1
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
2
|
|
3
|
# Using a local APT.
|
4
|
#
|
5
|
# This file is part of Hydrilla
|
6
|
#
|
7
|
# Copyright (C) 2022 Wojtek Kosior
|
8
|
#
|
9
|
# This program is free software: you can redistribute it and/or modify
|
10
|
# it under the terms of the GNU Affero General Public License as
|
11
|
# published by the Free Software Foundation, either version 3 of the
|
12
|
# License, or (at your option) any later version.
|
13
|
#
|
14
|
# This program is distributed in the hope that it will be useful,
|
15
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
16
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
17
|
# GNU Affero General Public License for more details.
|
18
|
#
|
19
|
# You should have received a copy of the GNU Affero General Public License
|
20
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
21
|
#
|
22
|
#
|
23
|
# I, Wojtek Kosior, thereby promise not to sue for violation of this
|
24
|
# file's license. Although I request that you do not make use this code
|
25
|
# in a proprietary program, I am not going to enforce this in court.
|
26
|
|
27
|
# Enable using with Python 3.7.
|
28
|
from __future__ import annotations
|
29
|
|
30
|
import zipfile
|
31
|
import shutil
|
32
|
import re
|
33
|
import subprocess
|
34
|
CP = subprocess.CompletedProcess
|
35
|
from pathlib import Path
|
36
|
from tempfile import TemporaryDirectory, NamedTemporaryFile
|
37
|
from hashlib import sha256
|
38
|
from contextlib import contextmanager
|
39
|
from typing import Optional, Iterable
|
40
|
|
41
|
from .. import util
|
42
|
|
43
|
here = Path(__file__).resolve().parent
|
44
|
|
45
|
_ = util.translation(here / 'locales').gettext
|
46
|
|
47
|
"""
|
48
|
Default cache directory to save APT configurations and downloaded GPG keys in.
|
49
|
"""
|
50
|
default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
|
51
|
|
52
|
"""
|
53
|
Default keyserver to use.
|
54
|
"""
|
55
|
default_keyserver = 'hkps://keyserver.ubuntu.com:443'
|
56
|
|
57
|
"""
|
58
|
Default keys to download when using a local APT.
|
59
|
"""
|
60
|
default_keys = [
|
61
|
# Trisquel
|
62
|
'B4EFB9F38D8AEBF1', 'B138CA450C05112F',
|
63
|
# Ubuntu
|
64
|
'40976EAF437D05B5', '3B4FE6ACC0B21F32', '871920D1991BC93C',
|
65
|
# Debian
|
66
|
'9D6D8F6BC857C906', '8B48AD6246925553', 'DCC9EFBF77E11517', '648ACFD622F3D138',
|
67
|
'54404762BBB6E853'
|
68
|
]
|
69
|
|
70
|
"""sources.list file contents for known distros."""
|
71
|
default_lists = {
|
72
|
'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
|
73
|
for type in ('deb', 'deb-src')
|
74
|
for suf in ('', '-updates', '-security')]
|
75
|
}
|
76
|
|
77
|
class GpgError(Exception):
|
78
|
"""
|
79
|
Exception used to report various problems when calling GPG.
|
80
|
"""
|
81
|
|
82
|
class DistroError(Exception):
|
83
|
"""
|
84
|
Exception used to report problems when resolving an OS distribution.
|
85
|
"""
|
86
|
|
87
|
class AptError(Exception):
|
88
|
"""
|
89
|
Exception used to report various problems when calling apt-* commands.
|
90
|
"""
|
91
|
def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
|
92
|
"""Initialize this AptError"""
|
93
|
if cp and cp.stdout:
|
94
|
msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
|
95
|
|
96
|
if cp and cp.stderr:
|
97
|
msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
|
98
|
|
99
|
super().__init__(msg)
|
100
|
|
101
|
class Apt:
|
102
|
"""
|
103
|
This class represents an APT instance and can be used to call apt-get
|
104
|
commands with it.
|
105
|
"""
|
106
|
def __init__(self, apt_conf: str) -> None:
|
107
|
"""Initialize this Apt object."""
|
108
|
self.apt_conf = apt_conf
|
109
|
|
110
|
def get(self, *args: str, **kwargs) -> CP:
|
111
|
"""
|
112
|
Run apt-get with the specified arguments and raise a meaningful AptError
|
113
|
when something goes wrong.
|
114
|
"""
|
115
|
command = ['apt-get', '-c', self.apt_conf, *args]
|
116
|
try:
|
117
|
cp = subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
|
118
|
capture_output=True, text=True)
|
119
|
except FileNotFoundError:
|
120
|
raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
|
121
|
|
122
|
if cp.returncode != 0:
|
123
|
msg = _('apt_get_command_{}_failed').format(' '.join(command))
|
124
|
raise AptError(msg, cp)
|
125
|
|
126
|
return cp
|
127
|
|
128
|
def cache_dir() -> Path:
|
129
|
"""
|
130
|
Return the directory used to cache data (APT configurations, keyrings) to
|
131
|
speed up repeated operations.
|
132
|
|
133
|
This function first ensures the directory exists.
|
134
|
"""
|
135
|
default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
|
136
|
return default_apt_cache_dir
|
137
|
|
138
|
class SourcesList:
|
139
|
"""Representation of apt's sources.list contents."""
|
140
|
def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
|
141
|
"""Initialize this SourcesList."""
|
142
|
self.codename = None
|
143
|
self.list = [*list]
|
144
|
self.has_extra_entries = bool(self.list)
|
145
|
|
146
|
if codename is not None:
|
147
|
if codename not in default_lists:
|
148
|
raise DistroError(_('distro_{}_unknown').format(codename))
|
149
|
|
150
|
self.codename = codename
|
151
|
self.list.extend(default_lists[codename])
|
152
|
|
153
|
def identity(self) -> str:
|
154
|
"""
|
155
|
Produce a string that uniquely identifies this sources.list contents.
|
156
|
"""
|
157
|
if self.codename and not self.has_extra_entries:
|
158
|
return self.codename
|
159
|
|
160
|
return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
|
161
|
|
162
|
def apt_conf(directory: Path) -> str:
|
163
|
"""
|
164
|
Given local APT's directory, produce a configuration suitable for running
|
165
|
APT there.
|
166
|
|
167
|
'directory' must not contain any special characters including quotes and
|
168
|
spaces.
|
169
|
"""
|
170
|
return f'''
|
171
|
Dir "{directory}";
|
172
|
Dir::State "{directory}/var/lib/apt";
|
173
|
Dir::State::status "{directory}/var/lib/dpkg/status";
|
174
|
Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
|
175
|
Dir::Etc::SourceParts "";
|
176
|
Dir::Cache "{directory}/var/cache/apt";
|
177
|
pkgCacheGen::Essential "none";
|
178
|
Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
|
179
|
'''
|
180
|
|
181
|
def apt_keyring(keys: [str]) -> bytes:
|
182
|
"""
|
183
|
Download the requested keys if necessary and export them as a keyring
|
184
|
suitable for passing to APT.
|
185
|
|
186
|
The keyring is returned as a bytes value that should be written to a file.
|
187
|
"""
|
188
|
try:
|
189
|
from gnupg import GPG
|
190
|
except ModuleNotFoundError:
|
191
|
raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
|
192
|
|
193
|
gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
|
194
|
for key in keys:
|
195
|
if gpg.list_keys(keys=[key]) != []:
|
196
|
continue
|
197
|
|
198
|
if gpg.recv_keys(default_keyserver, key).imported == 0:
|
199
|
raise GpgError(_('gpg_couldnt_recv_key'))
|
200
|
|
201
|
return gpg.export_keys(keys, armor=False, minimal=True)
|
202
|
|
203
|
def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
|
204
|
"""
|
205
|
Zip an APT root directory for later use and move the zipfile to the
|
206
|
requested destination.
|
207
|
"""
|
208
|
temporary_zip_path = None
|
209
|
try:
|
210
|
tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
|
211
|
dir=cache_dir(), delete=False)
|
212
|
temporary_zip_path = Path(tmpfile.name)
|
213
|
|
214
|
to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
|
215
|
|
216
|
with zipfile.ZipFile(tmpfile, 'w') as zf:
|
217
|
for member in apt_root.rglob('*'):
|
218
|
relative = member.relative_to(apt_root)
|
219
|
if relative not in to_skip:
|
220
|
# This call will also properly add empty folders to zip file
|
221
|
zf.write(member, relative, zipfile.ZIP_DEFLATED)
|
222
|
|
223
|
shutil.move(temporary_zip_path, destination_zip)
|
224
|
finally:
|
225
|
if temporary_zip_path is not None and temporary_zip_path.exists():
|
226
|
temporary_zip_path.unlink()
|
227
|
|
228
|
def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
|
229
|
"""
|
230
|
Create files and directories necessary for running APT without root rights
|
231
|
inside 'directory'.
|
232
|
|
233
|
'directory' must not contain any special characters including quotes and
|
234
|
spaces and must be empty.
|
235
|
|
236
|
Return an Apt object that can be used to call apt-get commands.
|
237
|
"""
|
238
|
apt_root = directory / 'apt_root'
|
239
|
|
240
|
conf_text = apt_conf(apt_root)
|
241
|
keyring_bytes = apt_keyring(keys)
|
242
|
|
243
|
apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
|
244
|
if apt_zipfile.exists():
|
245
|
with zipfile.ZipFile(apt_zipfile) as zf:
|
246
|
zf.extractall(apt_root)
|
247
|
|
248
|
for to_create in (
|
249
|
apt_root / 'var' / 'lib' / 'apt' / 'partial',
|
250
|
apt_root / 'var' / 'lib' / 'apt' / 'lists',
|
251
|
apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
|
252
|
apt_root / 'etc' / 'apt' / 'preferences.d',
|
253
|
apt_root / 'var' / 'lib' / 'dpkg',
|
254
|
apt_root / 'var' / 'log' / 'apt'
|
255
|
):
|
256
|
to_create.mkdir(parents=True, exist_ok=True)
|
257
|
|
258
|
conf_path = apt_root / 'etc' / 'apt.conf'
|
259
|
trusted_path = apt_root / 'etc' / 'trusted.gpg'
|
260
|
status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
|
261
|
list_path = apt_root / 'etc' / 'apt.sources.list'
|
262
|
|
263
|
conf_path.write_text(conf_text)
|
264
|
trusted_path.write_bytes(keyring_bytes)
|
265
|
status_path.touch()
|
266
|
list_path.write_text('\n'.join(list.list))
|
267
|
|
268
|
apt = Apt(str(conf_path))
|
269
|
apt.get('update')
|
270
|
|
271
|
cache_apt_root(apt_root, apt_zipfile)
|
272
|
|
273
|
return apt
|
274
|
|
275
|
@contextmanager
|
276
|
def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
|
277
|
"""
|
278
|
Create a temporary directory with proper local APT configuration in it.
|
279
|
Yield an Apt object that can be used to issue apt-get commands.
|
280
|
|
281
|
This function returns a context manager that will remove the directory on
|
282
|
close.
|
283
|
"""
|
284
|
with TemporaryDirectory() as td:
|
285
|
td = Path(td)
|
286
|
yield setup_local_apt(td, list, keys)
|
287
|
|
288
|
def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
|
289
|
destination_dir: Path, with_deps=False) -> [str]:
|
290
|
"""
|
291
|
Set up a local APT, update it using the specified sources.list configuration
|
292
|
and use it to download the specified packages.
|
293
|
|
294
|
This function downloads a .deb file of the packages matching the current
|
295
|
architecture (which includes packages with architecture 'all') as well as
|
296
|
all theis corresponding source package files and (if requested) the debs
|
297
|
and source files of all their declared dependencies.
|
298
|
|
299
|
Return value is a list of names of all downloaded files.
|
300
|
"""
|
301
|
with local_apt(list, keys) as apt:
|
302
|
if with_deps:
|
303
|
cp = apt.get('install', '--yes', '--just-print', *packages)
|
304
|
|
305
|
deps_listing = re.match(
|
306
|
r'''
|
307
|
.*
|
308
|
The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
|
309
|
(.*)
|
310
|
0\supgraded,
|
311
|
''',
|
312
|
cp.stdout,
|
313
|
re.MULTILINE | re.DOTALL | re.VERBOSE)
|
314
|
|
315
|
if deps_listing is None:
|
316
|
raise AptError(_('apt_install_output_not_understood'), cp)
|
317
|
|
318
|
packages = deps_listing.group(1).split()
|
319
|
|
320
|
# Download .debs to indirectly to destination_dir by first placing them
|
321
|
# in a temporary subdirectory.
|
322
|
with TemporaryDirectory(dir=destination_dir) as td:
|
323
|
td = Path(td)
|
324
|
cp = apt.get('download', *packages, cwd=td)
|
325
|
|
326
|
deb_name_regex = re.compile(
|
327
|
r'''
|
328
|
^
|
329
|
(?P<name>[^_]+)
|
330
|
_
|
331
|
(?P<ver>[^_]+)
|
332
|
_
|
333
|
.+ # architecture (or 'all')
|
334
|
\.deb
|
335
|
$
|
336
|
''',
|
337
|
re.VERBOSE)
|
338
|
|
339
|
names_vers = []
|
340
|
downloaded = []
|
341
|
for deb_file in td.glob('*'):
|
342
|
match = deb_name_regex.match(deb_file.name)
|
343
|
if match is None:
|
344
|
msg = _('apt_download_gave_bad_filename_{}')\
|
345
|
.format(deb_file.name)
|
346
|
raise AptError(msg, cp)
|
347
|
|
348
|
names_vers.append((match.group('name'), match.group('ver')))
|
349
|
downloaded.append(deb_file.name)
|
350
|
|
351
|
apt.get('source', '--download-only',
|
352
|
*[f'{n}={v}' for n, v in names_vers], cwd=td)
|
353
|
|
354
|
for source_file in td.glob('*'):
|
355
|
if source_file.name in downloaded:
|
356
|
continue
|
357
|
|
358
|
downloaded.append(source_file.name)
|
359
|
|
360
|
for filename in downloaded:
|
361
|
shutil.move(td / filename, destination_dir / filename)
|
362
|
|
363
|
return downloaded
|