Project

General

Profile

Download (14.5 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / local_apt.py @ 496d90f7

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

    
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path, PurePosixPath
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from urllib.parse import unquote
39
from contextlib import contextmanager
40
from typing import Optional, Iterable
41

    
42
from .. import util
43
from .piggybacking import Piggybacked
44
from .common_errors import *
45

    
46
here = Path(__file__).resolve().parent
47

    
48
_ = util.translation(here / 'locales').gettext
49

    
50
"""
51
Default cache directory to save APT configurations and downloaded GPG keys in.
52
"""
53
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
54

    
55
"""
56
Default keyserver to use.
57
"""
58
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
59

    
60
"""
61
Default keys to download when using a local APT.
62
"""
63
default_keys = [
64
    # Trisquel
65
    'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
66
    '60364C9869F92450421F0C22B138CA450C05112F',
67
    # Ubuntu
68
    '630239CC130E1A7FD81A27B140976EAF437D05B5',
69
    '790BC7277767219C42C86F933B4FE6ACC0B21F32',
70
    'F6ECB3762474EDA9D21B7022871920D1991BC93C',
71
    # Debian
72
    '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
73
    '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
74
    'AC530D520F2F3269F5E98313A48449044AAD5C5D'
75
]
76

    
77
"""sources.list file contents for known distros."""
78
default_lists = {
79
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
80
              for type in ('deb', 'deb-src')
81
              for suf in ('', '-updates', '-security')]
82
}
83

    
84
class GpgError(Exception):
85
    """
86
    Exception used to report various problems when calling GPG.
87
    """
88

    
89
class AptError(SubprocessError):
90
    """
91
    Exception used to report various problems when calling apt-* and dpkg-*
92
    commands.
93
    """
94

    
95
def run(command, **kwargs):
96
    """A wrapped around subprocess.run that sets some default options."""
97
    return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
98
                          capture_output=True, text=True)
99

    
100
class Apt:
101
    """
102
    This class represents an APT instance and can be used to call apt-get
103
    commands with it.
104
    """
105
    def __init__(self, apt_conf: str) -> None:
106
        """Initialize this Apt object."""
107
        self.apt_conf = apt_conf
108

    
109
    def get(self, *args: str, **kwargs) -> CP:
110
        """
111
        Run apt-get with the specified arguments and raise a meaningful AptError
112
        when something goes wrong.
113
        """
114
        command = ['apt-get', '-c', self.apt_conf, *args]
115
        try:
116
            cp = run(command, **kwargs)
117
        except FileNotFoundError:
118
            msg = _('couldnt_execute_{}_is_it_installed').format('apt-get')
119
            raise AptError(msg)
120

    
121
        if cp.returncode != 0:
122
            msg = _('command_{}_failed').format(' '.join(command))
123
            raise AptError(msg, cp)
124

    
125
        return cp
126

    
127
def cache_dir() -> Path:
128
    """
129
    Return the directory used to cache data (APT configurations, keyrings) to
130
    speed up repeated operations.
131

    
132
    This function first ensures the directory exists.
133
    """
134
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
135
    return default_apt_cache_dir
136

    
137
class SourcesList:
138
    """Representation of apt's sources.list contents."""
139
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
140
        """Initialize this SourcesList."""
141
        self.codename = None
142
        self.list = [*list]
143
        self.has_extra_entries = bool(self.list)
144

    
145
        if codename is not None:
146
            if codename not in default_lists:
147
                raise DistroError(_('distro_{}_unknown').format(codename))
148

    
149
            self.codename = codename
150
            self.list.extend(default_lists[codename])
151

    
152
    def identity(self) -> str:
153
        """
154
        Produce a string that uniquely identifies this sources.list contents.
155
        """
156
        if self.codename and not self.has_extra_entries:
157
            return self.codename
158

    
159
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
160

    
161
def apt_conf(directory: Path) -> str:
162
    """
163
    Given local APT's directory, produce a configuration suitable for running
164
    APT there.
165

    
166
    'directory' must not contain any special characters including quotes and
167
    spaces.
168
    """
169
    return f'''
170
Architecture "amd64";
171
Dir "{directory}";
172
Dir::State "{directory}/var/lib/apt";
173
Dir::State::status "{directory}/var/lib/dpkg/status";
174
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
175
Dir::Etc::SourceParts "";
176
Dir::Cache "{directory}/var/cache/apt";
177
pkgCacheGen::Essential "none";
178
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
179
'''
180

    
181
def apt_keyring(keys: [str]) -> bytes:
182
    """
183
    Download the requested keys if necessary and export them as a keyring
184
    suitable for passing to APT.
185

    
186
    The keyring is returned as a bytes value that should be written to a file.
187
    """
188
    try:
189
        from gnupg import GPG
190
    except ModuleNotFoundError:
191
        raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg'))
192

    
193
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
194
    for key in keys:
195
        if gpg.list_keys(keys=[key]) != []:
196
            continue
197

    
198
        if gpg.recv_keys(default_keyserver, key).imported == 0:
199
            raise GpgError(_('gpg_couldnt_recv_key_{}').format(key))
200

    
201
    return gpg.export_keys(keys, armor=False, minimal=True)
202

    
203
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
204
    """
205
    Zip an APT root directory for later use and move the zipfile to the
206
    requested destination.
207
    """
208
    temporary_zip_path = None
209
    try:
210
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
211
                                     dir=cache_dir(), delete=False)
212
        temporary_zip_path = Path(tmpfile.name)
213

    
214
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
215

    
216
        with zipfile.ZipFile(tmpfile, 'w') as zf:
217
            for member in apt_root.rglob('*'):
218
                relative = member.relative_to(apt_root)
219
                if relative not in to_skip:
220
                    # This call will also properly add empty folders to zip file
221
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
222

    
223
        shutil.move(temporary_zip_path, destination_zip)
224
    finally:
225
        if temporary_zip_path is not None and temporary_zip_path.exists():
226
            temporary_zip_path.unlink()
227

    
228
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
229
    """
230
    Create files and directories necessary for running APT without root rights
231
    inside 'directory'.
232

    
233
    'directory' must not contain any special characters including quotes and
234
    spaces and must be empty.
235

    
236
    Return an Apt object that can be used to call apt-get commands.
237
    """
238
    apt_root = directory / 'apt_root'
239

    
240
    conf_text     = apt_conf(apt_root)
241
    keyring_bytes = apt_keyring(keys)
242

    
243
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
244
    if apt_zipfile.exists():
245
        with zipfile.ZipFile(apt_zipfile) as zf:
246
            zf.extractall(apt_root)
247

    
248
    for to_create in (
249
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
250
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
251
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
252
            apt_root / 'etc' / 'apt' / 'preferences.d',
253
            apt_root / 'var' / 'lib' / 'dpkg',
254
            apt_root / 'var' / 'log' / 'apt'
255
    ):
256
        to_create.mkdir(parents=True, exist_ok=True)
257

    
258
    conf_path    = apt_root / 'etc' / 'apt.conf'
259
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
260
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
261
    list_path    = apt_root / 'etc' / 'apt.sources.list'
262

    
263
    conf_path.write_text(conf_text)
264
    trusted_path.write_bytes(keyring_bytes)
265
    status_path.touch()
266
    list_path.write_text('\n'.join(list.list))
267

    
268
    apt = Apt(str(conf_path))
269
    apt.get('update')
270

    
271
    cache_apt_root(apt_root, apt_zipfile)
272

    
273
    return apt
274

    
275
@contextmanager
276
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
277
    """
278
    Create a temporary directory with proper local APT configuration in it.
279
    Yield an Apt object that can be used to issue apt-get commands.
280

    
281
    This function returns a context manager that will remove the directory on
282
    close.
283
    """
284
    with TemporaryDirectory() as td:
285
        td = Path(td)
286
        yield setup_local_apt(td, list, keys)
287

    
288
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
289
                          destination_dir: Path, with_deps: bool) -> [str]:
290
    """
291
    Set up a local APT, update it using the specified sources.list configuration
292
    and use it to download the specified packages.
293

    
294
    This function downloads .deb files of packages matching the amd64
295
    architecture (which includes packages with architecture 'all') as well as
296
    all their corresponding source package files and (if requested) the debs
297
    and source files of all their declared dependencies.
298

    
299
    Return value is a list of names of all downloaded files.
300
    """
301
    install_line_regex = re.compile(r'^Inst (?P<name>\S+) \((?P<version>\S+) ')
302

    
303
    with local_apt(list, keys) as apt:
304
        if with_deps:
305
            cp = apt.get('install', '--yes', '--just-print', *packages)
306

    
307
            lines = cp.stdout.split('\n')
308
            matches = [install_line_regex.match(l) for l in lines]
309
            packages = [f'{m.group("name")}={m.group("version")}'
310
                        for m in matches if m]
311

    
312
            if not packages:
313
                raise AptError(_('apt_install_output_not_understood'), cp)
314

    
315
        # Download .debs to indirectly to destination_dir by first placing them
316
        # in a temporary subdirectory.
317
        with TemporaryDirectory(dir=destination_dir) as td:
318
            td = Path(td)
319
            cp = apt.get('download', *packages, cwd=td)
320

    
321
            deb_name_regex = re.compile(
322
                r'''
323
                ^
324
                (?P<name>[^_]+)
325
                _
326
                (?P<ver>[^_]+)
327
                _
328
                .+              # architecture (or 'all')
329
                \.deb
330
                $
331
                ''',
332
                re.VERBOSE)
333

    
334
            names_vers = []
335
            downloaded = []
336
            for deb_file in td.iterdir():
337
                match = deb_name_regex.match(deb_file.name)
338
                if match is None:
339
                    msg = _('apt_download_gave_bad_filename_{}')\
340
                        .format(deb_file.name)
341
                    raise AptError(msg, cp)
342

    
343
                names_vers.append((
344
                    unquote(match.group('name')),
345
                    unquote(match.group('ver'))
346
                ))
347
                downloaded.append(deb_file.name)
348

    
349
            apt.get('source', '--download-only',
350
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
351

    
352
            for source_file in td.iterdir():
353
                if source_file.name in downloaded:
354
                    continue
355

    
356
                downloaded.append(source_file.name)
357

    
358
            for filename in downloaded:
359
                shutil.move(td / filename, destination_dir / filename)
360

    
361
    return downloaded
362

    
363
@contextmanager
364
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
365
    -> Iterable[Piggybacked]:
366
    """
367
    Resolve resources from APT. Optionally, use package files (.deb's, etc.)
368
    from a specified directory instead of resolving and downloading them.
369

    
370
    The directories and files created for the yielded Piggybacked object shall
371
    be deleted when this context manager gets closed.
372
    """
373
    assert piggyback_def['system'] == 'apt'
374

    
375
    with TemporaryDirectory() as td:
376
        td       = Path(td)
377
        root     = td / 'root'
378
        root.mkdir()
379

    
380
        if foreign_packages is None:
381
            archives = td / 'archives'
382
            archives.mkdir()
383
        else:
384
            archives = foreign_packages / 'apt'
385
            archives.mkdir(exist_ok=True)
386

    
387
        if [*archives.glob('*.deb')] == []:
388
            sources_list = SourcesList(piggyback_def.get('sources_list', []),
389
                                       piggyback_def.get('distribution'))
390
            packages = piggyback_def['packages']
391
            with_deps = piggyback_def['dependencies']
392
            pgp_keys = [
393
                *default_keys,
394
                *piggyback_def.get('trusted_keys', [])
395
            ]
396

    
397
            download_apt_packages(
398
                list=sources_list,
399
                keys=pgp_keys,
400
                packages=packages,
401
                destination_dir=archives,
402
                with_deps=with_deps
403
            )
404

    
405
        for deb in archives.glob('*.deb'):
406
            command = ['dpkg-deb', '-x', str(deb), str(root)]
407
            try:
408
                cp = run(command)
409
            except FileNotFoundError:
410
                msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb'))
411
                raise AptError(msg)
412

    
413
            if cp.returncode != 0:
414
                msg = _('command_{}_failed').format(' '.join(command))
415
                raise AptError(msg, cp)
416

    
417
        docs_dir = root / 'usr' / 'share' / 'doc'
418
        copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
419
                    if docs_dir.exists() else []
420
        copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
421
                           for p in copyright_paths if p.exists()]
422

    
423
        standard_depends = piggyback_def.get('depend_on_base_packages', True)
424
        must_depend = [{'identifier': 'apt-common-licenses'}] \
425
            if standard_depends else []
426

    
427
        yield Piggybacked(
428
            archives={'apt': archives},
429
            roots={'.apt-root': root},
430
            package_license_files=copyright_paths,
431
            package_must_depend=must_depend
432
        )
(5-5/6)