Project

General

Profile

Download (14.2 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / local_apt.py @ 61f0aa75

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

    
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path, PurePosixPath
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from contextlib import contextmanager
39
from typing import Optional, Iterable
40

    
41
from .. import util
42
from .piggybacking import Piggybacked
43
from .common_errors import *
44

    
45
here = Path(__file__).resolve().parent
46

    
47
_ = util.translation(here / 'locales').gettext
48

    
49
"""
50
Default cache directory to save APT configurations and downloaded GPG keys in.
51
"""
52
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
53

    
54
"""
55
Default keyserver to use.
56
"""
57
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
58

    
59
"""
60
Default keys to download when using a local APT.
61
"""
62
default_keys = [
63
    # Trisquel
64
    'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
65
    '60364C9869F92450421F0C22B138CA450C05112F',
66
    # Ubuntu
67
    '630239CC130E1A7FD81A27B140976EAF437D05B5',
68
    '790BC7277767219C42C86F933B4FE6ACC0B21F32',
69
    'F6ECB3762474EDA9D21B7022871920D1991BC93C',
70
    # Debian
71
    '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
72
    '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
73
    'AC530D520F2F3269F5E98313A48449044AAD5C5D'
74
]
75

    
76
"""sources.list file contents for known distros."""
77
default_lists = {
78
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
79
              for type in ('deb', 'deb-src')
80
              for suf in ('', '-updates', '-security')]
81
}
82

    
83
class GpgError(Exception):
84
    """
85
    Exception used to report various problems when calling GPG.
86
    """
87

    
88
class AptError(SubprocessError):
89
    """
90
    Exception used to report various problems when calling apt-* and dpkg-*
91
    commands.
92
    """
93

    
94
def run(command, **kwargs):
95
    """A wrapped around subprocess.run that sets some default options."""
96
    return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
97
                          capture_output=True, text=True)
98

    
99
class Apt:
100
    """
101
    This class represents an APT instance and can be used to call apt-get
102
    commands with it.
103
    """
104
    def __init__(self, apt_conf: str) -> None:
105
        """Initialize this Apt object."""
106
        self.apt_conf = apt_conf
107

    
108
    def get(self, *args: str, **kwargs) -> CP:
109
        """
110
        Run apt-get with the specified arguments and raise a meaningful AptError
111
        when something goes wrong.
112
        """
113
        command = ['apt-get', '-c', self.apt_conf, *args]
114
        try:
115
            cp = run(command, **kwargs)
116
        except FileNotFoundError:
117
            raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
118

    
119
        if cp.returncode != 0:
120
            msg = _('apt_get_command_{}_failed').format(' '.join(command))
121
            raise AptError(msg, cp)
122

    
123
        return cp
124

    
125
def cache_dir() -> Path:
126
    """
127
    Return the directory used to cache data (APT configurations, keyrings) to
128
    speed up repeated operations.
129

    
130
    This function first ensures the directory exists.
131
    """
132
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
133
    return default_apt_cache_dir
134

    
135
class SourcesList:
136
    """Representation of apt's sources.list contents."""
137
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
138
        """Initialize this SourcesList."""
139
        self.codename = None
140
        self.list = [*list]
141
        self.has_extra_entries = bool(self.list)
142

    
143
        if codename is not None:
144
            if codename not in default_lists:
145
                raise DistroError(_('distro_{}_unknown').format(codename))
146

    
147
            self.codename = codename
148
            self.list.extend(default_lists[codename])
149

    
150
    def identity(self) -> str:
151
        """
152
        Produce a string that uniquely identifies this sources.list contents.
153
        """
154
        if self.codename and not self.has_extra_entries:
155
            return self.codename
156

    
157
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
158

    
159
def apt_conf(directory: Path) -> str:
160
    """
161
    Given local APT's directory, produce a configuration suitable for running
162
    APT there.
163

    
164
    'directory' must not contain any special characters including quotes and
165
    spaces.
166
    """
167
    return f'''
168
Dir "{directory}";
169
Dir::State "{directory}/var/lib/apt";
170
Dir::State::status "{directory}/var/lib/dpkg/status";
171
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
172
Dir::Etc::SourceParts "";
173
Dir::Cache "{directory}/var/cache/apt";
174
pkgCacheGen::Essential "none";
175
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
176
'''
177

    
178
def apt_keyring(keys: [str]) -> bytes:
179
    """
180
    Download the requested keys if necessary and export them as a keyring
181
    suitable for passing to APT.
182

    
183
    The keyring is returned as a bytes value that should be written to a file.
184
    """
185
    try:
186
        from gnupg import GPG
187
    except ModuleNotFoundError:
188
        raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
189

    
190
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
191
    for key in keys:
192
        if gpg.list_keys(keys=[key]) != []:
193
            continue
194

    
195
        if gpg.recv_keys(default_keyserver, key).imported == 0:
196
            raise GpgError(_('gpg_couldnt_recv_key'))
197

    
198
    return gpg.export_keys(keys, armor=False, minimal=True)
199

    
200
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
201
    """
202
    Zip an APT root directory for later use and move the zipfile to the
203
    requested destination.
204
    """
205
    temporary_zip_path = None
206
    try:
207
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
208
                                     dir=cache_dir(), delete=False)
209
        temporary_zip_path = Path(tmpfile.name)
210

    
211
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
212

    
213
        with zipfile.ZipFile(tmpfile, 'w') as zf:
214
            for member in apt_root.rglob('*'):
215
                relative = member.relative_to(apt_root)
216
                if relative not in to_skip:
217
                    # This call will also properly add empty folders to zip file
218
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
219

    
220
        shutil.move(temporary_zip_path, destination_zip)
221
    finally:
222
        if temporary_zip_path is not None and temporary_zip_path.exists():
223
            temporary_zip_path.unlink()
224

    
225
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
226
    """
227
    Create files and directories necessary for running APT without root rights
228
    inside 'directory'.
229

    
230
    'directory' must not contain any special characters including quotes and
231
    spaces and must be empty.
232

    
233
    Return an Apt object that can be used to call apt-get commands.
234
    """
235
    apt_root = directory / 'apt_root'
236

    
237
    conf_text     = apt_conf(apt_root)
238
    keyring_bytes = apt_keyring(keys)
239

    
240
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
241
    if apt_zipfile.exists():
242
        with zipfile.ZipFile(apt_zipfile) as zf:
243
            zf.extractall(apt_root)
244

    
245
    for to_create in (
246
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
247
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
248
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
249
            apt_root / 'etc' / 'apt' / 'preferences.d',
250
            apt_root / 'var' / 'lib' / 'dpkg',
251
            apt_root / 'var' / 'log' / 'apt'
252
    ):
253
        to_create.mkdir(parents=True, exist_ok=True)
254

    
255
    conf_path    = apt_root / 'etc' / 'apt.conf'
256
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
257
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
258
    list_path    = apt_root / 'etc' / 'apt.sources.list'
259

    
260
    conf_path.write_text(conf_text)
261
    trusted_path.write_bytes(keyring_bytes)
262
    status_path.touch()
263
    list_path.write_text('\n'.join(list.list))
264

    
265
    apt = Apt(str(conf_path))
266
    apt.get('update')
267

    
268
    cache_apt_root(apt_root, apt_zipfile)
269

    
270
    return apt
271

    
272
@contextmanager
273
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
274
    """
275
    Create a temporary directory with proper local APT configuration in it.
276
    Yield an Apt object that can be used to issue apt-get commands.
277

    
278
    This function returns a context manager that will remove the directory on
279
    close.
280
    """
281
    with TemporaryDirectory() as td:
282
        td = Path(td)
283
        yield setup_local_apt(td, list, keys)
284

    
285
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
286
                          destination_dir: Path, with_deps=False) -> [str]:
287
    """
288
    Set up a local APT, update it using the specified sources.list configuration
289
    and use it to download the specified packages.
290

    
291
    This function downloads a .deb file of the packages matching the current
292
    architecture (which includes packages with architecture 'all') as well as
293
    all theis corresponding source package files and (if requested) the debs
294
    and source files of all their declared dependencies.
295

    
296
    Return value is a list of names of all downloaded files.
297
    """
298
    with local_apt(list, keys) as apt:
299
        if with_deps:
300
            cp = apt.get('install', '--yes', '--just-print', *packages)
301

    
302
            deps_listing = re.match(
303
                r'''
304
                .*
305
                The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
306
                (.*)
307
                0\supgraded,
308
                ''',
309
                cp.stdout,
310
                re.MULTILINE | re.DOTALL | re.VERBOSE)
311

    
312
            if deps_listing is None:
313
                raise AptError(_('apt_install_output_not_understood'), cp)
314

    
315
            packages = deps_listing.group(1).split()
316

    
317
        # Download .debs to indirectly to destination_dir by first placing them
318
        # in a temporary subdirectory.
319
        with TemporaryDirectory(dir=destination_dir) as td:
320
            td = Path(td)
321
            cp = apt.get('download', *packages, cwd=td)
322

    
323
            deb_name_regex = re.compile(
324
                r'''
325
                ^
326
                (?P<name>[^_]+)
327
                _
328
                (?P<ver>[^_]+)
329
                _
330
                .+              # architecture (or 'all')
331
                \.deb
332
                $
333
                ''',
334
                re.VERBOSE)
335

    
336
            names_vers = []
337
            downloaded = []
338
            for deb_file in td.iterdir():
339
                match = deb_name_regex.match(deb_file.name)
340
                if match is None:
341
                    msg = _('apt_download_gave_bad_filename_{}')\
342
                        .format(deb_file.name)
343
                    raise AptError(msg, cp)
344

    
345
                names_vers.append((match.group('name'), match.group('ver')))
346
                downloaded.append(deb_file.name)
347

    
348
            apt.get('source', '--download-only',
349
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
350

    
351
            for source_file in td.iterdir():
352
                if source_file.name in downloaded:
353
                    continue
354

    
355
                downloaded.append(source_file.name)
356

    
357
            for filename in downloaded:
358
                shutil.move(td / filename, destination_dir / filename)
359

    
360
    return downloaded
361

    
362
@contextmanager
363
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
364
    -> Iterable[Piggybacked]:
365
    """
366
    Resolve resources from APT. Optionally, use package files (.deb's, etc.)
367
    from a specified directory instead of resolving and downloading them.
368

    
369
    The directories and files created for the yielded Piggybacked object shall
370
    be deleted when this context manager gets closed.
371
    """
372
    assert piggyback_def['system'] == 'apt'
373

    
374
    with TemporaryDirectory() as td:
375
        td       = Path(td)
376
        root     = td / 'root'
377
        root.mkdir()
378

    
379
        if foreign_packages is None:
380
            archives = td / 'archives'
381
            archives.mkdir()
382

    
383
            sources_list = SourcesList(piggyback_def.get('sources_list', []),
384
                                       piggyback_def.get('distribution'))
385
            packages = piggyback_def['packages']
386
            with_deps = piggyback_def['dependencies']
387
            pgp_keys = [
388
                *default_keys,
389
                *piggyback_def.get('trusted_keys', [])
390
            ]
391

    
392
            download_apt_packages(
393
                list=sources_list,
394
                keys=pgp_keys,
395
                packages=packages,
396
                destination_dir=archives,
397
                with_deps=with_deps
398
            )
399
        else:
400
            archives = foreign_packages / 'apt'
401

    
402
        for deb in archives.glob('*.deb'):
403
            command = ['dpkg-deb', '-x', str(deb), str(root)]
404
            try:
405
                cp = run(command)
406
            except FileNotFoundError:
407
                raise AptError(_('couldnt_execute_dpkg_deb_is_it_installed'))
408

    
409
            if cp.returncode != 0:
410
                msg = _('dpkg_deb_command_{}_failed').format(' '.join(command))
411
                raise AptError(msg, cp)
412

    
413
        docs_dir = root / 'usr' / 'share' / 'doc'
414
        copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
415
                    if docs_dir.exists() else []
416
        copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
417
                           for p in copyright_paths if p.exists()]
418

    
419
        standard_depends = piggyback_def.get('depend_on_base_packages', True)
420
        must_depend = [{'identifier': 'apt-common-licenses'}] \
421
            if standard_depends else []
422

    
423
        yield Piggybacked(
424
            archives={'apt': archives},
425
            roots={'.apt-root': root},
426
            package_license_files=copyright_paths,
427
            package_must_depend=must_depend
428
        )
(5-5/6)