Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(236)

Side by Side Diff: recipe_engine/third_party/setuptools/tests/server.py

Issue 1344583003: Recipe package system. (Closed) Base URL: git@github.com:luci/recipes-py.git@master
Patch Set: Recompiled proto Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 """Basic http server for tests to simulate PyPI or custom indexes
2 """
3 import sys
4 import time
5 import threading
6 from setuptools.compat import BaseHTTPRequestHandler
7 from setuptools.compat import (urllib2, URLError, HTTPServer,
8 SimpleHTTPRequestHandler)
9
10 class IndexServer(HTTPServer):
11 """Basic single-threaded http server simulating a package index
12
13 You can use this server in unittest like this::
14 s = IndexServer()
15 s.start()
16 index_url = s.base_url() + 'mytestindex'
17 # do some test requests to the index
18 # The index files should be located in setuptools/tests/indexes
19 s.stop()
20 """
21 def __init__(self, server_address=('', 0),
22 RequestHandlerClass=SimpleHTTPRequestHandler):
23 HTTPServer.__init__(self, server_address, RequestHandlerClass)
24 self._run = True
25
26 def serve(self):
27 while self._run:
28 self.handle_request()
29
30 def start(self):
31 self.thread = threading.Thread(target=self.serve)
32 self.thread.start()
33
34 def stop(self):
35 "Stop the server"
36
37 # Let the server finish the last request and wait for a new one.
38 time.sleep(0.1)
39
40 # self.shutdown is not supported on python < 2.6, so just
41 # set _run to false, and make a request, causing it to
42 # terminate.
43 self._run = False
44 url = 'http://127.0.0.1:%(server_port)s/' % vars(self)
45 try:
46 if sys.version_info >= (2, 6):
47 urllib2.urlopen(url, timeout=5)
48 else:
49 urllib2.urlopen(url)
50 except URLError:
51 # ignore any errors; all that's important is the request
52 pass
53 self.thread.join()
54 self.socket.close()
55
56 def base_url(self):
57 port = self.server_port
58 return 'http://127.0.0.1:%s/setuptools/tests/indexes/' % port
59
60 class RequestRecorder(BaseHTTPRequestHandler):
61 def do_GET(self):
62 requests = vars(self.server).setdefault('requests', [])
63 requests.append(self)
64 self.send_response(200, 'OK')
65
66 class MockServer(HTTPServer, threading.Thread):
67 """
68 A simple HTTP Server that records the requests made to it.
69 """
70 def __init__(self, server_address=('', 0),
71 RequestHandlerClass=RequestRecorder):
72 HTTPServer.__init__(self, server_address, RequestHandlerClass)
73 threading.Thread.__init__(self)
74 self.setDaemon(True)
75 self.requests = []
76
77 def run(self):
78 self.serve_forever()
79
80 def url(self):
81 return 'http://localhost:%(server_port)s/' % vars(self)
82 url = property(url)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698