Project

General

Profile

« Previous | Next » 

Revision 61f0aa75

Added by koszko over 1 year ago

support piggybacking on APT packages

View differences:

.gitmodules
4 4
#
5 5
# Available under the terms of Creative Commons Zero v1.0 Universal.
6 6

  
7
[submodule "src/hydrilla/schemas"]
7
[submodule "hydrilla-json-schemas"]
8 8
	path = src/hydrilla/schemas
9 9
	url = ../hydrilla-json-schemas
10
[submodule "src/test/source-package-example"]
10
[submodule "hydrilla-source-package-example"]
11 11
	path = tests/source-package-example
12 12
	url = ../hydrilla-source-package-example
conftest.py
7 7
import sys
8 8
from pathlib import Path
9 9

  
10
import pytest
11

  
10 12
here = Path(__file__).resolve().parent
11 13
sys.path.insert(0, str(here / 'src'))
14

  
15
@pytest.fixture(autouse=True)
16
def no_requests(monkeypatch):
17
    """Remove requests.sessions.Session.request for all tests."""
18
    monkeypatch.delattr('requests.sessions.Session.request')
19

  
20
@pytest.fixture
21
def mock_subprocess_run(monkeypatch, request):
22
    """
23
    Temporarily replace subprocess.run() with a function supplied through pytest
24
    marker 'subprocess_run'.
25

  
26
    The marker excepts 2 arguments:
27
    * the module inside which the subprocess attribute should be mocked and
28
    * a run() function to use.
29
    """
30
    where, mocked_run = request.node.get_closest_marker('subprocess_run').args
31

  
32
    class MockedSubprocess:
33
        """Minimal mocked version of the subprocess module."""
34
        run = mocked_run
35

  
36
    monkeypatch.setattr(where, 'subprocess', MockedSubprocess)
pyproject.toml
13 13

  
14 14
[tool.pytest.ini_options]
15 15
minversion = "6.0"
16
addopts = "-ra -q"
16
addopts = "-ra"
17 17
testpaths = [
18 18
    "tests"
19 19
]
20
markers = [
21
    "subprocess_run: define how mocked subprocess.run should behave"
22
]
src/hydrilla/builder/build.py
30 30
import json
31 31
import re
32 32
import zipfile
33
from pathlib import Path
33
import subprocess
34
from pathlib import Path, PurePosixPath
34 35
from hashlib import sha256
35 36
from sys import stderr
37
from contextlib import contextmanager
38
from tempfile import TemporaryDirectory, TemporaryFile
39
from typing import Optional, Iterable, Union
36 40

  
37 41
import jsonschema
38 42
import click
39 43

  
40 44
from .. import util
41 45
from . import _version
46
from . import local_apt
47
from .piggybacking import Piggybacked
48
from .common_errors import *
42 49

  
43 50
here = Path(__file__).resolve().parent
44 51

  
45 52
_ = util.translation(here / 'locales').gettext
46 53

  
47
index_validator = util.validator_for('package_source-1.0.1.schema.json')
54
index_validator = util.validator_for('package_source-2.schema.json')
48 55

  
49 56
schemas_root = 'https://hydrilla.koszko.org/schemas'
50 57

  
......
53 60
    'version': _version.version
54 61
}
55 62

  
56
class FileReferenceError(Exception):
57
    """
58
    Exception used to report various problems concerning files referenced from
59
    source package's index.json.
60
    """
61

  
62
class ReuseError(Exception):
63
class ReuseError(SubprocessError):
63 64
    """
64 65
    Exception used to report various problems when calling the REUSE tool.
65 66
    """
66 67

  
67
class FileBuffer:
68
    """
69
    Implement a file-like object that buffers data written to it.
70
    """
71
    def __init__(self):
72
        """
73
        Initialize FileBuffer.
74
        """
75
        self.chunks = []
76

  
77
    def write(self, b):
78
        """
79
        Buffer 'b', return number of bytes buffered.
80

  
81
        'b' is expected to be an instance of 'bytes' or 'str', in which case it
82
        gets encoded as UTF-8.
83
        """
84
        if type(b) is str:
85
            b = b.encode()
86
        self.chunks.append(b)
87
        return len(b)
88

  
89
    def flush(self):
90
        """
91
        A no-op mock of file-like object's flush() method.
92
        """
93
        pass
94

  
95
    def get_bytes(self):
96
        """
97
        Return all data written so far concatenated into a single 'bytes'
98
        object.
99
        """
100
        return b''.join(self.chunks)
101

  
102
def generate_spdx_report(root):
68
def generate_spdx_report(root: Path) -> bytes:
103 69
    """
104 70
    Use REUSE tool to generate an SPDX report for sources under 'root' and
105 71
    return the report's contents as 'bytes'.
106 72

  
107
    'root' shall be an instance of pathlib.Path.
108

  
109 73
    In case the directory tree under 'root' does not constitute a
110
    REUSE-compliant package, linting report is printed to standard output and
111
    an exception is raised.
74
    REUSE-compliant package, as exception is raised with linting report
75
    included in it.
112 76

  
113
    In case the reuse package is not installed, an exception is also raised.
77
    In case the reuse tool is not installed, an exception is also raised.
114 78
    """
115
    try:
116
        from reuse._main import main as reuse_main
117
    except ModuleNotFoundError:
118
        raise ReuseError(_('couldnt_import_reuse_is_it_installed'))
79
    for command in [
80
            ['reuse', '--root', str(root), 'lint'],
81
            ['reuse', '--root', str(root), 'spdx']
82
    ]:
83
        try:
84
            cp = subprocess.run(command, capture_output=True, text=True)
85
        except FileNotFoundError:
86
            raise ReuseError(_('couldnt_execute_reuse_is_it_installed'))
119 87

  
120
    mocked_output = FileBuffer()
121
    if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
122
        stderr.write(mocked_output.get_bytes().decode())
123
        raise ReuseError(_('spdx_report_from_reuse_incompliant'))
88
        if cp.returncode != 0:
89
            msg = _('reuse_command_{}_failed').format(' '.join(command))
90
            raise ReuseError(msg, cp)
124 91

  
125
    mocked_output = FileBuffer()
126
    if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
127
        stderr.write(mocked_output.get_bytes().decode())
128
        raise ReuseError("Couldn't generate an SPDX report for package.")
129

  
130
    return mocked_output.get_bytes()
92
    return cp.stdout.encode()
131 93

  
132 94
class FileRef:
133 95
    """Represent reference to a file in the package."""
134
    def __init__(self, path: Path, contents: bytes):
96
    def __init__(self, path: PurePosixPath, contents: bytes) -> None:
135 97
        """Initialize FileRef."""
136
        self.include_in_distribution = False
137
        self.include_in_zipfile      = True
138
        self.path                    = path
139
        self.contents                = contents
98
        self.include_in_distribution   = False
99
        self.include_in_source_archive = True
100
        self.path                      = path
101
        self.contents                  = contents
140 102

  
141 103
        self.contents_hash = sha256(contents).digest().hex()
142 104

  
143
    def make_ref_dict(self, filename: str):
105
    def make_ref_dict(self) -> dict[str, str]:
144 106
        """
145 107
        Represent the file reference through a dict that can be included in JSON
146 108
        defintions.
147 109
        """
148 110
        return {
149
            'file':   filename,
111
            'file':   str(self.path),
150 112
            'sha256': self.contents_hash
151 113
        }
152 114

  
115
@contextmanager
116
def piggybacked_system(piggyback_def: Optional[dict],
117
                       piggyback_files: Optional[Path]) \
118
                       -> Iterable[Piggybacked]:
119
    """
120
    Resolve resources from a foreign software packaging system. Optionally, use
121
    package files (.deb's, etc.) from a specified directory instead of resolving
122
    and downloading them.
123
    """
124
    if piggyback_def is None:
125
        yield Piggybacked()
126
    else:
127
        # apt is the only supported system right now
128
        assert piggyback_def['system'] == 'apt'
129

  
130
        with local_apt.piggybacked_system(piggyback_def, piggyback_files) \
131
             as piggybacked:
132
            yield piggybacked
133

  
153 134
class Build:
154 135
    """
155 136
    Build a Hydrilla package.
156 137
    """
157
    def __init__(self, srcdir, index_json_path):
138
    def __init__(self, srcdir: Path, index_json_path: Path,
139
                 piggyback_files: Optional[Path]=None):
158 140
        """
159 141
        Initialize a build. All files to be included in a distribution package
160 142
        are loaded into memory, all data gets validated and all necessary
161 143
        computations (e.g. preparing of hashes) are performed.
162

  
163
        'srcdir' and 'index_json' are expected to be pathlib.Path objects.
164 144
        """
165 145
        self.srcdir          = srcdir.resolve()
166
        self.index_json_path = index_json_path
146
        self.piggyback_files = piggyback_files
147
        # TODO: the piggyback files we set are ignored for now; use them
148
        if piggyback_files is None:
149
            piggyback_default_path = \
150
                srcdir.parent / f'{srcdir.name}.foreign-packages'
151
            if piggyback_default_path.exists():
152
                self.piggyback_files = piggyback_default_path
167 153
        self.files_by_path   = {}
168 154
        self.resource_list   = []
169 155
        self.mapping_list    = []
170 156

  
171 157
        if not index_json_path.is_absolute():
172
            self.index_json_path = (self.srcdir / self.index_json_path)
173

  
174
        self.index_json_path = self.index_json_path.resolve()
158
            index_json_path = (self.srcdir / index_json_path)
175 159

  
176
        with open(self.index_json_path, 'rt') as index_file:
160
        with open(index_json_path, 'rt') as index_file:
177 161
            index_json_text = index_file.read()
178 162

  
179 163
        index_obj = json.loads(util.strip_json_comments(index_json_text))
180 164

  
181
        self.files_by_path[self.srcdir / 'index.json'] = \
182
            FileRef(self.srcdir / 'index.json', index_json_text.encode())
165
        index_desired_path = PurePosixPath('index.json')
166
        self.files_by_path[index_desired_path] = \
167
            FileRef(index_desired_path, index_json_text.encode())
183 168

  
184 169
        self._process_index_json(index_obj)
185 170

  
186
    def _process_file(self, filename: str, include_in_distribution: bool=True):
171
    def _process_file(self, filename: Union[str, PurePosixPath],
172
                      piggybacked: Piggybacked,
173
                      include_in_distribution: bool=True):
187 174
        """
188 175
        Resolve 'filename' relative to srcdir, load it to memory (if not loaded
189 176
        before), compute its hash and store its information in
190 177
        'self.files_by_path'.
191 178

  
192
        'filename' shall represent a relative path using '/' as a separator.
179
        'filename' shall represent a relative path withing package directory.
193 180

  
194 181
        if 'include_in_distribution' is True it shall cause the file to not only
195 182
        be included in the source package's zipfile, but also written as one of
196 183
        built package's files.
197 184

  
185
        For each file an attempt is made to resolve it using 'piggybacked'
186
        object. If a file is found and pulled from foreign software packaging
187
        system this way, it gets automatically excluded from inclusion in
188
        Hydrilla source package's zipfile.
189

  
198 190
        Return file's reference object that can be included in JSON defintions
199 191
        of various kinds.
200 192
        """
201
        path = self.srcdir
202
        for segment in filename.split('/'):
203
            path /= segment
204

  
205
        path = path.resolve()
206
        if not path.is_relative_to(self.srcdir):
207
            raise FileReferenceError(_('loading_{}_outside_package_dir')
208
                                     .format(filename))
209

  
210
        if str(path.relative_to(self.srcdir)) == 'index.json':
211
            raise FileReferenceError(_('loading_reserved_index_json'))
193
        include_in_source_archive = True
194

  
195
        desired_path = PurePosixPath(filename)
196
        if '..' in desired_path.parts:
197
            msg = _('path_contains_double_dot_{}').format(filename)
198
            raise FileReferenceError(msg)
199

  
200
        path = piggybacked.resolve_file(desired_path)
201
        if path is None:
202
            path = (self.srcdir / desired_path).resolve()
203
            if not path.is_relative_to(self.srcdir):
204
                raise FileReferenceError(_('loading_{}_outside_package_dir')
205
                                         .format(filename))
206

  
207
            if str(path.relative_to(self.srcdir)) == 'index.json':
208
                raise FileReferenceError(_('loading_reserved_index_json'))
209
        else:
210
            include_in_source_archive = False
212 211

  
213
        file_ref = self.files_by_path.get(path)
212
        file_ref = self.files_by_path.get(desired_path)
214 213
        if file_ref is None:
215 214
            with open(path, 'rb') as file_handle:
216 215
                contents = file_handle.read()
217 216

  
218
            file_ref = FileRef(path, contents)
219
            self.files_by_path[path] = file_ref
217
            file_ref = FileRef(desired_path, contents)
218
            self.files_by_path[desired_path] = file_ref
220 219

  
221 220
        if include_in_distribution:
222 221
            file_ref.include_in_distribution = True
223 222

  
224
        return file_ref.make_ref_dict(filename)
223
        if not include_in_source_archive:
224
            file_ref.include_in_source_archive = False
225

  
226
        return file_ref.make_ref_dict()
225 227

  
226
    def _prepare_source_package_zip(self, root_dir_name: str):
228
    def _prepare_source_package_zip(self, source_name: str,
229
                                    piggybacked: Piggybacked) -> str:
227 230
        """
228 231
        Create and store in memory a .zip archive containing files needed to
229 232
        build this source package.
230 233

  
231
        'root_dir_name' shall not contain any slashes ('/').
234
        'src_dir_name' shall not contain any slashes ('/').
232 235

  
233 236
        Return zipfile's sha256 sum's hexstring.
234 237
        """
235
        fb = FileBuffer()
236
        root_dir_path = Path(root_dir_name)
238
        tf = TemporaryFile()
239
        source_dir_path      = PurePosixPath(source_name)
240
        piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages')
237 241

  
238
        def zippath(file_path):
239
            file_path = root_dir_path / file_path.relative_to(self.srcdir)
240
            return file_path.as_posix()
241

  
242
        with zipfile.ZipFile(fb, 'w') as xpi:
242
        with zipfile.ZipFile(tf, 'w') as zf:
243 243
            for file_ref in self.files_by_path.values():
244
                if file_ref.include_in_zipfile:
245
                    xpi.writestr(zippath(file_ref.path), file_ref.contents)
244
                if file_ref.include_in_source_archive:
245
                    zf.writestr(str(source_dir_path / file_ref.path),
246
                                file_ref.contents)
247

  
248
            for desired_path, real_path in piggybacked.archive_files():
249
                zf.writestr(str(piggybacked_dir_path / desired_path),
250
                            real_path.read_bytes())
246 251

  
247
        self.source_zip_contents = fb.get_bytes()
252
        tf.seek(0)
253
        self.source_zip_contents = tf.read()
248 254

  
249 255
        return sha256(self.source_zip_contents).digest().hex()
250 256

  
251
    def _process_item(self, item_def: dict):
257
    def _process_item(self, item_def: dict, piggybacked: Piggybacked):
252 258
        """
253 259
        Process 'item_def' as definition of a resource/mapping and store in
254 260
        memory its processed form and files used by it.
......
266 272

  
267 273
            copy_props.append('revision')
268 274

  
269
            script_file_refs = [self._process_file(f['file'])
275
            script_file_refs = [self._process_file(f['file'], piggybacked)
270 276
                                for f in item_def.get('scripts', [])]
271 277

  
272 278
            deps = [{'identifier': res_ref['identifier']}
273 279
                    for res_ref in item_def.get('dependencies', [])]
274 280

  
275 281
            new_item_obj = {
276
                'dependencies': deps,
282
                'dependencies': [*piggybacked.package_must_depend, *deps],
277 283
                'scripts':      script_file_refs
278 284
            }
279 285
        else:
......
308 314
        in it.
309 315
        """
310 316
        index_validator.validate(index_obj)
317
        match = re.match(r'.*-((([1-9][0-9]*|0)\.)+)schema\.json$',
318
                         index_obj['$schema'])
319
        self.source_schema_ver = \
320
            [int(n) for n in filter(None, match.group(1).split('.'))]
311 321

  
312
        schema = f'{schemas_root}/api_source_description-1.schema.json'
322
        out_schema = f'{schemas_root}/api_source_description-1.schema.json'
313 323

  
314 324
        self.source_name = index_obj['source_name']
315 325

  
316 326
        generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
317 327
        if generate_spdx:
318 328
            contents  = generate_spdx_report(self.srcdir)
319
            spdx_path = (self.srcdir / 'report.spdx').resolve()
329
            spdx_path = PurePosixPath('report.spdx')
320 330
            spdx_ref  = FileRef(spdx_path, contents)
321 331

  
322
            spdx_ref.include_in_zipfile = False
332
            spdx_ref.include_in_source_archive = False
323 333
            self.files_by_path[spdx_path] = spdx_ref
324 334

  
325
        self.copyright_file_refs = \
326
            [self._process_file(f['file']) for f in index_obj['copyright']]
335
        piggyback_def = None
336
        if self.source_schema_ver >= [1, 1] and 'piggyback_on' in index_obj:
337
            piggyback_def = index_obj['piggyback_on']
327 338

  
328
        if generate_spdx and not spdx_ref.include_in_distribution:
329
            raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
339
        with piggybacked_system(piggyback_def, self.piggyback_files) \
340
             as piggybacked:
341
            copyright_to_process = [
342
                *(file_ref['file'] for file_ref in index_obj['copyright']),
343
                *piggybacked.package_license_files
344
            ]
345
            self.copyright_file_refs = [self._process_file(f, piggybacked)
346
                                        for f in copyright_to_process]
330 347

  
331
        item_refs = [self._process_item(d) for d in index_obj['definitions']]
348
            if generate_spdx and not spdx_ref.include_in_distribution:
349
                raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
332 350

  
333
        for file_ref in index_obj.get('additional_files', []):
334
            self._process_file(file_ref['file'], include_in_distribution=False)
351
            item_refs = [self._process_item(d, piggybacked)
352
                         for d in index_obj['definitions']]
335 353

  
336
        root_dir_path = Path(self.source_name)
354
            for file_ref in index_obj.get('additional_files', []):
355
                self._process_file(file_ref['file'], piggybacked,
356
                                   include_in_distribution=False)
337 357

  
338
        source_archives_obj = {
339
            'zip' : {
340
                'sha256': self._prepare_source_package_zip(root_dir_path)
341
            }
342
        }
358
            zipfile_sha256 = self._prepare_source_package_zip\
359
                (self.source_name, piggybacked)
360

  
361
            source_archives_obj = {'zip' : {'sha256': zipfile_sha256}}
343 362

  
344 363
        self.source_description = {
345
            '$schema':            schema,
364
            '$schema':            out_schema,
346 365
            'source_name':        self.source_name,
347 366
            'source_copyright':   self.copyright_file_refs,
348 367
            'upstream_url':       index_obj['upstream_url'],
......
398 417

  
399 418
dir_type = click.Path(exists=True, file_okay=False, resolve_path=True)
400 419

  
420
@click.command(help=_('build_package_from_srcdir_to_dstdir'))
401 421
@click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True,
402 422
              help=_('source_directory_to_build_from'))
403 423
@click.option('-i', '--index-json', default='index.json', type=click.Path(),
404 424
              help=_('path_instead_of_index_json'))
425
@click.option('-p', '--piggyback-files', type=click.Path(),
426
              help=_('path_instead_for_piggyback_files'))
405 427
@click.option('-d', '--dstdir', type=dir_type, required=True,
406 428
              help=_('built_package_files_destination'))
407 429
@click.version_option(version=_version.version, prog_name='Hydrilla builder',
408 430
                      message=_('%(prog)s_%(version)s_license'),
409 431
                      help=_('version_printing'))
410
def perform(srcdir, index_json, dstdir):
411
    """<this will be replaced by a localized docstring for Click to pick up>"""
412
    build = Build(Path(srcdir), Path(index_json))
413
    build.write_package_files(Path(dstdir))
414

  
415
perform.__doc__ = _('build_package_from_srcdir_to_dstdir')
432
def perform(srcdir, index_json, piggyback_files, dstdir):
433
    """
434
    Execute Hydrilla builder to turn source package into a distributable one.
416 435

  
417
perform = click.command()(perform)
436
    This command is meant to be the entry point of hydrilla-builder command
437
    exported by this package.
438
    """
439
    build = Build(Path(srcdir), Path(index_json),
440
                  piggyback_files and Path(piggyback_files))
441
    build.write_package_files(Path(dstdir))
src/hydrilla/builder/common_errors.py
1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

  
3
# Error classes.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

  
27
"""
28
This module defines error types for use in other parts of Hydrilla builder.
29
"""
30

  
31
# Enable using with Python 3.7.
32
from __future__ import annotations
33

  
34
from pathlib import Path
35

  
36
from .. import util
37

  
38
here = Path(__file__).resolve().parent
39

  
40
_ = util.translation(here / 'locales').gettext
41

  
42
class DistroError(Exception):
43
    """
44
    Exception used to report problems when resolving an OS distribution.
45
    """
46

  
47
class FileReferenceError(Exception):
48
    """
49
    Exception used to report various problems concerning files referenced from
50
    source package.
51
    """
52

  
53
class SubprocessError(Exception):
54
    """
55
    Exception used to report problems related to execution of external
56
    processes, includes. various problems when calling apt-* and dpkg-*
57
    commands.
58
    """
59
    def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
60
        """Initialize this SubprocessError"""
61
        if cp and cp.stdout:
62
            msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
63

  
64
        if cp and cp.stderr:
65
            msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
66

  
67
        super().__init__(msg)
src/hydrilla/builder/local_apt.py
1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

  
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

  
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

  
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path, PurePosixPath
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from contextlib import contextmanager
39
from typing import Optional, Iterable
40

  
41
from .. import util
42
from .piggybacking import Piggybacked
43
from .common_errors import *
44

  
45
here = Path(__file__).resolve().parent
46

  
47
_ = util.translation(here / 'locales').gettext
48

  
49
"""
50
Default cache directory to save APT configurations and downloaded GPG keys in.
51
"""
52
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
53

  
54
"""
55
Default keyserver to use.
56
"""
57
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
58

  
59
"""
60
Default keys to download when using a local APT.
61
"""
62
default_keys = [
63
    # Trisquel
64
    'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
65
    '60364C9869F92450421F0C22B138CA450C05112F',
66
    # Ubuntu
67
    '630239CC130E1A7FD81A27B140976EAF437D05B5',
68
    '790BC7277767219C42C86F933B4FE6ACC0B21F32',
69
    'F6ECB3762474EDA9D21B7022871920D1991BC93C',
70
    # Debian
71
    '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
72
    '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
73
    'AC530D520F2F3269F5E98313A48449044AAD5C5D'
74
]
75

  
76
"""sources.list file contents for known distros."""
77
default_lists = {
78
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
79
              for type in ('deb', 'deb-src')
80
              for suf in ('', '-updates', '-security')]
81
}
82

  
83
class GpgError(Exception):
84
    """
85
    Exception used to report various problems when calling GPG.
86
    """
87

  
88
class AptError(SubprocessError):
89
    """
90
    Exception used to report various problems when calling apt-* and dpkg-*
91
    commands.
92
    """
93

  
94
def run(command, **kwargs):
95
    """A wrapped around subprocess.run that sets some default options."""
96
    return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
97
                          capture_output=True, text=True)
98

  
99
class Apt:
100
    """
101
    This class represents an APT instance and can be used to call apt-get
102
    commands with it.
103
    """
104
    def __init__(self, apt_conf: str) -> None:
105
        """Initialize this Apt object."""
106
        self.apt_conf = apt_conf
107

  
108
    def get(self, *args: str, **kwargs) -> CP:
109
        """
110
        Run apt-get with the specified arguments and raise a meaningful AptError
111
        when something goes wrong.
112
        """
113
        command = ['apt-get', '-c', self.apt_conf, *args]
114
        try:
115
            cp = run(command, **kwargs)
116
        except FileNotFoundError:
117
            raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
118

  
119
        if cp.returncode != 0:
120
            msg = _('apt_get_command_{}_failed').format(' '.join(command))
121
            raise AptError(msg, cp)
122

  
123
        return cp
124

  
125
def cache_dir() -> Path:
126
    """
127
    Return the directory used to cache data (APT configurations, keyrings) to
128
    speed up repeated operations.
129

  
130
    This function first ensures the directory exists.
131
    """
132
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
133
    return default_apt_cache_dir
134

  
135
class SourcesList:
136
    """Representation of apt's sources.list contents."""
137
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
138
        """Initialize this SourcesList."""
139
        self.codename = None
140
        self.list = [*list]
141
        self.has_extra_entries = bool(self.list)
142

  
143
        if codename is not None:
144
            if codename not in default_lists:
145
                raise DistroError(_('distro_{}_unknown').format(codename))
146

  
147
            self.codename = codename
148
            self.list.extend(default_lists[codename])
149

  
150
    def identity(self) -> str:
151
        """
152
        Produce a string that uniquely identifies this sources.list contents.
153
        """
154
        if self.codename and not self.has_extra_entries:
155
            return self.codename
156

  
157
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
158

  
159
def apt_conf(directory: Path) -> str:
160
    """
161
    Given local APT's directory, produce a configuration suitable for running
162
    APT there.
163

  
164
    'directory' must not contain any special characters including quotes and
165
    spaces.
166
    """
167
    return f'''
168
Dir "{directory}";
169
Dir::State "{directory}/var/lib/apt";
170
Dir::State::status "{directory}/var/lib/dpkg/status";
171
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
172
Dir::Etc::SourceParts "";
173
Dir::Cache "{directory}/var/cache/apt";
174
pkgCacheGen::Essential "none";
175
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
176
'''
177

  
178
def apt_keyring(keys: [str]) -> bytes:
179
    """
180
    Download the requested keys if necessary and export them as a keyring
181
    suitable for passing to APT.
182

  
183
    The keyring is returned as a bytes value that should be written to a file.
184
    """
185
    try:
186
        from gnupg import GPG
187
    except ModuleNotFoundError:
188
        raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
189

  
190
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
191
    for key in keys:
192
        if gpg.list_keys(keys=[key]) != []:
193
            continue
194

  
195
        if gpg.recv_keys(default_keyserver, key).imported == 0:
196
            raise GpgError(_('gpg_couldnt_recv_key'))
197

  
198
    return gpg.export_keys(keys, armor=False, minimal=True)
199

  
200
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
201
    """
202
    Zip an APT root directory for later use and move the zipfile to the
203
    requested destination.
204
    """
205
    temporary_zip_path = None
206
    try:
207
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
208
                                     dir=cache_dir(), delete=False)
209
        temporary_zip_path = Path(tmpfile.name)
210

  
211
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
212

  
213
        with zipfile.ZipFile(tmpfile, 'w') as zf:
214
            for member in apt_root.rglob('*'):
215
                relative = member.relative_to(apt_root)
216
                if relative not in to_skip:
217
                    # This call will also properly add empty folders to zip file
218
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
219

  
220
        shutil.move(temporary_zip_path, destination_zip)
221
    finally:
222
        if temporary_zip_path is not None and temporary_zip_path.exists():
223
            temporary_zip_path.unlink()
224

  
225
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
226
    """
227
    Create files and directories necessary for running APT without root rights
228
    inside 'directory'.
229

  
230
    'directory' must not contain any special characters including quotes and
231
    spaces and must be empty.
232

  
233
    Return an Apt object that can be used to call apt-get commands.
234
    """
235
    apt_root = directory / 'apt_root'
236

  
237
    conf_text     = apt_conf(apt_root)
238
    keyring_bytes = apt_keyring(keys)
239

  
240
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
241
    if apt_zipfile.exists():
242
        with zipfile.ZipFile(apt_zipfile) as zf:
243
            zf.extractall(apt_root)
244

  
245
    for to_create in (
246
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
247
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
248
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
249
            apt_root / 'etc' / 'apt' / 'preferences.d',
250
            apt_root / 'var' / 'lib' / 'dpkg',
251
            apt_root / 'var' / 'log' / 'apt'
252
    ):
253
        to_create.mkdir(parents=True, exist_ok=True)
254

  
255
    conf_path    = apt_root / 'etc' / 'apt.conf'
256
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
257
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
258
    list_path    = apt_root / 'etc' / 'apt.sources.list'
259

  
260
    conf_path.write_text(conf_text)
261
    trusted_path.write_bytes(keyring_bytes)
262
    status_path.touch()
263
    list_path.write_text('\n'.join(list.list))
264

  
265
    apt = Apt(str(conf_path))
266
    apt.get('update')
267

  
268
    cache_apt_root(apt_root, apt_zipfile)
269

  
270
    return apt
271

  
272
@contextmanager
273
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
274
    """
275
    Create a temporary directory with proper local APT configuration in it.
276
    Yield an Apt object that can be used to issue apt-get commands.
277

  
278
    This function returns a context manager that will remove the directory on
279
    close.
280
    """
281
    with TemporaryDirectory() as td:
282
        td = Path(td)
283
        yield setup_local_apt(td, list, keys)
284

  
285
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
286
                          destination_dir: Path, with_deps=False) -> [str]:
287
    """
288
    Set up a local APT, update it using the specified sources.list configuration
289
    and use it to download the specified packages.
290

  
291
    This function downloads a .deb file of the packages matching the current
292
    architecture (which includes packages with architecture 'all') as well as
293
    all theis corresponding source package files and (if requested) the debs
294
    and source files of all their declared dependencies.
295

  
296
    Return value is a list of names of all downloaded files.
297
    """
298
    with local_apt(list, keys) as apt:
299
        if with_deps:
300
            cp = apt.get('install', '--yes', '--just-print', *packages)
301

  
302
            deps_listing = re.match(
303
                r'''
304
                .*
305
                The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
306
                (.*)
307
                0\supgraded,
308
                ''',
309
                cp.stdout,
310
                re.MULTILINE | re.DOTALL | re.VERBOSE)
311

  
312
            if deps_listing is None:
313
                raise AptError(_('apt_install_output_not_understood'), cp)
314

  
315
            packages = deps_listing.group(1).split()
316

  
317
        # Download .debs to indirectly to destination_dir by first placing them
318
        # in a temporary subdirectory.
319
        with TemporaryDirectory(dir=destination_dir) as td:
320
            td = Path(td)
321
            cp = apt.get('download', *packages, cwd=td)
322

  
323
            deb_name_regex = re.compile(
324
                r'''
325
                ^
326
                (?P<name>[^_]+)
327
                _
328
                (?P<ver>[^_]+)
329
                _
330
                .+              # architecture (or 'all')
331
                \.deb
332
                $
333
                ''',
334
                re.VERBOSE)
335

  
336
            names_vers = []
337
            downloaded = []
338
            for deb_file in td.iterdir():
339
                match = deb_name_regex.match(deb_file.name)
340
                if match is None:
341
                    msg = _('apt_download_gave_bad_filename_{}')\
342
                        .format(deb_file.name)
343
                    raise AptError(msg, cp)
344

  
345
                names_vers.append((match.group('name'), match.group('ver')))
346
                downloaded.append(deb_file.name)
347

  
348
            apt.get('source', '--download-only',
349
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
350

  
351
            for source_file in td.iterdir():
352
                if source_file.name in downloaded:
353
                    continue
354

  
355
                downloaded.append(source_file.name)
356

  
357
            for filename in downloaded:
358
                shutil.move(td / filename, destination_dir / filename)
359

  
360
    return downloaded
361

  
362
@contextmanager
363
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
364
    -> Iterable[Piggybacked]:
365
    """
366
    Resolve resources from APT. Optionally, use package files (.deb's, etc.)
367
    from a specified directory instead of resolving and downloading them.
368

  
369
    The directories and files created for the yielded Piggybacked object shall
370
    be deleted when this context manager gets closed.
371
    """
372
    assert piggyback_def['system'] == 'apt'
373

  
374
    with TemporaryDirectory() as td:
375
        td       = Path(td)
376
        root     = td / 'root'
377
        root.mkdir()
378

  
379
        if foreign_packages is None:
380
            archives = td / 'archives'
381
            archives.mkdir()
382

  
383
            sources_list = SourcesList(piggyback_def.get('sources_list', []),
384
                                       piggyback_def.get('distribution'))
385
            packages = piggyback_def['packages']
386
            with_deps = piggyback_def['dependencies']
387
            pgp_keys = [
388
                *default_keys,
389
                *piggyback_def.get('trusted_keys', [])
390
            ]
391

  
392
            download_apt_packages(
393
                list=sources_list,
394
                keys=pgp_keys,
395
                packages=packages,
396
                destination_dir=archives,
397
                with_deps=with_deps
398
            )
399
        else:
400
            archives = foreign_packages / 'apt'
401

  
402
        for deb in archives.glob('*.deb'):
403
            command = ['dpkg-deb', '-x', str(deb), str(root)]
404
            try:
405
                cp = run(command)
406
            except FileNotFoundError:
407
                raise AptError(_('couldnt_execute_dpkg_deb_is_it_installed'))
408

  
409
            if cp.returncode != 0:
410
                msg = _('dpkg_deb_command_{}_failed').format(' '.join(command))
411
                raise AptError(msg, cp)
412

  
413
        docs_dir = root / 'usr' / 'share' / 'doc'
414
        copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
415
                    if docs_dir.exists() else []
416
        copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
417
                           for p in copyright_paths if p.exists()]
418

  
419
        standard_depends = piggyback_def.get('depend_on_base_packages', True)
420
        must_depend = [{'identifier': 'apt-common-licenses'}] \
421
            if standard_depends else []
422

  
423
        yield Piggybacked(
424
            archives={'apt': archives},
425
            roots={'.apt-root': root},
426
            package_license_files=copyright_paths,
427
            package_must_depend=must_depend
428
        )
src/hydrilla/builder/piggybacking.py
1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

  
3
# Handling of software packaged for other distribution systems.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

  
27
"""
28
This module contains definitions that may be reused by multiple piggybacked
29
software system backends.
30
"""
31

  
32
# Enable using with Python 3.7.
33
from __future__ import annotations
34

  
35
from pathlib import Path, PurePosixPath
36
from typing import Optional, Iterable
37

  
38
from .. import util
39
from .common_errors import *
40

  
41
here = Path(__file__).resolve().parent
42

  
43
_ = util.translation(here / 'locales').gettext
44

  
45
class Piggybacked:
46
    """
47
    Store information about foreign resources in use.
48

  
49
    Public attributes:
50
        'package_must_depend' (read-only)
51
        'package_license_files' (read-only)
52
    """
53
    def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={},
54
                 package_license_files: list[PurePosixPath]=[],
55
                 package_must_depend: list[dict]=[]):
56
        """
57
        Initialize this Piggybacked object.
58

  
59
        'archives' maps piggybacked system names to directories that contain
60
        package(s)' archive files. An 'archives' object may look like
61
        {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}.
62

  
63
        'roots' associates directory names to be virtually inserted under
64
        Hydrilla source package directory with paths to real filesystem
65
        directories that hold their desired contents, i.e. unpacked foreign
66
        packages.
67

  
68
        'package_license_files' lists paths to license files that should be
69
        included with the Haketilo package that will be produced. The paths are
70
        to be resolved using 'roots' dictionary.
71

  
72
        'package_must_depend' lists names of Haketilo packages that the produced
73
        package will additionally depend on. This is meant to help distribute
74
        common licenses with a separate Haketilo package.
75
        """
76
        self.archives              = archives
77
        self.roots                 = roots
78
        self.package_license_files = package_license_files
79
        self.package_must_depend   = package_must_depend
80

  
81
    def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]:
82
        """
83
        'file_ref_name' is a path as may appear in an index.json file. Check if
84
        the file belongs to one of the roots we have and return either a path
85
        to the relevant file under this root or None.
86

  
87
        It is not being checked whether the file actually exists in the
88
        filesystem.
89
        """
90
        parts = file_ref_name.parts
91
        root_path = self.roots.get(parts and parts[0])
92
        path = root_path
93
        if path is None:
94
            return None
95

  
96
        for part in parts[1:]:
97
            path = path / part
98

  
99
        path = path.resolve()
100

  
101
        if not path.is_relative_to(root_path):
102
            raise FileReferenceError(_('loading_{}_outside_piggybacked_dir')
103
                                     .format(file_ref_name))
104

  
105
        return path
106

  
107
    def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]:
108
        """
109
        Yield all archive files in use. Each yielded tuple holds file's desired
110
        path relative to the piggybacked archives directory to be created and
111
        its current real path.
112
        """
113
        for system, real_dir in self.archives.items():
114
            for path in real_dir.rglob('*'):
115
                yield PurePosixPath(system) / path.relative_to(real_dir), path
src/hydrilla/schemas
1
Subproject commit 09634f3446866f712a022327683b1149d8f46bf0
1
Subproject commit 4b4da5a02bc311603469eea7b3dfd4f1bbb911fd
tests/__init__.py
1
# SPDX-License-Identifier: CC0-1.0
2

  
3
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
4
#
5
# Available under the terms of Creative Commons Zero v1.0 Universal.
tests/helpers.py
1
# SPDX-License-Identifier: CC0-1.0
2

  
3
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
4
#
5
# Available under the terms of Creative Commons Zero v1.0 Universal.
6

  
7
import re
8

  
9
variable_word_re = re.compile(r'^<(.+)>$')
10

  
11
def process_command(command, expected_command):
12
    """Validate the command line and extract its variable parts (if any)."""
13
    assert len(command) == len(expected_command)
14

  
15
    extracted = {}
16
    for word, expected_word in zip(command, expected_command):
17
        match = variable_word_re.match(expected_word)
18
        if match:
19
            extracted[match.group(1)] = word
20
        else:
21
            assert word == expected_word
22

  
23
    return extracted
24

  
25
def run_missing_executable(command, **kwargs):
26
    """
27
    Instead of running a command, raise FileNotFoundError as if its executable
28
    was missing.
29
    """
30
    raise FileNotFoundError('dummy')
31

  
32
class MockedCompletedProcess:
33
    """
34
    Object with some fields similar to those of subprocess.CompletedProcess.
35
    """
36
    def __init__(self, args, returncode=0,
37
                 stdout='some output', stderr='some error output',
38
                 text_output=True):
39
        """
40
        Initialize MockedCompletedProcess. Convert strings to bytes if needed.
41
        """
42
        self.args       = args
43
        self.returncode = returncode
44

  
45
        if type(stdout) is str and not text_output:
46
            stdout = stdout.encode()
47
        if type(stderr) is str and not text_output:
48
            stderr = stderr.encode()
49

  
50
        self.stdout = stdout
51
        self.stderr = stderr
tests/test_build.py
1
# SPDX-License-Identifier: CC0-1.0
2

  
3
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
4
#
5
# Available under the terms of Creative Commons Zero v1.0 Universal.
6

  
7
# Enable using with Python 3.7.
8
from __future__ import annotations
9

  
10
import pytest
11
import json
12
import shutil
13

  
14
from tempfile import TemporaryDirectory
15
from pathlib import Path, PurePosixPath
16
from hashlib import sha256
17
from zipfile import ZipFile
18
from contextlib import contextmanager
19

  
20
from jsonschema import ValidationError
21

  
22
from hydrilla import util as hydrilla_util
23
from hydrilla.builder import build, _version, local_apt
24
from hydrilla.builder.common_errors import *
25

  
26
from .helpers import *
27

  
28
here = Path(__file__).resolve().parent
29

  
30
expected_generated_by = {
31
    'name': 'hydrilla.builder',
32
    'version': _version.version
33
}
34

  
35
orig_srcdir = here / 'source-package-example'
36

  
37
index_text = (orig_srcdir / 'index.json').read_text()
38
index_obj = json.loads(hydrilla_util.strip_json_comments(index_text))
39

  
40
def read_files(*file_list):
41
    """
42
    Take names of files under srcdir and return a dict that maps them to their
43
    contents (as bytes).
44
    """
45
    return dict((name, (orig_srcdir / name).read_bytes()) for name in file_list)
46

  
47
dist_files = {
48
    **read_files('LICENSES/CC0-1.0.txt', 'bye.js', 'hello.js', 'message.js'),
49
    'report.spdx': b'dummy spdx output'
50
}
51
src_files = {
52
    **dist_files,
53
    **read_files('README.txt', 'README.txt.license', '.reuse/dep5',
54
                 'index.json')
55
}
56
extra_archive_files = {
57
}
58

  
59
sha256_hashes = dict((name, sha256(contents).digest().hex())
60
                     for name, contents in src_files.items())
61

  
62
del src_files['report.spdx']
63

  
64
expected_resources = [{
65
    '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
66
    'source_name': 'hello',
67
    'source_copyright': [{
68
        'file': 'report.spdx',
69
        'sha256': sha256_hashes['report.spdx']
70
    }, {
71
        'file': 'LICENSES/CC0-1.0.txt',
72
        'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
73
    }],
74
    'type': 'resource',
75
    'identifier': 'helloapple',
76
    'long_name': 'Hello Apple',
77
    'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68',
78
    'version': [2021, 11, 10],
79
    'revision': 1,
80
    'description': 'greets an apple',
81
    'dependencies': [{'identifier': 'hello-message'}],
82
    'scripts': [{
83
        'file': 'hello.js',
84
        'sha256': sha256_hashes['hello.js']
85
    }, {
86
        'file': 'bye.js',
87
        'sha256': sha256_hashes['bye.js']
88
    }],
89
    'generated_by': expected_generated_by
90
}, {
91
    '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
92
    'source_name': 'hello',
93
    'source_copyright': [{
94
        'file': 'report.spdx',
95
        'sha256': sha256_hashes['report.spdx']
96
    }, {
97
        'file': 'LICENSES/CC0-1.0.txt',
98
        'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
99
    }],
100
    'type': 'resource',
101
    'identifier': 'hello-message',
102
    'long_name': 'Hello Message',
103
    'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e',
104
    'version': [2021, 11, 10],
105
    'revision': 2,
106
    'description': 'define messages for saying hello and bye',
107
    'dependencies': [],
108
    'scripts': [{
109
        'file': 'message.js',
110
        'sha256': sha256_hashes['message.js']
111
    }],
112
    'generated_by': expected_generated_by
113
}]
114

  
115
expected_mapping = {
116
    '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json',
117
    'source_name': 'hello',
118
    'source_copyright': [{
119
        'file': 'report.spdx',
120
        'sha256': sha256_hashes['report.spdx']
121
    }, {
122
        'file': 'LICENSES/CC0-1.0.txt',
123
        'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
124
    }],
125
    'type': 'mapping',
126
    'identifier': 'helloapple',
127
    'long_name': 'Hello Apple',
128
    'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7',
129
    'version': [2021, 11, 10],
130
    'description': 'causes apple to get greeted on Hydrillabugs issue tracker',
131
    'payloads': {
132
	'https://hydrillabugs.koszko.org/***': {
133
	    'identifier': 'helloapple'
134
	},
135
	'https://hachettebugs.koszko.org/***': {
136
	    'identifier': 'helloapple'
137
        }
138
    },
139
    'generated_by': expected_generated_by
140
}
141

  
142
expected_source_description = {
143
    '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json',
144
    'source_name': 'hello',
145
    'source_copyright': [{
146
        'file': 'report.spdx',
147
        'sha256': sha256_hashes['report.spdx']
148
    }, {
149
        'file': 'LICENSES/CC0-1.0.txt',
150
        'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
151
    }],
152
    'source_archives': {
153
        'zip': {
154
            'sha256': '!!!!value to fill during test!!!!',
155
        }
156
    },
157
    'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example',
158
    'definitions': [{
159
        'type': 'resource',
160
        'identifier': 'helloapple',
161
        'long_name': 'Hello Apple',
162
        'version': [2021, 11, 10],
163
    }, {
164
        'type':       'resource',
165
        'identifier': 'hello-message',
166
        'long_name': 'Hello Message',
167
        'version':     [2021, 11, 10],
168
    }, {
169
        'type': 'mapping',
170
        'identifier': 'helloapple',
171
	'long_name': 'Hello Apple',
172
        'version': [2021, 11, 10],
173
    }],
174
    'generated_by': expected_generated_by
175
}
176

  
177
expected = [*expected_resources, expected_mapping, expected_source_description]
178

  
179
@pytest.fixture
180
def tmpdir() -> Iterable[str]:
181
    """
182
    Provide test case with a temporary directory that will be automatically
183
    deleted after the test.
184
    """
185
    with TemporaryDirectory() as tmpdir:
186
        yield Path(tmpdir)
187

  
188
def run_reuse(command, **kwargs):
189
    """
190
    Instead of running a 'reuse' command, check if 'mock_reuse_missing' file
191
    exists under root directory. If yes, raise FileNotFoundError as if 'reuse'
192
    command was missing. If not, check if 'README.txt.license' file exists
193
    in the requested directory and return zero if it does.
194
    """
195
    expected = ['reuse', '--root', '<root>',
196
                'lint' if 'lint' in command else 'spdx']
197

  
198
    root_path = Path(process_command(command, expected)['root'])
199

  
200
    if (root_path / 'mock_reuse_missing').exists():
201
        raise FileNotFoundError('dummy')
202

  
203
    is_reuse_compliant = (root_path / 'README.txt.license').exists()
204

  
205
    return MockedCompletedProcess(command, 1 - is_reuse_compliant,
206
                                  stdout=f'dummy {expected[-1]} output',
207
                                  text_output=kwargs.get('text'))
208

  
209
mocked_piggybacked_archives = [
210
    PurePosixPath('apt/something.deb'),
211
    PurePosixPath('apt/something.orig.tar.gz'),
212
    PurePosixPath('apt/something.debian.tar.xz'),
213
    PurePosixPath('othersystem/other-something.tar.gz')
214
]
215

  
216
@pytest.fixture
217
def mock_piggybacked_apt_system(monkeypatch):
218
    """Make local_apt.piggybacked_system() return a mocked result."""
219
    # We set 'td' to a temporary dir path further below.
220
    td = None
221

  
222
    class MockedPiggybacked:
223
        """Minimal mock of Piggybacked object."""
224
        package_license_files = [PurePosixPath('.apt-root/.../copyright')]
225
        package_must_depend = [{'identifier': 'apt-common-licenses'}]
226

  
227
        def resolve_file(path):
228
            """
229
            For each path that starts with '.apt-root' return a valid
230
            dummy file path.
231
            """
232
            if path.parts[0] != '.apt-root':
233
                return None
234

  
235
            (td / path.name).write_text(f'dummy {path.name}')
236

  
237
            return (td / path.name)
238

  
239
        def archive_files():
240
            """Yield some valid dummy file path tuples."""
241
            for desired_path in mocked_piggybacked_archives:
242
                real_path = td / desired_path.name
243
                real_path.write_text(f'dummy {desired_path.name}')
244

  
245
                yield desired_path, real_path
246

  
247
    @contextmanager
248
    def mocked_piggybacked_system(piggyback_def, piggyback_files):
249
        """Mock the execution of local_apt.piggybacked_system()."""
250
        assert piggyback_def == {
251
	    'system': 'apt',
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff