Project

General

Profile

Download (14.3 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / local_apt.py @ bd588eb9

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

    
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path, PurePosixPath
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from contextlib import contextmanager
39
from typing import Optional, Iterable
40

    
41
from .. import util
42
from .piggybacking import Piggybacked
43
from .common_errors import *
44

    
45
here = Path(__file__).resolve().parent
46

    
47
_ = util.translation(here / 'locales').gettext
48

    
49
"""
50
Default cache directory to save APT configurations and downloaded GPG keys in.
51
"""
52
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
53

    
54
"""
55
Default keyserver to use.
56
"""
57
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
58

    
59
"""
60
Default keys to download when using a local APT.
61
"""
62
default_keys = [
63
    # Trisquel
64
    'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
65
    '60364C9869F92450421F0C22B138CA450C05112F',
66
    # Ubuntu
67
    '630239CC130E1A7FD81A27B140976EAF437D05B5',
68
    '790BC7277767219C42C86F933B4FE6ACC0B21F32',
69
    'F6ECB3762474EDA9D21B7022871920D1991BC93C',
70
    # Debian
71
    '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
72
    '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
73
    'AC530D520F2F3269F5E98313A48449044AAD5C5D'
74
]
75

    
76
"""sources.list file contents for known distros."""
77
default_lists = {
78
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
79
              for type in ('deb', 'deb-src')
80
              for suf in ('', '-updates', '-security')]
81
}
82

    
83
class GpgError(Exception):
84
    """
85
    Exception used to report various problems when calling GPG.
86
    """
87

    
88
class AptError(SubprocessError):
89
    """
90
    Exception used to report various problems when calling apt-* and dpkg-*
91
    commands.
92
    """
93

    
94
def run(command, **kwargs):
95
    """A wrapped around subprocess.run that sets some default options."""
96
    return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
97
                          capture_output=True, text=True)
98

    
99
class Apt:
100
    """
101
    This class represents an APT instance and can be used to call apt-get
102
    commands with it.
103
    """
104
    def __init__(self, apt_conf: str) -> None:
105
        """Initialize this Apt object."""
106
        self.apt_conf = apt_conf
107

    
108
    def get(self, *args: str, **kwargs) -> CP:
109
        """
110
        Run apt-get with the specified arguments and raise a meaningful AptError
111
        when something goes wrong.
112
        """
113
        command = ['apt-get', '-c', self.apt_conf, *args]
114
        try:
115
            cp = run(command, **kwargs)
116
        except FileNotFoundError:
117
            msg = _('couldnt_execute_{}_is_it_installed').format('apt-get')
118
            raise AptError(msg)
119

    
120
        if cp.returncode != 0:
121
            msg = _('command_{}_failed').format(' '.join(command))
122
            raise AptError(msg, cp)
123

    
124
        return cp
125

    
126
def cache_dir() -> Path:
127
    """
128
    Return the directory used to cache data (APT configurations, keyrings) to
129
    speed up repeated operations.
130

    
131
    This function first ensures the directory exists.
132
    """
133
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
134
    return default_apt_cache_dir
135

    
136
class SourcesList:
137
    """Representation of apt's sources.list contents."""
138
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
139
        """Initialize this SourcesList."""
140
        self.codename = None
141
        self.list = [*list]
142
        self.has_extra_entries = bool(self.list)
143

    
144
        if codename is not None:
145
            if codename not in default_lists:
146
                raise DistroError(_('distro_{}_unknown').format(codename))
147

    
148
            self.codename = codename
149
            self.list.extend(default_lists[codename])
150

    
151
    def identity(self) -> str:
152
        """
153
        Produce a string that uniquely identifies this sources.list contents.
154
        """
155
        if self.codename and not self.has_extra_entries:
156
            return self.codename
157

    
158
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
159

    
160
def apt_conf(directory: Path) -> str:
161
    """
162
    Given local APT's directory, produce a configuration suitable for running
163
    APT there.
164

    
165
    'directory' must not contain any special characters including quotes and
166
    spaces.
167
    """
168
    return f'''
169
Dir "{directory}";
170
Dir::State "{directory}/var/lib/apt";
171
Dir::State::status "{directory}/var/lib/dpkg/status";
172
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
173
Dir::Etc::SourceParts "";
174
Dir::Cache "{directory}/var/cache/apt";
175
pkgCacheGen::Essential "none";
176
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
177
'''
178

    
179
def apt_keyring(keys: [str]) -> bytes:
180
    """
181
    Download the requested keys if necessary and export them as a keyring
182
    suitable for passing to APT.
183

    
184
    The keyring is returned as a bytes value that should be written to a file.
185
    """
186
    try:
187
        from gnupg import GPG
188
    except ModuleNotFoundError:
189
        raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg'))
190

    
191
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
192
    for key in keys:
193
        if gpg.list_keys(keys=[key]) != []:
194
            continue
195

    
196
        if gpg.recv_keys(default_keyserver, key).imported == 0:
197
            raise GpgError(_('gpg_couldnt_recv_key_{}').format(key))
198

    
199
    return gpg.export_keys(keys, armor=False, minimal=True)
200

    
201
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
202
    """
203
    Zip an APT root directory for later use and move the zipfile to the
204
    requested destination.
205
    """
206
    temporary_zip_path = None
207
    try:
208
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
209
                                     dir=cache_dir(), delete=False)
210
        temporary_zip_path = Path(tmpfile.name)
211

    
212
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
213

    
214
        with zipfile.ZipFile(tmpfile, 'w') as zf:
215
            for member in apt_root.rglob('*'):
216
                relative = member.relative_to(apt_root)
217
                if relative not in to_skip:
218
                    # This call will also properly add empty folders to zip file
219
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
220

    
221
        shutil.move(temporary_zip_path, destination_zip)
222
    finally:
223
        if temporary_zip_path is not None and temporary_zip_path.exists():
224
            temporary_zip_path.unlink()
225

    
226
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
227
    """
228
    Create files and directories necessary for running APT without root rights
229
    inside 'directory'.
230

    
231
    'directory' must not contain any special characters including quotes and
232
    spaces and must be empty.
233

    
234
    Return an Apt object that can be used to call apt-get commands.
235
    """
236
    apt_root = directory / 'apt_root'
237

    
238
    conf_text     = apt_conf(apt_root)
239
    keyring_bytes = apt_keyring(keys)
240

    
241
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
242
    if apt_zipfile.exists():
243
        with zipfile.ZipFile(apt_zipfile) as zf:
244
            zf.extractall(apt_root)
245

    
246
    for to_create in (
247
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
248
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
249
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
250
            apt_root / 'etc' / 'apt' / 'preferences.d',
251
            apt_root / 'var' / 'lib' / 'dpkg',
252
            apt_root / 'var' / 'log' / 'apt'
253
    ):
254
        to_create.mkdir(parents=True, exist_ok=True)
255

    
256
    conf_path    = apt_root / 'etc' / 'apt.conf'
257
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
258
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
259
    list_path    = apt_root / 'etc' / 'apt.sources.list'
260

    
261
    conf_path.write_text(conf_text)
262
    trusted_path.write_bytes(keyring_bytes)
263
    status_path.touch()
264
    list_path.write_text('\n'.join(list.list))
265

    
266
    apt = Apt(str(conf_path))
267
    apt.get('update')
268

    
269
    cache_apt_root(apt_root, apt_zipfile)
270

    
271
    return apt
272

    
273
@contextmanager
274
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
275
    """
276
    Create a temporary directory with proper local APT configuration in it.
277
    Yield an Apt object that can be used to issue apt-get commands.
278

    
279
    This function returns a context manager that will remove the directory on
280
    close.
281
    """
282
    with TemporaryDirectory() as td:
283
        td = Path(td)
284
        yield setup_local_apt(td, list, keys)
285

    
286
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
287
                          destination_dir: Path, with_deps=False) -> [str]:
288
    """
289
    Set up a local APT, update it using the specified sources.list configuration
290
    and use it to download the specified packages.
291

    
292
    This function downloads a .deb file of the packages matching the current
293
    architecture (which includes packages with architecture 'all') as well as
294
    all theis corresponding source package files and (if requested) the debs
295
    and source files of all their declared dependencies.
296

    
297
    Return value is a list of names of all downloaded files.
298
    """
299
    with local_apt(list, keys) as apt:
300
        if with_deps:
301
            cp = apt.get('install', '--yes', '--just-print', *packages)
302

    
303
            deps_listing = re.match(
304
                r'''
305
                .*
306
                The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
307
                (.*)
308
                0\supgraded,
309
                ''',
310
                cp.stdout,
311
                re.MULTILINE | re.DOTALL | re.VERBOSE)
312

    
313
            if deps_listing is None:
314
                raise AptError(_('apt_install_output_not_understood'), cp)
315

    
316
            packages = deps_listing.group(1).split()
317

    
318
        # Download .debs to indirectly to destination_dir by first placing them
319
        # in a temporary subdirectory.
320
        with TemporaryDirectory(dir=destination_dir) as td:
321
            td = Path(td)
322
            cp = apt.get('download', *packages, cwd=td)
323

    
324
            deb_name_regex = re.compile(
325
                r'''
326
                ^
327
                (?P<name>[^_]+)
328
                _
329
                (?P<ver>[^_]+)
330
                _
331
                .+              # architecture (or 'all')
332
                \.deb
333
                $
334
                ''',
335
                re.VERBOSE)
336

    
337
            names_vers = []
338
            downloaded = []
339
            for deb_file in td.iterdir():
340
                match = deb_name_regex.match(deb_file.name)
341
                if match is None:
342
                    msg = _('apt_download_gave_bad_filename_{}')\
343
                        .format(deb_file.name)
344
                    raise AptError(msg, cp)
345

    
346
                names_vers.append((match.group('name'), match.group('ver')))
347
                downloaded.append(deb_file.name)
348

    
349
            apt.get('source', '--download-only',
350
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
351

    
352
            for source_file in td.iterdir():
353
                if source_file.name in downloaded:
354
                    continue
355

    
356
                downloaded.append(source_file.name)
357

    
358
            for filename in downloaded:
359
                shutil.move(td / filename, destination_dir / filename)
360

    
361
    return downloaded
362

    
363
@contextmanager
364
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
365
    -> Iterable[Piggybacked]:
366
    """
367
    Resolve resources from APT. Optionally, use package files (.deb's, etc.)
368
    from a specified directory instead of resolving and downloading them.
369

    
370
    The directories and files created for the yielded Piggybacked object shall
371
    be deleted when this context manager gets closed.
372
    """
373
    assert piggyback_def['system'] == 'apt'
374

    
375
    with TemporaryDirectory() as td:
376
        td       = Path(td)
377
        root     = td / 'root'
378
        root.mkdir()
379

    
380
        if foreign_packages is None:
381
            archives = td / 'archives'
382
            archives.mkdir()
383

    
384
            sources_list = SourcesList(piggyback_def.get('sources_list', []),
385
                                       piggyback_def.get('distribution'))
386
            packages = piggyback_def['packages']
387
            with_deps = piggyback_def['dependencies']
388
            pgp_keys = [
389
                *default_keys,
390
                *piggyback_def.get('trusted_keys', [])
391
            ]
392

    
393
            download_apt_packages(
394
                list=sources_list,
395
                keys=pgp_keys,
396
                packages=packages,
397
                destination_dir=archives,
398
                with_deps=with_deps
399
            )
400
        else:
401
            archives = foreign_packages / 'apt'
402

    
403
        for deb in archives.glob('*.deb'):
404
            command = ['dpkg-deb', '-x', str(deb), str(root)]
405
            try:
406
                cp = run(command)
407
            except FileNotFoundError:
408
                msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb'))
409
                raise AptError(msg)
410

    
411
            if cp.returncode != 0:
412
                msg = _('command_{}_failed').format(' '.join(command))
413
                raise AptError(msg, cp)
414

    
415
        docs_dir = root / 'usr' / 'share' / 'doc'
416
        copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
417
                    if docs_dir.exists() else []
418
        copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
419
                           for p in copyright_paths if p.exists()]
420

    
421
        standard_depends = piggyback_def.get('depend_on_base_packages', True)
422
        must_depend = [{'identifier': 'apt-common-licenses'}] \
423
            if standard_depends else []
424

    
425
        yield Piggybacked(
426
            archives={'apt': archives},
427
            roots={'.apt-root': root},
428
            package_license_files=copyright_paths,
429
            package_must_depend=must_depend
430
        )
(5-5/6)