circuitpack/unpack.py

259 lines
7.3 KiB
Python
Raw Normal View History

2024-06-30 08:43:56 -07:00
"""
CircuitPack extractor - install CircuitPack packages
"""
import os
import requests
try:
from typing import Sequence, BinaryIO, Iterable, Union
except ImportError:
pass
def _mkdir_p(parts: Sequence[str]) -> None:
whole = "/".join(parts)
# This function is based in part on code from CPython's `pathlib` module,
# specifically the `pathlib2` backport, the relevant portion of which does
# not differ from CPython's tree as of the time of writing.
#
# The code used is available at the following URL:
#
# https://github.com/jazzband/pathlib2/blob/0ac11b8d697069abea58a8188061f1a1870977af/src/pathlib2/__init__.py#L1124
#
# That file includes the following copyright notice:
#
# Copyright (c) 2001-2022 Python Software Foundation; All Rights Reserved
# Copyright (c) 2014-2022 Matthias C. M. Troffaes and contributors
# Copyright (c) 2012-2014 Antoine Pitrou and contributors
#
# Distributed under the terms of the MIT License.
#
# (end copyright notice)
#
# I determined the logic and magic numbers to use for directory checking
# from CPython's `stat` module, but did not directly use a significant
# amount of its code. `stat`'s source code can be found here:
#
# https://github.com/python/cpython/blob/717d2bd1d95879992d30d417399b1ec03ee05028/Lib/stat.py
try:
os.mkdir(whole)
except FileNotFoundError:
_mkdir_p(parts[:-1])
os.mkdir(whole)
except OSError:
if not os.stat(whole).st_mode & 0o170000 == 0o040000:
raise
# (end code derived from `pathlib`)
2024-06-30 08:43:56 -07:00
class InvalidFileError(BaseException):
"""
CircuitPack package file is invalid.
"""
class _ResponseFile: # pylint: disable=too-few-public-methods
def __init__(
self, response: requests.Response, chunk_size: int = 128
) -> None:
self.iter = response.iter_content(chunk_size)
self.buffer = b""
def read( # pylint: disable=missing-function-docstring
self, length: int
) -> bytes:
try:
while len(self.buffer) < length:
self.buffer += next(self.iter)
except StopIteration:
pass
response = self.buffer[:length]
self.buffer = self.buffer[length:]
return response
def readline(self) -> bytes: # pylint: disable=missing-function-docstring
try:
while not b"\n" in self.buffer:
self.buffer += next(self.iter)
except StopIteration:
pass
if b"\n" == self.buffer:
response, self.buffer = self.buffer.split(b"\n", 1)
return response
else:
response = self.buffer
self.buffer = b""
return response
2024-06-30 08:43:56 -07:00
def unpack(
package: Union[BinaryIO, _ResponseFile], target: str
) -> Iterable[str]:
2024-06-30 08:43:56 -07:00
"""
Unpack the given file-like object `package` into the directory `target`,
which may, but is not required to, already exist. Return the unpacked
paths, relative to `target`.
"""
if not target.endswith("/"):
target += "/"
if package.read(8) != b"CPAv001\n":
raise InvalidFileError("wrong header")
try:
length = int(package.read(8).decode("utf-8"))
except ValueError as exception:
raise InvalidFileError("invalid index length") from exception
raw_index_bytes = package.read(length)
if len(raw_index_bytes) < length:
raise InvalidFileError("file cut short (incomplete index)")
try:
raw_index = raw_index_bytes.decode("utf-8")
except UnicodeDecodeError as exception:
raise InvalidFileError("invalid character in index") from exception
index_split = [line.split(" ") for line in raw_index.split("\n")[:-1]]
try:
index = {file_name: int(length) for file_name, length in index_split}
except ValueError as exception:
raise InvalidFileError(
"incorrect number of fields or invalid number in index"
) from exception
for file_name, length in index.items():
path = target + file_name
_mkdir_p(path.split("/")[:-1])
with open(path, "wb+") as file:
if file.write(package.read(length)) < length:
raise InvalidFileError(
"file cut short (or, possibly, a write error)"
)
return index.keys()
def unpack_url(url: str, target: str) -> Iterable[str]:
"""
Download the package at `url` and unpack it into the directory `target`,
which may, but is not required to, already exist. Return the unpacked
paths, relative to `target`.
"""
return unpack(_ResponseFile(requests.get(url)), target)
def install(
name: str, version_hash: str, repo: str, target: str, records: str
) -> None:
"""
Download version `version_hash` of package `name` from `repo`, unpack it
into the directory `target`, and create a record file in `records`.
"""
2024-06-30 08:43:56 -07:00
if not repo.endswith("/"):
repo = repo + "/"
file_names = unpack_url(f"{repo}{name}.{version_hash}.cpa", target)
with open(f"{records}/{name}", "w+", encoding="utf-8") as record:
record.write(f"{version_hash}\n")
2024-06-30 08:43:56 -07:00
for file_name in file_names:
record.write(f"{file_name}\n")
def uninstall(name: str, target: str, records: str) -> None:
"""
Uninstall package `name` from `target` using the information stored in
`records`.
"""
with open(f"{records}/{name}", "r", encoding="utf-8") as record:
record.readline() # Skip over version hash
while True:
file_name = record.readline().strip()
if file_name:
os.remove(f"{target}/{file_name}")
else:
break
os.remove(f"{records}/{name}")
def _vacuum(root: str, delete_this: bool = False) -> None:
try:
for item in os.listdir(root):
_vacuum(f"{root}/{item}", True)
except NotADirectoryError:
pass
if delete_this:
try:
os.rmdir(root)
except OSError:
pass
def vacuum(target: str, paths: Iterable[str] = ("lib",)) -> None:
"""
Remove empty directories under `paths`, each relative to `target`.
"""
for path in paths:
_vacuum(f"{target}/{path}")
def parse_spec(spec_text: str) -> dict[str, tuple[str, str]]:
spec = {}
for full_line in spec_text.split("\n"):
line = full_line.strip()
if line:
name, version_hash, repo = line.split(",")
spec[name] = (version_hash, repo)
return spec
def get_status(records: str) -> dict[str, str]:
status = {}
for name in os.listdir(records):
with open(f"{records}/{name}", encoding="utf-8") as f:
status[name] = f.readline().strip()
return status
def install_spec(
spec_text: str, target: str, records: str, paths: Iterable[str] = ("lib",)
) -> None:
spec = parse_spec(spec_text)
status = get_status(records)
for name, version in status.items():
if not name in spec or spec[name][0] != version:
uninstall(name, target, records)
vacuum(target, paths)
status = get_status(records)
for name, version_repo in spec.items():
version, repo = version_repo
if not name in status:
install(name, version, repo, target, records)