Revision 6c69435c
Added by jahoti about 2 years ago
| test/proxy_core.py | ||
|---|---|---|
| 8 | 8 | Be sure to run this inside your intended certificates directory. | 
| 9 | 9 | """ | 
| 10 | 10 |  | 
| 11 | import os, socket, ssl, sys, threading, time | |
| 11 | import os, socket, ssl, subprocess, sys, threading, time
 | |
| 12 | 12 | from http.server import HTTPServer, BaseHTTPRequestHandler | 
| 13 | 13 | from socketserver import ThreadingMixIn | 
| 14 | from subprocess import Popen, PIPE | |
| 15 | 14 |  | 
| 16 | gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock() | |
| 17 | sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s'
 | |
| 15 | gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock()
 | |
| 16 | sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s'
 | |
| 18 | 17 |  | 
| 18 | def popen(command, *args, **kwargs): | |
| 19 |     return subprocess.Popen((command % args).split(' '), **kwargs)
 | |
| 19 | 20 |  | 
| 20 | 21 | class ProxyRequestHandler(BaseHTTPRequestHandler): | 
| 21 | 22 | """Handles a network request made to the proxy""" | 
| 23 | certdir = '' | |
| 22 | 24 |  | 
| 23 | 25 | def log_error(self, format, *args): | 
| 24 | 26 | 		# suppress "Request timed out: timeout('timed out',)"
 | 
| ... | ... | |
| 29 | 31 |  | 
| 30 | 32 | def do_CONNECT(self): | 
| 31 | 33 | 		hostname = self.path.split(':')[0]
 | 
| 32 | 		certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA')
 | |
| 34 | 		certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA')
 | |
| 33 | 35 |  | 
| 34 | 36 | with lock: | 
| 35 | 37 | if not os.path.isfile(certpath): | 
| 36 | 				p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout
 | |
| 37 | 				Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate()
 | |
| 38 | 				p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout
 | |
| 39 | 				popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate()
 | |
| 38 | 40 |  | 
| 39 | 41 | self.send_response(200) | 
| 40 | 42 | self.end_headers() | 
| 41 | 43 |  | 
| 42 | self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) | |
| 44 | 		self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True)
 | |
| 43 | 45 | 		self.rfile = self.connection.makefile('rb', self.rbufsize)
 | 
| 44 | 46 | 		self.wfile = self.connection.makefile('wb', self.wbufsize)
 | 
| 45 | 47 |  | 
Also available in: Unified diff
Support a custom certificates directory in test/server.py