Revision e9b7f4d7
Added by jahoti about 2 years ago
copyright | ||
---|---|---|
85 | 85 |
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
86 | 86 |
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
87 | 87 |
|
88 |
Files: test/server.py |
|
89 |
Copyright: 2021 jahoti <jahoti@tilde.team> |
|
90 |
License: AGPL-3+ |
|
91 |
|
|
88 | 92 |
Files: licenses/* |
89 | 93 |
Copyright: 2001, 2002, 2011-2013 Creative Commons |
90 | 94 |
License: CC-BY-4.0 |
test/proxy_core.py | ||
---|---|---|
1 |
#!/usr/bin/env python3 |
|
2 |
# |
|
3 | 1 |
# Copyright (c) 2015, inaz2 |
4 | 2 |
# Copyright (C) 2021 jahoti <jahoti@tilde.team> |
5 | 3 |
# Licensing information is collated in the `copyright` file |
... | ... | |
38 | 36 |
p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout |
39 | 37 |
Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate() |
40 | 38 |
|
41 |
self.wfile.write('HTTP/1.1 200 Connection Established\r\n')
|
|
39 |
self.send_response(200)
|
|
42 | 40 |
self.end_headers() |
43 | 41 |
|
44 | 42 |
self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) |
... | ... | |
48 | 46 |
self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close') |
49 | 47 |
|
50 | 48 |
def proxy(self): |
51 |
if self.path == 'http://hachette.test/': |
|
52 |
with open('ca.crt', 'rb') as f: |
|
53 |
data = f.read() |
|
54 |
|
|
55 |
self.wfile.write('HTTP/1.1 200 OK\r\n') |
|
56 |
self.send_header('Content-Type', 'application/x-x509-ca-cert') |
|
57 |
self.send_header('Content-Length', len(data)) |
|
58 |
self.send_header('Connection', 'close') |
|
59 |
self.end_headers() |
|
60 |
self.wfile.write(data) |
|
61 |
return |
|
62 |
|
|
63 | 49 |
content_length = int(self.headers.get('Content-Length', 0)) |
64 | 50 |
req_body = self.rfile.read(content_length) if content_length else None |
65 | 51 |
|
... | ... | |
79 | 65 |
|
80 | 66 |
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): |
81 | 67 |
"""The actual proxy server""" |
82 |
address_family, daemon_threads, handler = socket.AF_INET6, True, ProxyRequestHandler
|
|
68 |
address_family, daemon_threads = socket.AF_INET6, True
|
|
83 | 69 |
|
84 | 70 |
def handle_error(self, request, client_address): |
85 | 71 |
# suppress socket/ssl related errors |
86 | 72 |
cls, e = sys.exc_info()[:2] |
87 | 73 |
if not (cls is socket.error or cls is ssl.SSLError): |
88 | 74 |
return HTTPServer.handle_error(self, request, client_address) |
89 |
|
|
90 |
|
|
91 |
def start(server_class, port=1337): |
|
92 |
"""Start up the proxy/server""" |
|
93 |
|
|
94 |
print('Serving HTTP Proxy') |
|
95 |
httpd = server_class(('::1', port), ProxyRequestHandler) |
|
96 |
httpd.serve_forever() |
test/server.py | ||
---|---|---|
1 |
# Copyright (C) 2021 jahoti <jahoti@tilde.team> |
|
2 |
# Licensing information is collated in the `copyright` file |
|
3 |
|
|
4 |
""" |
|
5 |
A modular "virtual network" proxy, |
|
6 |
wrapping the classes in proxy_core.py |
|
7 |
""" |
|
8 |
|
|
9 |
from proxy_core import * |
|
10 |
from urllib.parse import parse_qs |
|
11 |
|
|
12 |
internet = {} # Add info here later |
|
13 |
mime_types = { |
|
14 |
"7z": "application/x-7z-compressed", "oga": "audio/ogg", |
|
15 |
"abw": "application/x-abiword", "ogv": "video/ogg", |
|
16 |
"arc": "application/x-freearc", "ogx": "application/ogg", |
|
17 |
"bin": "application/octet-stream", "opus": "audio/opus", |
|
18 |
"bz": "application/x-bzip", "otf": "font/otf", |
|
19 |
"bz2": "application/x-bzip2", "pdf": "application/pdf", |
|
20 |
"css": "text/css", "png": "image/png", |
|
21 |
"csv": "text/csv", "sh": "application/x-sh", |
|
22 |
"gif": "image/gif", "svg": "image/svg+xml", |
|
23 |
"gz": "application/gzip", "tar": "application/x-tar", |
|
24 |
"htm": "text/html", "ts": "video/mp2t", |
|
25 |
"html": "text/html", "ttf": "font/ttf", |
|
26 |
"ico": "image/vnd.microsoft.icon", "txt": "text/plain", |
|
27 |
"js": "text/javascript", "wav": "audio/wav", |
|
28 |
"jpeg": "image/jpeg", "weba": "audio/webm", |
|
29 |
"jpg": "image/jpeg", "webm": "video/webm", |
|
30 |
"json": "application/json", "woff": "font/woff", |
|
31 |
"mjs": "text/javascript", "woff2": "font/woff2", |
|
32 |
"mp3": "audio/mpeg", "xhtml": "application/xhtml+xml", |
|
33 |
"mp4": "video/mp4", "zip": "application/zip", |
|
34 |
"mpeg": "video/mpeg", |
|
35 |
"odp": "application/vnd.oasis.opendocument.presentation", |
|
36 |
"ods": "application/vnd.oasis.opendocument.spreadsheet", |
|
37 |
"odt": "application/vnd.oasis.opendocument.text", |
|
38 |
"xml": "application/xml" # text/xml if readable from casual users |
|
39 |
} |
|
40 |
|
|
41 |
class RequestHijacker(ProxyRequestHandler): |
|
42 |
def handle_request(self, req_body): |
|
43 |
path_components = self.path.split('?', maxsplit=1) |
|
44 |
path = path_components[0] |
|
45 |
try: |
|
46 |
# Response format: (status_code, headers (dict. of strings), |
|
47 |
# body as bytes or filename containing body as string) |
|
48 |
if path in internet: |
|
49 |
info = internet[path] |
|
50 |
if type(info) == tuple: |
|
51 |
status_code, headers, body_file = info |
|
52 |
if type(body_file) == str: |
|
53 |
if 'Content-Type' not in headers and '.' in body_file: |
|
54 |
ext = body_file.rsplit('.', maxsplit=1)[-1] |
|
55 |
if ext in mime_types: |
|
56 |
headers['Content-Type'] = mime_types[ext] |
|
57 |
|
|
58 |
with open(body_file, mode='rb') as f: |
|
59 |
body_file = f.read() |
|
60 |
|
|
61 |
else: |
|
62 |
# A function to evaluate to get the response |
|
63 |
get_params, post_params = {}, {} |
|
64 |
if len(path_components) == 2: |
|
65 |
get_params = parse_qs(path_components[1]) |
|
66 |
|
|
67 |
# Parse POST parameters; currently only supports |
|
68 |
# application/x-www-form-urlencoded |
|
69 |
if req_body: |
|
70 |
post_params = parse_qs(req_body.encode()) |
|
71 |
|
|
72 |
status_code, headers, body_file = info(self.command, get_params, post_params) |
|
73 |
if type(body_file) == str: |
|
74 |
body_file = body_file.encode() |
|
75 |
|
|
76 |
if type(status_code) != int or status_code <= 0: |
|
77 |
raise Exception('Invalid status code %r' % status_code) |
|
78 |
|
|
79 |
for header, header_value in headers.items(): |
|
80 |
if type(header) != str: |
|
81 |
raise Exception('Invalid header key %r' % header) |
|
82 |
|
|
83 |
elif type(header_value) != str: |
|
84 |
raise Exception('Invalid header value %r' % header_value) |
|
85 |
else: |
|
86 |
status_code, headers = 404, {'Content-Type': 'text/plain'} |
|
87 |
body_file = b'Handler for this URL not found.' |
|
88 |
|
|
89 |
except Exception as e: |
|
90 |
status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode() |
|
91 |
|
|
92 |
headers['Content-Length'] = str(len(body_file)) |
|
93 |
self.send_response(status_code) |
|
94 |
for header, header_value in headers.items(): |
|
95 |
self.send_header(header, header_value) |
|
96 |
|
|
97 |
self.end_headers() |
|
98 |
self.wfile.write(body_file) |
|
99 |
|
|
100 |
|
|
101 |
|
|
102 |
def do_an_internet(port=1337): |
|
103 |
"""Start up the proxy/server""" |
|
104 |
|
|
105 |
httpd = ThreadingHTTPServer(('', port), RequestHijacker) |
|
106 |
httpd.serve_forever() |
Also available in: Unified diff
Enable the hijacking proxy in the test suite to serve responses