Revision 6c69435c
Added by jahoti about 2 years ago
| test/proxy_core.py | ||
|---|---|---|
| 8 | 8 |
Be sure to run this inside your intended certificates directory. |
| 9 | 9 |
""" |
| 10 | 10 |
|
| 11 |
import os, socket, ssl, sys, threading, time |
|
| 11 |
import os, socket, ssl, subprocess, sys, threading, time
|
|
| 12 | 12 |
from http.server import HTTPServer, BaseHTTPRequestHandler |
| 13 | 13 |
from socketserver import ThreadingMixIn |
| 14 |
from subprocess import Popen, PIPE |
|
| 15 | 14 |
|
| 16 |
gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock() |
|
| 17 |
sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s'
|
|
| 15 |
gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock()
|
|
| 16 |
sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s'
|
|
| 18 | 17 |
|
| 18 |
def popen(command, *args, **kwargs): |
|
| 19 |
return subprocess.Popen((command % args).split(' '), **kwargs)
|
|
| 19 | 20 |
|
| 20 | 21 |
class ProxyRequestHandler(BaseHTTPRequestHandler): |
| 21 | 22 |
"""Handles a network request made to the proxy""" |
| 23 |
certdir = '' |
|
| 22 | 24 |
|
| 23 | 25 |
def log_error(self, format, *args): |
| 24 | 26 |
# suppress "Request timed out: timeout('timed out',)"
|
| ... | ... | |
| 29 | 31 |
|
| 30 | 32 |
def do_CONNECT(self): |
| 31 | 33 |
hostname = self.path.split(':')[0]
|
| 32 |
certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA')
|
|
| 34 |
certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA')
|
|
| 33 | 35 |
|
| 34 | 36 |
with lock: |
| 35 | 37 |
if not os.path.isfile(certpath): |
| 36 |
p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout
|
|
| 37 |
Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate()
|
|
| 38 |
p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout
|
|
| 39 |
popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate()
|
|
| 38 | 40 |
|
| 39 | 41 |
self.send_response(200) |
| 40 | 42 |
self.end_headers() |
| 41 | 43 |
|
| 42 |
self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) |
|
| 44 |
self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True)
|
|
| 43 | 45 |
self.rfile = self.connection.makefile('rb', self.rbufsize)
|
| 44 | 46 |
self.wfile = self.connection.makefile('wb', self.wbufsize)
|
| 45 | 47 |
|
Also available in: Unified diff
Support a custom certificates directory in test/server.py