Project

General

Profile

Download (13.8 KB) Statistics
| Branch: | Tag: | Revision:

hydrilla-builder / src / hydrilla / builder / build.py @ 33097569

1
# SPDX-License-Identifier: AGPL-3.0-or-later
2

    
3
# Building Hydrilla packages.
4
#
5
# This file is part of Hydrilla
6
#
7
# Copyright (C) 2022 Wojtek Kosior
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU Affero General Public License as
11
# published by the Free Software Foundation, either version 3 of the
12
# License, or (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU Affero General Public License for more details.
18
#
19
# You should have received a copy of the GNU Affero General Public License
20
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
#
22
#
23
# I, Wojtek Kosior, thereby promise not to sue for violation of this
24
# file's license. Although I request that you do not make use this code
25
# in a proprietary program, I am not going to enforce this in court.
26

    
27
import json
28
import re
29
import zipfile
30
from pathlib import Path
31
from hashlib import sha256
32
from sys import stderr
33

    
34
import jsonschema
35
import click
36

    
37
from .. import util
38

    
39
here = Path(__file__).resolve().parent
40

    
41
_ = util.translation(here / 'locales').gettext
42

    
43
index_validator = util.validator_for('package_source-1.schema.json')
44

    
45
class FileReferenceError(Exception):
46
    """
47
    Exception used to report various problems concerning files referenced from
48
    source package's index.json.
49
    """
50

    
51
class ReuseError(Exception):
52
    """
53
    Exception used to report various problems when calling the REUSE tool.
54
    """
55

    
56
class FileBuffer:
57
    """
58
    Implement a file-like object that buffers data written to it.
59
    """
60
    def __init__(self):
61
        """
62
        Initialize FileBuffer.
63
        """
64
        self.chunks = []
65

    
66
    def write(self, b):
67
        """
68
        Buffer 'b', return number of bytes buffered.
69

    
70
        'b' is expected to be an instance of 'bytes' or 'str', in which case it
71
        gets encoded as UTF-8.
72
        """
73
        if type(b) is str:
74
            b = b.encode()
75
        self.chunks.append(b)
76
        return len(b)
77

    
78
    def flush(self):
79
        """
80
        A no-op mock of file-like object's flush() method.
81
        """
82
        pass
83

    
84
    def get_bytes(self):
85
        """
86
        Return all data written so far concatenated into a single 'bytes'
87
        object.
88
        """
89
        return b''.join(self.chunks)
90

    
91
def generate_spdx_report(root):
92
    """
93
    Use REUSE tool to generate an SPDX report for sources under 'root' and
94
    return the report's contents as 'bytes'.
95

    
96
    'root' shall be an instance of pathlib.Path.
97

    
98
    In case the directory tree under 'root' does not constitute a
99
    REUSE-compliant package, linting report is printed to standard output and
100
    an exception is raised.
101

    
102
    In case the reuse package is not installed, an exception is also raised.
103
    """
104
    try:
105
        from reuse._main import main as reuse_main
106
    except ModuleNotFoundError:
107
        ReuseError(_('couldnt_import_reuse_is_it_installed'))
108

    
109
    mocked_output = FileBuffer()
110
    if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
111
        stderr.write(mocked_output.get_bytes().decode())
112
        raise ReuseError(_('spdx_report_from_reuse_incompliant'))
113

    
114
    mocked_output = FileBuffer()
115
    if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
116
        stderr.write(mocked_output.get_bytes().decode())
117
        raise ReuseError("Couldn't generate an SPDX report for package.")
118

    
119
    return mocked_output.get_bytes()
120

    
121
class FileRef:
122
    """Represent reference to a file in the package."""
123
    def __init__(self, path: Path, contents: bytes):
124
        """Initialize FileRef."""
125
        self.include_in_distribution = False
126
        self.include_in_zipfile      = True
127
        self.path                    = path
128
        self.contents                = contents
129

    
130
        self.contents_hash = sha256(contents).digest().hex()
131

    
132
    def make_ref_dict(self, filename: str):
133
        """
134
        Represent the file reference through a dict that can be included in JSON
135
        defintions.
136
        """
137
        return {
138
            'file':   filename,
139
            'sha256': self.contents_hash
140
        }
141

    
142
class Build:
143
    """
144
    Build a Hydrilla package.
145
    """
146
    def __init__(self, srcdir, index_json_path):
147
        """
148
        Initialize a build. All files to be included in a distribution package
149
        are loaded into memory, all data gets validated and all necessary
150
        computations (e.g. preparing of hashes) are performed.
151

    
152
        'srcdir' and 'index_json' are expected to be pathlib.Path objects.
153
        """
154
        self.srcdir          = srcdir.resolve()
155
        self.index_json_path = index_json_path
156
        self.files_by_path   = {}
157
        self.resource_list   = []
158
        self.mapping_list    = []
159

    
160
        if not index_json_path.is_absolute():
161
            self.index_json_path = (self.srcdir / self.index_json_path)
162

    
163
        self.index_json_path = self.index_json_path.resolve()
164

    
165
        with open(self.index_json_path, 'rt') as index_file:
166
            index_json_text = index_file.read()
167

    
168
        index_obj = json.loads(util.strip_json_comments(index_json_text))
169

    
170
        self.files_by_path[self.srcdir / 'index.json'] = \
171
            FileRef(self.srcdir / 'index.json', index_json_text.encode())
172

    
173
        self._process_index_json(index_obj)
174

    
175
    def _process_file(self, filename: str, include_in_distribution: bool=True):
176
        """
177
        Resolve 'filename' relative to srcdir, load it to memory (if not loaded
178
        before), compute its hash and store its information in
179
        'self.files_by_path'.
180

    
181
        'filename' shall represent a relative path using '/' as a separator.
182

    
183
        if 'include_in_distribution' is True it shall cause the file to not only
184
        be included in the source package's zipfile, but also written as one of
185
        built package's files.
186

    
187
        Return file's reference object that can be included in JSON defintions
188
        of various kinds.
189
        """
190
        path = self.srcdir
191
        for segment in filename.split('/'):
192
            path /= segment
193

    
194
        path = path.resolve()
195
        if not path.is_relative_to(self.srcdir):
196
            raise FileReferenceError(_('loading_{}_outside_package_dir')
197
                                     .format(filename))
198

    
199
        if str(path.relative_to(self.srcdir)) == 'index.json':
200
            raise FileReferenceError(_('loading_reserved_index_json'))
201

    
202
        file_ref = self.files_by_path.get(path)
203
        if file_ref is None:
204
            with open(path, 'rb') as file_handle:
205
                contents = file_handle.read()
206

    
207
            file_ref = FileRef(path, contents)
208
            self.files_by_path[path] = file_ref
209

    
210
        if include_in_distribution:
211
            file_ref.include_in_distribution = True
212

    
213
        return file_ref.make_ref_dict(filename)
214

    
215
    def _prepare_source_package_zip(self, root_dir_name: str):
216
        """
217
        Create and store in memory a .zip archive containing files needed to
218
        build this source package.
219

    
220
        'root_dir_name' shall not contain any slashes ('/').
221

    
222
        Return zipfile's sha256 sum's hexstring.
223
        """
224
        fb = FileBuffer()
225
        root_dir_path = Path(root_dir_name)
226

    
227
        def zippath(file_path):
228
            file_path = root_dir_path / file_path.relative_to(self.srcdir)
229
            return file_path.as_posix()
230

    
231
        with zipfile.ZipFile(fb, 'w') as xpi:
232
            for file_ref in self.files_by_path.values():
233
                if file_ref.include_in_zipfile:
234
                    xpi.writestr(zippath(file_ref.path), file_ref.contents)
235

    
236
        self.source_zip_contents = fb.get_bytes()
237

    
238
        return sha256(self.source_zip_contents).digest().hex()
239

    
240
    def _process_item(self, item_def: dict):
241
        """
242
        Process 'item_def' as definition of a resource/mapping and store in
243
        memory its processed form and files used by it.
244

    
245
        Return a minimal item reference suitable for using in source
246
        description.
247
        """
248
        copy_props = ['type', 'identifier', 'long_name', 'uuid', 'description']
249
        if 'comment' in item_def:
250
            copy_props.append('comment')
251

    
252
        if item_def['type'] == 'resource':
253
            item_list = self.resource_list
254

    
255
            copy_props.append('revision')
256

    
257
            script_file_refs = [self._process_file(f['file'])
258
                                for f in item_def.get('scripts', [])]
259

    
260
            deps = [{'identifier': res_ref['identifier']}
261
                    for res_ref in item_def.get('dependencies', [])]
262

    
263
            new_item_obj = {
264
                'dependencies': deps,
265
                'scripts':      script_file_refs
266
            }
267
        else:
268
            item_list = self.mapping_list
269

    
270
            payloads = {}
271
            for pat, res_ref in item_def.get('payloads', {}).items():
272
                payloads[pat] = {'identifier': res_ref['identifier']}
273

    
274
            new_item_obj = {
275
                'payloads': payloads
276
            }
277

    
278
        new_item_obj.update([(p, item_def[p]) for p in copy_props])
279

    
280
        new_item_obj['version'] = util.normalize_version(item_def['version'])
281
        new_item_obj['api_schema_version'] = [1, 0, 1]
282
        new_item_obj['source_copyright'] = self.copyright_file_refs
283
        new_item_obj['source_name'] = self.source_name
284

    
285
        item_list.append(new_item_obj)
286

    
287
        props_in_ref = ('type', 'identifier', 'version', 'long_name')
288
        return dict([(prop, new_item_obj[prop]) for prop in props_in_ref])
289

    
290
    def _process_index_json(self, index_obj: dict):
291
        """
292
        Process 'index_obj' as contents of source package's index.json and store
293
        in memory this source package's zipfile as well as package's individual
294
        files and computed definitions of the source package and items defined
295
        in it.
296
        """
297
        index_validator.validate(index_obj)
298

    
299
        self.source_name = index_obj['source_name']
300

    
301
        generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
302
        if generate_spdx:
303
            contents  = generate_spdx_report(self.srcdir)
304
            spdx_path = (self.srcdir / 'report.spdx').resolve()
305
            spdx_ref  = FileRef(spdx_path, contents)
306

    
307
            spdx_ref.include_in_zipfile = False
308
            self.files_by_path[spdx_path] = spdx_ref
309

    
310
        self.copyright_file_refs = \
311
            [self._process_file(f['file']) for f in index_obj['copyright']]
312

    
313
        if generate_spdx and not spdx_ref.include_in_distribution:
314
            raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
315

    
316
        item_refs = [self._process_item(d) for d in index_obj['definitions']]
317

    
318
        for file_ref in index_obj.get('additional_files', []):
319
            self._process_file(file_ref['file'], include_in_distribution=False)
320

    
321
        root_dir_path = Path(self.source_name)
322

    
323
        source_archives_obj = {
324
            'zip' : {
325
                'sha256': self._prepare_source_package_zip(root_dir_path)
326
            }
327
        }
328

    
329
        self.source_description = {
330
            'api_schema_version': [1, 0, 1],
331
            'source_name':        self.source_name,
332
            'source_copyright':   self.copyright_file_refs,
333
            'upstream_url':       index_obj['upstream_url'],
334
            'definitions':        item_refs,
335
            'source_archives':    source_archives_obj
336
        }
337

    
338
        if 'comment' in index_obj:
339
            self.source_description['comment'] = index_obj['comment']
340

    
341
    def write_source_package_zip(self, dstpath: Path):
342
        """
343
        Create a .zip archive containing files needed to build this source
344
        package and write it at 'dstpath'.
345
        """
346
        with open(dstpath, 'wb') as output:
347
            output.write(self.source_zip_contents)
348

    
349
    def write_package_files(self, dstpath: Path):
350
        """Write package files under 'dstpath' for distribution."""
351
        file_dir_path = (dstpath / 'file' / 'sha256').resolve()
352
        file_dir_path.mkdir(parents=True, exist_ok=True)
353

    
354
        for file_ref in self.files_by_path.values():
355
            if file_ref.include_in_distribution:
356
                file_path = file_dir_path / file_ref.contents_hash
357
                file_path.write_bytes(file_ref.contents)
358

    
359
        source_dir_path = (dstpath / 'source').resolve()
360
        source_dir_path.mkdir(parents=True, exist_ok=True)
361
        source_name = self.source_description["source_name"]
362

    
363
        with open(source_dir_path / f'{source_name}.json', 'wt') as output:
364
            json.dump(self.source_description, output)
365

    
366
        with open(source_dir_path / f'{source_name}.zip', 'wb') as output:
367
            output.write(self.source_zip_contents)
368

    
369
        for item_type, item_list in [
370
                ('resource', self.resource_list),
371
                ('mapping', self.mapping_list)
372
        ]:
373
            item_type_dir_path = (dstpath / item_type).resolve()
374

    
375
            for item_def in item_list:
376
                item_dir_path = item_type_dir_path / item_def['identifier']
377
                item_dir_path.mkdir(parents=True, exist_ok=True)
378

    
379
                version = '.'.join([str(n) for n in item_def['version']])
380
                with open(item_dir_path / version, 'wt') as output:
381
                    json.dump(item_def, output)
382

    
383
dir_type = click.Path(exists=True, file_okay=False, resolve_path=True)
384

    
385
@click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True,
386
              help=_('source_directory_to_build_from'))
387
@click.option('-i', '--index-json', default='index.json', type=click.Path(),
388
              help=_('path_instead_of_index_json'))
389
@click.option('-d', '--dstdir', type=dir_type, required=True,
390
              help=_('built_package_files_destination'))
391
def perform(srcdir, index_json, dstdir):
392
    """<this will be replaced by a localized docstring for Click to pick up>"""
393
    build = Build(Path(srcdir), Path(index_json))
394
    build.write_package_files(Path(dstdir))
395

    
396
perform.__doc__ = _('build_package_from_srcdir_to_dstdir')
397

    
398
perform = click.command()(perform)
(3-3/3)