Project

General

Profile

Download (4.46 KB) Statistics
| Branch: | Tag: | Revision:

haketilo / test / unit / test_repo_query_cacher.py @ 4c6a2323

1
# SPDX-License-Identifier: CC0-1.0
2

    
3
"""
4
Haketilo unit tests - caching responses from remote repositories
5
"""
6

    
7
# This file is part of Haketilo
8
#
9
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
10
#
11
# This program is free software: you can redistribute it and/or modify
12
# it under the terms of the CC0 1.0 Universal License as published by
13
# the Creative Commons Corporation.
14
#
15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# CC0 1.0 Universal License for more details.
19

    
20
import pytest
21
import json
22
from selenium.webdriver.support.ui import WebDriverWait
23

    
24
from ..script_loader import load_script
25

    
26
def content_script():
27
    script = load_script('content/repo_query_cacher.js')
28
    return f'{script}; {tab_id_asker}; start();'
29

    
30
def bypass_js():
31
    return load_script('background/CORS_bypass_server.js') + '; start();'
32

    
33
def fetch_through_cache(driver, tab_id, url):
34
    return driver.execute_script(
35
        '''
36
        return browser.tabs.sendMessage(arguments[0],
37
                                        ["repo_query", arguments[1]]);
38
        ''',
39
        tab_id, url)
40

    
41
"""
42
tab_id_responder is meant to be appended to background script of a test
43
extension.
44
"""
45
tab_id_responder = '''
46
function tell_tab_id(msg, sender, respond_cb) {
47
    if (msg[0] === "learn_tab_id")
48
        respond_cb(sender.tab.id);
49
}
50
browser.runtime.onMessage.addListener(tell_tab_id);
51
'''
52

    
53
"""
54
tab_id_asker is meant to be appended to content script of a test extension.
55
"""
56
tab_id_asker = '''
57
browser.runtime.sendMessage(["learn_tab_id"])
58
    .then(tid => window.wrappedJSObject.haketilo_tab = tid);
59
'''
60

    
61
def run_content_script_in_new_window(driver, url):
62
    """
63
    Expect an extension to be loaded which had tab_id_responder and tab_id_asker
64
    appended to its background and content scripts, respectively.
65
    Open the provided url in a new tab, find its tab id and return it, with
66
    current window changed back to the initial one.
67
    """
68
    handle0 = driver.current_window_handle
69
    initial_handles = [*driver.window_handles]
70
    driver.execute_script('window.open(arguments[0], "_blank");', url)
71
    window_added = lambda d: set(d.window_handles) != set(initial_handles)
72
    WebDriverWait(driver, 10).until(window_added)
73
    new_handle = [*set(driver.window_handles).difference(initial_handles)][0]
74

    
75
    driver.switch_to.window(new_handle)
76

    
77
    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
78
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)
79

    
80
    driver.switch_to.window(handle0)
81
    return tab_id
82

    
83
@pytest.mark.ext_data({
84
    'content_script': content_script,
85
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
86
})
87
@pytest.mark.usefixtures('webextension')
88
def test_repo_query_cacher_normal_use(driver, execute_in_page):
89
    """
90
    Test if HTTP requests made through our cacher return correct results.
91
    """
92
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
93

    
94
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
95
    assert set(result.keys()) == {'ok', 'status', 'json'}
96
    counter_initial = result['json']['counter']
97
    assert type(counter_initial) is int
98

    
99
    for i in range(2):
100
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
101
        assert result['json']['counter'] == counter_initial
102

    
103
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
104
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
105
    assert result['json']['counter'] == counter_initial + 1
106

    
107
    for i in range(2):
108
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
109
        assert set(result.keys()) == {'ok', 'status', 'error_json'}
110
        assert result['ok'] == False
111
        assert result['status'] == 404
112

    
113
    for i in range(2):
114
        result = fetch_through_cache(driver, tab_id, 'bad://url')
115
        assert set(result.keys()) == {'error'}
116

    
117
@pytest.mark.ext_data({
118
    'content_script': content_script,
119
    'background_script': tab_id_responder
120
})
121
@pytest.mark.usefixtures('webextension')
122
def test_repo_query_cacher_bgscript_error(driver):
123
    """
124
    Test if our cacher properly reports errors in communication with the
125
    background script.
126
    """
127
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
128

    
129
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
130
    assert set(result.keys()) == {'error'}
(21-21/25)