Project

General

Profile

Download (14.4 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / local_apt.py @ 866922f8

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

    
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path, PurePosixPath
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from urllib.parse import unquote
39
from contextlib import contextmanager
40
from typing import Optional, Iterable
41

    
42
from .. import util
43
from .piggybacking import Piggybacked
44
from .common_errors import *
45

    
46
here = Path(__file__).resolve().parent
47

    
48
_ = util.translation(here / 'locales').gettext
49

    
50
"""
51
Default cache directory to save APT configurations and downloaded GPG keys in.
52
"""
53
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
54

    
55
"""
56
Default keyserver to use.
57
"""
58
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
59

    
60
"""
61
Default keys to download when using a local APT.
62
"""
63
default_keys = [
64
    # Trisquel
65
    'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
66
    '60364C9869F92450421F0C22B138CA450C05112F',
67
    # Ubuntu
68
    '630239CC130E1A7FD81A27B140976EAF437D05B5',
69
    '790BC7277767219C42C86F933B4FE6ACC0B21F32',
70
    'F6ECB3762474EDA9D21B7022871920D1991BC93C',
71
    # Debian
72
    '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
73
    '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
74
    'AC530D520F2F3269F5E98313A48449044AAD5C5D'
75
]
76

    
77
"""sources.list file contents for known distros."""
78
default_lists = {
79
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
80
              for type in ('deb', 'deb-src')
81
              for suf in ('', '-updates', '-security')]
82
}
83

    
84
class GpgError(Exception):
85
    """
86
    Exception used to report various problems when calling GPG.
87
    """
88

    
89
class AptError(SubprocessError):
90
    """
91
    Exception used to report various problems when calling apt-* and dpkg-*
92
    commands.
93
    """
94

    
95
def run(command, **kwargs):
96
    """A wrapped around subprocess.run that sets some default options."""
97
    return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
98
                          capture_output=True, text=True)
99

    
100
class Apt:
101
    """
102
    This class represents an APT instance and can be used to call apt-get
103
    commands with it.
104
    """
105
    def __init__(self, apt_conf: str) -> None:
106
        """Initialize this Apt object."""
107
        self.apt_conf = apt_conf
108

    
109
    def get(self, *args: str, **kwargs) -> CP:
110
        """
111
        Run apt-get with the specified arguments and raise a meaningful AptError
112
        when something goes wrong.
113
        """
114
        command = ['apt-get', '-c', self.apt_conf, *args]
115
        try:
116
            cp = run(command, **kwargs)
117
        except FileNotFoundError:
118
            msg = _('couldnt_execute_{}_is_it_installed').format('apt-get')
119
            raise AptError(msg)
120

    
121
        if cp.returncode != 0:
122
            msg = _('command_{}_failed').format(' '.join(command))
123
            raise AptError(msg, cp)
124

    
125
        return cp
126

    
127
def cache_dir() -> Path:
128
    """
129
    Return the directory used to cache data (APT configurations, keyrings) to
130
    speed up repeated operations.
131

    
132
    This function first ensures the directory exists.
133
    """
134
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
135
    return default_apt_cache_dir
136

    
137
class SourcesList:
138
    """Representation of apt's sources.list contents."""
139
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
140
        """Initialize this SourcesList."""
141
        self.codename = None
142
        self.list = [*list]
143
        self.has_extra_entries = bool(self.list)
144

    
145
        if codename is not None:
146
            if codename not in default_lists:
147
                raise DistroError(_('distro_{}_unknown').format(codename))
148

    
149
            self.codename = codename
150
            self.list.extend(default_lists[codename])
151

    
152
    def identity(self) -> str:
153
        """
154
        Produce a string that uniquely identifies this sources.list contents.
155
        """
156
        if self.codename and not self.has_extra_entries:
157
            return self.codename
158

    
159
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
160

    
161
def apt_conf(directory: Path) -> str:
162
    """
163
    Given local APT's directory, produce a configuration suitable for running
164
    APT there.
165

    
166
    'directory' must not contain any special characters including quotes and
167
    spaces.
168
    """
169
    return f'''
170
Dir "{directory}";
171
Dir::State "{directory}/var/lib/apt";
172
Dir::State::status "{directory}/var/lib/dpkg/status";
173
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
174
Dir::Etc::SourceParts "";
175
Dir::Cache "{directory}/var/cache/apt";
176
pkgCacheGen::Essential "none";
177
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
178
'''
179

    
180
def apt_keyring(keys: [str]) -> bytes:
181
    """
182
    Download the requested keys if necessary and export them as a keyring
183
    suitable for passing to APT.
184

    
185
    The keyring is returned as a bytes value that should be written to a file.
186
    """
187
    try:
188
        from gnupg import GPG
189
    except ModuleNotFoundError:
190
        raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg'))
191

    
192
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
193
    for key in keys:
194
        if gpg.list_keys(keys=[key]) != []:
195
            continue
196

    
197
        if gpg.recv_keys(default_keyserver, key).imported == 0:
198
            raise GpgError(_('gpg_couldnt_recv_key_{}').format(key))
199

    
200
    return gpg.export_keys(keys, armor=False, minimal=True)
201

    
202
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
203
    """
204
    Zip an APT root directory for later use and move the zipfile to the
205
    requested destination.
206
    """
207
    temporary_zip_path = None
208
    try:
209
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
210
                                     dir=cache_dir(), delete=False)
211
        temporary_zip_path = Path(tmpfile.name)
212

    
213
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
214

    
215
        with zipfile.ZipFile(tmpfile, 'w') as zf:
216
            for member in apt_root.rglob('*'):
217
                relative = member.relative_to(apt_root)
218
                if relative not in to_skip:
219
                    # This call will also properly add empty folders to zip file
220
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
221

    
222
        shutil.move(temporary_zip_path, destination_zip)
223
    finally:
224
        if temporary_zip_path is not None and temporary_zip_path.exists():
225
            temporary_zip_path.unlink()
226

    
227
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
228
    """
229
    Create files and directories necessary for running APT without root rights
230
    inside 'directory'.
231

    
232
    'directory' must not contain any special characters including quotes and
233
    spaces and must be empty.
234

    
235
    Return an Apt object that can be used to call apt-get commands.
236
    """
237
    apt_root = directory / 'apt_root'
238

    
239
    conf_text     = apt_conf(apt_root)
240
    keyring_bytes = apt_keyring(keys)
241

    
242
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
243
    if apt_zipfile.exists():
244
        with zipfile.ZipFile(apt_zipfile) as zf:
245
            zf.extractall(apt_root)
246

    
247
    for to_create in (
248
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
249
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
250
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
251
            apt_root / 'etc' / 'apt' / 'preferences.d',
252
            apt_root / 'var' / 'lib' / 'dpkg',
253
            apt_root / 'var' / 'log' / 'apt'
254
    ):
255
        to_create.mkdir(parents=True, exist_ok=True)
256

    
257
    conf_path    = apt_root / 'etc' / 'apt.conf'
258
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
259
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
260
    list_path    = apt_root / 'etc' / 'apt.sources.list'
261

    
262
    conf_path.write_text(conf_text)
263
    trusted_path.write_bytes(keyring_bytes)
264
    status_path.touch()
265
    list_path.write_text('\n'.join(list.list))
266

    
267
    apt = Apt(str(conf_path))
268
    apt.get('update')
269

    
270
    cache_apt_root(apt_root, apt_zipfile)
271

    
272
    return apt
273

    
274
@contextmanager
275
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
276
    """
277
    Create a temporary directory with proper local APT configuration in it.
278
    Yield an Apt object that can be used to issue apt-get commands.
279

    
280
    This function returns a context manager that will remove the directory on
281
    close.
282
    """
283
    with TemporaryDirectory() as td:
284
        td = Path(td)
285
        yield setup_local_apt(td, list, keys)
286

    
287
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
288
                          destination_dir: Path, with_deps=False) -> [str]:
289
    """
290
    Set up a local APT, update it using the specified sources.list configuration
291
    and use it to download the specified packages.
292

    
293
    This function downloads a .deb file of the packages matching the current
294
    architecture (which includes packages with architecture 'all') as well as
295
    all theis corresponding source package files and (if requested) the debs
296
    and source files of all their declared dependencies.
297

    
298
    Return value is a list of names of all downloaded files.
299
    """
300
    install_line_regex = re.compile(r'^Inst (?P<name>\S+) \((?P<version>\S+) ')
301

    
302
    with local_apt(list, keys) as apt:
303
        if with_deps:
304
            cp = apt.get('install', '--yes', '--just-print', *packages)
305

    
306
            lines = cp.stdout.split('\n')
307
            matches = [install_line_regex.match(l) for l in lines]
308
            packages = [f'{m.group("name")}={m.group("version")}'
309
                        for m in matches if m]
310

    
311
            if not packages:
312
                raise AptError(_('apt_install_output_not_understood'), cp)
313

    
314
        # Download .debs to indirectly to destination_dir by first placing them
315
        # in a temporary subdirectory.
316
        with TemporaryDirectory(dir=destination_dir) as td:
317
            td = Path(td)
318
            cp = apt.get('download', *packages, cwd=td)
319

    
320
            deb_name_regex = re.compile(
321
                r'''
322
                ^
323
                (?P<name>[^_]+)
324
                _
325
                (?P<ver>[^_]+)
326
                _
327
                .+              # architecture (or 'all')
328
                \.deb
329
                $
330
                ''',
331
                re.VERBOSE)
332

    
333
            names_vers = []
334
            downloaded = []
335
            for deb_file in td.iterdir():
336
                match = deb_name_regex.match(deb_file.name)
337
                if match is None:
338
                    msg = _('apt_download_gave_bad_filename_{}')\
339
                        .format(deb_file.name)
340
                    raise AptError(msg, cp)
341

    
342
                names_vers.append((
343
                    unquote(match.group('name')),
344
                    unquote(match.group('ver'))
345
                ))
346
                downloaded.append(deb_file.name)
347

    
348
            apt.get('source', '--download-only',
349
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
350

    
351
            for source_file in td.iterdir():
352
                if source_file.name in downloaded:
353
                    continue
354

    
355
                downloaded.append(source_file.name)
356

    
357
            for filename in downloaded:
358
                shutil.move(td / filename, destination_dir / filename)
359

    
360
    return downloaded
361

    
362
@contextmanager
363
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
364
    -> Iterable[Piggybacked]:
365
    """
366
    Resolve resources from APT. Optionally, use package files (.deb's, etc.)
367
    from a specified directory instead of resolving and downloading them.
368

    
369
    The directories and files created for the yielded Piggybacked object shall
370
    be deleted when this context manager gets closed.
371
    """
372
    assert piggyback_def['system'] == 'apt'
373

    
374
    with TemporaryDirectory() as td:
375
        td       = Path(td)
376
        root     = td / 'root'
377
        root.mkdir()
378

    
379
        if foreign_packages is None:
380
            archives = td / 'archives'
381
            archives.mkdir()
382

    
383
            sources_list = SourcesList(piggyback_def.get('sources_list', []),
384
                                       piggyback_def.get('distribution'))
385
            packages = piggyback_def['packages']
386
            with_deps = piggyback_def['dependencies']
387
            pgp_keys = [
388
                *default_keys,
389
                *piggyback_def.get('trusted_keys', [])
390
            ]
391

    
392
            download_apt_packages(
393
                list=sources_list,
394
                keys=pgp_keys,
395
                packages=packages,
396
                destination_dir=archives,
397
                with_deps=with_deps
398
            )
399
        else:
400
            archives = foreign_packages / 'apt'
401

    
402
        for deb in archives.glob('*.deb'):
403
            command = ['dpkg-deb', '-x', str(deb), str(root)]
404
            try:
405
                cp = run(command)
406
            except FileNotFoundError:
407
                msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb'))
408
                raise AptError(msg)
409

    
410
            if cp.returncode != 0:
411
                msg = _('command_{}_failed').format(' '.join(command))
412
                raise AptError(msg, cp)
413

    
414
        docs_dir = root / 'usr' / 'share' / 'doc'
415
        copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
416
                    if docs_dir.exists() else []
417
        copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
418
                           for p in copyright_paths if p.exists()]
419

    
420
        standard_depends = piggyback_def.get('depend_on_base_packages', True)
421
        must_depend = [{'identifier': 'apt-common-licenses'}] \
422
            if standard_depends else []
423

    
424
        yield Piggybacked(
425
            archives={'apt': archives},
426
            roots={'.apt-root': root},
427
            package_license_files=copyright_paths,
428
            package_must_depend=must_depend
429
        )
(5-5/6)