Revision 6c69435c
Added by jahoti about 2 years ago
test/proxy_core.py | ||
---|---|---|
8 | 8 |
Be sure to run this inside your intended certificates directory. |
9 | 9 |
""" |
10 | 10 |
|
11 |
import os, socket, ssl, sys, threading, time |
|
11 |
import os, socket, ssl, subprocess, sys, threading, time
|
|
12 | 12 |
from http.server import HTTPServer, BaseHTTPRequestHandler |
13 | 13 |
from socketserver import ThreadingMixIn |
14 |
from subprocess import Popen, PIPE |
|
15 | 14 |
|
16 |
gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock() |
|
17 |
sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s'
|
|
15 |
gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock()
|
|
16 |
sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s'
|
|
18 | 17 |
|
18 |
def popen(command, *args, **kwargs): |
|
19 |
return subprocess.Popen((command % args).split(' '), **kwargs) |
|
19 | 20 |
|
20 | 21 |
class ProxyRequestHandler(BaseHTTPRequestHandler): |
21 | 22 |
"""Handles a network request made to the proxy""" |
23 |
certdir = '' |
|
22 | 24 |
|
23 | 25 |
def log_error(self, format, *args): |
24 | 26 |
# suppress "Request timed out: timeout('timed out',)" |
... | ... | |
29 | 31 |
|
30 | 32 |
def do_CONNECT(self): |
31 | 33 |
hostname = self.path.split(':')[0] |
32 |
certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA')
|
|
34 |
certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA')
|
|
33 | 35 |
|
34 | 36 |
with lock: |
35 | 37 |
if not os.path.isfile(certpath): |
36 |
p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout
|
|
37 |
Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate()
|
|
38 |
p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout
|
|
39 |
popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate()
|
|
38 | 40 |
|
39 | 41 |
self.send_response(200) |
40 | 42 |
self.end_headers() |
41 | 43 |
|
42 |
self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) |
|
44 |
self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True)
|
|
43 | 45 |
self.rfile = self.connection.makefile('rb', self.rbufsize) |
44 | 46 |
self.wfile = self.connection.makefile('wb', self.wbufsize) |
45 | 47 |
|
Also available in: Unified diff
Support a custom certificates directory in test/server.py