Project

General

Profile

Download (4.71 KB) Statistics
| Branch: | Tag: | Revision:

haketilo / test / haketilo_test / unit / test_repo_query_cacher.py @ 13a707c6

1
# SPDX-License-Identifier: CC0-1.0
2

    
3
"""
4
Haketilo unit tests - caching responses from remote repositories
5
"""
6

    
7
# This file is part of Haketilo
8
#
9
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
10
#
11
# This program is free software: you can redistribute it and/or modify
12
# it under the terms of the CC0 1.0 Universal License as published by
13
# the Creative Commons Corporation.
14
#
15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# CC0 1.0 Universal License for more details.
19

    
20
import pytest
21
import json
22
from selenium.webdriver.support.ui import WebDriverWait
23

    
24
from ..script_loader import load_script
25

    
26
def content_script():
27
    script = load_script('content/repo_query_cacher.js')
28
    return f'{script}; {tab_id_asker}; start();'
29

    
30
def bypass_js():
31
    return load_script('background/CORS_bypass_server.js') + '; start();'
32

    
33
def fetch_through_cache(driver, tab_id, url):
34
    return driver.execute_script(
35
        '''
36
        return browser.tabs.sendMessage(arguments[0],
37
                                        ["repo_query", arguments[1]]);
38
        ''',
39
        tab_id, url)
40

    
41
"""
42
tab_id_responder is meant to be appended to background script of a test
43
extension.
44
"""
45
tab_id_responder = '''
46
function tell_tab_id(msg, sender, respond_cb) {
47
    if (msg[0] === "learn_tab_id")
48
        respond_cb(sender.tab.id);
49
}
50
browser.runtime.onMessage.addListener(tell_tab_id);
51
'''
52

    
53
"""
54
tab_id_asker is meant to be appended to content script of a test extension.
55
"""
56
tab_id_asker = '''
57
browser.runtime.sendMessage(["learn_tab_id"])
58
    .then(tid => window.wrappedJSObject.haketilo_tab = tid);
59
'''
60

    
61
def run_content_script_in_new_window(driver, url):
62
    """
63
    Expect an extension to be loaded which had tab_id_responder and tab_id_asker
64
    appended to its background and content scripts, respectively.
65
    Open the provided url in a new tab, find its tab id and return it, with
66
    current window changed back to the initial one.
67
    """
68
    handle0 = driver.current_window_handle
69
    initial_handles = [*driver.window_handles]
70
    driver.execute_script('window.open(arguments[0], "_blank");', url)
71
    window_added = lambda d: set(d.window_handles) != set(initial_handles)
72
    WebDriverWait(driver, 10).until(window_added)
73
    new_handle = [*set(driver.window_handles).difference(initial_handles)][0]
74

    
75
    driver.switch_to.window(new_handle)
76

    
77
    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
78
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)
79

    
80
    driver.switch_to.window(handle0)
81
    return tab_id
82

    
83
@pytest.mark.ext_data({
84
    'content_script': content_script,
85
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
86
})
87
@pytest.mark.usefixtures('webextension')
88
def test_repo_query_cacher_normal_use(driver):
89
    """
90
    Test if HTTP requests made through our cacher return correct results.
91
    """
92
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
93

    
94
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
95
    assert set(result.keys()) == {'status', 'statusText', 'headers', 'body'}
96
    counter_initial = json.loads(bytes.fromhex(result['body']))['counter']
97
    assert type(counter_initial) is int
98

    
99
    for i in range(2):
100
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
101
        assert json.loads(bytes.fromhex(result['body'])) \
102
            == {'counter': counter_initial}
103

    
104
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
105
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
106
    assert json.loads(bytes.fromhex(result['body'])) \
107
        == {'counter': counter_initial + 1}
108

    
109
    for i in range(2):
110
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
111
        assert result['status'] == 404
112

    
113
    for i in range(2):
114
        result = fetch_through_cache(driver, tab_id, 'bad://url')
115
        assert set(result.keys()) == {'error'}
116
        assert result['error']['name'] == 'TypeError'
117

    
118
@pytest.mark.ext_data({
119
    'content_script': content_script,
120
    'background_script': tab_id_responder
121
})
122
@pytest.mark.usefixtures('webextension')
123
def test_repo_query_cacher_bgscript_error(driver):
124
    """
125
    Test if our cacher properly reports errors in communication with the
126
    background script.
127
    """
128
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
129

    
130
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
131
    assert set(result.keys()) == {'error'}
132
    assert set(result['error'].keys()) == \
133
        {'name', 'message', 'fileName', 'lineNumber'}
134
    assert result['error']['message'] == \
135
        "Couldn't communicate with background script."
(21-21/25)