From 19ed511769468946bb94e3d46e43da91b897a5d9 Mon Sep 17 00:00:00 2001 From: Samuel Sloniker Date: Sun, 30 Jun 2024 08:43:56 -0700 Subject: [PATCH] Add code --- cppack.py | 111 ++++++++++++++++++++++++++++++++++++++++ unpack.py | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 cppack.py create mode 100644 unpack.py diff --git a/cppack.py b/cppack.py new file mode 100644 index 0000000..85a90af --- /dev/null +++ b/cppack.py @@ -0,0 +1,111 @@ +""" +CircuitPack packager - create CircuitPack packages +""" + +import os +import pathlib +import hashlib +from typing import Iterable, Union + + +def pack(contents: dict[str, bytes]) -> bytes: + """ + Pack the data from `contents`, a `dict` with paths (as `str`s) given as + keys and file contents (as `bytes`es) as values, and return the archive + as a `bytes`. + """ + + header = b"CPAv001\n" + + index = b"" + data = b"" + for name, content in contents.items(): + index += f"{name} {len(content)}\n".encode("utf-8") + data += content + + length = f"{len(index):07}\n".encode("utf-8") + + return header + length + index + data + + +def pack_files( + paths: Iterable[Union[str, os.PathLike[str]]], + directory: Union[str, os.PathLike[str]] = ".", +) -> bytes: + """ + Pack the files specified in `paths`, relative to `directory` (which + corresponds to the root of the device), and return the archive as a + `bytes`. + """ + + contents = {} + for path in paths: + with open(os.path.join(directory, path), "rb") as file_to_pack: + contents[str(path)] = file_to_pack.read() + return pack(contents) + + +def pack_dir(directory: Union[str, os.PathLike[str]]) -> bytes: + """ + Pack all files in `directory` (which corresponds to the root of the + device), and return the archive as a `bytes`. + """ + + paths = [] + + directory = pathlib.Path(directory) + + for root, _, files in os.walk(directory): + for file in files: + path = pathlib.Path(root, file).relative_to(directory) + paths.append(path) + + return pack_files(paths, directory) + + +def name_file(name: str, packed: bytes) -> str: + """ + Find and return the file name for the package named `name` with contents + `packed`. + """ + return name + "." + hashlib.sha256(packed).hexdigest()[:16] + ".cpa" + + +def pack_to_file( + name: str, + directory: Union[str, os.PathLike[str]], + output: Union[str, os.PathLike[str]] = ".", +) -> str: + """ + Pack all files in `directory` (which corresponds to the root of the + device), find the file name based on the package name (`name`) and the hash + of the archive, save the archive under this name in directory `output` + (defaults to the current directory), and return the path to the archive. + """ + + packed = pack_dir(directory) + + path = os.path.join(output, name_file(name, packed)) + + with open(path, "wb+") as package_file: + package_file.write(packed) + + return path + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(prog="cppack") + + parser.add_argument("name", help="package name") + parser.add_argument( + "directory", help="directory to pack; corresponds to device root" + ) + parser.add_argument( + "--output", help="directory in which to place package", default="." + ) + + args = parser.parse_args() + + print(pack_to_file(args.name, args.directory, args.output)) diff --git a/unpack.py b/unpack.py new file mode 100644 index 0000000..53ba19b --- /dev/null +++ b/unpack.py @@ -0,0 +1,148 @@ +""" +CircuitPack extractor - install CircuitPack packages +""" + +import os +import requests + +try: + from typing import Sequence, BinaryIO, Iterable, Union +except ImportError: + pass + + +def _mkdir_p(parts: Sequence[str]) -> None: + whole = "/".join(parts) + + # This function is based in part on code from CPython's `pathlib` module, + # specifically the `pathlib2` backport, the relevant portion of which does + # not differ from CPython's tree as of the time of writing. + # + # The code used is available at the following URL: + # + # https://github.com/jazzband/pathlib2/blob/0ac11b8d697069abea58a8188061f1a1870977af/src/pathlib2/__init__.py#L1124 + # + # That file includes the following copyright notice: + # + # Copyright (c) 2001-2022 Python Software Foundation; All Rights Reserved + # Copyright (c) 2014-2022 Matthias C. M. Troffaes and contributors + # Copyright (c) 2012-2014 Antoine Pitrou and contributors + # + # Distributed under the terms of the MIT License. + # + # (end copyright notice) + # + # I determined the logic and magic numbers to use for directory checking + # from CPython's `stat` module, but did not directly use a significant + # amount of its code. `stat`'s source code can be found here: + # + # https://github.com/python/cpython/blob/717d2bd1d95879992d30d417399b1ec03ee05028/Lib/stat.py + + try: + os.mkdir(whole) + except FileNotFoundError: + _mkdir_p(parts[:-1]) + os.mkdir(whole) + except OSError: + if not os.stat(whole).st_mode & 0o170000 == 0o040000: + raise + + # (end code derived from `pathlib`) + +class InvalidFileError(BaseException): + """ + CircuitPack package file is invalid. + """ + + +class _ResponseFile: # pylint: disable=too-few-public-methods + def __init__( + self, response: requests.Response, chunk_size: int = 128 + ) -> None: + self.iter = response.iter_content(chunk_size) + self.buffer = b"" + + def read( # pylint: disable=missing-function-docstring + self, length: int + ) -> bytes: + try: + while len(self.buffer) < length: + self.buffer += next(self.iter) + except StopIteration: + pass + + response = self.buffer[:length] + self.buffer = self.buffer[length:] + + return response + + +def unpack(package: Union[BinaryIO, _ResponseFile], target: str) -> Iterable[str]: + # TODO: add a way to limit where packages can store files? + """ + Unpack the given file-like object `package` into the directory `target`, + which may, but is not required to, already exist. Return the unpacked + paths, relative to `target`. + """ + + if not target.endswith("/"): + target += "/" + + if package.read(8) != b"CPAv001\n": + raise InvalidFileError("wrong header") + + try: + length = int(package.read(8).decode("utf-8")) + except ValueError as exception: + raise InvalidFileError("invalid index length") from exception + + raw_index_bytes = package.read(length) + if len(raw_index_bytes) < length: + raise InvalidFileError("file cut short (incomplete index)") + + try: + raw_index = raw_index_bytes.decode("utf-8") + except UnicodeDecodeError as exception: + raise InvalidFileError("invalid character in index") from exception + + index_split = [line.split(" ") for line in raw_index.split("\n")[:-1]] + + try: + index = {file_name: int(length) for file_name, length in index_split} + except ValueError as exception: + raise InvalidFileError( + "incorrect number of fields or invalid number in index" + ) from exception + + for file_name, length in index.items(): + path = target + file_name + _mkdir_p(path.split("/")[:-1]) + with open(path, "wb+") as file: + if file.write(package.read(length)) < length: + raise InvalidFileError( + "file cut short (or, possibly, a write error)" + ) + + return index.keys() + + +def unpack_url(url: str, target: str) -> Iterable[str]: + """ + Download the package at `url` and unpack it into the directory `target`, + which may, but is not required to, already exist. Return the unpacked + paths, relative to `target`. + """ + + return unpack(_ResponseFile(requests.get(url)), target) + +def install(name: str, version_hash: str, repo: str, target: str, records: str) -> None: + if not repo.endswith("/"): + repo = repo + "/" + + file_names = unpack_url(f"{repo}{name}.{version_hash}.cpa", target) + + with open(f"{records}/{name}", "w+") as f: + f.write(f"{version_hash}\n") + + for file_name in file_names: + f.write(f"{file_name}\n")