diff --git a/unpack.py b/unpack.py index 4d1247f..39f4bfd 100644 --- a/unpack.py +++ b/unpack.py @@ -77,6 +77,21 @@ class _ResponseFile: # pylint: disable=too-few-public-methods return response + def readline(self) -> bytes: # pylint: disable=missing-function-docstring + try: + while not b"\n" in self.buffer: + self.buffer += next(self.iter) + except StopIteration: + pass + + if b"\n" == self.buffer: + response, self.buffer = self.buffer.split(b"\n", 1) + return response + else: + response = self.buffer + self.buffer = b"" + return response + def unpack( package: Union[BinaryIO, _ResponseFile], target: str @@ -165,7 +180,7 @@ def uninstall(name: str, target: str, records: str) -> None: """ with open(f"{records}/{name}", "r", encoding="utf-8") as record: - record.readline() # Skip over version hash + record.readline() # Skip over version hash while True: file_name = record.readline().strip() @@ -176,3 +191,68 @@ def uninstall(name: str, target: str, records: str) -> None: break os.remove(f"{records}/{name}") + + +def _vacuum(root: str, delete_this: bool = False) -> None: + try: + for item in os.listdir(root): + _vacuum(f"{root}/{item}", True) + except NotADirectoryError: + pass + + if delete_this: + try: + os.rmdir(root) + except OSError: + pass + + +def vacuum(target: str, paths: Iterable[str] = ("lib",)) -> None: + """ + Remove empty directories under `paths`, each relative to `target`. + """ + + for path in paths: + _vacuum(f"{target}/{path}") + + +def parse_spec(spec_text: str) -> dict[str, tuple[str, str]]: + spec = {} + + for full_line in spec_text.split("\n"): + line = full_line.strip() + if line: + name, version_hash, repo = line.split(",") + spec[name] = (version_hash, repo) + + return spec + + +def get_status(records: str) -> dict[str, str]: + status = {} + + for name in os.listdir(records): + with open(f"{records}/{name}", encoding="utf-8") as f: + status[name] = f.readline().strip() + + return status + + +def install_spec( + spec_text: str, target: str, records: str, paths: Iterable[str] = ("lib",) +) -> None: + spec = parse_spec(spec_text) + status = get_status(records) + + for name, version in status.items(): + if not name in spec or spec[name][0] != version: + uninstall(name, target, records) + + vacuum(target, paths) + status = get_status(records) + + for name, version_repo in spec.items(): + version, repo = version_repo + + if not name in status: + install(name, version, repo, target, records)