Add code
This commit is contained in:
parent
88929a992d
commit
19ed511769
111
cppack.py
Normal file
111
cppack.py
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
"""
|
||||||
|
CircuitPack packager - create CircuitPack packages
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import hashlib
|
||||||
|
from typing import Iterable, Union
|
||||||
|
|
||||||
|
|
||||||
|
def pack(contents: dict[str, bytes]) -> bytes:
|
||||||
|
"""
|
||||||
|
Pack the data from `contents`, a `dict` with paths (as `str`s) given as
|
||||||
|
keys and file contents (as `bytes`es) as values, and return the archive
|
||||||
|
as a `bytes`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
header = b"CPAv001\n"
|
||||||
|
|
||||||
|
index = b""
|
||||||
|
data = b""
|
||||||
|
for name, content in contents.items():
|
||||||
|
index += f"{name} {len(content)}\n".encode("utf-8")
|
||||||
|
data += content
|
||||||
|
|
||||||
|
length = f"{len(index):07}\n".encode("utf-8")
|
||||||
|
|
||||||
|
return header + length + index + data
|
||||||
|
|
||||||
|
|
||||||
|
def pack_files(
|
||||||
|
paths: Iterable[Union[str, os.PathLike[str]]],
|
||||||
|
directory: Union[str, os.PathLike[str]] = ".",
|
||||||
|
) -> bytes:
|
||||||
|
"""
|
||||||
|
Pack the files specified in `paths`, relative to `directory` (which
|
||||||
|
corresponds to the root of the device), and return the archive as a
|
||||||
|
`bytes`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
contents = {}
|
||||||
|
for path in paths:
|
||||||
|
with open(os.path.join(directory, path), "rb") as file_to_pack:
|
||||||
|
contents[str(path)] = file_to_pack.read()
|
||||||
|
return pack(contents)
|
||||||
|
|
||||||
|
|
||||||
|
def pack_dir(directory: Union[str, os.PathLike[str]]) -> bytes:
|
||||||
|
"""
|
||||||
|
Pack all files in `directory` (which corresponds to the root of the
|
||||||
|
device), and return the archive as a `bytes`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
paths = []
|
||||||
|
|
||||||
|
directory = pathlib.Path(directory)
|
||||||
|
|
||||||
|
for root, _, files in os.walk(directory):
|
||||||
|
for file in files:
|
||||||
|
path = pathlib.Path(root, file).relative_to(directory)
|
||||||
|
paths.append(path)
|
||||||
|
|
||||||
|
return pack_files(paths, directory)
|
||||||
|
|
||||||
|
|
||||||
|
def name_file(name: str, packed: bytes) -> str:
|
||||||
|
"""
|
||||||
|
Find and return the file name for the package named `name` with contents
|
||||||
|
`packed`.
|
||||||
|
"""
|
||||||
|
return name + "." + hashlib.sha256(packed).hexdigest()[:16] + ".cpa"
|
||||||
|
|
||||||
|
|
||||||
|
def pack_to_file(
|
||||||
|
name: str,
|
||||||
|
directory: Union[str, os.PathLike[str]],
|
||||||
|
output: Union[str, os.PathLike[str]] = ".",
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Pack all files in `directory` (which corresponds to the root of the
|
||||||
|
device), find the file name based on the package name (`name`) and the hash
|
||||||
|
of the archive, save the archive under this name in directory `output`
|
||||||
|
(defaults to the current directory), and return the path to the archive.
|
||||||
|
"""
|
||||||
|
|
||||||
|
packed = pack_dir(directory)
|
||||||
|
|
||||||
|
path = os.path.join(output, name_file(name, packed))
|
||||||
|
|
||||||
|
with open(path, "wb+") as package_file:
|
||||||
|
package_file.write(packed)
|
||||||
|
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(prog="cppack")
|
||||||
|
|
||||||
|
parser.add_argument("name", help="package name")
|
||||||
|
parser.add_argument(
|
||||||
|
"directory", help="directory to pack; corresponds to device root"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--output", help="directory in which to place package", default="."
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print(pack_to_file(args.name, args.directory, args.output))
|
148
unpack.py
Normal file
148
unpack.py
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
"""
|
||||||
|
CircuitPack extractor - install CircuitPack packages
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
|
||||||
|
try:
|
||||||
|
from typing import Sequence, BinaryIO, Iterable, Union
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _mkdir_p(parts: Sequence[str]) -> None:
|
||||||
|
whole = "/".join(parts)
|
||||||
|
|
||||||
|
# This function is based in part on code from CPython's `pathlib` module,
|
||||||
|
# specifically the `pathlib2` backport, the relevant portion of which does
|
||||||
|
# not differ from CPython's tree as of the time of writing.
|
||||||
|
#
|
||||||
|
# The code used is available at the following URL:
|
||||||
|
#
|
||||||
|
# https://github.com/jazzband/pathlib2/blob/0ac11b8d697069abea58a8188061f1a1870977af/src/pathlib2/__init__.py#L1124
|
||||||
|
#
|
||||||
|
# That file includes the following copyright notice:
|
||||||
|
#
|
||||||
|
# Copyright (c) 2001-2022 Python Software Foundation; All Rights Reserved
|
||||||
|
# Copyright (c) 2014-2022 Matthias C. M. Troffaes and contributors
|
||||||
|
# Copyright (c) 2012-2014 Antoine Pitrou and contributors
|
||||||
|
#
|
||||||
|
# Distributed under the terms of the MIT License.
|
||||||
|
#
|
||||||
|
# (end copyright notice)
|
||||||
|
#
|
||||||
|
# I determined the logic and magic numbers to use for directory checking
|
||||||
|
# from CPython's `stat` module, but did not directly use a significant
|
||||||
|
# amount of its code. `stat`'s source code can be found here:
|
||||||
|
#
|
||||||
|
# https://github.com/python/cpython/blob/717d2bd1d95879992d30d417399b1ec03ee05028/Lib/stat.py
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.mkdir(whole)
|
||||||
|
except FileNotFoundError:
|
||||||
|
_mkdir_p(parts[:-1])
|
||||||
|
os.mkdir(whole)
|
||||||
|
except OSError:
|
||||||
|
if not os.stat(whole).st_mode & 0o170000 == 0o040000:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# (end code derived from `pathlib`)
|
||||||
|
|
||||||
|
class InvalidFileError(BaseException):
|
||||||
|
"""
|
||||||
|
CircuitPack package file is invalid.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class _ResponseFile: # pylint: disable=too-few-public-methods
|
||||||
|
def __init__(
|
||||||
|
self, response: requests.Response, chunk_size: int = 128
|
||||||
|
) -> None:
|
||||||
|
self.iter = response.iter_content(chunk_size)
|
||||||
|
self.buffer = b""
|
||||||
|
|
||||||
|
def read( # pylint: disable=missing-function-docstring
|
||||||
|
self, length: int
|
||||||
|
) -> bytes:
|
||||||
|
try:
|
||||||
|
while len(self.buffer) < length:
|
||||||
|
self.buffer += next(self.iter)
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
|
||||||
|
response = self.buffer[:length]
|
||||||
|
self.buffer = self.buffer[length:]
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def unpack(package: Union[BinaryIO, _ResponseFile], target: str) -> Iterable[str]:
|
||||||
|
# TODO: add a way to limit where packages can store files?
|
||||||
|
"""
|
||||||
|
Unpack the given file-like object `package` into the directory `target`,
|
||||||
|
which may, but is not required to, already exist. Return the unpacked
|
||||||
|
paths, relative to `target`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not target.endswith("/"):
|
||||||
|
target += "/"
|
||||||
|
|
||||||
|
if package.read(8) != b"CPAv001\n":
|
||||||
|
raise InvalidFileError("wrong header")
|
||||||
|
|
||||||
|
try:
|
||||||
|
length = int(package.read(8).decode("utf-8"))
|
||||||
|
except ValueError as exception:
|
||||||
|
raise InvalidFileError("invalid index length") from exception
|
||||||
|
|
||||||
|
raw_index_bytes = package.read(length)
|
||||||
|
if len(raw_index_bytes) < length:
|
||||||
|
raise InvalidFileError("file cut short (incomplete index)")
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw_index = raw_index_bytes.decode("utf-8")
|
||||||
|
except UnicodeDecodeError as exception:
|
||||||
|
raise InvalidFileError("invalid character in index") from exception
|
||||||
|
|
||||||
|
index_split = [line.split(" ") for line in raw_index.split("\n")[:-1]]
|
||||||
|
|
||||||
|
try:
|
||||||
|
index = {file_name: int(length) for file_name, length in index_split}
|
||||||
|
except ValueError as exception:
|
||||||
|
raise InvalidFileError(
|
||||||
|
"incorrect number of fields or invalid number in index"
|
||||||
|
) from exception
|
||||||
|
|
||||||
|
for file_name, length in index.items():
|
||||||
|
path = target + file_name
|
||||||
|
_mkdir_p(path.split("/")[:-1])
|
||||||
|
with open(path, "wb+") as file:
|
||||||
|
if file.write(package.read(length)) < length:
|
||||||
|
raise InvalidFileError(
|
||||||
|
"file cut short (or, possibly, a write error)"
|
||||||
|
)
|
||||||
|
|
||||||
|
return index.keys()
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_url(url: str, target: str) -> Iterable[str]:
|
||||||
|
"""
|
||||||
|
Download the package at `url` and unpack it into the directory `target`,
|
||||||
|
which may, but is not required to, already exist. Return the unpacked
|
||||||
|
paths, relative to `target`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return unpack(_ResponseFile(requests.get(url)), target)
|
||||||
|
|
||||||
|
def install(name: str, version_hash: str, repo: str, target: str, records: str) -> None:
|
||||||
|
if not repo.endswith("/"):
|
||||||
|
repo = repo + "/"
|
||||||
|
|
||||||
|
file_names = unpack_url(f"{repo}{name}.{version_hash}.cpa", target)
|
||||||
|
|
||||||
|
with open(f"{records}/{name}", "w+") as f:
|
||||||
|
f.write(f"{version_hash}\n")
|
||||||
|
|
||||||
|
for file_name in file_names:
|
||||||
|
f.write(f"{file_name}\n")
|
Loading…
Reference in New Issue
Block a user