1
|
# SPDX-License-Identifier: CC0-1.0
|
2
|
|
3
|
"""
|
4
|
Haketilo unit tests - routing HTTP requests through background script
|
5
|
"""
|
6
|
|
7
|
# This file is part of Haketilo
|
8
|
#
|
9
|
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
|
10
|
#
|
11
|
# This program is free software: you can redistribute it and/or modify
|
12
|
# it under the terms of the CC0 1.0 Universal License as published by
|
13
|
# the Creative Commons Corporation.
|
14
|
#
|
15
|
# This program is distributed in the hope that it will be useful,
|
16
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
17
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
18
|
# CC0 1.0 Universal License for more details.
|
19
|
|
20
|
import pytest
|
21
|
import json
|
22
|
from selenium.webdriver.support.ui import WebDriverWait
|
23
|
|
24
|
from ..script_loader import load_script
|
25
|
from ..world_wide_library import some_data
|
26
|
|
27
|
datas = {
|
28
|
'resource': 'https://anotherdoma.in/resource/blocked/by/CORS.json',
|
29
|
'nonexistent': 'https://nxdoma.in/resource.json',
|
30
|
'invalid': 'w3csucks://invalid.url/',
|
31
|
'redirected_ok': 'https://site.with.scripts.block.ed',
|
32
|
'redirected_err': 'https://site.with.scripts.block.ed'
|
33
|
}
|
34
|
|
35
|
for name, url in [*datas.items()]:
|
36
|
datas[name] = {'url': url}
|
37
|
|
38
|
datas['redirected_ok']['init'] = {'redirect': 'follow'}
|
39
|
datas['redirected_err']['init'] = {'redirect': 'error'}
|
40
|
|
41
|
content_script = '''\
|
42
|
const datas = %s;
|
43
|
|
44
|
async function fetch_resources() {
|
45
|
const results = {};
|
46
|
const promises = [];
|
47
|
for (const [name, data] of Object.entries(datas)) {
|
48
|
const sending = browser.runtime.sendMessage(["CORS_bypass", data]);
|
49
|
promises.push(sending.then(response => results[name] = response));
|
50
|
}
|
51
|
|
52
|
await Promise.all(promises);
|
53
|
|
54
|
window.wrappedJSObject.haketilo_fetch_results = results;
|
55
|
}
|
56
|
|
57
|
fetch_resources();
|
58
|
'''
|
59
|
|
60
|
content_script = content_script % json.dumps(datas);
|
61
|
|
62
|
@pytest.mark.ext_data({
|
63
|
'content_script': content_script,
|
64
|
'background_script':
|
65
|
lambda: load_script('background/CORS_bypass_server.js') + '; start();'
|
66
|
})
|
67
|
@pytest.mark.usefixtures('webextension')
|
68
|
def test_CORS_bypass_server(driver, execute_in_page):
|
69
|
"""
|
70
|
Test if CORS bypassing works and if errors get properly forwarded.
|
71
|
"""
|
72
|
driver.get('https://gotmyowndoma.in/')
|
73
|
|
74
|
# First, verify that requests without CORS bypass measures fail.
|
75
|
results = execute_in_page(
|
76
|
'''
|
77
|
const result = {};
|
78
|
let promises = [];
|
79
|
for (const [name, data] of Object.entries(arguments[0])) {
|
80
|
const [ok_cb, err_cb] =
|
81
|
["ok", "err"].map(status => () => result[name] = status);
|
82
|
promises.push(fetch(data.url).then(ok_cb, err_cb));
|
83
|
}
|
84
|
// Make the promises non-failing.
|
85
|
promises = promises.map(p => new Promise(cb => p.then(cb, cb)));
|
86
|
returnval(Promise.all(promises).then(() => result));
|
87
|
''',
|
88
|
{**datas, 'sameorigin': './nonexistent_resource'})
|
89
|
|
90
|
assert results == dict([*[(k, 'err') for k in datas.keys()],
|
91
|
('sameorigin', 'ok')])
|
92
|
|
93
|
done = lambda d: d.execute_script('return window.haketilo_fetch_results;')
|
94
|
results = WebDriverWait(driver, 10).until(done)
|
95
|
|
96
|
assert set(results['invalid'].keys()) == {'error'}
|
97
|
assert results['invalid']['error']['fileName'].endswith('background.js')
|
98
|
assert type(results['invalid']['error']['lineNumber']) is int
|
99
|
assert type(results['invalid']['error']['message']) is str
|
100
|
assert results['invalid']['error']['name'] == 'TypeError'
|
101
|
|
102
|
assert results['nonexistent']['status'] == 404
|
103
|
assert results['nonexistent']['statusText'] == 'Not Found'
|
104
|
assert any([name.lower() == 'content-length'
|
105
|
for name, value in results['nonexistent']['headers']])
|
106
|
assert bytes.fromhex(results['nonexistent']['body']) == \
|
107
|
b'Handler for this URL not found.'
|
108
|
|
109
|
assert results['resource']['status'] == 200
|
110
|
assert results['resource']['statusText'] == 'OK'
|
111
|
assert any([name.lower() == 'content-length'
|
112
|
for name, value in results['resource']['headers']])
|
113
|
assert bytes.fromhex(results['resource']['body']) == b'{"some": "data"}'
|
114
|
|
115
|
assert results['redirected_ok']['status'] == 200
|
116
|
assert results['redirected_err']['error']['name'] == 'TypeError'
|