1
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
2
|
|
3
|
# Using a local APT.
|
4
|
#
|
5
|
# This file is part of Hydrilla
|
6
|
#
|
7
|
# Copyright (C) 2022 Wojtek Kosior
|
8
|
#
|
9
|
# This program is free software: you can redistribute it and/or modify
|
10
|
# it under the terms of the GNU Affero General Public License as
|
11
|
# published by the Free Software Foundation, either version 3 of the
|
12
|
# License, or (at your option) any later version.
|
13
|
#
|
14
|
# This program is distributed in the hope that it will be useful,
|
15
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
16
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
17
|
# GNU Affero General Public License for more details.
|
18
|
#
|
19
|
# You should have received a copy of the GNU Affero General Public License
|
20
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
21
|
#
|
22
|
#
|
23
|
# I, Wojtek Kosior, thereby promise not to sue for violation of this
|
24
|
# file's license. Although I request that you do not make use this code
|
25
|
# in a proprietary program, I am not going to enforce this in court.
|
26
|
|
27
|
# Enable using with Python 3.7.
|
28
|
from __future__ import annotations
|
29
|
|
30
|
import zipfile
|
31
|
import shutil
|
32
|
import re
|
33
|
import subprocess
|
34
|
CP = subprocess.CompletedProcess
|
35
|
from pathlib import Path, PurePosixPath
|
36
|
from tempfile import TemporaryDirectory, NamedTemporaryFile
|
37
|
from hashlib import sha256
|
38
|
from urllib.parse import unquote
|
39
|
from contextlib import contextmanager
|
40
|
from typing import Optional, Iterable
|
41
|
|
42
|
from .. import util
|
43
|
from .piggybacking import Piggybacked
|
44
|
from .common_errors import *
|
45
|
|
46
|
here = Path(__file__).resolve().parent
|
47
|
|
48
|
_ = util.translation(here / 'locales').gettext
|
49
|
|
50
|
"""
|
51
|
Default cache directory to save APT configurations and downloaded GPG keys in.
|
52
|
"""
|
53
|
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
|
54
|
|
55
|
"""
|
56
|
Default keyserver to use.
|
57
|
"""
|
58
|
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
|
59
|
|
60
|
"""
|
61
|
Default keys to download when using a local APT.
|
62
|
"""
|
63
|
default_keys = [
|
64
|
# Trisquel
|
65
|
'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
|
66
|
'60364C9869F92450421F0C22B138CA450C05112F',
|
67
|
# Ubuntu
|
68
|
'630239CC130E1A7FD81A27B140976EAF437D05B5',
|
69
|
'790BC7277767219C42C86F933B4FE6ACC0B21F32',
|
70
|
'F6ECB3762474EDA9D21B7022871920D1991BC93C',
|
71
|
# Debian
|
72
|
'6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
|
73
|
'80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
|
74
|
'AC530D520F2F3269F5E98313A48449044AAD5C5D'
|
75
|
]
|
76
|
|
77
|
"""sources.list file contents for known distros."""
|
78
|
default_lists = {
|
79
|
'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
|
80
|
for type in ('deb', 'deb-src')
|
81
|
for suf in ('', '-updates', '-security')]
|
82
|
}
|
83
|
|
84
|
class GpgError(Exception):
|
85
|
"""
|
86
|
Exception used to report various problems when calling GPG.
|
87
|
"""
|
88
|
|
89
|
class AptError(SubprocessError):
|
90
|
"""
|
91
|
Exception used to report various problems when calling apt-* and dpkg-*
|
92
|
commands.
|
93
|
"""
|
94
|
|
95
|
def run(command, **kwargs):
|
96
|
"""A wrapped around subprocess.run that sets some default options."""
|
97
|
return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
|
98
|
capture_output=True, text=True)
|
99
|
|
100
|
class Apt:
|
101
|
"""
|
102
|
This class represents an APT instance and can be used to call apt-get
|
103
|
commands with it.
|
104
|
"""
|
105
|
def __init__(self, apt_conf: str) -> None:
|
106
|
"""Initialize this Apt object."""
|
107
|
self.apt_conf = apt_conf
|
108
|
|
109
|
def get(self, *args: str, **kwargs) -> CP:
|
110
|
"""
|
111
|
Run apt-get with the specified arguments and raise a meaningful AptError
|
112
|
when something goes wrong.
|
113
|
"""
|
114
|
command = ['apt-get', '-c', self.apt_conf, *args]
|
115
|
try:
|
116
|
cp = run(command, **kwargs)
|
117
|
except FileNotFoundError:
|
118
|
msg = _('couldnt_execute_{}_is_it_installed').format('apt-get')
|
119
|
raise AptError(msg)
|
120
|
|
121
|
if cp.returncode != 0:
|
122
|
msg = _('command_{}_failed').format(' '.join(command))
|
123
|
raise AptError(msg, cp)
|
124
|
|
125
|
return cp
|
126
|
|
127
|
def cache_dir() -> Path:
|
128
|
"""
|
129
|
Return the directory used to cache data (APT configurations, keyrings) to
|
130
|
speed up repeated operations.
|
131
|
|
132
|
This function first ensures the directory exists.
|
133
|
"""
|
134
|
default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
|
135
|
return default_apt_cache_dir
|
136
|
|
137
|
class SourcesList:
|
138
|
"""Representation of apt's sources.list contents."""
|
139
|
def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
|
140
|
"""Initialize this SourcesList."""
|
141
|
self.codename = None
|
142
|
self.list = [*list]
|
143
|
self.has_extra_entries = bool(self.list)
|
144
|
|
145
|
if codename is not None:
|
146
|
if codename not in default_lists:
|
147
|
raise DistroError(_('distro_{}_unknown').format(codename))
|
148
|
|
149
|
self.codename = codename
|
150
|
self.list.extend(default_lists[codename])
|
151
|
|
152
|
def identity(self) -> str:
|
153
|
"""
|
154
|
Produce a string that uniquely identifies this sources.list contents.
|
155
|
"""
|
156
|
if self.codename and not self.has_extra_entries:
|
157
|
return self.codename
|
158
|
|
159
|
return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
|
160
|
|
161
|
def apt_conf(directory: Path) -> str:
|
162
|
"""
|
163
|
Given local APT's directory, produce a configuration suitable for running
|
164
|
APT there.
|
165
|
|
166
|
'directory' must not contain any special characters including quotes and
|
167
|
spaces.
|
168
|
"""
|
169
|
return f'''
|
170
|
Architecture "amd64";
|
171
|
Dir "{directory}";
|
172
|
Dir::State "{directory}/var/lib/apt";
|
173
|
Dir::State::status "{directory}/var/lib/dpkg/status";
|
174
|
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
|
175
|
Dir::Etc::SourceParts "";
|
176
|
Dir::Cache "{directory}/var/cache/apt";
|
177
|
pkgCacheGen::Essential "none";
|
178
|
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
|
179
|
'''
|
180
|
|
181
|
def apt_keyring(keys: [str]) -> bytes:
|
182
|
"""
|
183
|
Download the requested keys if necessary and export them as a keyring
|
184
|
suitable for passing to APT.
|
185
|
|
186
|
The keyring is returned as a bytes value that should be written to a file.
|
187
|
"""
|
188
|
try:
|
189
|
from gnupg import GPG
|
190
|
except ModuleNotFoundError:
|
191
|
raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg'))
|
192
|
|
193
|
gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
|
194
|
for key in keys:
|
195
|
if gpg.list_keys(keys=[key]) != []:
|
196
|
continue
|
197
|
|
198
|
if gpg.recv_keys(default_keyserver, key).imported == 0:
|
199
|
raise GpgError(_('gpg_couldnt_recv_key_{}').format(key))
|
200
|
|
201
|
return gpg.export_keys(keys, armor=False, minimal=True)
|
202
|
|
203
|
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
|
204
|
"""
|
205
|
Zip an APT root directory for later use and move the zipfile to the
|
206
|
requested destination.
|
207
|
"""
|
208
|
temporary_zip_path = None
|
209
|
try:
|
210
|
tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
|
211
|
dir=cache_dir(), delete=False)
|
212
|
temporary_zip_path = Path(tmpfile.name)
|
213
|
|
214
|
to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
|
215
|
|
216
|
with zipfile.ZipFile(tmpfile, 'w') as zf:
|
217
|
for member in apt_root.rglob('*'):
|
218
|
relative = member.relative_to(apt_root)
|
219
|
if relative not in to_skip:
|
220
|
# This call will also properly add empty folders to zip file
|
221
|
zf.write(member, relative, zipfile.ZIP_DEFLATED)
|
222
|
|
223
|
shutil.move(temporary_zip_path, destination_zip)
|
224
|
finally:
|
225
|
if temporary_zip_path is not None and temporary_zip_path.exists():
|
226
|
temporary_zip_path.unlink()
|
227
|
|
228
|
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
|
229
|
"""
|
230
|
Create files and directories necessary for running APT without root rights
|
231
|
inside 'directory'.
|
232
|
|
233
|
'directory' must not contain any special characters including quotes and
|
234
|
spaces and must be empty.
|
235
|
|
236
|
Return an Apt object that can be used to call apt-get commands.
|
237
|
"""
|
238
|
apt_root = directory / 'apt_root'
|
239
|
|
240
|
conf_text = apt_conf(apt_root)
|
241
|
keyring_bytes = apt_keyring(keys)
|
242
|
|
243
|
apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
|
244
|
if apt_zipfile.exists():
|
245
|
with zipfile.ZipFile(apt_zipfile) as zf:
|
246
|
zf.extractall(apt_root)
|
247
|
|
248
|
for to_create in (
|
249
|
apt_root / 'var' / 'lib' / 'apt' / 'partial',
|
250
|
apt_root / 'var' / 'lib' / 'apt' / 'lists',
|
251
|
apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
|
252
|
apt_root / 'etc' / 'apt' / 'preferences.d',
|
253
|
apt_root / 'var' / 'lib' / 'dpkg',
|
254
|
apt_root / 'var' / 'log' / 'apt'
|
255
|
):
|
256
|
to_create.mkdir(parents=True, exist_ok=True)
|
257
|
|
258
|
conf_path = apt_root / 'etc' / 'apt.conf'
|
259
|
trusted_path = apt_root / 'etc' / 'trusted.gpg'
|
260
|
status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
|
261
|
list_path = apt_root / 'etc' / 'apt.sources.list'
|
262
|
|
263
|
conf_path.write_text(conf_text)
|
264
|
trusted_path.write_bytes(keyring_bytes)
|
265
|
status_path.touch()
|
266
|
list_path.write_text('\n'.join(list.list))
|
267
|
|
268
|
apt = Apt(str(conf_path))
|
269
|
apt.get('update')
|
270
|
|
271
|
cache_apt_root(apt_root, apt_zipfile)
|
272
|
|
273
|
return apt
|
274
|
|
275
|
@contextmanager
|
276
|
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
|
277
|
"""
|
278
|
Create a temporary directory with proper local APT configuration in it.
|
279
|
Yield an Apt object that can be used to issue apt-get commands.
|
280
|
|
281
|
This function returns a context manager that will remove the directory on
|
282
|
close.
|
283
|
"""
|
284
|
with TemporaryDirectory() as td:
|
285
|
td = Path(td)
|
286
|
yield setup_local_apt(td, list, keys)
|
287
|
|
288
|
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
|
289
|
destination_dir: Path, with_deps: bool) -> [str]:
|
290
|
"""
|
291
|
Set up a local APT, update it using the specified sources.list configuration
|
292
|
and use it to download the specified packages.
|
293
|
|
294
|
This function downloads .deb files of packages matching the amd64
|
295
|
architecture (which includes packages with architecture 'all') as well as
|
296
|
all their corresponding source package files and (if requested) the debs
|
297
|
and source files of all their declared dependencies.
|
298
|
|
299
|
Return value is a list of names of all downloaded files.
|
300
|
"""
|
301
|
install_line_regex = re.compile(r'^Inst (?P<name>\S+) \((?P<version>\S+) ')
|
302
|
|
303
|
with local_apt(list, keys) as apt:
|
304
|
if with_deps:
|
305
|
cp = apt.get('install', '--yes', '--just-print', *packages)
|
306
|
|
307
|
lines = cp.stdout.split('\n')
|
308
|
matches = [install_line_regex.match(l) for l in lines]
|
309
|
packages = [f'{m.group("name")}={m.group("version")}'
|
310
|
for m in matches if m]
|
311
|
|
312
|
if not packages:
|
313
|
raise AptError(_('apt_install_output_not_understood'), cp)
|
314
|
|
315
|
# Download .debs to indirectly to destination_dir by first placing them
|
316
|
# in a temporary subdirectory.
|
317
|
with TemporaryDirectory(dir=destination_dir) as td:
|
318
|
td = Path(td)
|
319
|
cp = apt.get('download', *packages, cwd=td)
|
320
|
|
321
|
deb_name_regex = re.compile(
|
322
|
r'''
|
323
|
^
|
324
|
(?P<name>[^_]+)
|
325
|
_
|
326
|
(?P<ver>[^_]+)
|
327
|
_
|
328
|
.+ # architecture (or 'all')
|
329
|
\.deb
|
330
|
$
|
331
|
''',
|
332
|
re.VERBOSE)
|
333
|
|
334
|
names_vers = []
|
335
|
downloaded = []
|
336
|
for deb_file in td.iterdir():
|
337
|
match = deb_name_regex.match(deb_file.name)
|
338
|
if match is None:
|
339
|
msg = _('apt_download_gave_bad_filename_{}')\
|
340
|
.format(deb_file.name)
|
341
|
raise AptError(msg, cp)
|
342
|
|
343
|
names_vers.append((
|
344
|
unquote(match.group('name')),
|
345
|
unquote(match.group('ver'))
|
346
|
))
|
347
|
downloaded.append(deb_file.name)
|
348
|
|
349
|
apt.get('source', '--download-only',
|
350
|
*[f'{n}={v}' for n, v in names_vers], cwd=td)
|
351
|
|
352
|
for source_file in td.iterdir():
|
353
|
if source_file.name in downloaded:
|
354
|
continue
|
355
|
|
356
|
downloaded.append(source_file.name)
|
357
|
|
358
|
for filename in downloaded:
|
359
|
shutil.move(td / filename, destination_dir / filename)
|
360
|
|
361
|
return downloaded
|
362
|
|
363
|
@contextmanager
|
364
|
def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
|
365
|
-> Iterable[Piggybacked]:
|
366
|
"""
|
367
|
Resolve resources from APT. Optionally, use package files (.deb's, etc.)
|
368
|
from a specified directory instead of resolving and downloading them.
|
369
|
|
370
|
The directories and files created for the yielded Piggybacked object shall
|
371
|
be deleted when this context manager gets closed.
|
372
|
"""
|
373
|
assert piggyback_def['system'] == 'apt'
|
374
|
|
375
|
with TemporaryDirectory() as td:
|
376
|
td = Path(td)
|
377
|
root = td / 'root'
|
378
|
root.mkdir()
|
379
|
|
380
|
if foreign_packages is None:
|
381
|
archives = td / 'archives'
|
382
|
archives.mkdir()
|
383
|
else:
|
384
|
archives = foreign_packages / 'apt'
|
385
|
archives.mkdir(exist_ok=True)
|
386
|
|
387
|
if [*archives.glob('*.deb')] == []:
|
388
|
sources_list = SourcesList(piggyback_def.get('sources_list', []),
|
389
|
piggyback_def.get('distribution'))
|
390
|
packages = piggyback_def['packages']
|
391
|
with_deps = piggyback_def['dependencies']
|
392
|
pgp_keys = [
|
393
|
*default_keys,
|
394
|
*piggyback_def.get('trusted_keys', [])
|
395
|
]
|
396
|
|
397
|
download_apt_packages(
|
398
|
list=sources_list,
|
399
|
keys=pgp_keys,
|
400
|
packages=packages,
|
401
|
destination_dir=archives,
|
402
|
with_deps=with_deps
|
403
|
)
|
404
|
|
405
|
for deb in archives.glob('*.deb'):
|
406
|
command = ['dpkg-deb', '-x', str(deb), str(root)]
|
407
|
try:
|
408
|
cp = run(command)
|
409
|
except FileNotFoundError:
|
410
|
msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb'))
|
411
|
raise AptError(msg)
|
412
|
|
413
|
if cp.returncode != 0:
|
414
|
msg = _('command_{}_failed').format(' '.join(command))
|
415
|
raise AptError(msg, cp)
|
416
|
|
417
|
docs_dir = root / 'usr' / 'share' / 'doc'
|
418
|
copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
|
419
|
if docs_dir.exists() else []
|
420
|
copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
|
421
|
for p in copyright_paths if p.exists()]
|
422
|
|
423
|
standard_depends = piggyback_def.get('depend_on_base_packages', True)
|
424
|
must_depend = [{'identifier': 'apt-common-licenses'}] \
|
425
|
if standard_depends else []
|
426
|
|
427
|
yield Piggybacked(
|
428
|
archives={'apt': archives},
|
429
|
roots={'.apt-root': root},
|
430
|
package_license_files=copyright_paths,
|
431
|
package_must_depend=must_depend
|
432
|
)
|