Project

General

Profile

Download (11.8 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / local_apt.py @ 25ff1c9d

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Using a local APT.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
# Enable using with Python 3.7.
28
from __future__ import annotations
29

    
30
import zipfile
31
import shutil
32
import re
33
import subprocess
34
CP = subprocess.CompletedProcess
35
from pathlib import Path
36
from tempfile import TemporaryDirectory, NamedTemporaryFile
37
from hashlib import sha256
38
from contextlib import contextmanager
39
from typing import Optional, Iterable
40

    
41
from .. import util
42

    
43
here = Path(__file__).resolve().parent
44

    
45
_ = util.translation(here / 'locales').gettext
46

    
47
"""
48
Default cache directory to save APT configurations and downloaded GPG keys in.
49
"""
50
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
51

    
52
"""
53
Default keyserver to use.
54
"""
55
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
56

    
57
"""
58
Default keys to download when using a local APT.
59
"""
60
default_keys = [
61
    # Trisquel
62
    'B4EFB9F38D8AEBF1', 'B138CA450C05112F',
63
    # Ubuntu
64
    '40976EAF437D05B5', '3B4FE6ACC0B21F32', '871920D1991BC93C',
65
    # Debian
66
    '9D6D8F6BC857C906', '8B48AD6246925553', 'DCC9EFBF77E11517', '648ACFD622F3D138',
67
    '54404762BBB6E853'
68
]
69

    
70
"""sources.list file contents for known distros."""
71
default_lists = {
72
    'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
73
              for type in ('deb', 'deb-src')
74
              for suf in ('', '-updates', '-security')]
75
}
76

    
77
class GpgError(Exception):
78
    """
79
    Exception used to report various problems when calling GPG.
80
    """
81

    
82
class DistroError(Exception):
83
    """
84
    Exception used to report problems when resolving an OS distribution.
85
    """
86

    
87
class AptError(Exception):
88
    """
89
    Exception used to report various problems when calling apt-* commands.
90
    """
91
    def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
92
        """Initialize this AptError"""
93
        if cp and cp.stdout:
94
            msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
95

    
96
        if cp and cp.stderr:
97
            msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
98

    
99
        super().__init__(msg)
100

    
101
class Apt:
102
    """
103
    This class represents an APT instance and can be used to call apt-get
104
    commands with it.
105
    """
106
    def __init__(self, apt_conf: str) -> None:
107
        """Initialize this Apt object."""
108
        self.apt_conf = apt_conf
109

    
110
    def get(self, *args: str, **kwargs) -> CP:
111
        """
112
        Run apt-get with the specified arguments and raise a meaningful AptError
113
        when something goes wrong.
114
        """
115
        command = ['apt-get', '-c', self.apt_conf, *args]
116
        try:
117
            cp = subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
118
                                capture_output=True, text=True)
119
        except FileNotFoundError:
120
            raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
121

    
122
        if cp.returncode != 0:
123
            msg = _('apt_get_command_{}_failed').format(' '.join(command))
124
            raise AptError(msg, cp)
125

    
126
        return cp
127

    
128
def cache_dir() -> Path:
129
    """
130
    Return the directory used to cache data (APT configurations, keyrings) to
131
    speed up repeated operations.
132

    
133
    This function first ensures the directory exists.
134
    """
135
    default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
136
    return default_apt_cache_dir
137

    
138
class SourcesList:
139
    """Representation of apt's sources.list contents."""
140
    def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
141
        """Initialize this SourcesList."""
142
        self.codename = None
143
        self.list = [*list]
144
        self.has_extra_entries = bool(self.list)
145

    
146
        if codename is not None:
147
            if codename not in default_lists:
148
                raise DistroError(_('distro_{}_unknown').format(codename))
149

    
150
            self.codename = codename
151
            self.list.extend(default_lists[codename])
152

    
153
    def identity(self) -> str:
154
        """
155
        Produce a string that uniquely identifies this sources.list contents.
156
        """
157
        if self.codename and not self.has_extra_entries:
158
            return self.codename
159

    
160
        return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
161

    
162
def apt_conf(directory: Path) -> str:
163
    """
164
    Given local APT's directory, produce a configuration suitable for running
165
    APT there.
166

    
167
    'directory' must not contain any special characters including quotes and
168
    spaces.
169
    """
170
    return f'''
171
Dir "{directory}";
172
Dir::State "{directory}/var/lib/apt";
173
Dir::State::status "{directory}/var/lib/dpkg/status";
174
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
175
Dir::Etc::SourceParts "";
176
Dir::Cache "{directory}/var/cache/apt";
177
pkgCacheGen::Essential "none";
178
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
179
'''
180

    
181
def apt_keyring(keys: [str]) -> bytes:
182
    """
183
    Download the requested keys if necessary and export them as a keyring
184
    suitable for passing to APT.
185

    
186
    The keyring is returned as a bytes value that should be written to a file.
187
    """
188
    try:
189
        from gnupg import GPG
190
    except ModuleNotFoundError:
191
        raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
192

    
193
    gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
194
    for key in keys:
195
        if gpg.list_keys(keys=[key]) != []:
196
            continue
197

    
198
        if gpg.recv_keys(default_keyserver, key).imported == 0:
199
            raise GpgError(_('gpg_couldnt_recv_key'))
200

    
201
    return gpg.export_keys(keys, armor=False, minimal=True)
202

    
203
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
204
    """
205
    Zip an APT root directory for later use and move the zipfile to the
206
    requested destination.
207
    """
208
    temporary_zip_path = None
209
    try:
210
        tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
211
                                     dir=cache_dir(), delete=False)
212
        temporary_zip_path = Path(tmpfile.name)
213

    
214
        to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
215

    
216
        with zipfile.ZipFile(tmpfile, 'w') as zf:
217
            for member in apt_root.rglob('*'):
218
                relative = member.relative_to(apt_root)
219
                if relative not in to_skip:
220
                    # This call will also properly add empty folders to zip file
221
                    zf.write(member, relative, zipfile.ZIP_DEFLATED)
222

    
223
        shutil.move(temporary_zip_path, destination_zip)
224
    finally:
225
        if temporary_zip_path is not None and temporary_zip_path.exists():
226
            temporary_zip_path.unlink()
227

    
228
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
229
    """
230
    Create files and directories necessary for running APT without root rights
231
    inside 'directory'.
232

    
233
    'directory' must not contain any special characters including quotes and
234
    spaces and must be empty.
235

    
236
    Return an Apt object that can be used to call apt-get commands.
237
    """
238
    apt_root = directory / 'apt_root'
239

    
240
    conf_text     = apt_conf(apt_root)
241
    keyring_bytes = apt_keyring(keys)
242

    
243
    apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
244
    if apt_zipfile.exists():
245
        with zipfile.ZipFile(apt_zipfile) as zf:
246
            zf.extractall(apt_root)
247

    
248
    for to_create in (
249
            apt_root / 'var' / 'lib' / 'apt' / 'partial',
250
            apt_root / 'var' / 'lib' / 'apt' / 'lists',
251
            apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
252
            apt_root / 'etc' / 'apt' / 'preferences.d',
253
            apt_root / 'var' / 'lib' / 'dpkg',
254
            apt_root / 'var' / 'log' / 'apt'
255
    ):
256
        to_create.mkdir(parents=True, exist_ok=True)
257

    
258
    conf_path    = apt_root / 'etc' / 'apt.conf'
259
    trusted_path = apt_root / 'etc' / 'trusted.gpg'
260
    status_path  = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
261
    list_path    = apt_root / 'etc' / 'apt.sources.list'
262

    
263
    conf_path.write_text(conf_text)
264
    trusted_path.write_bytes(keyring_bytes)
265
    status_path.touch()
266
    list_path.write_text('\n'.join(list.list))
267

    
268
    apt = Apt(str(conf_path))
269
    apt.get('update')
270

    
271
    cache_apt_root(apt_root, apt_zipfile)
272

    
273
    return apt
274

    
275
@contextmanager
276
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
277
    """
278
    Create a temporary directory with proper local APT configuration in it.
279
    Yield an Apt object that can be used to issue apt-get commands.
280

    
281
    This function returns a context manager that will remove the directory on
282
    close.
283
    """
284
    with TemporaryDirectory() as td:
285
        td = Path(td)
286
        yield setup_local_apt(td, list, keys)
287

    
288
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
289
                          destination_dir: Path, with_deps=False) -> [str]:
290
    """
291
    Set up a local APT, update it using the specified sources.list configuration
292
    and use it to download the specified packages.
293

    
294
    This function downloads a .deb file of the packages matching the current
295
    architecture (which includes packages with architecture 'all') as well as
296
    all theis corresponding source package files and (if requested) the debs
297
    and source files of all their declared dependencies.
298

    
299
    Return value is a list of names of all downloaded files.
300
    """
301
    with local_apt(list, keys) as apt:
302
        if with_deps:
303
            cp = apt.get('install', '--yes', '--just-print', *packages)
304

    
305
            deps_listing = re.match(
306
                r'''
307
                .*
308
                The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
309
                (.*)
310
                0\supgraded,
311
                ''',
312
                cp.stdout,
313
                re.MULTILINE | re.DOTALL | re.VERBOSE)
314

    
315
            if deps_listing is None:
316
                raise AptError(_('apt_install_output_not_understood'), cp)
317

    
318
            packages = deps_listing.group(1).split()
319

    
320
        # Download .debs to indirectly to destination_dir by first placing them
321
        # in a temporary subdirectory.
322
        with TemporaryDirectory(dir=destination_dir) as td:
323
            td = Path(td)
324
            cp = apt.get('download', *packages, cwd=td)
325

    
326
            deb_name_regex = re.compile(
327
                r'''
328
                ^
329
                (?P<name>[^_]+)
330
                _
331
                (?P<ver>[^_]+)
332
                _
333
                .+              # architecture (or 'all')
334
                \.deb
335
                $
336
                ''',
337
                re.VERBOSE)
338

    
339
            names_vers = []
340
            downloaded = []
341
            for deb_file in td.glob('*'):
342
                match = deb_name_regex.match(deb_file.name)
343
                if match is None:
344
                    msg = _('apt_download_gave_bad_filename_{}')\
345
                        .format(deb_file.name)
346
                    raise AptError(msg, cp)
347

    
348
                names_vers.append((match.group('name'), match.group('ver')))
349
                downloaded.append(deb_file.name)
350

    
351
            apt.get('source', '--download-only',
352
                    *[f'{n}={v}' for n, v in names_vers], cwd=td)
353

    
354
            for source_file in td.glob('*'):
355
                if source_file.name in downloaded:
356
                    continue
357

    
358
                downloaded.append(source_file.name)
359

    
360
            for filename in downloaded:
361
                shutil.move(td / filename, destination_dir / filename)
362

    
363
    return downloaded
(4-4/4)