Project

General

Profile

« Previous | Next » 

Revision 25ff1c9d

Added by koszko over 1 year ago

  • ID 25ff1c9d01b337387411644393ac4eb04d511cad
  • Parent 9dda3aa9

this commit is a backup of a part of bigger unfinished work and will be force-pushed over later

View differences:

.gitmodules
4 4
#
5 5
# Available under the terms of Creative Commons Zero v1.0 Universal.
6 6

  
7
[submodule "src/hydrilla/schemas"]
7
[submodule "hydrilla-json-schemas"]
8 8
	path = src/hydrilla/schemas
9 9
	url = ../hydrilla-json-schemas
10
[submodule "src/test/source-package-example"]
10
[submodule "hydrilla-source-package-example"]
11 11
	path = tests/source-package-example
12 12
	url = ../hydrilla-source-package-example
13
[submodule "hydrilla-source-package-example-apt"]
14
	path = tests/source-package-example-apt
15
	url = ../hydrilla-source-package-example-apt
pyproject.toml
17 17
testpaths = [
18 18
    "tests"
19 19
]
20
markers = [
21
    "subprocess_run: define how mocked subprocess.run should behave"
22
]
src/hydrilla/builder/local_apt.py
1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

  
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

  
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

  
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from contextlib import contextmanager
39
from typing import Optional, Iterable
40

  
41
from .. import util
42

  
43
here = Path(__file__).resolve().parent
44

  
45
_ = util.translation(here / 'locales').gettext
46

  
47
"""
48
Default cache directory to save APT configurations and downloaded GPG keys in.
49
"""
50
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
51

  
52
"""
53
Default keyserver to use.
54
"""
55
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
56

  
57
"""
58
Default keys to download when using a local APT.
59
"""
60
default_keys = [
61
    # Trisquel
62
    'B4EFB9F38D8AEBF1', 'B138CA450C05112F',
63
    # Ubuntu
64
    '40976EAF437D05B5', '3B4FE6ACC0B21F32', '871920D1991BC93C',
65
    # Debian
66
    '9D6D8F6BC857C906', '8B48AD6246925553', 'DCC9EFBF77E11517', '648ACFD622F3D138',
67
    '54404762BBB6E853'
68
]
69

  
70
"""sources.list file contents for known distros."""
71
default_lists = {
72
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
73
              for type in ('deb', 'deb-src')
74
              for suf in ('', '-updates', '-security')]
75
}
76

  
77
class GpgError(Exception):
78
    """
79
    Exception used to report various problems when calling GPG.
80
    """
81

  
82
class DistroError(Exception):
83
    """
84
    Exception used to report problems when resolving an OS distribution.
85
    """
86

  
87
class AptError(Exception):
88
    """
89
    Exception used to report various problems when calling apt-* commands.
90
    """
91
    def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
92
        """Initialize this AptError"""
93
        if cp and cp.stdout:
94
            msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
95

  
96
        if cp and cp.stderr:
97
            msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
98

  
99
        super().__init__(msg)
100

  
101
class Apt:
102
    """
103
    This class represents an APT instance and can be used to call apt-get
104
    commands with it.
105
    """
106
    def __init__(self, apt_conf: str) -> None:
107
        """Initialize this Apt object."""
108
        self.apt_conf = apt_conf
109

  
110
    def get(self, *args: str, **kwargs) -> CP:
111
        """
112
        Run apt-get with the specified arguments and raise a meaningful AptError
113
        when something goes wrong.
114
        """
115
        command = ['apt-get', '-c', self.apt_conf, *args]
116
        try:
117
            cp = subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
118
                                capture_output=True, text=True)
119
        except FileNotFoundError:
120
            raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
121

  
122
        if cp.returncode != 0:
123
            msg = _('apt_get_command_{}_failed').format(' '.join(command))
124
            raise AptError(msg, cp)
125

  
126
        return cp
127

  
128
def cache_dir() -> Path:
129
    """
130
    Return the directory used to cache data (APT configurations, keyrings) to
131
    speed up repeated operations.
132

  
133
    This function first ensures the directory exists.
134
    """
135
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
136
    return default_apt_cache_dir
137

  
138
class SourcesList:
139
    """Representation of apt's sources.list contents."""
140
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
141
        """Initialize this SourcesList."""
142
        self.codename = None
143
        self.list = [*list]
144
        self.has_extra_entries = bool(self.list)
145

  
146
        if codename is not None:
147
            if codename not in default_lists:
148
                raise DistroError(_('distro_{}_unknown').format(codename))
149

  
150
            self.codename = codename
151
            self.list.extend(default_lists[codename])
152

  
153
    def identity(self) -> str:
154
        """
155
        Produce a string that uniquely identifies this sources.list contents.
156
        """
157
        if self.codename and not self.has_extra_entries:
158
            return self.codename
159

  
160
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
161

  
162
def apt_conf(directory: Path) -> str:
163
    """
164
    Given local APT's directory, produce a configuration suitable for running
165
    APT there.
166

  
167
    'directory' must not contain any special characters including quotes and
168
    spaces.
169
    """
170
    return f'''
171
Dir "{directory}";
172
Dir::State "{directory}/var/lib/apt";
173
Dir::State::status "{directory}/var/lib/dpkg/status";
174
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
175
Dir::Etc::SourceParts "";
176
Dir::Cache "{directory}/var/cache/apt";
177
pkgCacheGen::Essential "none";
178
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
179
'''
180

  
181
def apt_keyring(keys: [str]) -> bytes:
182
    """
183
    Download the requested keys if necessary and export them as a keyring
184
    suitable for passing to APT.
185

  
186
    The keyring is returned as a bytes value that should be written to a file.
187
    """
188
    try:
189
        from gnupg import GPG
190
    except ModuleNotFoundError:
191
        raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
192

  
193
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
194
    for key in keys:
195
        if gpg.list_keys(keys=[key]) != []:
196
            continue
197

  
198
        if gpg.recv_keys(default_keyserver, key).imported == 0:
199
            raise GpgError(_('gpg_couldnt_recv_key'))
200

  
201
    return gpg.export_keys(keys, armor=False, minimal=True)
202

  
203
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
204
    """
205
    Zip an APT root directory for later use and move the zipfile to the
206
    requested destination.
207
    """
208
    temporary_zip_path = None
209
    try:
210
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
211
                                     dir=cache_dir(), delete=False)
212
        temporary_zip_path = Path(tmpfile.name)
213

  
214
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
215

  
216
        with zipfile.ZipFile(tmpfile, 'w') as zf:
217
            for member in apt_root.rglob('*'):
218
                relative = member.relative_to(apt_root)
219
                if relative not in to_skip:
220
                    # This call will also properly add empty folders to zip file
221
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
222

  
223
        shutil.move(temporary_zip_path, destination_zip)
224
    finally:
225
        if temporary_zip_path is not None and temporary_zip_path.exists():
226
            temporary_zip_path.unlink()
227

  
228
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
229
    """
230
    Create files and directories necessary for running APT without root rights
231
    inside 'directory'.
232

  
233
    'directory' must not contain any special characters including quotes and
234
    spaces and must be empty.
235

  
236
    Return an Apt object that can be used to call apt-get commands.
237
    """
238
    apt_root = directory / 'apt_root'
239

  
240
    conf_text     = apt_conf(apt_root)
241
    keyring_bytes = apt_keyring(keys)
242

  
243
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
244
    if apt_zipfile.exists():
245
        with zipfile.ZipFile(apt_zipfile) as zf:
246
            zf.extractall(apt_root)
247

  
248
    for to_create in (
249
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
250
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
251
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
252
            apt_root / 'etc' / 'apt' / 'preferences.d',
253
            apt_root / 'var' / 'lib' / 'dpkg',
254
            apt_root / 'var' / 'log' / 'apt'
255
    ):
256
        to_create.mkdir(parents=True, exist_ok=True)
257

  
258
    conf_path    = apt_root / 'etc' / 'apt.conf'
259
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
260
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
261
    list_path    = apt_root / 'etc' / 'apt.sources.list'
262

  
263
    conf_path.write_text(conf_text)
264
    trusted_path.write_bytes(keyring_bytes)
265
    status_path.touch()
266
    list_path.write_text('\n'.join(list.list))
267

  
268
    apt = Apt(str(conf_path))
269
    apt.get('update')
270

  
271
    cache_apt_root(apt_root, apt_zipfile)
272

  
273
    return apt
274

  
275
@contextmanager
276
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
277
    """
278
    Create a temporary directory with proper local APT configuration in it.
279
    Yield an Apt object that can be used to issue apt-get commands.
280

  
281
    This function returns a context manager that will remove the directory on
282
    close.
283
    """
284
    with TemporaryDirectory() as td:
285
        td = Path(td)
286
        yield setup_local_apt(td, list, keys)
287

  
288
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
289
                          destination_dir: Path, with_deps=False) -> [str]:
290
    """
291
    Set up a local APT, update it using the specified sources.list configuration
292
    and use it to download the specified packages.
293

  
294
    This function downloads a .deb file of the packages matching the current
295
    architecture (which includes packages with architecture 'all') as well as
296
    all theis corresponding source package files and (if requested) the debs
297
    and source files of all their declared dependencies.
298

  
299
    Return value is a list of names of all downloaded files.
300
    """
301
    with local_apt(list, keys) as apt:
302
        if with_deps:
303
            cp = apt.get('install', '--yes', '--just-print', *packages)
304

  
305
            deps_listing = re.match(
306
                r'''
307
                .*
308
                The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
309
                (.*)
310
                0\supgraded,
311
                ''',
312
                cp.stdout,
313
                re.MULTILINE | re.DOTALL | re.VERBOSE)
314

  
315
            if deps_listing is None:
316
                raise AptError(_('apt_install_output_not_understood'), cp)
317

  
318
            packages = deps_listing.group(1).split()
319

  
320
        # Download .debs to indirectly to destination_dir by first placing them
321
        # in a temporary subdirectory.
322
        with TemporaryDirectory(dir=destination_dir) as td:
323
            td = Path(td)
324
            cp = apt.get('download', *packages, cwd=td)
325

  
326
            deb_name_regex = re.compile(
327
                r'''
328
                ^
329
                (?P<name>[^_]+)
330
                _
331
                (?P<ver>[^_]+)
332
                _
333
                .+              # architecture (or 'all')
334
                \.deb
335
                $
336
                ''',
337
                re.VERBOSE)
338

  
339
            names_vers = []
340
            downloaded = []
341
            for deb_file in td.glob('*'):
342
                match = deb_name_regex.match(deb_file.name)
343
                if match is None:
344
                    msg = _('apt_download_gave_bad_filename_{}')\
345
                        .format(deb_file.name)
346
                    raise AptError(msg, cp)
347

  
348
                names_vers.append((match.group('name'), match.group('ver')))
349
                downloaded.append(deb_file.name)
350

  
351
            apt.get('source', '--download-only',
352
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
353

  
354
            for source_file in td.glob('*'):
355
                if source_file.name in downloaded:
356
                    continue
357

  
358
                downloaded.append(source_file.name)
359

  
360
            for filename in downloaded:
361
                shutil.move(td / filename, destination_dir / filename)
362

  
363
    return downloaded
tests/source-package-example-apt
1
Subproject commit 9294298e538fd5c9ef647270e6a48df85b4950f8
tests/test_local_apt.py
1
# SPDX-License-Identifier: CC0-1.0
2

  
3
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
4
#
5
# Available under the terms of Creative Commons Zero v1.0 Universal.
6

  
7
# Enable using with Python 3.7.
8
from __future__ import annotations
9

  
10
import pytest
11
import tempfile
12
import re
13
from pathlib import Path
14
from zipfile import ZipFile
15
#from hashlib import sha256
16

  
17
from hydrilla.builder import local_apt
18

  
19
here = Path(__file__).resolve().parent
20

  
21
@pytest.fixture(autouse=True)
22
def no_requests(monkeypatch):
23
    """Remove requests.sessions.Session.request for all tests."""
24
    monkeypatch.delattr('requests.sessions.Session.request')
25

  
26
@pytest.fixture
27
def mock_cache_dir(monkeypatch):
28
    """Make local_apt.py cache files to a temporary directory."""
29
    with tempfile.TemporaryDirectory() as td:
30
        td_path = Path(td)
31
        monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path)
32
        yield td_path
33

  
34
@pytest.fixture
35
def mock_gnupg_import(monkeypatch, mock_cache_dir):
36
    """Mock gnupg library when imported dynamically."""
37

  
38
    gnupg_mock_dir = mock_cache_dir / 'gnupg_mock'
39
    gnupg_mock_dir.mkdir()
40
    (gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n')
41

  
42
    monkeypatch.syspath_prepend(str(gnupg_mock_dir))
43

  
44
    import gnupg
45

  
46
    keyring_path = mock_cache_dir / 'master_keyring.gpg'
47

  
48
    class MockedImportResult:
49
        """gnupg.ImportResult replacement"""
50
        def __init__(self):
51
            """Initialize MockedImportResult object."""
52
            self.imported = 1
53

  
54
    class MockedGPG:
55
        """GPG replacement that does not really invoke GPG."""
56
        def __init__(self, keyring):
57
            """Verify the keyring path and initialize MockedGPG."""
58
            assert keyring == str(keyring_path)
59

  
60
            self.known_keys = {*keyring_path.read_text().split('\n')} \
61
                if keyring_path.exists() else set()
62

  
63
        def recv_keys(self, keyserver, key):
64
            """Mock key receiving - record requested key as received."""
65
            assert keyserver == local_apt.default_keyserver
66
            assert key not in self.known_keys
67

  
68
            self.known_keys.add(key)
69
            keyring_path.write_text('\n'.join(self.known_keys))
70

  
71
            return MockedImportResult()
72

  
73
        def list_keys(self, keys=None):
74
            """Mock key listing - return a list with dummy items."""
75
            if keys is None:
76
                return ['dummy'] * len(self.known_keys)
77
            else:
78
                return ['dummy' for k in keys if k in self.known_keys]
79

  
80
        def export_keys(self, keys, **kwargs):
81
            """
82
            Mock key export - check that the call has the expected arguments and
83
            return a dummy bytes array.
84
            """
85
            assert kwargs['armor']   == False
86
            assert kwargs['minimal'] == True
87
            assert {*keys} == self.known_keys
88

  
89
            return b'<dummy keys export>'
90

  
91
    monkeypatch.setattr(gnupg, 'GPG', MockedGPG)
92

  
93
class MockedCompletedProcess:
94
    """
95
    Object with some fields similar to those of subprocess.CompletedProcess.
96
    """
97
    def __init__(self, args, returncode, stdout, stderr):
98
        """Initialize MockedCompletedProcess"""
99
        self.args       = args
100
        self.returncode = returncode
101
        self.stdout     = stdout
102
        self.stderr     = stderr
103

  
104
"""
105
Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based
106
system.
107
"""
108
sample_install_stdout = '''\
109
NOTE: This is only a simulation!
110
      apt-get needs root privileges for real execution.
111
      Keep also in mind that locking is deactivated,
112
      so don't depend on the relevance to the real current situation!
113
Reading package lists...
114
Building dependency tree...
115
Reading state information...
116
The following additional packages will be installed:
117
  fonts-mathjax
118
Suggested packages:
119
  fonts-mathjax-extras fonts-stix libjs-mathjax-doc
120
The following NEW packages will be installed:
121
  fonts-mathjax libjs-mathjax
122
0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded.
123
Inst fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
124
Inst libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
125
Conf fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
126
Conf libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
127
'''
128

  
129
class MockedSubprocess:
130
    """subprocess with a run() function that does not spawn a process."""
131
    def __init__(self, update_return_value=0, install_return_value=0,
132
                 download_return_value=0, source_return_value=0):
133
        """Initialize MockedSubprocess object."""
134
        self.update_return_value   = update_return_value
135
        self.install_return_value  = install_return_value
136
        self.download_return_value = download_return_value
137
        self.source_return_value   = source_return_value
138

  
139
    def run(self, command, **kwargs):
140
        """
141
        Instead of running an 'apt-get update' command just touch some file
142
        in apt root to indicate that the call was made. Instead of running an
143
        'apt-get install' command just print a possible output of one. Instead
144
        if running an 'apt-get download' command just write some dummy .deb
145
        files to the appropriate directory.
146
        """
147
        assert kwargs['env'] == {'LANG': 'en_US'}
148
        assert kwargs['capture_output'] == True
149

  
150
        if 'update' in command:
151
            expected_command = ['apt-get', '-c', '<conf_path>', 'update']
152
        elif 'install' in command:
153
            expected_command = ['apt-get', '-c', '<conf_path>', 'install',
154
                                '--yes', '--just-print', 'libjs-mathjax']
155
        elif 'download' in command:
156
            expected_command = ['apt-get', '-c', '<conf_path>', 'download',
157
                                'libjs-mathjax']
158
            if 'fonts-mathjax' in command:
159
                expected_command.insert(-1, 'fonts-mathjax')
160
        elif 'source' in command:
161
            expected_command = ['apt-get', '-c', '<conf_path>', 'source',
162
                                '--download-only', 'libjs-mathjax=2.7.9+dfsg-1']
163
            if 'fonts-mathjax=2.7.9+dfsg-1' in command:
164
                if command[-1] == 'fonts-mathjax=2.7.9+dfsg-1':
165
                    expected_command.append('fonts-mathjax=2.7.9+dfsg-1')
166
                else:
167
                    expected_command.insert(-1, 'fonts-mathjax=2.7.9+dfsg-1')
168
        else:
169
            raise Exception(f'unknown apt command: {" ".join(command)}')
170

  
171
        assert len(expected_command) == len(command)
172

  
173
        for word, expected in zip(command, expected_command):
174
            if expected == '<conf_path>':
175
                conf_path = Path(word)
176
            else:
177
                assert word == expected
178

  
179
        if 'update' in command:
180
            (conf_path.parent / 'update_called').touch()
181

  
182
        if 'download' in command:
183
            destination = Path(kwargs.get('cwd') or Path.cwd())
184

  
185
            (destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb')\
186
                .write_text('dummy libjs-mathjax_2.7.9+dfsg-1_all.deb')
187

  
188
            if 'fonts-mathjax' in command:
189
                (destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb')\
190
                    .write_text('dummy fonts-mathjax_2.7.9+dfsg-1_all.deb')
191

  
192
        if 'source' in command:
193
            destination = Path(kwargs.get('cwd') or Path.cwd())
194
            for filename in [
195
                    'mathjax_2.7.9+dfsg-1.debian.tar.xz',
196
                    'mathjax_2.7.9+dfsg-1.dsc',
197
                    'mathjax_2.7.9+dfsg.orig.tar.xz'
198
            ]:
199
                (destination / filename).write_text(f'dummy {filename}')
200

  
201
        if kwargs.get('text'):
202
            stderr = 'some error output'
203
            stdout = 'some output'
204
            if 'install' in command:
205
                stdout = sample_install_stdout
206
        else:
207
            stdout = b'some bin output'
208
            stderr = b'some bin error output'
209
            if 'install' in command:
210
                stdout = sample_install_stdout.encode()
211

  
212
        return_val = (self.update_return_value   if 'update'   in command else \
213
                      self.install_return_value  if 'install'  in command else \
214
                      self.download_return_value if 'download' in command else \
215
                      self.source_return_value)
216

  
217
        return MockedCompletedProcess(command, return_val, stdout, stderr)
218

  
219
@pytest.fixture
220
def mock_apt_get_update(monkeypatch, request):
221
    """Mock 'apt-get update' command when called through subprocess.run."""
222
    marker = request.node.get_closest_marker('subprocess_run')
223
    run_mock_opts = marker.args[0] if marker else {}
224
    monkeypatch.setattr(local_apt, 'subprocess',
225
                        MockedSubprocess(**run_mock_opts))
226

  
227
class MockedSubprocessRaises:
228
    """subprocess with a run() function that always raises FileNotFoundError."""
229
    def run(self, command, **kwargs):
230
        """
231
        Instead of running an 'apt-get' command we act as if it was missing.
232
        """
233
        raise FileNotFoundError('dummy')
234

  
235
@pytest.fixture
236
def mock_apt_get_update_raises(monkeypatch):
237
    """Mock missing 'apt-get' command when called through subprocess.run."""
238

  
239
    monkeypatch.setattr(local_apt, 'subprocess', MockedSubprocessRaises())
240

  
241
def test_local_apt_contextmanager(mock_cache_dir, mock_apt_get_update,
242
                                  mock_gnupg_import):
243
    """
244
    Verify that the local_apt() function creates a proper apt environment and
245
    that it also properly restores it from cache.
246
    """
247
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
248

  
249
    with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
250
        apt_root = Path(apt.apt_conf).parent.parent
251

  
252
        assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \
253
            b'<dummy keys export>'
254

  
255
        assert (apt_root / 'etc' / 'update_called').exists()
256

  
257
        assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \
258
            'deb-src sth\ndeb sth'
259

  
260
        conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n')
261

  
262
        # check mocked keyring
263
        assert {*local_apt.default_keys} == \
264
            {*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')}
265

  
266
    assert not apt_root.exists()
267

  
268
    expected_conf = {
269
        'Dir':                    str(apt_root),
270
        'Dir::State':             f'{apt_root}/var/lib/apt',
271
        'Dir::State::status':     f'{apt_root}/var/lib/dpkg/status',
272
        'Dir::Etc::SourceList':   f'{apt_root}/etc/apt.sources.list',
273
        'Dir::Etc::SourceParts':  '',
274
        'Dir::Cache':             f'{apt_root}/var/cache/apt',
275
        'pkgCacheGen::Essential': 'none',
276
        'Dir::Etc::Trusted':      f'{apt_root}/etc/trusted.gpg',
277
    }
278

  
279
    conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$')
280
    assert dict([(m.group('key'), m.group('val'))
281
                 for l in conf_lines if l for m in [conf_regex.match(l)]]) == \
282
        expected_conf
283

  
284
    with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf:
285
        # reuse the same APT, its cached zip file should exist now
286
        with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
287
            apt_root = Path(apt.apt_conf).parent.parent
288

  
289
            expected_members = {*apt_root.rglob('*')}
290
            expected_members.remove(apt_root / 'etc' / 'apt.conf')
291
            expected_members.remove(apt_root / 'etc' / 'trusted.gpg')
292

  
293
            names = zf.namelist()
294
            assert len(names) == len(expected_members)
295

  
296
            for name in names:
297
                path = apt_root / name
298
                assert path in expected_members
299
                assert zf.read(name) == \
300
                    (b'' if path.is_dir() else path.read_bytes())
301

  
302
    assert not apt_root.exists()
303

  
304
@pytest.mark.subprocess_run({'update_return_value': 1})
305
def test_local_apt_update_failing(mock_cache_dir, mock_apt_get_update,
306
                                  mock_gnupg_import):
307
    """
308
    Verify that the local_apt() function raises a proper error when
309
    'apt-get update' command returns non-0.
310
    """
311
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
312

  
313
    with pytest.raises(local_apt.AptError) as excinfo:
314
        with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
315
            pass
316

  
317
    assert len(excinfo.value.args) == 1
318

  
319
    assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
320
                    excinfo.value.args[0])
321

  
322
def test_local_apt_missing(mock_cache_dir, mock_apt_get_update_raises,
323
                           mock_gnupg_import):
324
    """
325
    Verify that the local_apt() function raises a proper error when 'apt-get'
326
    command is missing.
327
    """
328
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
329

  
330
    with pytest.raises(local_apt.AptError) as excinfo:
331
        with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
332
            pass
333

  
334
    assert len(excinfo.value.args) == 1
335
    assert isinstance(excinfo.value.args[0], str)
336
    assert '\n' not in excinfo.value.args[0]
337

  
338
def test_local_apt_download(mock_cache_dir, mock_apt_get_update,
339
                            mock_gnupg_import):
340
    """
341
    Verify that download_apt_packages() function properly performs the download
342
    of .debs and sources.
343
    """
344
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
345
    destination = mock_cache_dir / 'destination'
346
    destination.mkdir()
347

  
348
    local_apt.download_apt_packages(sources_list, local_apt.default_keys,
349
                                    ['libjs-mathjax'], destination)
350

  
351
    libjs_mathjax_path = destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb'
352
    fonts_mathjax_path = destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb'
353

  
354
    source_paths = [
355
        destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz',
356
        destination / 'mathjax_2.7.9+dfsg-1.dsc',
357
        destination / 'mathjax_2.7.9+dfsg.orig.tar.xz'
358
    ]
359

  
360
    assert {*destination.glob('*')} == {libjs_mathjax_path, *source_paths}
361

  
362
    local_apt.download_apt_packages(sources_list, local_apt.default_keys,
363
                                    ['libjs-mathjax'], destination,
364
                                    with_deps=True)
365

  
366
    assert {*destination.glob('*')} == \
367
        {libjs_mathjax_path, fonts_mathjax_path, *source_paths}
368

  
369
@pytest.mark.subprocess_run({'install_return_value': 1})
370
def test_local_apt_install_failing(mock_cache_dir, mock_apt_get_update,
371
                                   mock_gnupg_import):
372
    """
373
    Verify that the download_apt_packages() function raises a proper error when
374
    'apt-get install' command returns non-0.
375
    """
376
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
377
    destination = mock_cache_dir / 'destination'
378
    destination.mkdir()
379

  
380
    with pytest.raises(local_apt.AptError) as excinfo:
381
        local_apt.download_apt_packages(sources_list, local_apt.default_keys,
382
                                        ['libjs-mathjax'], destination,
383
                                        with_deps=True)
384

  
385
    assert len(excinfo.value.args) == 1
386

  
387
    assert re.match(r'^.*\n\n.*\n\n', excinfo.value.args[0])
388
    assert re.search(r'\n\nsome error output$', excinfo.value.args[0])
389
    assert sample_install_stdout in excinfo.value.args[0]
390

  
391
    assert [*destination.glob('*')] == []
392

  
393
@pytest.mark.subprocess_run({'download_return_value': 1})
394
def test_local_apt_download_failing(mock_cache_dir, mock_apt_get_update,
395
                                    mock_gnupg_import):
396
    """
397
    Verify that the download_apt_packages() function raises a proper error when
398
    'apt-get download' command returns non-0.
399
    """
400
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
401
    destination = mock_cache_dir / 'destination'
402
    destination.mkdir()
403

  
404
    with pytest.raises(local_apt.AptError) as excinfo:
405
        local_apt.download_apt_packages(sources_list, local_apt.default_keys,
406
                                        ['libjs-mathjax'], destination)
407

  
408
    assert len(excinfo.value.args) == 1
409

  
410
    assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
411
                    excinfo.value.args[0])
412

  
413
    assert [*destination.glob('*')] == []
414

  
415
@pytest.mark.subprocess_run({'source_return_value': 1})
416
def test_local_apt_source_failing(mock_cache_dir, mock_apt_get_update,
417
                                  mock_gnupg_import):
418
    """
419
    Verify that the download_apt_packages() function raises a proper error when
420
    'apt-get source' command returns non-0.
421
    """
422
    sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
423
    destination = mock_cache_dir / 'destination'
424
    destination.mkdir()
425

  
426
    with pytest.raises(local_apt.AptError) as excinfo:
427
        local_apt.download_apt_packages(sources_list, local_apt.default_keys,
428
                                        ['libjs-mathjax'], destination)
429

  
430
    assert len(excinfo.value.args) == 1
431

  
432
    assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
433
                    excinfo.value.args[0])
434

  
435
    assert [*destination.glob('*')] == []
436

  
437
def test_sources_list():
438
    list = local_apt.SourcesList([], 'nabia')
439
    assert list.identity() == 'nabia'
440

  
441
    with pytest.raises(local_apt.DistroError):
442
        local_apt.SourcesList([], 'nabiaƂ')
443

  
444
    list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia')
445
    assert list.identity() == \
446
        'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1'

Also available in: Unified diff