Revision e9b7f4d7
Added by jahoti about 2 years ago
| test/proxy_core.py | ||
|---|---|---|
| 1 |
#!/usr/bin/env python3 |
|
| 2 |
# |
|
| 3 | 1 |
# Copyright (c) 2015, inaz2 |
| 4 | 2 |
# Copyright (C) 2021 jahoti <jahoti@tilde.team> |
| 5 | 3 |
# Licensing information is collated in the `copyright` file |
| ... | ... | |
| 38 | 36 |
p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout
|
| 39 | 37 |
Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate()
|
| 40 | 38 |
|
| 41 |
self.wfile.write('HTTP/1.1 200 Connection Established\r\n')
|
|
| 39 |
self.send_response(200)
|
|
| 42 | 40 |
self.end_headers() |
| 43 | 41 |
|
| 44 | 42 |
self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) |
| ... | ... | |
| 48 | 46 |
self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close')
|
| 49 | 47 |
|
| 50 | 48 |
def proxy(self): |
| 51 |
if self.path == 'http://hachette.test/': |
|
| 52 |
with open('ca.crt', 'rb') as f:
|
|
| 53 |
data = f.read() |
|
| 54 |
|
|
| 55 |
self.wfile.write('HTTP/1.1 200 OK\r\n')
|
|
| 56 |
self.send_header('Content-Type', 'application/x-x509-ca-cert')
|
|
| 57 |
self.send_header('Content-Length', len(data))
|
|
| 58 |
self.send_header('Connection', 'close')
|
|
| 59 |
self.end_headers() |
|
| 60 |
self.wfile.write(data) |
|
| 61 |
return |
|
| 62 |
|
|
| 63 | 49 |
content_length = int(self.headers.get('Content-Length', 0))
|
| 64 | 50 |
req_body = self.rfile.read(content_length) if content_length else None |
| 65 | 51 |
|
| ... | ... | |
| 79 | 65 |
|
| 80 | 66 |
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): |
| 81 | 67 |
"""The actual proxy server""" |
| 82 |
address_family, daemon_threads, handler = socket.AF_INET6, True, ProxyRequestHandler
|
|
| 68 |
address_family, daemon_threads = socket.AF_INET6, True
|
|
| 83 | 69 |
|
| 84 | 70 |
def handle_error(self, request, client_address): |
| 85 | 71 |
# suppress socket/ssl related errors |
| 86 | 72 |
cls, e = sys.exc_info()[:2] |
| 87 | 73 |
if not (cls is socket.error or cls is ssl.SSLError): |
| 88 | 74 |
return HTTPServer.handle_error(self, request, client_address) |
| 89 |
|
|
| 90 |
|
|
| 91 |
def start(server_class, port=1337): |
|
| 92 |
"""Start up the proxy/server""" |
|
| 93 |
|
|
| 94 |
print('Serving HTTP Proxy')
|
|
| 95 |
httpd = server_class(('::1', port), ProxyRequestHandler)
|
|
| 96 |
httpd.serve_forever() |
|
Also available in: Unified diff
Enable the hijacking proxy in the test suite to serve responses