Revision 25ff1c9d
Added by koszko over 1 year ago
- ID 25ff1c9d01b337387411644393ac4eb04d511cad
- Parent 9dda3aa9
| .gitmodules | ||
|---|---|---|
| 4 | 4 |
# |
| 5 | 5 |
# Available under the terms of Creative Commons Zero v1.0 Universal. |
| 6 | 6 |
|
| 7 |
[submodule "src/hydrilla/schemas"]
|
|
| 7 |
[submodule "hydrilla-json-schemas"]
|
|
| 8 | 8 |
path = src/hydrilla/schemas |
| 9 | 9 |
url = ../hydrilla-json-schemas |
| 10 |
[submodule "src/test/source-package-example"]
|
|
| 10 |
[submodule "hydrilla-source-package-example"]
|
|
| 11 | 11 |
path = tests/source-package-example |
| 12 | 12 |
url = ../hydrilla-source-package-example |
| 13 |
[submodule "hydrilla-source-package-example-apt"] |
|
| 14 |
path = tests/source-package-example-apt |
|
| 15 |
url = ../hydrilla-source-package-example-apt |
|
| pyproject.toml | ||
|---|---|---|
| 17 | 17 |
testpaths = [ |
| 18 | 18 |
"tests" |
| 19 | 19 |
] |
| 20 |
markers = [ |
|
| 21 |
"subprocess_run: define how mocked subprocess.run should behave" |
|
| 22 |
] |
|
| src/hydrilla/builder/local_apt.py | ||
|---|---|---|
| 1 |
# SPDX-License-Identifier: AGPL-3.0-or-later |
|
| 2 |
|
|
| 3 |
# Using a local APT. |
|
| 4 |
# |
|
| 5 |
# This file is part of Hydrilla |
|
| 6 |
# |
|
| 7 |
# Copyright (C) 2022 Wojtek Kosior |
|
| 8 |
# |
|
| 9 |
# This program is free software: you can redistribute it and/or modify |
|
| 10 |
# it under the terms of the GNU Affero General Public License as |
|
| 11 |
# published by the Free Software Foundation, either version 3 of the |
|
| 12 |
# License, or (at your option) any later version. |
|
| 13 |
# |
|
| 14 |
# This program is distributed in the hope that it will be useful, |
|
| 15 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
| 16 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
| 17 |
# GNU Affero General Public License for more details. |
|
| 18 |
# |
|
| 19 |
# You should have received a copy of the GNU Affero General Public License |
|
| 20 |
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
|
| 21 |
# |
|
| 22 |
# |
|
| 23 |
# I, Wojtek Kosior, thereby promise not to sue for violation of this |
|
| 24 |
# file's license. Although I request that you do not make use this code |
|
| 25 |
# in a proprietary program, I am not going to enforce this in court. |
|
| 26 |
|
|
| 27 |
# Enable using with Python 3.7. |
|
| 28 |
from __future__ import annotations |
|
| 29 |
|
|
| 30 |
import zipfile |
|
| 31 |
import shutil |
|
| 32 |
import re |
|
| 33 |
import subprocess |
|
| 34 |
CP = subprocess.CompletedProcess |
|
| 35 |
from pathlib import Path |
|
| 36 |
from tempfile import TemporaryDirectory, NamedTemporaryFile |
|
| 37 |
from hashlib import sha256 |
|
| 38 |
from contextlib import contextmanager |
|
| 39 |
from typing import Optional, Iterable |
|
| 40 |
|
|
| 41 |
from .. import util |
|
| 42 |
|
|
| 43 |
here = Path(__file__).resolve().parent |
|
| 44 |
|
|
| 45 |
_ = util.translation(here / 'locales').gettext |
|
| 46 |
|
|
| 47 |
""" |
|
| 48 |
Default cache directory to save APT configurations and downloaded GPG keys in. |
|
| 49 |
""" |
|
| 50 |
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt' |
|
| 51 |
|
|
| 52 |
""" |
|
| 53 |
Default keyserver to use. |
|
| 54 |
""" |
|
| 55 |
default_keyserver = 'hkps://keyserver.ubuntu.com:443' |
|
| 56 |
|
|
| 57 |
""" |
|
| 58 |
Default keys to download when using a local APT. |
|
| 59 |
""" |
|
| 60 |
default_keys = [ |
|
| 61 |
# Trisquel |
|
| 62 |
'B4EFB9F38D8AEBF1', 'B138CA450C05112F', |
|
| 63 |
# Ubuntu |
|
| 64 |
'40976EAF437D05B5', '3B4FE6ACC0B21F32', '871920D1991BC93C', |
|
| 65 |
# Debian |
|
| 66 |
'9D6D8F6BC857C906', '8B48AD6246925553', 'DCC9EFBF77E11517', '648ACFD622F3D138', |
|
| 67 |
'54404762BBB6E853' |
|
| 68 |
] |
|
| 69 |
|
|
| 70 |
"""sources.list file contents for known distros.""" |
|
| 71 |
default_lists = {
|
|
| 72 |
'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
|
|
| 73 |
for type in ('deb', 'deb-src')
|
|
| 74 |
for suf in ('', '-updates', '-security')]
|
|
| 75 |
} |
|
| 76 |
|
|
| 77 |
class GpgError(Exception): |
|
| 78 |
""" |
|
| 79 |
Exception used to report various problems when calling GPG. |
|
| 80 |
""" |
|
| 81 |
|
|
| 82 |
class DistroError(Exception): |
|
| 83 |
""" |
|
| 84 |
Exception used to report problems when resolving an OS distribution. |
|
| 85 |
""" |
|
| 86 |
|
|
| 87 |
class AptError(Exception): |
|
| 88 |
""" |
|
| 89 |
Exception used to report various problems when calling apt-* commands. |
|
| 90 |
""" |
|
| 91 |
def __init__(self, msg: str, cp: Optional[CP]=None) -> None: |
|
| 92 |
"""Initialize this AptError""" |
|
| 93 |
if cp and cp.stdout: |
|
| 94 |
msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
|
|
| 95 |
|
|
| 96 |
if cp and cp.stderr: |
|
| 97 |
msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
|
|
| 98 |
|
|
| 99 |
super().__init__(msg) |
|
| 100 |
|
|
| 101 |
class Apt: |
|
| 102 |
""" |
|
| 103 |
This class represents an APT instance and can be used to call apt-get |
|
| 104 |
commands with it. |
|
| 105 |
""" |
|
| 106 |
def __init__(self, apt_conf: str) -> None: |
|
| 107 |
"""Initialize this Apt object.""" |
|
| 108 |
self.apt_conf = apt_conf |
|
| 109 |
|
|
| 110 |
def get(self, *args: str, **kwargs) -> CP: |
|
| 111 |
""" |
|
| 112 |
Run apt-get with the specified arguments and raise a meaningful AptError |
|
| 113 |
when something goes wrong. |
|
| 114 |
""" |
|
| 115 |
command = ['apt-get', '-c', self.apt_conf, *args] |
|
| 116 |
try: |
|
| 117 |
cp = subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
|
|
| 118 |
capture_output=True, text=True) |
|
| 119 |
except FileNotFoundError: |
|
| 120 |
raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
|
|
| 121 |
|
|
| 122 |
if cp.returncode != 0: |
|
| 123 |
msg = _('apt_get_command_{}_failed').format(' '.join(command))
|
|
| 124 |
raise AptError(msg, cp) |
|
| 125 |
|
|
| 126 |
return cp |
|
| 127 |
|
|
| 128 |
def cache_dir() -> Path: |
|
| 129 |
""" |
|
| 130 |
Return the directory used to cache data (APT configurations, keyrings) to |
|
| 131 |
speed up repeated operations. |
|
| 132 |
|
|
| 133 |
This function first ensures the directory exists. |
|
| 134 |
""" |
|
| 135 |
default_apt_cache_dir.mkdir(parents=True, exist_ok=True) |
|
| 136 |
return default_apt_cache_dir |
|
| 137 |
|
|
| 138 |
class SourcesList: |
|
| 139 |
"""Representation of apt's sources.list contents.""" |
|
| 140 |
def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None: |
|
| 141 |
"""Initialize this SourcesList.""" |
|
| 142 |
self.codename = None |
|
| 143 |
self.list = [*list] |
|
| 144 |
self.has_extra_entries = bool(self.list) |
|
| 145 |
|
|
| 146 |
if codename is not None: |
|
| 147 |
if codename not in default_lists: |
|
| 148 |
raise DistroError(_('distro_{}_unknown').format(codename))
|
|
| 149 |
|
|
| 150 |
self.codename = codename |
|
| 151 |
self.list.extend(default_lists[codename]) |
|
| 152 |
|
|
| 153 |
def identity(self) -> str: |
|
| 154 |
""" |
|
| 155 |
Produce a string that uniquely identifies this sources.list contents. |
|
| 156 |
""" |
|
| 157 |
if self.codename and not self.has_extra_entries: |
|
| 158 |
return self.codename |
|
| 159 |
|
|
| 160 |
return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
|
|
| 161 |
|
|
| 162 |
def apt_conf(directory: Path) -> str: |
|
| 163 |
""" |
|
| 164 |
Given local APT's directory, produce a configuration suitable for running |
|
| 165 |
APT there. |
|
| 166 |
|
|
| 167 |
'directory' must not contain any special characters including quotes and |
|
| 168 |
spaces. |
|
| 169 |
""" |
|
| 170 |
return f''' |
|
| 171 |
Dir "{directory}";
|
|
| 172 |
Dir::State "{directory}/var/lib/apt";
|
|
| 173 |
Dir::State::status "{directory}/var/lib/dpkg/status";
|
|
| 174 |
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
|
|
| 175 |
Dir::Etc::SourceParts ""; |
|
| 176 |
Dir::Cache "{directory}/var/cache/apt";
|
|
| 177 |
pkgCacheGen::Essential "none"; |
|
| 178 |
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
|
|
| 179 |
''' |
|
| 180 |
|
|
| 181 |
def apt_keyring(keys: [str]) -> bytes: |
|
| 182 |
""" |
|
| 183 |
Download the requested keys if necessary and export them as a keyring |
|
| 184 |
suitable for passing to APT. |
|
| 185 |
|
|
| 186 |
The keyring is returned as a bytes value that should be written to a file. |
|
| 187 |
""" |
|
| 188 |
try: |
|
| 189 |
from gnupg import GPG |
|
| 190 |
except ModuleNotFoundError: |
|
| 191 |
raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
|
|
| 192 |
|
|
| 193 |
gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg')) |
|
| 194 |
for key in keys: |
|
| 195 |
if gpg.list_keys(keys=[key]) != []: |
|
| 196 |
continue |
|
| 197 |
|
|
| 198 |
if gpg.recv_keys(default_keyserver, key).imported == 0: |
|
| 199 |
raise GpgError(_('gpg_couldnt_recv_key'))
|
|
| 200 |
|
|
| 201 |
return gpg.export_keys(keys, armor=False, minimal=True) |
|
| 202 |
|
|
| 203 |
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None: |
|
| 204 |
""" |
|
| 205 |
Zip an APT root directory for later use and move the zipfile to the |
|
| 206 |
requested destination. |
|
| 207 |
""" |
|
| 208 |
temporary_zip_path = None |
|
| 209 |
try: |
|
| 210 |
tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_', |
|
| 211 |
dir=cache_dir(), delete=False) |
|
| 212 |
temporary_zip_path = Path(tmpfile.name) |
|
| 213 |
|
|
| 214 |
to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
|
|
| 215 |
|
|
| 216 |
with zipfile.ZipFile(tmpfile, 'w') as zf: |
|
| 217 |
for member in apt_root.rglob('*'):
|
|
| 218 |
relative = member.relative_to(apt_root) |
|
| 219 |
if relative not in to_skip: |
|
| 220 |
# This call will also properly add empty folders to zip file |
|
| 221 |
zf.write(member, relative, zipfile.ZIP_DEFLATED) |
|
| 222 |
|
|
| 223 |
shutil.move(temporary_zip_path, destination_zip) |
|
| 224 |
finally: |
|
| 225 |
if temporary_zip_path is not None and temporary_zip_path.exists(): |
|
| 226 |
temporary_zip_path.unlink() |
|
| 227 |
|
|
| 228 |
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt: |
|
| 229 |
""" |
|
| 230 |
Create files and directories necessary for running APT without root rights |
|
| 231 |
inside 'directory'. |
|
| 232 |
|
|
| 233 |
'directory' must not contain any special characters including quotes and |
|
| 234 |
spaces and must be empty. |
|
| 235 |
|
|
| 236 |
Return an Apt object that can be used to call apt-get commands. |
|
| 237 |
""" |
|
| 238 |
apt_root = directory / 'apt_root' |
|
| 239 |
|
|
| 240 |
conf_text = apt_conf(apt_root) |
|
| 241 |
keyring_bytes = apt_keyring(keys) |
|
| 242 |
|
|
| 243 |
apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
|
|
| 244 |
if apt_zipfile.exists(): |
|
| 245 |
with zipfile.ZipFile(apt_zipfile) as zf: |
|
| 246 |
zf.extractall(apt_root) |
|
| 247 |
|
|
| 248 |
for to_create in ( |
|
| 249 |
apt_root / 'var' / 'lib' / 'apt' / 'partial', |
|
| 250 |
apt_root / 'var' / 'lib' / 'apt' / 'lists', |
|
| 251 |
apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial', |
|
| 252 |
apt_root / 'etc' / 'apt' / 'preferences.d', |
|
| 253 |
apt_root / 'var' / 'lib' / 'dpkg', |
|
| 254 |
apt_root / 'var' / 'log' / 'apt' |
|
| 255 |
): |
|
| 256 |
to_create.mkdir(parents=True, exist_ok=True) |
|
| 257 |
|
|
| 258 |
conf_path = apt_root / 'etc' / 'apt.conf' |
|
| 259 |
trusted_path = apt_root / 'etc' / 'trusted.gpg' |
|
| 260 |
status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status' |
|
| 261 |
list_path = apt_root / 'etc' / 'apt.sources.list' |
|
| 262 |
|
|
| 263 |
conf_path.write_text(conf_text) |
|
| 264 |
trusted_path.write_bytes(keyring_bytes) |
|
| 265 |
status_path.touch() |
|
| 266 |
list_path.write_text('\n'.join(list.list))
|
|
| 267 |
|
|
| 268 |
apt = Apt(str(conf_path)) |
|
| 269 |
apt.get('update')
|
|
| 270 |
|
|
| 271 |
cache_apt_root(apt_root, apt_zipfile) |
|
| 272 |
|
|
| 273 |
return apt |
|
| 274 |
|
|
| 275 |
@contextmanager |
|
| 276 |
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]: |
|
| 277 |
""" |
|
| 278 |
Create a temporary directory with proper local APT configuration in it. |
|
| 279 |
Yield an Apt object that can be used to issue apt-get commands. |
|
| 280 |
|
|
| 281 |
This function returns a context manager that will remove the directory on |
|
| 282 |
close. |
|
| 283 |
""" |
|
| 284 |
with TemporaryDirectory() as td: |
|
| 285 |
td = Path(td) |
|
| 286 |
yield setup_local_apt(td, list, keys) |
|
| 287 |
|
|
| 288 |
def download_apt_packages(list: SourcesList, keys: [str], packages: [str], |
|
| 289 |
destination_dir: Path, with_deps=False) -> [str]: |
|
| 290 |
""" |
|
| 291 |
Set up a local APT, update it using the specified sources.list configuration |
|
| 292 |
and use it to download the specified packages. |
|
| 293 |
|
|
| 294 |
This function downloads a .deb file of the packages matching the current |
|
| 295 |
architecture (which includes packages with architecture 'all') as well as |
|
| 296 |
all theis corresponding source package files and (if requested) the debs |
|
| 297 |
and source files of all their declared dependencies. |
|
| 298 |
|
|
| 299 |
Return value is a list of names of all downloaded files. |
|
| 300 |
""" |
|
| 301 |
with local_apt(list, keys) as apt: |
|
| 302 |
if with_deps: |
|
| 303 |
cp = apt.get('install', '--yes', '--just-print', *packages)
|
|
| 304 |
|
|
| 305 |
deps_listing = re.match( |
|
| 306 |
r''' |
|
| 307 |
.* |
|
| 308 |
The\sfollowing\sNEW\spackages\swill\sbe\sinstalled: |
|
| 309 |
(.*) |
|
| 310 |
0\supgraded, |
|
| 311 |
''', |
|
| 312 |
cp.stdout, |
|
| 313 |
re.MULTILINE | re.DOTALL | re.VERBOSE) |
|
| 314 |
|
|
| 315 |
if deps_listing is None: |
|
| 316 |
raise AptError(_('apt_install_output_not_understood'), cp)
|
|
| 317 |
|
|
| 318 |
packages = deps_listing.group(1).split() |
|
| 319 |
|
|
| 320 |
# Download .debs to indirectly to destination_dir by first placing them |
|
| 321 |
# in a temporary subdirectory. |
|
| 322 |
with TemporaryDirectory(dir=destination_dir) as td: |
|
| 323 |
td = Path(td) |
|
| 324 |
cp = apt.get('download', *packages, cwd=td)
|
|
| 325 |
|
|
| 326 |
deb_name_regex = re.compile( |
|
| 327 |
r''' |
|
| 328 |
^ |
|
| 329 |
(?P<name>[^_]+) |
|
| 330 |
_ |
|
| 331 |
(?P<ver>[^_]+) |
|
| 332 |
_ |
|
| 333 |
.+ # architecture (or 'all') |
|
| 334 |
\.deb |
|
| 335 |
$ |
|
| 336 |
''', |
|
| 337 |
re.VERBOSE) |
|
| 338 |
|
|
| 339 |
names_vers = [] |
|
| 340 |
downloaded = [] |
|
| 341 |
for deb_file in td.glob('*'):
|
|
| 342 |
match = deb_name_regex.match(deb_file.name) |
|
| 343 |
if match is None: |
|
| 344 |
msg = _('apt_download_gave_bad_filename_{}')\
|
|
| 345 |
.format(deb_file.name) |
|
| 346 |
raise AptError(msg, cp) |
|
| 347 |
|
|
| 348 |
names_vers.append((match.group('name'), match.group('ver')))
|
|
| 349 |
downloaded.append(deb_file.name) |
|
| 350 |
|
|
| 351 |
apt.get('source', '--download-only',
|
|
| 352 |
*[f'{n}={v}' for n, v in names_vers], cwd=td)
|
|
| 353 |
|
|
| 354 |
for source_file in td.glob('*'):
|
|
| 355 |
if source_file.name in downloaded: |
|
| 356 |
continue |
|
| 357 |
|
|
| 358 |
downloaded.append(source_file.name) |
|
| 359 |
|
|
| 360 |
for filename in downloaded: |
|
| 361 |
shutil.move(td / filename, destination_dir / filename) |
|
| 362 |
|
|
| 363 |
return downloaded |
|
| tests/source-package-example-apt | ||
|---|---|---|
| 1 |
Subproject commit 9294298e538fd5c9ef647270e6a48df85b4950f8 |
|
| tests/test_local_apt.py | ||
|---|---|---|
| 1 |
# SPDX-License-Identifier: CC0-1.0 |
|
| 2 |
|
|
| 3 |
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> |
|
| 4 |
# |
|
| 5 |
# Available under the terms of Creative Commons Zero v1.0 Universal. |
|
| 6 |
|
|
| 7 |
# Enable using with Python 3.7. |
|
| 8 |
from __future__ import annotations |
|
| 9 |
|
|
| 10 |
import pytest |
|
| 11 |
import tempfile |
|
| 12 |
import re |
|
| 13 |
from pathlib import Path |
|
| 14 |
from zipfile import ZipFile |
|
| 15 |
#from hashlib import sha256 |
|
| 16 |
|
|
| 17 |
from hydrilla.builder import local_apt |
|
| 18 |
|
|
| 19 |
here = Path(__file__).resolve().parent |
|
| 20 |
|
|
| 21 |
@pytest.fixture(autouse=True) |
|
| 22 |
def no_requests(monkeypatch): |
|
| 23 |
"""Remove requests.sessions.Session.request for all tests.""" |
|
| 24 |
monkeypatch.delattr('requests.sessions.Session.request')
|
|
| 25 |
|
|
| 26 |
@pytest.fixture |
|
| 27 |
def mock_cache_dir(monkeypatch): |
|
| 28 |
"""Make local_apt.py cache files to a temporary directory.""" |
|
| 29 |
with tempfile.TemporaryDirectory() as td: |
|
| 30 |
td_path = Path(td) |
|
| 31 |
monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path) |
|
| 32 |
yield td_path |
|
| 33 |
|
|
| 34 |
@pytest.fixture |
|
| 35 |
def mock_gnupg_import(monkeypatch, mock_cache_dir): |
|
| 36 |
"""Mock gnupg library when imported dynamically.""" |
|
| 37 |
|
|
| 38 |
gnupg_mock_dir = mock_cache_dir / 'gnupg_mock' |
|
| 39 |
gnupg_mock_dir.mkdir() |
|
| 40 |
(gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n')
|
|
| 41 |
|
|
| 42 |
monkeypatch.syspath_prepend(str(gnupg_mock_dir)) |
|
| 43 |
|
|
| 44 |
import gnupg |
|
| 45 |
|
|
| 46 |
keyring_path = mock_cache_dir / 'master_keyring.gpg' |
|
| 47 |
|
|
| 48 |
class MockedImportResult: |
|
| 49 |
"""gnupg.ImportResult replacement""" |
|
| 50 |
def __init__(self): |
|
| 51 |
"""Initialize MockedImportResult object.""" |
|
| 52 |
self.imported = 1 |
|
| 53 |
|
|
| 54 |
class MockedGPG: |
|
| 55 |
"""GPG replacement that does not really invoke GPG.""" |
|
| 56 |
def __init__(self, keyring): |
|
| 57 |
"""Verify the keyring path and initialize MockedGPG.""" |
|
| 58 |
assert keyring == str(keyring_path) |
|
| 59 |
|
|
| 60 |
self.known_keys = {*keyring_path.read_text().split('\n')} \
|
|
| 61 |
if keyring_path.exists() else set() |
|
| 62 |
|
|
| 63 |
def recv_keys(self, keyserver, key): |
|
| 64 |
"""Mock key receiving - record requested key as received.""" |
|
| 65 |
assert keyserver == local_apt.default_keyserver |
|
| 66 |
assert key not in self.known_keys |
|
| 67 |
|
|
| 68 |
self.known_keys.add(key) |
|
| 69 |
keyring_path.write_text('\n'.join(self.known_keys))
|
|
| 70 |
|
|
| 71 |
return MockedImportResult() |
|
| 72 |
|
|
| 73 |
def list_keys(self, keys=None): |
|
| 74 |
"""Mock key listing - return a list with dummy items.""" |
|
| 75 |
if keys is None: |
|
| 76 |
return ['dummy'] * len(self.known_keys) |
|
| 77 |
else: |
|
| 78 |
return ['dummy' for k in keys if k in self.known_keys] |
|
| 79 |
|
|
| 80 |
def export_keys(self, keys, **kwargs): |
|
| 81 |
""" |
|
| 82 |
Mock key export - check that the call has the expected arguments and |
|
| 83 |
return a dummy bytes array. |
|
| 84 |
""" |
|
| 85 |
assert kwargs['armor'] == False |
|
| 86 |
assert kwargs['minimal'] == True |
|
| 87 |
assert {*keys} == self.known_keys
|
|
| 88 |
|
|
| 89 |
return b'<dummy keys export>' |
|
| 90 |
|
|
| 91 |
monkeypatch.setattr(gnupg, 'GPG', MockedGPG) |
|
| 92 |
|
|
| 93 |
class MockedCompletedProcess: |
|
| 94 |
""" |
|
| 95 |
Object with some fields similar to those of subprocess.CompletedProcess. |
|
| 96 |
""" |
|
| 97 |
def __init__(self, args, returncode, stdout, stderr): |
|
| 98 |
"""Initialize MockedCompletedProcess""" |
|
| 99 |
self.args = args |
|
| 100 |
self.returncode = returncode |
|
| 101 |
self.stdout = stdout |
|
| 102 |
self.stderr = stderr |
|
| 103 |
|
|
| 104 |
""" |
|
| 105 |
Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based |
|
| 106 |
system. |
|
| 107 |
""" |
|
| 108 |
sample_install_stdout = '''\ |
|
| 109 |
NOTE: This is only a simulation! |
|
| 110 |
apt-get needs root privileges for real execution. |
|
| 111 |
Keep also in mind that locking is deactivated, |
|
| 112 |
so don't depend on the relevance to the real current situation! |
|
| 113 |
Reading package lists... |
|
| 114 |
Building dependency tree... |
|
| 115 |
Reading state information... |
|
| 116 |
The following additional packages will be installed: |
|
| 117 |
fonts-mathjax |
|
| 118 |
Suggested packages: |
|
| 119 |
fonts-mathjax-extras fonts-stix libjs-mathjax-doc |
|
| 120 |
The following NEW packages will be installed: |
|
| 121 |
fonts-mathjax libjs-mathjax |
|
| 122 |
0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded. |
|
| 123 |
Inst fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) |
|
| 124 |
Inst libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) |
|
| 125 |
Conf fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) |
|
| 126 |
Conf libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) |
|
| 127 |
''' |
|
| 128 |
|
|
| 129 |
class MockedSubprocess: |
|
| 130 |
"""subprocess with a run() function that does not spawn a process.""" |
|
| 131 |
def __init__(self, update_return_value=0, install_return_value=0, |
|
| 132 |
download_return_value=0, source_return_value=0): |
|
| 133 |
"""Initialize MockedSubprocess object.""" |
|
| 134 |
self.update_return_value = update_return_value |
|
| 135 |
self.install_return_value = install_return_value |
|
| 136 |
self.download_return_value = download_return_value |
|
| 137 |
self.source_return_value = source_return_value |
|
| 138 |
|
|
| 139 |
def run(self, command, **kwargs): |
|
| 140 |
""" |
|
| 141 |
Instead of running an 'apt-get update' command just touch some file |
|
| 142 |
in apt root to indicate that the call was made. Instead of running an |
|
| 143 |
'apt-get install' command just print a possible output of one. Instead |
|
| 144 |
if running an 'apt-get download' command just write some dummy .deb |
|
| 145 |
files to the appropriate directory. |
|
| 146 |
""" |
|
| 147 |
assert kwargs['env'] == {'LANG': 'en_US'}
|
|
| 148 |
assert kwargs['capture_output'] == True |
|
| 149 |
|
|
| 150 |
if 'update' in command: |
|
| 151 |
expected_command = ['apt-get', '-c', '<conf_path>', 'update'] |
|
| 152 |
elif 'install' in command: |
|
| 153 |
expected_command = ['apt-get', '-c', '<conf_path>', 'install', |
|
| 154 |
'--yes', '--just-print', 'libjs-mathjax'] |
|
| 155 |
elif 'download' in command: |
|
| 156 |
expected_command = ['apt-get', '-c', '<conf_path>', 'download', |
|
| 157 |
'libjs-mathjax'] |
|
| 158 |
if 'fonts-mathjax' in command: |
|
| 159 |
expected_command.insert(-1, 'fonts-mathjax') |
|
| 160 |
elif 'source' in command: |
|
| 161 |
expected_command = ['apt-get', '-c', '<conf_path>', 'source', |
|
| 162 |
'--download-only', 'libjs-mathjax=2.7.9+dfsg-1'] |
|
| 163 |
if 'fonts-mathjax=2.7.9+dfsg-1' in command: |
|
| 164 |
if command[-1] == 'fonts-mathjax=2.7.9+dfsg-1': |
|
| 165 |
expected_command.append('fonts-mathjax=2.7.9+dfsg-1')
|
|
| 166 |
else: |
|
| 167 |
expected_command.insert(-1, 'fonts-mathjax=2.7.9+dfsg-1') |
|
| 168 |
else: |
|
| 169 |
raise Exception(f'unknown apt command: {" ".join(command)}')
|
|
| 170 |
|
|
| 171 |
assert len(expected_command) == len(command) |
|
| 172 |
|
|
| 173 |
for word, expected in zip(command, expected_command): |
|
| 174 |
if expected == '<conf_path>': |
|
| 175 |
conf_path = Path(word) |
|
| 176 |
else: |
|
| 177 |
assert word == expected |
|
| 178 |
|
|
| 179 |
if 'update' in command: |
|
| 180 |
(conf_path.parent / 'update_called').touch() |
|
| 181 |
|
|
| 182 |
if 'download' in command: |
|
| 183 |
destination = Path(kwargs.get('cwd') or Path.cwd())
|
|
| 184 |
|
|
| 185 |
(destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb')\ |
|
| 186 |
.write_text('dummy libjs-mathjax_2.7.9+dfsg-1_all.deb')
|
|
| 187 |
|
|
| 188 |
if 'fonts-mathjax' in command: |
|
| 189 |
(destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb')\ |
|
| 190 |
.write_text('dummy fonts-mathjax_2.7.9+dfsg-1_all.deb')
|
|
| 191 |
|
|
| 192 |
if 'source' in command: |
|
| 193 |
destination = Path(kwargs.get('cwd') or Path.cwd())
|
|
| 194 |
for filename in [ |
|
| 195 |
'mathjax_2.7.9+dfsg-1.debian.tar.xz', |
|
| 196 |
'mathjax_2.7.9+dfsg-1.dsc', |
|
| 197 |
'mathjax_2.7.9+dfsg.orig.tar.xz' |
|
| 198 |
]: |
|
| 199 |
(destination / filename).write_text(f'dummy {filename}')
|
|
| 200 |
|
|
| 201 |
if kwargs.get('text'):
|
|
| 202 |
stderr = 'some error output' |
|
| 203 |
stdout = 'some output' |
|
| 204 |
if 'install' in command: |
|
| 205 |
stdout = sample_install_stdout |
|
| 206 |
else: |
|
| 207 |
stdout = b'some bin output' |
|
| 208 |
stderr = b'some bin error output' |
|
| 209 |
if 'install' in command: |
|
| 210 |
stdout = sample_install_stdout.encode() |
|
| 211 |
|
|
| 212 |
return_val = (self.update_return_value if 'update' in command else \ |
|
| 213 |
self.install_return_value if 'install' in command else \ |
|
| 214 |
self.download_return_value if 'download' in command else \ |
|
| 215 |
self.source_return_value) |
|
| 216 |
|
|
| 217 |
return MockedCompletedProcess(command, return_val, stdout, stderr) |
|
| 218 |
|
|
| 219 |
@pytest.fixture |
|
| 220 |
def mock_apt_get_update(monkeypatch, request): |
|
| 221 |
"""Mock 'apt-get update' command when called through subprocess.run.""" |
|
| 222 |
marker = request.node.get_closest_marker('subprocess_run')
|
|
| 223 |
run_mock_opts = marker.args[0] if marker else {}
|
|
| 224 |
monkeypatch.setattr(local_apt, 'subprocess', |
|
| 225 |
MockedSubprocess(**run_mock_opts)) |
|
| 226 |
|
|
| 227 |
class MockedSubprocessRaises: |
|
| 228 |
"""subprocess with a run() function that always raises FileNotFoundError.""" |
|
| 229 |
def run(self, command, **kwargs): |
|
| 230 |
""" |
|
| 231 |
Instead of running an 'apt-get' command we act as if it was missing. |
|
| 232 |
""" |
|
| 233 |
raise FileNotFoundError('dummy')
|
|
| 234 |
|
|
| 235 |
@pytest.fixture |
|
| 236 |
def mock_apt_get_update_raises(monkeypatch): |
|
| 237 |
"""Mock missing 'apt-get' command when called through subprocess.run.""" |
|
| 238 |
|
|
| 239 |
monkeypatch.setattr(local_apt, 'subprocess', MockedSubprocessRaises()) |
|
| 240 |
|
|
| 241 |
def test_local_apt_contextmanager(mock_cache_dir, mock_apt_get_update, |
|
| 242 |
mock_gnupg_import): |
|
| 243 |
""" |
|
| 244 |
Verify that the local_apt() function creates a proper apt environment and |
|
| 245 |
that it also properly restores it from cache. |
|
| 246 |
""" |
|
| 247 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 248 |
|
|
| 249 |
with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: |
|
| 250 |
apt_root = Path(apt.apt_conf).parent.parent |
|
| 251 |
|
|
| 252 |
assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \ |
|
| 253 |
b'<dummy keys export>' |
|
| 254 |
|
|
| 255 |
assert (apt_root / 'etc' / 'update_called').exists() |
|
| 256 |
|
|
| 257 |
assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \ |
|
| 258 |
'deb-src sth\ndeb sth' |
|
| 259 |
|
|
| 260 |
conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n')
|
|
| 261 |
|
|
| 262 |
# check mocked keyring |
|
| 263 |
assert {*local_apt.default_keys} == \
|
|
| 264 |
{*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')}
|
|
| 265 |
|
|
| 266 |
assert not apt_root.exists() |
|
| 267 |
|
|
| 268 |
expected_conf = {
|
|
| 269 |
'Dir': str(apt_root), |
|
| 270 |
'Dir::State': f'{apt_root}/var/lib/apt',
|
|
| 271 |
'Dir::State::status': f'{apt_root}/var/lib/dpkg/status',
|
|
| 272 |
'Dir::Etc::SourceList': f'{apt_root}/etc/apt.sources.list',
|
|
| 273 |
'Dir::Etc::SourceParts': '', |
|
| 274 |
'Dir::Cache': f'{apt_root}/var/cache/apt',
|
|
| 275 |
'pkgCacheGen::Essential': 'none', |
|
| 276 |
'Dir::Etc::Trusted': f'{apt_root}/etc/trusted.gpg',
|
|
| 277 |
} |
|
| 278 |
|
|
| 279 |
conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$') |
|
| 280 |
assert dict([(m.group('key'), m.group('val'))
|
|
| 281 |
for l in conf_lines if l for m in [conf_regex.match(l)]]) == \ |
|
| 282 |
expected_conf |
|
| 283 |
|
|
| 284 |
with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf:
|
|
| 285 |
# reuse the same APT, its cached zip file should exist now |
|
| 286 |
with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: |
|
| 287 |
apt_root = Path(apt.apt_conf).parent.parent |
|
| 288 |
|
|
| 289 |
expected_members = {*apt_root.rglob('*')}
|
|
| 290 |
expected_members.remove(apt_root / 'etc' / 'apt.conf') |
|
| 291 |
expected_members.remove(apt_root / 'etc' / 'trusted.gpg') |
|
| 292 |
|
|
| 293 |
names = zf.namelist() |
|
| 294 |
assert len(names) == len(expected_members) |
|
| 295 |
|
|
| 296 |
for name in names: |
|
| 297 |
path = apt_root / name |
|
| 298 |
assert path in expected_members |
|
| 299 |
assert zf.read(name) == \ |
|
| 300 |
(b'' if path.is_dir() else path.read_bytes()) |
|
| 301 |
|
|
| 302 |
assert not apt_root.exists() |
|
| 303 |
|
|
| 304 |
@pytest.mark.subprocess_run({'update_return_value': 1})
|
|
| 305 |
def test_local_apt_update_failing(mock_cache_dir, mock_apt_get_update, |
|
| 306 |
mock_gnupg_import): |
|
| 307 |
""" |
|
| 308 |
Verify that the local_apt() function raises a proper error when |
|
| 309 |
'apt-get update' command returns non-0. |
|
| 310 |
""" |
|
| 311 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 312 |
|
|
| 313 |
with pytest.raises(local_apt.AptError) as excinfo: |
|
| 314 |
with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: |
|
| 315 |
pass |
|
| 316 |
|
|
| 317 |
assert len(excinfo.value.args) == 1 |
|
| 318 |
|
|
| 319 |
assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', |
|
| 320 |
excinfo.value.args[0]) |
|
| 321 |
|
|
| 322 |
def test_local_apt_missing(mock_cache_dir, mock_apt_get_update_raises, |
|
| 323 |
mock_gnupg_import): |
|
| 324 |
""" |
|
| 325 |
Verify that the local_apt() function raises a proper error when 'apt-get' |
|
| 326 |
command is missing. |
|
| 327 |
""" |
|
| 328 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 329 |
|
|
| 330 |
with pytest.raises(local_apt.AptError) as excinfo: |
|
| 331 |
with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: |
|
| 332 |
pass |
|
| 333 |
|
|
| 334 |
assert len(excinfo.value.args) == 1 |
|
| 335 |
assert isinstance(excinfo.value.args[0], str) |
|
| 336 |
assert '\n' not in excinfo.value.args[0] |
|
| 337 |
|
|
| 338 |
def test_local_apt_download(mock_cache_dir, mock_apt_get_update, |
|
| 339 |
mock_gnupg_import): |
|
| 340 |
""" |
|
| 341 |
Verify that download_apt_packages() function properly performs the download |
|
| 342 |
of .debs and sources. |
|
| 343 |
""" |
|
| 344 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 345 |
destination = mock_cache_dir / 'destination' |
|
| 346 |
destination.mkdir() |
|
| 347 |
|
|
| 348 |
local_apt.download_apt_packages(sources_list, local_apt.default_keys, |
|
| 349 |
['libjs-mathjax'], destination) |
|
| 350 |
|
|
| 351 |
libjs_mathjax_path = destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb' |
|
| 352 |
fonts_mathjax_path = destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb' |
|
| 353 |
|
|
| 354 |
source_paths = [ |
|
| 355 |
destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz', |
|
| 356 |
destination / 'mathjax_2.7.9+dfsg-1.dsc', |
|
| 357 |
destination / 'mathjax_2.7.9+dfsg.orig.tar.xz' |
|
| 358 |
] |
|
| 359 |
|
|
| 360 |
assert {*destination.glob('*')} == {libjs_mathjax_path, *source_paths}
|
|
| 361 |
|
|
| 362 |
local_apt.download_apt_packages(sources_list, local_apt.default_keys, |
|
| 363 |
['libjs-mathjax'], destination, |
|
| 364 |
with_deps=True) |
|
| 365 |
|
|
| 366 |
assert {*destination.glob('*')} == \
|
|
| 367 |
{libjs_mathjax_path, fonts_mathjax_path, *source_paths}
|
|
| 368 |
|
|
| 369 |
@pytest.mark.subprocess_run({'install_return_value': 1})
|
|
| 370 |
def test_local_apt_install_failing(mock_cache_dir, mock_apt_get_update, |
|
| 371 |
mock_gnupg_import): |
|
| 372 |
""" |
|
| 373 |
Verify that the download_apt_packages() function raises a proper error when |
|
| 374 |
'apt-get install' command returns non-0. |
|
| 375 |
""" |
|
| 376 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 377 |
destination = mock_cache_dir / 'destination' |
|
| 378 |
destination.mkdir() |
|
| 379 |
|
|
| 380 |
with pytest.raises(local_apt.AptError) as excinfo: |
|
| 381 |
local_apt.download_apt_packages(sources_list, local_apt.default_keys, |
|
| 382 |
['libjs-mathjax'], destination, |
|
| 383 |
with_deps=True) |
|
| 384 |
|
|
| 385 |
assert len(excinfo.value.args) == 1 |
|
| 386 |
|
|
| 387 |
assert re.match(r'^.*\n\n.*\n\n', excinfo.value.args[0]) |
|
| 388 |
assert re.search(r'\n\nsome error output$', excinfo.value.args[0]) |
|
| 389 |
assert sample_install_stdout in excinfo.value.args[0] |
|
| 390 |
|
|
| 391 |
assert [*destination.glob('*')] == []
|
|
| 392 |
|
|
| 393 |
@pytest.mark.subprocess_run({'download_return_value': 1})
|
|
| 394 |
def test_local_apt_download_failing(mock_cache_dir, mock_apt_get_update, |
|
| 395 |
mock_gnupg_import): |
|
| 396 |
""" |
|
| 397 |
Verify that the download_apt_packages() function raises a proper error when |
|
| 398 |
'apt-get download' command returns non-0. |
|
| 399 |
""" |
|
| 400 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 401 |
destination = mock_cache_dir / 'destination' |
|
| 402 |
destination.mkdir() |
|
| 403 |
|
|
| 404 |
with pytest.raises(local_apt.AptError) as excinfo: |
|
| 405 |
local_apt.download_apt_packages(sources_list, local_apt.default_keys, |
|
| 406 |
['libjs-mathjax'], destination) |
|
| 407 |
|
|
| 408 |
assert len(excinfo.value.args) == 1 |
|
| 409 |
|
|
| 410 |
assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', |
|
| 411 |
excinfo.value.args[0]) |
|
| 412 |
|
|
| 413 |
assert [*destination.glob('*')] == []
|
|
| 414 |
|
|
| 415 |
@pytest.mark.subprocess_run({'source_return_value': 1})
|
|
| 416 |
def test_local_apt_source_failing(mock_cache_dir, mock_apt_get_update, |
|
| 417 |
mock_gnupg_import): |
|
| 418 |
""" |
|
| 419 |
Verify that the download_apt_packages() function raises a proper error when |
|
| 420 |
'apt-get source' command returns non-0. |
|
| 421 |
""" |
|
| 422 |
sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) |
|
| 423 |
destination = mock_cache_dir / 'destination' |
|
| 424 |
destination.mkdir() |
|
| 425 |
|
|
| 426 |
with pytest.raises(local_apt.AptError) as excinfo: |
|
| 427 |
local_apt.download_apt_packages(sources_list, local_apt.default_keys, |
|
| 428 |
['libjs-mathjax'], destination) |
|
| 429 |
|
|
| 430 |
assert len(excinfo.value.args) == 1 |
|
| 431 |
|
|
| 432 |
assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', |
|
| 433 |
excinfo.value.args[0]) |
|
| 434 |
|
|
| 435 |
assert [*destination.glob('*')] == []
|
|
| 436 |
|
|
| 437 |
def test_sources_list(): |
|
| 438 |
list = local_apt.SourcesList([], 'nabia') |
|
| 439 |
assert list.identity() == 'nabia' |
|
| 440 |
|
|
| 441 |
with pytest.raises(local_apt.DistroError): |
|
| 442 |
local_apt.SourcesList([], 'nabiaĆ') |
|
| 443 |
|
|
| 444 |
list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia') |
|
| 445 |
assert list.identity() == \ |
|
| 446 |
'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1' |
|
Also available in: Unified diff
this commit is a backup of a part of bigger unfinished work and will be force-pushed over later