8 |
8 |
Be sure to run this inside your intended certificates directory.
|
9 |
9 |
"""
|
10 |
10 |
|
11 |
|
import os, socket, ssl, sys, threading, time
|
|
11 |
import os, socket, ssl, subprocess, sys, threading, time
|
12 |
12 |
from http.server import HTTPServer, BaseHTTPRequestHandler
|
13 |
13 |
from socketserver import ThreadingMixIn
|
14 |
|
from subprocess import Popen, PIPE
|
15 |
14 |
|
16 |
|
gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock()
|
17 |
|
sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s'
|
|
15 |
gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock()
|
|
16 |
sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s'
|
18 |
17 |
|
|
18 |
def popen(command, *args, **kwargs):
|
|
19 |
return subprocess.Popen((command % args).split(' '), **kwargs)
|
19 |
20 |
|
20 |
21 |
class ProxyRequestHandler(BaseHTTPRequestHandler):
|
21 |
22 |
"""Handles a network request made to the proxy"""
|
|
23 |
certdir = ''
|
22 |
24 |
|
23 |
25 |
def log_error(self, format, *args):
|
24 |
26 |
# suppress "Request timed out: timeout('timed out',)"
|
... | ... | |
29 |
31 |
|
30 |
32 |
def do_CONNECT(self):
|
31 |
33 |
hostname = self.path.split(':')[0]
|
32 |
|
certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA')
|
|
34 |
certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA')
|
33 |
35 |
|
34 |
36 |
with lock:
|
35 |
37 |
if not os.path.isfile(certpath):
|
36 |
|
p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout
|
37 |
|
Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate()
|
|
38 |
p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout
|
|
39 |
popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate()
|
38 |
40 |
|
39 |
41 |
self.send_response(200)
|
40 |
42 |
self.end_headers()
|
41 |
43 |
|
42 |
|
self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True)
|
|
44 |
self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True)
|
43 |
45 |
self.rfile = self.connection.makefile('rb', self.rbufsize)
|
44 |
46 |
self.wfile = self.connection.makefile('wb', self.wbufsize)
|
45 |
47 |
|
Support a custom certificates directory in test/server.py