diff --git a/pmb/__init__.py b/pmb/__init__.py index 3a04bb82..43787957 100644 --- a/pmb/__init__.py +++ b/pmb/__init__.py @@ -2,10 +2,9 @@ # SPDX-License-Identifier: GPL-3.0-or-later # PYTHON_ARGCOMPLETE_OK import sys -import logging import os import traceback -from argparse import Namespace +from typing import Any, Optional from pmb.helpers.exceptions import BuildFailedError, NonBugError @@ -42,7 +41,9 @@ def print_log_hint(args: Any) -> None: def main() -> int: # Wrap everything to display nice error messages - args = None + + # FIXME: can't use PmbArgs here because it creates a circular import + args: Any try: # Parse arguments, set up logging args = parse.arguments() diff --git a/pmb/aportgen/core.py b/pmb/aportgen/core.py index fb226ffb..932673f8 100644 --- a/pmb/aportgen/core.py +++ b/pmb/aportgen/core.py @@ -75,9 +75,9 @@ def rewrite(args: PmbArgs, pkgname, path_original="", fields={}, replace_pkgname # Header if path_original: lines_new = [ - "# Automatically generated aport, do not edit!\n", - "# Generator: pmbootstrap aportgen " + pkgname + "\n", - "# Based on: " + path_original + "\n", + "# Automatically generated aport, do not edit!\n", + f"# Generator: pmbootstrap aportgen {pkgname}\n", + f"# Based on: {path_original}\n", "\n", ] else: diff --git a/pmb/build/autodetect.py b/pmb/build/autodetect.py index 1cd22766..33ac0426 100644 --- a/pmb/build/autodetect.py +++ b/pmb/build/autodetect.py @@ -12,7 +12,8 @@ import pmb.parse.arch from pmb.core import Chroot, ChrootType -def arch_from_deviceinfo(args: PmbArgs, pkgname, aport): +# FIXME (#2324): type hint Arch +def arch_from_deviceinfo(args: PmbArgs, pkgname, aport: Path) -> Optional[str]: """ The device- packages are noarch packages. But it only makes sense to build them for the device's architecture, which is specified in the deviceinfo @@ -23,10 +24,10 @@ def arch_from_deviceinfo(args: PmbArgs, pkgname, aport): """ # Require a deviceinfo file in the aport if not pkgname.startswith("device-"): - return - deviceinfo = aport + "/deviceinfo" - if not os.path.exists(deviceinfo): - return + return None + deviceinfo = aport / "deviceinfo" + if not deviceinfo.exists(): + return None # Return its arch device = pkgname.split("-", 1)[1] @@ -35,7 +36,7 @@ def arch_from_deviceinfo(args: PmbArgs, pkgname, aport): return arch -def arch(args: PmbArgs, pkgname): +def arch(args: PmbArgs, pkgname: str): """ Find a good default in case the user did not specify for which architecture a package should be built. @@ -47,6 +48,8 @@ def arch(args: PmbArgs, pkgname): * first arch in the APKBUILD """ aport = pmb.helpers.pmaports.find(args, pkgname) + if not aport: + raise FileNotFoundError(f"APKBUILD not found for {pkgname}") ret = arch_from_deviceinfo(args, pkgname, aport) if ret: return ret @@ -80,7 +83,7 @@ def chroot(apkbuild: Dict[str, str], arch: str) -> Chroot: if "pmb:cross-native" in apkbuild["options"]: return Chroot.native() - return Chroot(ChrootType.BUILDROOT, arch) + return Chroot.buildroot(arch) def crosscompile(args: PmbArgs, apkbuild, arch, suffix: Chroot): diff --git a/pmb/build/envkernel.py b/pmb/build/envkernel.py index 1ab78f40..c0c49554 100644 --- a/pmb/build/envkernel.py +++ b/pmb/build/envkernel.py @@ -1,5 +1,6 @@ # Copyright 2023 Robert Yang # SPDX-License-Identifier: GPL-3.0-or-later +from typing import List from pmb.helpers import logging import os from pathlib import Path @@ -9,7 +10,7 @@ import pmb.aportgen import pmb.build import pmb.build.autodetect import pmb.chroot -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.helpers import pmb.helpers.mount import pmb.helpers.pmaports @@ -89,7 +90,7 @@ def find_kbuild_output_dir(function_body): "can't resolve it, please open an issue.") -def modify_apkbuild(args: PmbArgs, pkgname, aport): +def modify_apkbuild(args: PmbArgs, pkgname: str, aport: Path): """Modify kernel APKBUILD to package build output from envkernel.sh.""" apkbuild_path = aport + "/APKBUILD" apkbuild = pmb.parse.apkbuild(apkbuild_path) @@ -110,7 +111,7 @@ def modify_apkbuild(args: PmbArgs, pkgname, aport): pmb.aportgen.core.rewrite(args, pkgname, apkbuild_path, fields=fields) -def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out): +def run_abuild(args: PmbArgs, pkgname: str, arch: str, apkbuild_path: Path, kbuild_out): """ Prepare build environment and run abuild. @@ -142,17 +143,16 @@ def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out): pmb.build.copy_to_buildpath(args, pkgname) # Create symlink from abuild working directory to envkernel build directory - build_output = Path("" if kbuild_out == "" else "/" + kbuild_out) - if False or build_output != "": - if os.path.islink(chroot / "mnt/linux" / build_output) and \ - os.path.lexists(chroot / "mnt/linux" / build_output): - pmb.chroot.root(args, ["rm", "/mnt/linux" / build_output]) + if kbuild_out != "": + if os.path.islink(chroot / "mnt/linux" / kbuild_out) and \ + os.path.lexists(chroot / "mnt/linux" / kbuild_out): + pmb.chroot.root(args, ["rm", "/mnt/linux" / kbuild_out]) pmb.chroot.root(args, ["ln", "-s", "/mnt/linux", build_path / "src"]) pmb.chroot.root(args, ["ln", "-s", kbuild_out_source, - build_path / "src" / build_output]) + build_path / "src" / kbuild_out]) - cmd = ["cp", apkbuild_path, chroot / build_path / "APKBUILD"] + cmd: List[PathString] = ["cp", apkbuild_path, chroot / build_path / "APKBUILD"] pmb.helpers.run.root(args, cmd) # Create the apk package @@ -167,10 +167,10 @@ def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out): pmb.helpers.mount.umount_all(args, chroot / "mnt/linux") # Clean up symlinks - if build_output != "": - if os.path.islink(chroot / "mnt/linux" / build_output) and \ - os.path.lexists(chroot / "mnt/linux" / build_output): - pmb.chroot.root(args, ["rm", "/mnt/linux" / build_output]) + if kbuild_out != "": + if os.path.islink(chroot / "mnt/linux" / kbuild_out) and \ + os.path.lexists(chroot / "mnt/linux" / kbuild_out): + pmb.chroot.root(args, ["rm", "/mnt/linux" / kbuild_out]) pmb.chroot.root(args, ["rm", build_path / "src"]) diff --git a/pmb/build/other.py b/pmb/build/other.py index efea6bc5..a644b9f1 100644 --- a/pmb/build/other.py +++ b/pmb/build/other.py @@ -33,13 +33,14 @@ def copy_to_buildpath(args: PmbArgs, package, chroot: Chroot=Chroot.native()): # Copy aport contents with resolved symlinks pmb.helpers.run.root(args, ["mkdir", "-p", build]) for entry in aport.iterdir(): + file = entry.name # Don't copy those dirs, as those have probably been generated by running `abuild` # on the host system directly and not cleaning up after itself. # Those dirs might contain broken symlinks and cp fails resolving them. - if entry.name in ["src", "pkg"]: - logging.warn(f"WARNING: Not copying {entry}, looks like a leftover from abuild") + if file in ["src", "pkg"]: + logging.warning(f"WARNING: Not copying {file}, looks like a leftover from abuild") continue - pmb.helpers.run.root(args, ["cp", "-rL", aport / entry, build / entry]) + pmb.helpers.run.root(args, ["cp", "-rL", aport / file, build / file]) pmb.chroot.root(args, ["chown", "-R", "pmos:pmos", "/home/pmos/build"], chroot) diff --git a/pmb/chroot/apk.py b/pmb/chroot/apk.py index 0dabe200..82c0c7dd 100644 --- a/pmb/chroot/apk.py +++ b/pmb/chroot/apk.py @@ -168,7 +168,7 @@ def packages_get_locally_built_apks(args: PmbArgs, packages, arch: str): return ret -def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix): +def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, chroot: Chroot): """ Run apk to add packages, and ensure only the desired packages get explicitly marked as installed. @@ -178,7 +178,7 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix): :param to_del: list of pkgnames to be deleted, this should be set to conflicting dependencies in any of the packages to be installed or their dependencies (e.g. ["unl0kr"]) - :param suffix: the chroot suffix, e.g. "native" or "rootfs_qemu-amd64" + :param chroot: the chroot suffix, e.g. "native" or "rootfs_qemu-amd64" """ # Sanitize packages: don't allow '--allow-untrusted' and other options # to be passed to apk! @@ -210,16 +210,16 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix): command = ["--no-network"] + command if i == 0: pmb.helpers.apk.apk_with_progress(args, ["apk"] + command, - chroot=True, suffix=suffix) + run_in_chroot=True, chroot=chroot) else: # Virtual package related commands don't actually install or remove # packages, but only mark the right ones as explicitly installed. # They finish up almost instantly, so don't display a progress bar. pmb.chroot.root(args, ["apk", "--no-progress"] + command, - chroot=suffix) + chroot=chroot) -def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True): +def install(args: PmbArgs, packages, chroot: Chroot=Chroot.native(), build=True): """ Install packages from pmbootstrap's local package index or the pmOS/Alpine binary package mirrors. Iterate over all dependencies recursively, and @@ -232,7 +232,7 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True) special case that all packages are expected to be in Alpine's repositories, set this to False for performance optimization. """ - arch = pmb.parse.arch.from_chroot_suffix(args, suffix) + arch = pmb.parse.arch.from_chroot_suffix(args, chroot) if not packages: logging.verbose("pmb.chroot.apk.install called with empty packages list," @@ -240,10 +240,10 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True) return # Initialize chroot - check_min_version(args, suffix) - pmb.chroot.init(args, suffix) + check_min_version(args, chroot) + pmb.chroot.init(args, chroot) - packages_with_depends = pmb.parse.depends.recurse(args, packages, suffix) + packages_with_depends = pmb.parse.depends.recurse(args, packages, chroot) to_add, to_del = packages_split_to_add_del(packages_with_depends) if build: @@ -253,8 +253,8 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True) to_add_local = packages_get_locally_built_apks(args, to_add, arch) to_add_no_deps, _ = packages_split_to_add_del(packages) - logging.info(f"({suffix}) install {' '.join(to_add_no_deps)}") - install_run_apk(args, to_add_no_deps, to_add_local, to_del, suffix) + logging.info(f"({chroot}) install {' '.join(to_add_no_deps)}") + install_run_apk(args, to_add_no_deps, to_add_local, to_del, chroot) def installed(args: PmbArgs, suffix: Chroot=Chroot.native()): diff --git a/pmb/chroot/apk_static.py b/pmb/chroot/apk_static.py index 515dd65c..61a8b516 100644 --- a/pmb/chroot/apk_static.py +++ b/pmb/chroot/apk_static.py @@ -67,8 +67,8 @@ def extract_temp(tar, sigfilename): for ftype in ret.keys(): member = tar.getmember(ret[ftype]["filename"]) - handle, path = tempfile.mkstemp(ftype, "pmbootstrap") - handle = open(handle, "wb") + fd, path = tempfile.mkstemp(ftype, "pmbootstrap") + handle = open(fd, "wb") ret[ftype]["temp_path"] = path shutil.copyfileobj(tar.extractfile(member), handle) @@ -119,8 +119,7 @@ def extract(args: PmbArgs, version, apk_path): logging.debug("Verify the version reported by the apk.static binary" f" (must match the package version {version})") os.chmod(temp_path, os.stat(temp_path).st_mode | stat.S_IEXEC) - version_bin = pmb.helpers.run.user(args, [temp_path, "--version"], - output_return=True) + version_bin = pmb.helpers.run.user_output(args, [temp_path, "--version"]) version_bin = version_bin.split(" ")[1].split(",")[0] if not version.startswith(f"{version_bin}-r"): os.unlink(temp_path) @@ -174,4 +173,4 @@ def run(args: PmbArgs, parameters): if args.offline: parameters = ["--no-network"] + parameters pmb.helpers.apk.apk_with_progress( - args, [pmb.config.work / "apk.static"] + parameters, chroot=False) + args, [pmb.config.work / "apk.static"] + parameters, run_in_chroot=False) diff --git a/pmb/chroot/init.py b/pmb/chroot/init.py index 8eaccca6..9b26aeea 100644 --- a/pmb/chroot/init.py +++ b/pmb/chroot/init.py @@ -5,7 +5,6 @@ import filecmp from typing import List from pmb.helpers import logging import os -import filecmp import pmb.chroot import pmb.chroot.apk_static @@ -17,7 +16,7 @@ import pmb.helpers.run import pmb.parse.arch from pmb.core import Chroot, ChrootType -cache_chroot_is_outdated = [] +cache_chroot_is_outdated: List[str] = [] class UsrMerge(enum.Enum): """ @@ -98,7 +97,7 @@ def warn_if_chroot_is_outdated(args: PmbArgs, chroot: Chroot): global cache_chroot_is_outdated # Only check / display the warning once per session - if chroot in cache_chroot_is_outdated: + if str(chroot) in cache_chroot_is_outdated: return if pmb.config.workdir.chroots_outdated(args, chroot): @@ -107,7 +106,7 @@ def warn_if_chroot_is_outdated(args: PmbArgs, chroot: Chroot): f" {days_warn} days. Consider running" " 'pmbootstrap zap'.") - cache_chroot_is_outdated += [chroot] + cache_chroot_is_outdated += [str(chroot)] def init(args: PmbArgs, chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO, diff --git a/pmb/chroot/mount.py b/pmb/chroot/mount.py index 2930417f..b6919149 100644 --- a/pmb/chroot/mount.py +++ b/pmb/chroot/mount.py @@ -93,6 +93,7 @@ def mount(args: PmbArgs, chroot: Chroot=Chroot.native()): # Mount if necessary for source, target in mountpoints.items(): target_outer = chroot / target + #raise RuntimeError("test") pmb.helpers.mount.bind(args, source, target_outer) @@ -101,7 +102,7 @@ def mount_native_into_foreign(args: PmbArgs, chroot: Chroot): target = chroot / "native" pmb.helpers.mount.bind(args, source, target) - musl = next(source.glob("/lib/ld-musl-*.so.1")).name + musl = next(source.glob("lib/ld-musl-*.so.1")).name musl_link = (chroot / "lib" / musl) if not musl_link.is_symlink(): pmb.helpers.run.root(args, ["ln", "-s", "/native/lib/" + musl, diff --git a/pmb/chroot/root.py b/pmb/chroot/root.py index 9c825217..b783b074 100644 --- a/pmb/chroot/root.py +++ b/pmb/chroot/root.py @@ -1,7 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import os -from pathlib import Path +from pathlib import Path, PurePath import shutil from typing import Sequence @@ -11,7 +11,7 @@ import pmb.chroot.binfmt import pmb.helpers.run import pmb.helpers.run_core from pmb.core import Chroot -from pmb.core.types import PathString, PmbArgs +from pmb.core.types import Env, PathString, PmbArgs def executables_absolute_path(): @@ -29,7 +29,7 @@ def executables_absolute_path(): return ret -def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(), working_dir: Path=Path("/"), output="log", +def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(), working_dir: PurePath=PurePath("/"), output="log", output_return=False, check=None, env={}, auto_init=True, disable_timeout=False, add_proxy_env_vars=True): """ @@ -38,6 +38,7 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native( :param env: dict of environment variables to be passed to the command, e.g. {"JOBS": "5"} :param auto_init: automatically initialize the chroot + :param working_dir: chroot-relative working directory :param add_proxy_env_vars: if True, preserve HTTP_PROXY etc. vars from host environment. pmb.chroot.user sets this to False when calling pmb.chroot.root, because it already @@ -59,12 +60,12 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native( msg = f"({chroot}) % " for key, value in env.items(): msg += f"{key}={value} " - if working_dir != Path("/"): + if working_dir != PurePath("/"): msg += f"cd {working_dir}; " msg += " ".join(cmd_str) # Merge env with defaults into env_all - env_all = {"CHARSET": "UTF-8", + env_all: Env = {"CHARSET": "UTF-8", "HISTFILE": "~/.ash_history", "HOME": "/root", "LANG": "UTF-8", @@ -83,7 +84,7 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native( # cmd_sudo: ["sudo", "env", "-i", "sh", "-c", "PATH=... /sbin/chroot ..."] executables = executables_absolute_path() cmd_chroot = [executables["chroot"], chroot.path, "/bin/sh", "-c", - pmb.helpers.run_core.flat_cmd(cmd_str, working_dir)] + pmb.helpers.run_core.flat_cmd(cmd_str, Path(working_dir))] cmd_sudo = pmb.config.sudo([ "env", "-i", executables["sh"], "-c", pmb.helpers.run_core.flat_cmd(cmd_chroot, env=env_all)] diff --git a/pmb/chroot/zap.py b/pmb/chroot/zap.py index 8b1ffab3..c8369ce4 100644 --- a/pmb/chroot/zap.py +++ b/pmb/chroot/zap.py @@ -11,7 +11,7 @@ from pmb.core.types import PmbArgs import pmb.helpers.pmaports import pmb.helpers.run import pmb.parse.apkindex -from pmb.core import Chroot, ChrootType +from pmb.core import Chroot def zap(args: PmbArgs, confirm=True, dry=False, pkgs_local=False, http=False, @@ -65,6 +65,7 @@ def zap(args: PmbArgs, confirm=True, dry=False, pkgs_local=False, http=False, # Delete everything matching the patterns for pattern in patterns: + logging.debug(f"Deleting {pattern}") pattern = os.path.realpath(f"{pmb.config.work}/{pattern}") matches = glob.glob(pattern) for match in matches: @@ -114,7 +115,7 @@ def zap_pkgs_local_mismatch(args: PmbArgs, confirm=True, dry=False): continue # Aport path - aport_path = pmb.helpers.pmaports.find(args, origin, False) + aport_path = pmb.helpers.pmaports.find_optional(args, origin) if not aport_path: logging.info(f"% rm {apk_path_short}" f" ({origin} aport not found)") @@ -154,7 +155,7 @@ def zap_pkgs_online_mismatch(args: PmbArgs, confirm=True, dry=False): suffix = Chroot.native() else: try: - suffix = Chroot(ChrootType.BUILDROOT, arch) + suffix = Chroot.buildroot(arch) except ValueError: continue # Ignore invalid directory name diff --git a/pmb/config/__init__.py b/pmb/config/__init__.py index d89ba1a0..4a5a16a3 100644 --- a/pmb/config/__init__.py +++ b/pmb/config/__init__.py @@ -3,14 +3,16 @@ import multiprocessing import os from pathlib import Path -from pmb.core.types import PathString +from pmb.core.types import AportGenEntry, PathString import pmb.parse.arch import sys -from typing import Sequence +from typing import Dict, List, Sequence, TypedDict # # Exported functions # +# FIXME (#2324): this sucks, we should re-organise this and not rely on "lifting" +# this functions this way from pmb.config.load import load, sanity_checks from pmb.config.save import save from pmb.config.merge_with_args import merge_with_args @@ -24,7 +26,7 @@ from pmb.config.other import is_systemd_selected pmb_src: Path = Path(Path(__file__) / "../../..").resolve() apk_keys_path: Path = (pmb_src / "pmb/data/keys") arch_native = pmb.parse.arch.alpine_native() -work: Path = Path("/unitialised/pmbootstrap/work/dir") +work: Path # apk-tools minimum version # https://pkgs.alpinelinux.org/packages?name=apk-tools&branch=edge @@ -67,7 +69,7 @@ required_programs = [ ] -def sudo(cmd: Sequence[PathString]) -> Sequence[str]: +def sudo(cmd: Sequence[PathString]) -> Sequence[PathString]: """Adapt a command to run as root.""" sudo = which_sudo() if sudo: @@ -81,7 +83,7 @@ def work_dir(_work: Path) -> None: work directory before any other code is run. It is not meant to be used anywhere else.""" global work - if work: + if "work" in globals(): raise RuntimeError("work_dir() called multiple times!") work = _work @@ -947,10 +949,10 @@ flash_methods = [ # These folders will be mounted at the same location into the native # chroot, before the flash programs get started. flash_mount_bind = [ - "sys/bus/usb/devices/", - "sys/dev/", - "sys/devices/", - "dev/bus/usb/" + Path("/sys/bus/usb/devices/"), + Path("/sys/dev/"), + Path("/sys/devices/"), + Path("/dev/bus/usb/"), ] """ @@ -969,7 +971,7 @@ Fastboot specific: $KERNEL_CMDLINE Heimdall specific: $PARTITION_INITFS uuu specific: $UUU_SCRIPT """ -flashers = { +flashers: Dict[str, Dict[str, bool | List[str] | Dict[str, List[List[str]]]]] = { "fastboot": { "depends": [], # pmaports.cfg: supported_fastboot_depends "actions": { @@ -1129,7 +1131,7 @@ git_repos = { # # APORTGEN # -aportgen = { +aportgen: Dict[str, AportGenEntry] = { "cross": { "prefixes": ["busybox-static", "gcc", "musl", "grub-efi"], "confirm_overwrite": False, diff --git a/pmb/config/init.py b/pmb/config/init.py index 736236c6..7e2098d3 100644 --- a/pmb/config/init.py +++ b/pmb/config/init.py @@ -423,8 +423,7 @@ def ask_for_device(args: PmbArgs): device = f"{vendor}-{codename}" device_path = pmb.helpers.devices.find_path(args, device, 'deviceinfo') - device_exists = device_path is not None - if not device_exists: + if device_path is None: if device == args.device: raise RuntimeError( "This device does not exist anymore, check" @@ -449,7 +448,7 @@ def ask_for_device(args: PmbArgs): break kernel = ask_for_device_kernel(args, device) - return (device, device_exists, kernel) + return (device, device_path is not None, kernel) def ask_for_additional_options(args: PmbArgs, cfg): @@ -739,7 +738,7 @@ def frontend(args: PmbArgs): # Zap existing chroots if (work_exists and device_exists and - len(glob.glob(pmb.config.work / "chroot_*")) and + len(list(Chroot.iter_patterns())) and pmb.helpers.cli.confirm( args, "Zap existing chroots to apply configuration?", default=True)): diff --git a/pmb/config/pmaports.py b/pmb/config/pmaports.py index 49d8add5..bad0680f 100644 --- a/pmb/config/pmaports.py +++ b/pmb/config/pmaports.py @@ -9,13 +9,14 @@ import pmb.config from pmb.core.types import PmbArgs import pmb.helpers.git import pmb.helpers.pmaports +import pmb.parse.version def check_legacy_folder(): # Existing pmbootstrap/aports must be a symlink - link = pmb.config.pmb_src + "/aports" + link = pmb.config.pmb_src / "aports" if os.path.exists(link) and not os.path.islink(link): - raise RuntimeError("The path '" + link + "' should be a" + raise RuntimeError(f"The path '{link}' should be a" " symlink pointing to the new pmaports" " repository, which was split from the" " pmbootstrap repository (#383). Consider" @@ -58,21 +59,21 @@ def check_version_pmaports(real): raise RuntimeError("Run 'pmbootstrap pull' to update your pmaports.") -def check_version_pmbootstrap(min): +def check_version_pmbootstrap(min_ver): # Compare versions real = pmb.__version__ - if pmb.parse.version.compare(real, min) >= 0: + if pmb.parse.version.compare(real, min_ver) >= 0: return # Show versions - logging.info("NOTE: you are using pmbootstrap version " + real + ", but" + - " version " + min + " is required.") + logging.info(f"NOTE: you are using pmbootstrap version {real}, but" + f" version {min_ver} is required.") # Error for git clone pmb_src = pmb.config.pmb_src - if os.path.exists(pmb_src + "/.git"): + if os.path.exists(pmb_src / ".git"): raise RuntimeError("Please update your local pmbootstrap repository." - " Usually with: 'git -C \"" + pmb_src + "\" pull'") + f" Usually with: 'git -C \"{pmb_src}\" pull'") # Error for package manager installation raise RuntimeError("Please update your pmbootstrap version (with your" @@ -121,7 +122,7 @@ def read_config(args: PmbArgs): path_cfg = args.aports / "pmaports.cfg" if not os.path.exists(path_cfg): raise RuntimeError("Invalid pmaports repository, could not find the" - " config: " + path_cfg) + f" config: {path_cfg}") # Load the config cfg = configparser.ConfigParser() diff --git a/pmb/config/save.py b/pmb/config/save.py index 84848766..87713df6 100644 --- a/pmb/config/save.py +++ b/pmb/config/save.py @@ -7,7 +7,7 @@ from pmb.core.types import PmbArgs def save(args: PmbArgs, cfg): - logging.debug("Save config: " + args.config) + logging.debug(f"Save config: {args.config}") os.makedirs(os.path.dirname(args.config), 0o700, True) with open(args.config, "w") as handle: cfg.write(handle) diff --git a/pmb/core/chroot.py b/pmb/core/chroot.py index 04a4d691..2d6547d7 100644 --- a/pmb/core/chroot.py +++ b/pmb/core/chroot.py @@ -4,7 +4,7 @@ from __future__ import annotations import enum from typing import Generator, Optional -from pathlib import Path, PosixPath +from pathlib import Path, PosixPath, PurePosixPath import pmb.config class ChrootType(enum.Enum): @@ -86,7 +86,7 @@ class Chroot: def __truediv__(self, other: object) -> Path: - if isinstance(other, PosixPath): + if isinstance(other, PosixPath) or isinstance(other, PurePosixPath): # Convert the other path to a relative path # FIXME: we should avoid creating absolute paths that we actually want # to make relative to the chroot... @@ -99,8 +99,8 @@ class Chroot: def __rtruediv__(self, other: object) -> Path: - if isinstance(other, PosixPath): - return other / self.path + if isinstance(other, PosixPath) or isinstance(other, PurePosixPath): + return Path(other) / self.path if isinstance(other, str): return other / self.path @@ -120,6 +120,11 @@ class Chroot: return Chroot(ChrootType.NATIVE) + @staticmethod + def buildroot(arch: str) -> Chroot: + return Chroot(ChrootType.BUILDROOT, arch) + + @staticmethod def rootfs(device: str) -> Chroot: return Chroot(ChrootType.ROOTFS, device) diff --git a/pmb/core/types.py b/pmb/core/types.py index 54c17cd0..d989c7ee 100644 --- a/pmb/core/types.py +++ b/pmb/core/types.py @@ -3,9 +3,24 @@ from argparse import Namespace from pathlib import Path -from typing import Dict, Union +from typing import Dict, List, Optional, Tuple, TypedDict, Union PathString = Union[Path, str] +Env = Dict[str, PathString] + +# These types are not definitive / API, they exist to describe the current +# state of things so that we can improve our type hinting coverage and make +# future refactoring efforts easier. + +class PartitionLayout(TypedDict): + kernel: Optional[int] + boot: int + reserve: Optional[int] + root: int + +class AportGenEntry(TypedDict): + prefixes: List[str] + confirm_overwrite: bool # Property list generated with: # $ rg --vimgrep "((^|\s)args\.\w+)" --only-matching | cut -d"." -f3 | sort | uniq @@ -28,7 +43,7 @@ class PmbArgs(Namespace): autoinstall: str boot_size: str build_default_device_arch: str - build_pkgs_on_install: str + build_pkgs_on_install: bool buildroot: str built: str ccache_size: str @@ -38,17 +53,17 @@ class PmbArgs(Namespace): command: str config: Path config_channels: str - details: str + details: bool details_to_stdout: str device: str deviceinfo: Dict[str, str] deviceinfo_parse_kernel: str devices: str - disk: str + disk: Path dry: str efi: str envkernel: str - export_folder: str + export_folder: Path extra_packages: str extra_space: str fast: str @@ -58,7 +73,8 @@ class PmbArgs(Namespace): folder: str force: str fork_alpine: str - from_argparse: str + # This is a filthy lie + from_argparse: "PmbArgs" full_disk_encryption: str hook: str host: str @@ -68,7 +84,7 @@ class PmbArgs(Namespace): install_base: str install_blockdev: str install_cgpt: str - install_key: str + install_key: bool install_local_pkgs: str install_recommends: str is_default_channel: str @@ -80,7 +96,7 @@ class PmbArgs(Namespace): lines: str log: Path mirror_alpine: str - mirrors_postmarketos: str + mirrors_postmarketos: List[str] name: str no_depends: str no_fde: str @@ -91,15 +107,16 @@ class PmbArgs(Namespace): no_sshd: str odin_flashable_tar: str offline: str - ondev_cp: str + ondev_cp: List[Tuple[str, str]] on_device_installer: str ondev_no_rootfs: str overview: str - package: str - packages: str + # FIXME (#2324): figure out the args.package vs args.packages situation + package: str | List[str] + packages: List[str] partition: str password: str - path: str + path: Path pkgname: str pkgname_pkgver_srcurl: str port: str @@ -122,7 +139,7 @@ class PmbArgs(Namespace): rsync: str scripts: str second_storage: str - selected_providers: str + selected_providers: Dict[str, str] sparse: str split: str src: str @@ -131,7 +148,7 @@ class PmbArgs(Namespace): sudo_timer: str suffix: str systemd: str - timeout: str + timeout: float ui: str ui_extras: str user: str diff --git a/pmb/export/frontend.py b/pmb/export/frontend.py index e05dd1c6..31e2b0d0 100644 --- a/pmb/export/frontend.py +++ b/pmb/export/frontend.py @@ -28,7 +28,7 @@ def frontend(args: PmbArgs): pmb.chroot.initfs.build(args, flavor, Chroot(ChrootType.ROOTFS, args.device)) # Do the export, print all files - logging.info("Export symlinks to: " + target) + logging.info(f"Export symlinks to: {target}") if args.odin_flashable_tar: pmb.export.odin(args, flavor, target) pmb.export.symlinks(args, flavor, target) diff --git a/pmb/export/symlinks.py b/pmb/export/symlinks.py index e953c32b..85a2babc 100644 --- a/pmb/export/symlinks.py +++ b/pmb/export/symlinks.py @@ -45,7 +45,7 @@ def symlinks(args: PmbArgs, flavor, folder: Path): # Generate a list of patterns chroot_native = Chroot.native() path_boot = Chroot(ChrootType.ROOTFS, args.device) / "boot" - chroot_buildroot = Chroot(ChrootType.BUILDROOT, args.deviceinfo['arch']) + chroot_buildroot = Chroot.buildroot(args.deviceinfo['arch']) files: List[Path] = [ path_boot / f"boot.img{suffix}", path_boot / f"uInitrd{suffix}", diff --git a/pmb/flasher/frontend.py b/pmb/flasher/frontend.py index d5597284..7de4b093 100644 --- a/pmb/flasher/frontend.py +++ b/pmb/flasher/frontend.py @@ -93,7 +93,7 @@ def sideload(args: PmbArgs): pmb.flasher.install_depends(args) # Mount the buildroot - chroot = Chroot(ChrootType.BUILDROOT, args.deviceinfo["arch"]) + chroot = Chroot.buildroot(args.deviceinfo["arch"]) mountpoint = "/mnt/" / chroot pmb.helpers.mount.bind(args, chroot.path, Chroot.native().path / mountpoint) diff --git a/pmb/flasher/run.py b/pmb/flasher/run.py index df720819..bf0623d3 100644 --- a/pmb/flasher/run.py +++ b/pmb/flasher/run.py @@ -22,6 +22,8 @@ def run(args: PmbArgs, action, flavor=None): # Verify action method = args.flash_method or args.deviceinfo["flash_method"] cfg = pmb.config.flashers[method] + if not isinstance(cfg["actions"], dict): + raise TypeError(f"Flashers misconfigured! {method} key 'actions' should be a dictionary") if action not in cfg["actions"]: raise RuntimeError("action " + action + " is not" " configured for method " + method + "!" diff --git a/pmb/flasher/variables.py b/pmb/flasher/variables.py index a861418d..48d58300 100644 --- a/pmb/flasher/variables.py +++ b/pmb/flasher/variables.py @@ -1,5 +1,6 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later +from typing import Optional import pmb.config.pmaports from pmb.core.types import PmbArgs @@ -15,6 +16,9 @@ def variables(args: PmbArgs, flavor, method): # updated and minimum pmbootstrap version bumped. # See also https://gitlab.com/postmarketOS/pmbootstrap/-/issues/2243 + _partition_kernel: Optional[str] + _partition_rootfs: Optional[str] + if method.startswith("fastboot"): _partition_kernel = args.deviceinfo["flash_fastboot_partition_kernel"]\ or "boot" diff --git a/pmb/helpers/apk.py b/pmb/helpers/apk.py index 47770e77..0e496d83 100644 --- a/pmb/helpers/apk.py +++ b/pmb/helpers/apk.py @@ -1,10 +1,12 @@ # Copyright 2023 Johannes Marbach, Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import os +from pathlib import Path +from typing import List, Sequence import pmb.chroot.root import pmb.config.pmaports -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.helpers.cli import pmb.helpers.run import pmb.helpers.run_core @@ -12,7 +14,7 @@ import pmb.parse.version from pmb.core import Chroot -def _run(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native(), output="log"): +def _run(args: PmbArgs, command, run_in_chroot=False, chroot: Chroot=Chroot.native(), output="log"): """Run a command. :param command: command in list form @@ -23,8 +25,8 @@ def _run(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native(), o See pmb.helpers.run_core.core() for a detailed description of all other arguments and the return value. """ - if chroot: - return pmb.chroot.root(args, command, output=output, suffix=suffix, + if run_in_chroot: + return pmb.chroot.root(args, command, output=output, chroot=chroot, disable_timeout=True) return pmb.helpers.run.root(args, command, output=output) @@ -41,7 +43,7 @@ def _prepare_fifo(args: PmbArgs, run_in_chroot=False, chroot: Chroot=Chroot.nati relative to the host) """ if run_in_chroot: - fifo = "/tmp/apk_progress_fifo" + fifo = Path("/tmp/apk_progress_fifo") fifo_outside = chroot / fifo else: _run(args, ["mkdir", "-p", pmb.config.work / "tmp"]) @@ -82,7 +84,7 @@ def _compute_progress(line): return cur / tot if tot > 0 else 0 -def apk_with_progress(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native()): +def apk_with_progress(args: PmbArgs, command: Sequence[PathString], run_in_chroot=False, chroot: Chroot=Chroot.native()): """Run an apk subcommand while printing a progress bar to STDOUT. :param command: apk subcommand in list form @@ -91,12 +93,13 @@ def apk_with_progress(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroo set to True. :raises RuntimeError: when the apk command fails """ - fifo, fifo_outside = _prepare_fifo(args, chroot, suffix) - command_with_progress = _create_command_with_progress(command, fifo) - log_msg = " ".join(command) - with _run(args, ['cat', fifo], chroot=chroot, suffix=suffix, + fifo, fifo_outside = _prepare_fifo(args, run_in_chroot, chroot) + _command: List[str] = [os.fspath(c) for c in command] + command_with_progress = _create_command_with_progress(_command, fifo) + log_msg = " ".join(_command) + with _run(args, ['cat', fifo], run_in_chroot=run_in_chroot, chroot=chroot, output="pipe") as p_cat: - with _run(args, command_with_progress, chroot=chroot, suffix=suffix, + with _run(args, command_with_progress, run_in_chroot=run_in_chroot, chroot=chroot, output="background") as p_apk: while p_apk.poll() is None: line = p_cat.stdout.readline().decode('utf-8') diff --git a/pmb/helpers/aportupgrade.py b/pmb/helpers/aportupgrade.py index 63ea2c82..5c1998ee 100644 --- a/pmb/helpers/aportupgrade.py +++ b/pmb/helpers/aportupgrade.py @@ -6,15 +6,15 @@ from pmb.helpers import logging import os import re import urllib.parse -from typing import Optional +from typing import Dict, Optional from pmb.core.types import PmbArgs import pmb.helpers.file import pmb.helpers.http import pmb.helpers.pmaports -req_headers = None -req_headers_github = None +req_headers: Dict[str, str] +req_headers_github: Dict[str, str] ANITYA_API_BASE = "https://release-monitoring.org/api/v2" GITHUB_API_BASE = "https://api.github.com" @@ -272,7 +272,6 @@ def upgrade(args: PmbArgs, pkgname, git=True, stable=True) -> None: if stable: upgrade_stable_package(args, pkgname, package) - return False def upgrade_all(args: PmbArgs) -> None: """Upgrade all packages, based on args.all, args.all_git and args.all_stable.""" diff --git a/pmb/helpers/args.py b/pmb/helpers/args.py index f1e09e66..3d96e66c 100644 --- a/pmb/helpers/args.py +++ b/pmb/helpers/args.py @@ -1,11 +1,11 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import copy -import os from pathlib import Path import pmb.config from pmb.core.types import PmbArgs import pmb.helpers.git +import pmb.helpers.args """This file constructs the args variable, which is passed to almost all functions in the pmbootstrap code base. Here's a listing of the kind of @@ -75,7 +75,7 @@ def check_pmaports_path(args: PmbArgs): """ if args.from_argparse.aports and not os.path.exists(args.aports): raise ValueError("pmaports path (specified with --aports) does" - " not exist: " + args.aports) + f" not exist: {args.aports}") def replace_placeholders(args: PmbArgs): @@ -108,7 +108,7 @@ def add_deviceinfo(args: PmbArgs): " ") -def init(args: PmbArgs): +def init(args: PmbArgs) -> PmbArgs: # Basic initialization fix_mirrors_postmarketos(args) pmb.config.merge_with_args(args) diff --git a/pmb/helpers/devices.py b/pmb/helpers/devices.py index f6037f80..9a8e8914 100644 --- a/pmb/helpers/devices.py +++ b/pmb/helpers/devices.py @@ -1,13 +1,13 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import os -import glob from pathlib import Path +from typing import Optional from pmb.core.types import PmbArgs import pmb.parse -def find_path(args: PmbArgs, codename: str, file='') -> Path: +def find_path(args: PmbArgs, codename: str, file='') -> Optional[Path]: """Find path to device APKBUILD under `device/*/device-`. :param codename: device codename @@ -58,7 +58,7 @@ def list_apkbuilds(args: PmbArgs): """:returns: { "first-device": {"pkgname": ..., "pkgver": ...}, ... }""" ret = {} for device in list_codenames(args): - apkbuild_path = f"{args.aports}/device/*/device-{device}/APKBUILD" + apkbuild_path = next(args.aports.glob(f"device/*/device-{device}/APKBUILD")) ret[device] = pmb.parse.apkbuild(apkbuild_path) return ret diff --git a/pmb/helpers/file.py b/pmb/helpers/file.py index 3ab59673..f8e9f292 100644 --- a/pmb/helpers/file.py +++ b/pmb/helpers/file.py @@ -7,16 +7,17 @@ import time from pmb.core.types import PmbArgs import pmb.helpers.run +import pmb.helpers.pmaports -def replace(path, old, new): +def replace(path: Path, old: str, new: str): text = "" - with open(path, "r", encoding="utf-8") as handle: + with path.open("r", encoding="utf-8") as handle: text = handle.read() text = text.replace(old, new) - with open(path, "w", encoding="utf-8") as handle: + with path.open("w", encoding="utf-8") as handle: handle.write(text) @@ -29,7 +30,7 @@ def replace_apkbuild(args: PmbArgs, pkgname, key, new, in_quotes=False): :param in_quotes: expect the value to be in quotation marks ("") """ # Read old value - path = pmb.helpers.pmaports.find(args, pkgname) + "/APKBUILD" + path = pmb.helpers.pmaports.find(args, pkgname) / "APKBUILD" apkbuild = pmb.parse.apkbuild(path) old = apkbuild[key] @@ -95,7 +96,7 @@ def symlink(args: PmbArgs, file: Path, link: Path): if (os.path.islink(link) and os.path.realpath(os.readlink(link)) == os.path.realpath(file)): return - raise RuntimeError("File exists: " + link) + raise RuntimeError(f"File exists: {link}") elif link.is_symlink(): link.unlink() diff --git a/pmb/helpers/frontend.py b/pmb/helpers/frontend.py index 222587b1..9e6f992e 100644 --- a/pmb/helpers/frontend.py +++ b/pmb/helpers/frontend.py @@ -1,6 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import json +from typing import List, Sequence from pmb.helpers import logging import os from pathlib import Path @@ -14,7 +15,7 @@ import pmb.chroot.initfs import pmb.chroot.other import pmb.ci import pmb.config -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.export import pmb.flasher import pmb.helpers.aportupgrade @@ -35,7 +36,6 @@ import pmb.netboot import pmb.parse import pmb.qemu import pmb.sideload -from argparse import Namespace from pmb.core import ChrootType, Chroot @@ -48,13 +48,13 @@ def _parse_flavor(args: PmbArgs, autoinstall=True): # identifier that is typically in the form # "postmarketos--", e.g. # "postmarketos-qcom-sdm845" - suffix = Chroot(ChrootType.ROOTFS, args.device) + chroot = Chroot(ChrootType.ROOTFS, args.device) flavor = pmb.chroot.other.kernel_flavor_installed( - args, suffix, autoinstall) + args, chroot, autoinstall) if not flavor: raise RuntimeError( - "No kernel flavors installed in chroot " + suffix + "! Please let" + f"No kernel flavors installed in chroot '{chroot}'! Please let" " your device package depend on a package starting with 'linux-'.") return flavor @@ -64,9 +64,9 @@ def _parse_suffix(args: PmbArgs) -> Chroot: return Chroot(ChrootType.ROOTFS, args.device) elif args.buildroot: if args.buildroot == "device": - return Chroot(ChrootType.BUILDROOT, args.deviceinfo["arch"]) + return Chroot.buildroot(args.deviceinfo["arch"]) else: - return Chroot(ChrootType.BUILDROOT, args.buildroot) + return Chroot.buildroot(args.buildroot) elif args.suffix: (_t, s) = args.suffix.split("_") t: ChrootType = ChrootType(_t) @@ -416,11 +416,16 @@ def kconfig(args: PmbArgs): raise RuntimeError("kconfig check failed!") # Default to all kernel packages - packages = args.package + packages: List[str] + # FIXME (#2324): figure out the args.package vs args.packages situation + if isinstance(args.package, list): + packages = args.package + else: + packages = [args.package] if not args.package: - for aport in pmb.helpers.pmaports.get_list(args): - if aport.startswith("linux-"): - packages.append(aport.split("linux-")[1]) + for pkg in pmb.helpers.pmaports.get_list(args): + if pkg.startswith("linux-"): + packages.append(pkg.split("linux-")[1]) # Iterate over all kernels error = False @@ -431,7 +436,7 @@ def kconfig(args: PmbArgs): pkgname = package if package.startswith("linux-") \ else "linux-" + package aport = pmb.helpers.pmaports.find(args, pkgname) - apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD") + apkbuild = pmb.parse.apkbuild(aport) if "!pmb:kconfigcheck" in apkbuild["options"]: skipped += 1 continue @@ -449,7 +454,7 @@ def kconfig(args: PmbArgs): logging.info("kconfig check succeeded!") elif args.action_kconfig in ["edit", "migrate"]: if args.package: - pkgname = args.package + pkgname = args.package if isinstance(args.package, str) else args.package[0] else: pkgname = args.deviceinfo["codename"] use_oldconfig = args.action_kconfig == "migrate" @@ -472,7 +477,7 @@ def deviceinfo_parse(args: PmbArgs): def apkbuild_parse(args: PmbArgs): # Default to all packages - packages = args.packages + packages: Sequence[str] = args.packages if not packages: packages = pmb.helpers.pmaports.get_list(args) @@ -480,8 +485,7 @@ def apkbuild_parse(args: PmbArgs): for package in packages: print(package + ":") aport = pmb.helpers.pmaports.find(args, package) - path = aport + "/APKBUILD" - print(json.dumps(pmb.parse.apkbuild(path), indent=4, + print(json.dumps(pmb.parse.apkbuild(aport), indent=4, sort_keys=True)) @@ -489,8 +493,7 @@ def apkindex_parse(args: PmbArgs): result = pmb.parse.apkindex.parse(args.apkindex_path) if args.package: if args.package not in result: - raise RuntimeError("Package not found in the APKINDEX: " + - args.package) + raise RuntimeError(f"Package not found in the APKINDEX: {args.package}") result = result[args.package] print(json.dumps(result, indent=4)) @@ -536,14 +539,14 @@ def shutdown(args: PmbArgs): def stats(args: PmbArgs): # Chroot suffix - suffix = "native" + chroot = Chroot.native() if args.arch != pmb.config.arch_native: - suffix = "buildroot_" + args.arch + chroot = Chroot.buildroot(args.arch) # Install ccache and display stats - pmb.chroot.apk.install(args, ["ccache"], suffix) - logging.info(f"({suffix}) % ccache -s") - pmb.chroot.user(args, ["ccache", "-s"], suffix, output="stdout") + pmb.chroot.apk.install(args, ["ccache"], chroot) + logging.info(f"({chroot}) % ccache -s") + pmb.chroot.user(args, ["ccache", "-s"], chroot, output="stdout") def work_migrate(args: PmbArgs): @@ -558,7 +561,7 @@ def log(args: PmbArgs): pmb.helpers.run.user(args, ["truncate", "-s", "0", args.log]) pmb.helpers.run.user(args, ["truncate", "-s", "0", log_testsuite]) - cmd = ["tail", "-n", args.lines, "-F"] + cmd: List[PathString] = ["tail", "-n", args.lines, "-F"] # Follow the testsuite's log file too if it exists. It will be created when # starting a test case that writes to it (git -C test grep log_testsuite). @@ -620,7 +623,7 @@ def pull(args: PmbArgs): def lint(args: PmbArgs): - packages = args.packages + packages: Sequence[str] = args.packages if not packages: packages = pmb.helpers.pmaports.get_list(args) diff --git a/pmb/helpers/git.py b/pmb/helpers/git.py index 5266bad4..a799986d 100644 --- a/pmb/helpers/git.py +++ b/pmb/helpers/git.py @@ -1,6 +1,8 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import configparser +from pathlib import Path +from typing import Dict from pmb.helpers import logging import os from pathlib import Path @@ -22,7 +24,7 @@ def get_path(args: PmbArgs, name_repo): """ if name_repo == "pmaports": return args.aports - return pmb.config.work / "cache_git/" + name_repo + return pmb.config.work / "cache_git" / name_repo def clone(args: PmbArgs, name_repo): @@ -66,7 +68,7 @@ def rev_parse(args: PmbArgs, path, revision="HEAD", extra_args: list = []): or (with ``--abbrev-ref``): the branch name, e.g. "master" """ command = ["git", "rev-parse"] + extra_args + [revision] - rev = pmb.helpers.run.user(args, command, path, output_return=True) + rev = pmb.helpers.run.user_output(args, command, path) return rev.rstrip() @@ -84,7 +86,7 @@ def can_fast_forward(args: PmbArgs, path, branch_upstream, branch="HEAD"): def clean_worktree(args: PmbArgs, path): """Check if there are not any modified files in the git dir.""" command = ["git", "status", "--porcelain"] - return pmb.helpers.run.user(args, command, path, output_return=True) == "" + return pmb.helpers.run.user_output(args, command, path) == "" def get_upstream_remote(args: PmbArgs, name_repo): @@ -95,7 +97,7 @@ def get_upstream_remote(args: PmbArgs, name_repo): urls = pmb.config.git_repos[name_repo] path = get_path(args, name_repo) command = ["git", "remote", "-v"] - output = pmb.helpers.run.user(args, command, path, output_return=True) + output = pmb.helpers.run.user_output(args, command, path) for line in output.split("\n"): if any(u in line for u in urls): return line.split("\t", 1)[0] @@ -127,8 +129,8 @@ def parse_channels_cfg(args): else: remote = get_upstream_remote(args, "pmaports") command = ["git", "show", f"{remote}/master:channels.cfg"] - stdout = pmb.helpers.run.user(args, command, args.aports, - output_return=True, check=False) + stdout = pmb.helpers.run.user_output(args, command, args.aports, + check=False) try: cfg.read_string(stdout) except configparser.MissingSectionHeaderError: @@ -139,7 +141,7 @@ def parse_channels_cfg(args): " pmaports clone") # Meta section - ret = {"channels": {}} + ret: Dict[str, Dict[str, str | Dict[str, str]]] = {"channels": {}} ret["meta"] = {"recommended": cfg.get("channels.cfg", "recommended")} # Channels @@ -153,7 +155,8 @@ def parse_channels_cfg(args): for key in ["description", "branch_pmaports", "branch_aports", "mirrordir_alpine"]: value = cfg.get(channel, key) - ret["channels"][channel_new][key] = value + # FIXME: how to type this properly?? + ret["channels"][channel_new][key] = value # type: ignore[index] pmb.helpers.other.cache[cache_key] = ret return ret @@ -261,11 +264,10 @@ def get_files(args: PmbArgs, path): :returns: all files in a git repository as list, relative to path """ ret = [] - files = pmb.helpers.run.user(args, ["git", "ls-files"], path, - output_return=True).split("\n") - files += pmb.helpers.run.user(args, ["git", "ls-files", - "--exclude-standard", "--other"], path, - output_return=True).split("\n") + files = pmb.helpers.run.user_output(args, ["git", "ls-files"], path).split("\n") + files += pmb.helpers.run.user_output(args, ["git", "ls-files", + "--exclude-standard", "--other"], + path).split("\n") for file in files: if os.path.exists(f"{path}/{file}"): ret += [file] diff --git a/pmb/helpers/http.py b/pmb/helpers/http.py index 6e3393ea..b4035019 100644 --- a/pmb/helpers/http.py +++ b/pmb/helpers/http.py @@ -4,12 +4,17 @@ import hashlib import json from pmb.helpers import logging import os +from pathlib import Path import shutil import urllib.request from pmb.core.types import PmbArgs import pmb.helpers.run +def cache_file(prefix: str, url: str) -> Path: + prefix = prefix.replace("/", "_") + return Path(f"{prefix}_{hashlib.sha256(url.encode('utf-8')).hexdigest()}") + def download(args: PmbArgs, url, prefix, cache=True, loglevel=logging.INFO, allow_404=False): @@ -33,9 +38,7 @@ def download(args: PmbArgs, url, prefix, cache=True, loglevel=logging.INFO, pmb.helpers.run.user(args, ["mkdir", "-p", pmb.config.work / "cache_http"]) # Check if file exists in cache - prefix = prefix.replace("/", "_") - path = (pmb.config.work / "cache_http/" + prefix + "_" + - hashlib.sha256(url.encode("utf-8")).hexdigest()) + path = pmb.config.work / "cache_http" / cache_file(prefix, url) if os.path.exists(path): if cache: return path diff --git a/pmb/helpers/lint.py b/pmb/helpers/lint.py index 48b7f5a9..6ba597d8 100644 --- a/pmb/helpers/lint.py +++ b/pmb/helpers/lint.py @@ -1,5 +1,6 @@ # Copyright 2023 Danct12 # SPDX-License-Identifier: GPL-3.0-or-later +from pathlib import Path from pmb.helpers import logging import os @@ -20,7 +21,7 @@ def check(args: PmbArgs, pkgnames): # Mount pmaports.git inside the chroot so that we don't have to copy the # package folders - pmaports = "/mnt/pmaports" + pmaports = Path("/mnt/pmaports") pmb.build.mount_pmaports(args, pmaports) # Locate all APKBUILDs and make the paths be relative to the pmaports @@ -28,9 +29,8 @@ def check(args: PmbArgs, pkgnames): apkbuilds = [] for pkgname in pkgnames: aport = pmb.helpers.pmaports.find(args, pkgname) - if not os.path.exists(aport + "/APKBUILD"): - raise ValueError("Path does not contain an APKBUILD file:" + - aport) + if not (aport / "APKBUILD").exists(): + raise ValueError(f"Path does not contain an APKBUILD file: {aport}") relpath = os.path.relpath(aport, args.aports) apkbuilds.append(f"{relpath}/APKBUILD") diff --git a/pmb/helpers/logging.py b/pmb/helpers/logging.py index a47cf309..c17afb53 100644 --- a/pmb/helpers/logging.py +++ b/pmb/helpers/logging.py @@ -3,15 +3,29 @@ import logging import os import sys +from typing import TextIO import pmb.config from pmb.core.types import PmbArgs -logfd = None +logfd: TextIO +CRITICAL = logging.CRITICAL +FATAL = logging.FATAL +ERROR = logging.ERROR +WARNING = logging.WARNING +WARN = logging.WARN +INFO = logging.INFO +DEBUG = logging.DEBUG +NOTSET = logging.NOTSET +VERBOSE = 5 class log_handler(logging.StreamHandler): """Write to stdout and to the already opened log file.""" - _args = None + _args: PmbArgs + + def __init__(self, args: PmbArgs): + super().__init__() + self._args = args def emit(self, record): try: @@ -80,13 +94,13 @@ def add_verbose_log_level(): All stackoverflow user contributions are licensed as CC-BY-SA: https://creativecommons.org/licenses/by-sa/3.0/ """ - logging.VERBOSE = 5 - logging.addLevelName(logging.VERBOSE, "VERBOSE") - logging.Logger.verbose = lambda inst, msg, * \ - args, **kwargs: inst.log(logging.VERBOSE, msg, *args, **kwargs) - logging.verbose = lambda msg, *args, **kwargs: logging.log(logging.VERBOSE, + setattr(logging, "VERBOSE", VERBOSE) + logging.addLevelName(VERBOSE, "VERBOSE") + setattr(logging.Logger, "verbose", lambda inst, msg, * \ + args, **kwargs: inst.log(VERBOSE, msg, *args, **kwargs)) + setattr(logging, "verbose", lambda msg, *args, **kwargs: logging.log(VERBOSE, msg, *args, - **kwargs) + **kwargs)) def init(args: PmbArgs): @@ -118,11 +132,10 @@ def init(args: PmbArgs): add_verbose_log_level() root_logger.setLevel(logging.DEBUG) if args.verbose: - root_logger.setLevel(logging.VERBOSE) + root_logger.setLevel(VERBOSE) # Add a custom log handler - handler = log_handler() - log_handler._args = args + handler = log_handler(args) handler.setFormatter(formatter) root_logger.addHandler(handler) diff --git a/pmb/helpers/mount.py b/pmb/helpers/mount.py index d4564ed5..fcdb1d0d 100644 --- a/pmb/helpers/mount.py +++ b/pmb/helpers/mount.py @@ -36,6 +36,8 @@ def bind(args: PmbArgs, source: Path, destination: Path, create_folders=True, um umount_all(args, destination) else: return + + print(f"Mounting {source} -> {destination}") # Check/create folders for path in [source, destination]: @@ -44,8 +46,7 @@ def bind(args: PmbArgs, source: Path, destination: Path, create_folders=True, um if create_folders: pmb.helpers.run.root(args, ["mkdir", "-p", path]) else: - raise RuntimeError("Mount failed, folder does not exist: " + - path) + raise RuntimeError(f"Mount failed, folder does not exist: {path}") # Actually mount the folder pmb.helpers.run.root(args, ["mount", "--bind", source, destination]) @@ -89,8 +90,7 @@ def umount_all_list(prefix: Path, source: Path=Path("/proc/mounts")) -> List[Pat for line in handle: words = line.split() if len(words) < 2: - raise RuntimeError("Failed to parse line in " + source + ": " + - line) + raise RuntimeError(f"Failed to parse line in {source}: {line}") mountpoint = Path(words[1].replace(r"\040(deleted)", "")) if mountpoint.is_relative_to(prefix): # is subpath ret.append(mountpoint) @@ -103,7 +103,7 @@ def umount_all(args: PmbArgs, folder: Path): for mountpoint in umount_all_list(folder): pmb.helpers.run.root(args, ["umount", mountpoint]) if ismount(mountpoint): - raise RuntimeError("Failed to umount: " + mountpoint) + raise RuntimeError(f"Failed to umount: {mountpoint}") def mount_device_rootfs(args: PmbArgs, chroot_rootfs: Chroot) -> PurePath: @@ -114,7 +114,7 @@ def mount_device_rootfs(args: PmbArgs, chroot_rootfs: Chroot) -> PurePath: "rootfs_qemu-amd64") :returns: the mountpoint (relative to the native chroot) """ - mountpoint = PurePath("/mnt", chroot_rootfs.dirname()) + mountpoint = PurePath("/mnt", chroot_rootfs.dirname) pmb.helpers.mount.bind(args, chroot_rootfs.path, Chroot.native() / mountpoint) return mountpoint diff --git a/pmb/helpers/other.py b/pmb/helpers/other.py index ab0dcd29..5a141715 100644 --- a/pmb/helpers/other.py +++ b/pmb/helpers/other.py @@ -12,6 +12,8 @@ import pmb.helpers.pmaports import pmb.helpers.run from typing import Dict, Any +from typing import Any, Dict + def folder_size(args: PmbArgs, path: Path): """Run `du` to calculate the size of a folder. diff --git a/pmb/helpers/package.py b/pmb/helpers/package.py index d54586ef..af071c29 100644 --- a/pmb/helpers/package.py +++ b/pmb/helpers/package.py @@ -9,6 +9,7 @@ See also: - pmb/helpers/repo.py (work with binary package repos) """ import copy +from typing import Any, Dict from pmb.helpers import logging from pmb.core.types import PmbArgs @@ -57,7 +58,7 @@ def get(args: PmbArgs, pkgname, arch, replace_subpkgnames=False, must_exist=True ] # Find in pmaports - ret = None + ret: Dict[str, Any] = {} pmaport = pmb.helpers.pmaports.get(args, pkgname, False) if pmaport: ret = {"arch": pmaport["arch"], diff --git a/pmb/helpers/pkgrel_bump.py b/pmb/helpers/pkgrel_bump.py index 068d2689..36dfaa90 100644 --- a/pmb/helpers/pkgrel_bump.py +++ b/pmb/helpers/pkgrel_bump.py @@ -18,7 +18,7 @@ def package(args: PmbArgs, pkgname, reason="", dry=False): :param dry: don't modify the APKBUILD, just print the message """ # Current and new pkgrel - path = pmb.helpers.pmaports.find(args, pkgname) + "/APKBUILD" + path = pmb.helpers.pmaports.find(args, pkgname) / "APKBUILD" apkbuild = pmb.parse.apkbuild(path) pkgrel = int(apkbuild["pkgrel"]) pkgrel_new = pkgrel + 1 @@ -91,7 +91,7 @@ def auto_apkindex_package(args: PmbArgs, arch, aport, apk, dry=False): # (which means dynamic libraries that the package was linked # against) and packages for which no aport exists. if (depend.startswith("so:") or - not pmb.helpers.pmaports.find(args, depend, False)): + not pmb.helpers.pmaports.find_optional(args, depend)): missing.append(depend) # Increase pkgrel @@ -107,7 +107,7 @@ def auto(args: PmbArgs, dry=False): for arch in pmb.config.build_device_architectures: paths = pmb.helpers.repo.apkindex_files(args, arch, alpine=False) for path in paths: - logging.info("scan " + path) + logging.info(f"scan {path}") index = pmb.parse.apkindex.parse(path, False) for pkgname, apk in index.items(): origin = apk["origin"] @@ -116,12 +116,12 @@ def auto(args: PmbArgs, dry=False): logging.verbose( f"{pkgname}: origin '{origin}' found again") continue - aport_path = pmb.helpers.pmaports.find(args, origin, False) + aport_path = pmb.helpers.pmaports.find_optional(args, origin) if not aport_path: logging.warning("{}: origin '{}' aport not found".format( pkgname, origin)) continue - aport = pmb.parse.apkbuild(f"{aport_path}/APKBUILD") + aport = pmb.parse.apkbuild(aport_path) if auto_apkindex_package(args, arch, aport, apk, dry): ret.append(pkgname) return ret diff --git a/pmb/helpers/pmaports.py b/pmb/helpers/pmaports.py index 6ee769d1..f7ef3f8d 100644 --- a/pmb/helpers/pmaports.py +++ b/pmb/helpers/pmaports.py @@ -8,7 +8,6 @@ See also: """ import glob from pmb.helpers import logging -import os from pathlib import Path from typing import Optional, Sequence, Dict @@ -44,7 +43,7 @@ def get_list(args: PmbArgs) -> Sequence[str]: return list(_find_apkbuilds(args).keys()) -def guess_main_dev(args: PmbArgs, subpkgname): +def guess_main_dev(args: PmbArgs, subpkgname) -> Optional[Path]: """Check if a package without "-dev" at the end exists in pmaports or not, and log the appropriate message. Don't call this function directly, use guess_main() instead. @@ -57,7 +56,7 @@ def guess_main_dev(args: PmbArgs, subpkgname): if path: logging.verbose(subpkgname + ": guessed to be a subpackage of " + pkgname + " (just removed '-dev')") - return os.path.dirname(path) + return path.parent logging.verbose(subpkgname + ": guessed to be a subpackage of " + pkgname + ", which we can't find in pmaports, so it's probably in" @@ -147,7 +146,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path: """ # Try to get a cached result first (we assume that the aports don't change # in one pmbootstrap call) - ret = Path() + ret: Optional[Path] = None if package in pmb.helpers.other.cache["find_aport"]: ret = pmb.helpers.other.cache["find_aport"][package] else: @@ -165,7 +164,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path: guess = guess_main(args, package) if guess: # Parse the APKBUILD and verify if the guess was right - if _find_package_in_apkbuild(package, f'{guess}/APKBUILD'): + if _find_package_in_apkbuild(package, guess / "APKBUILD"): ret = guess else: # Otherwise parse all APKBUILDs (takes time!), is the @@ -183,7 +182,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path: ret = guess # Crash when necessary - if ret is None and must_exist: + if ret is None: raise RuntimeError("Could not find aport for package: " + package) @@ -192,6 +191,13 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path: return ret +def find_optional(args: PmbArgs, package: str) -> Optional[Path]: + try: + return find(args, package) + except RuntimeError: + return None + + def get(args: PmbArgs, pkgname, must_exist=True, subpackages=True): """Find and parse an APKBUILD file. @@ -213,9 +219,12 @@ def get(args: PmbArgs, pkgname, must_exist=True, subpackages=True): """ pkgname = pmb.helpers.package.remove_operators(pkgname) if subpackages: - aport = find(args, pkgname, must_exist) + aport = find_optional(args, pkgname) if aport: - return pmb.parse.apkbuild(f"{aport}/APKBUILD") + return pmb.parse.apkbuild(aport / "APKBUILD") + elif must_exist: + raise RuntimeError("Could not find APKBUILD for package:" + f" {pkgname}") else: path = _find_apkbuilds(args).get(pkgname) if path: @@ -251,7 +260,8 @@ def find_providers(args: PmbArgs, provide): key=lambda p: p[1].get('provider_priority', 0)) -def get_repo(args: PmbArgs, pkgname, must_exist=True): +# FIXME (#2324): split into an _optional variant or drop must_exist +def get_repo(args: PmbArgs, pkgname, must_exist=True) -> Optional[str]: """Get the repository folder of an aport. :pkgname: package name @@ -259,10 +269,14 @@ def get_repo(args: PmbArgs, pkgname, must_exist=True): :returns: a string like "main", "device", "cross", ... or None when the aport could not be found """ - aport = find(args, pkgname, must_exist) + aport: Optional[Path] + if must_exist: + aport = find(args, pkgname) + else: + aport = find_optional(args, pkgname) if not aport: return None - return os.path.basename(os.path.dirname(aport)) + return aport.parent.name def check_arches(arches, arch): @@ -284,7 +298,7 @@ def check_arches(arches, arch): return False -def get_channel_new(channel): +def get_channel_new(channel: str) -> str: """Translate legacy channel names to the new ones. Legacy names are still supported for compatibility with old branches (pmb#2015). diff --git a/pmb/helpers/repo.py b/pmb/helpers/repo.py index e26f7944..5da24d89 100644 --- a/pmb/helpers/repo.py +++ b/pmb/helpers/repo.py @@ -17,9 +17,10 @@ import pmb.config.pmaports from pmb.core.types import PmbArgs import pmb.helpers.http import pmb.helpers.run +import pmb.helpers.other -def hash(url, length=8): +def apkindex_hash(url: str, length: int=8) -> Path: r"""Generate the hash that APK adds to the APKINDEX and apk packages in its apk cache folder. It is the "12345678" part in this example: @@ -43,7 +44,7 @@ def hash(url, length=8): ret += xd[(binary[i] >> 4) & 0xf] ret += xd[binary[i] & 0xf] - return ret + return Path(f"APKINDEX.{ret}.tar.gz") def urls(args: PmbArgs, user_repository=True, postmarketos_mirror=True, alpine=True): @@ -112,7 +113,7 @@ def apkindex_files(args: PmbArgs, arch=None, user_repository=True, pmos=True, # Resolve the APKINDEX.$HASH.tar.gz files for url in urls(args, False, pmos, alpine): - ret.append(pmb.config.work / f"cache_apk_{arch}" / f"APKINDEX.{hash(url)}.tar.gz") + ret.append(pmb.config.work / f"cache_apk_{arch}" / apkindex_hash(url)) return ret @@ -151,7 +152,7 @@ def update(args: PmbArgs, arch=None, force=False, existing_only=False): # APKINDEX file name from the URL url_full = url + "/" + arch + "/APKINDEX.tar.gz" cache_apk_outside = pmb.config.work / f"cache_apk_{arch}" - apkindex = cache_apk_outside / f"APKINDEX.{hash(url)}.tar.gz" + apkindex = cache_apk_outside / f"APKINDEX.{apkindex_hash(url)}.tar.gz" # Find update reason, possibly skip non-existing or known 404 files reason = None @@ -217,5 +218,5 @@ def alpine_apkindex_path(args: PmbArgs, repo="main", arch=None): # Find it on disk channel_cfg = pmb.config.pmaports.read_config_channel(args) repo_link = f"{args.mirror_alpine}{channel_cfg['mirrordir_alpine']}/{repo}" - cache_folder = pmb.config.work / "cache_apk_" + arch - return cache_folder + "/APKINDEX." + hash(repo_link) + ".tar.gz" + cache_folder = pmb.config.work / (f"cache_apk_{arch}") + return cache_folder / apkindex_hash(repo_link) diff --git a/pmb/helpers/repo_bootstrap.py b/pmb/helpers/repo_bootstrap.py index 671a5d80..02aa43a0 100644 --- a/pmb/helpers/repo_bootstrap.py +++ b/pmb/helpers/repo_bootstrap.py @@ -1,5 +1,6 @@ # Copyright 2024 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later +from pmb.core.chroot import Chroot from pmb.helpers import logging import glob @@ -10,7 +11,7 @@ from pmb.core.types import PmbArgs progress_done = 0 progress_total = 0 -progress_step = None +progress_step: str def get_arch(args: PmbArgs): @@ -115,7 +116,7 @@ def log_progress(msg): progress_done += 1 -def run_steps(args: PmbArgs, steps, arch, suffix): +def run_steps(args: PmbArgs, steps, arch, chroot: Chroot): global progress_step for step, bootstrap_line in steps.items(): @@ -128,14 +129,14 @@ def run_steps(args: PmbArgs, steps, arch, suffix): if "[usr_merge]" in bootstrap_line: usr_merge = pmb.chroot.UsrMerge.ON - if suffix != "native": + if chroot != Chroot.native(): log_progress(f"initializing native chroot (merge /usr: {usr_merge.name})") # Native chroot needs pmOS binary package repo for cross compilers - pmb.chroot.init(args, "native", usr_merge) + pmb.chroot.init(args, Chroot.native(), usr_merge) - log_progress(f"initializing {suffix} chroot (merge /usr: {usr_merge.name})") + log_progress(f"initializing {chroot} chroot (merge /usr: {usr_merge.name})") # Initialize without pmOS binary package repo - pmb.chroot.init(args, suffix, usr_merge, postmarketos_mirror=False) + pmb.chroot.init(args, chroot, usr_merge, postmarketos_mirror=False) for package in get_packages(bootstrap_line): log_progress(f"building {package}") diff --git a/pmb/helpers/repo_missing.py b/pmb/helpers/repo_missing.py index 3d8138b2..4edfe246 100644 --- a/pmb/helpers/repo_missing.py +++ b/pmb/helpers/repo_missing.py @@ -34,7 +34,7 @@ def filter_aport_packages(args: PmbArgs, arch, pkgnames): """ ret = [] for pkgname in pkgnames: - if pmb.helpers.pmaports.find(args, pkgname, False): + if pmb.helpers.pmaports.find_optional(args, pkgname): ret += [pkgname] return ret diff --git a/pmb/helpers/run.py b/pmb/helpers/run.py index d0c8747d..e39f117e 100644 --- a/pmb/helpers/run.py +++ b/pmb/helpers/run.py @@ -2,13 +2,14 @@ # SPDX-License-Identifier: GPL-3.0-or-later import os from pathlib import Path +import subprocess import pmb.helpers.run_core from typing import Any, Dict, List, Optional, Sequence -from pmb.core.types import PathString, PmbArgs +from pmb.core.types import Env, PathString, PmbArgs -def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"), output: str="log", output_return: bool=False, - check: Optional[bool]=None, env: Dict[Any, Any]={}, sudo: bool=False) -> str: +def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Optional[Path] = None, output: str = "log", output_return: bool = False, + check: Optional[bool] = None, env: Env = {}, sudo: bool = False) -> str | int | subprocess.Popen: """ Run a command on the host system as user. @@ -22,8 +23,8 @@ def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"), # Readable log message (without all the escaping) msg = "% " for key, value in env.items(): - msg += key + "=" + value + " " - if working_dir != Path("/"): + msg += f"{key}={value} " + if working_dir is not None: msg += f"cd {os.fspath(working_dir)}; " msg += " ".join(cmd_parts) @@ -35,6 +36,15 @@ def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"), return pmb.helpers.run_core.core(args, msg, cmd_parts, working_dir, output, output_return, check, sudo) +# FIXME: should probably use some kind of wrapper class / builder pattern for all these parameters... +def user_output(args: PmbArgs, cmd: Sequence[PathString], working_dir: Optional[Path] = None, output: str = "log", + check: Optional[bool] = None, env: Env = {}, sudo: bool = False) -> str: + ret = user(args, cmd, working_dir, output, output_return=True, check=check, env=env, sudo=sudo) + if not isinstance(ret, str): + raise TypeError("Expected str output, got " + str(ret)) + + return ret + def root(args: PmbArgs, cmd: Sequence[PathString], working_dir=None, output="log", output_return=False, check=None, env={}): diff --git a/pmb/helpers/run_core.py b/pmb/helpers/run_core.py index 4fae4999..e4379fe7 100644 --- a/pmb/helpers/run_core.py +++ b/pmb/helpers/run_core.py @@ -1,6 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import fcntl +from pmb.core.types import PathString, Env from pmb.helpers import logging import os from pathlib import Path @@ -10,17 +11,17 @@ import subprocess import sys import threading import time -from typing import Sequence -from pmb.core.chroot import Chroot +from typing import Dict, Optional, Sequence import pmb.helpers.run -from pmb.core.types import PathString, PmbArgs + +from pmb.core.types import Env, PathString, PmbArgs """For a detailed description of all output modes, read the description of core() at the bottom. All other functions in this file get (indirectly) called by core(). """ -def flat_cmd(cmd: Sequence[PathString], working_dir: Path=Path("/"), env={}): +def flat_cmd(cmd: Sequence[PathString], working_dir: Optional[Path]=None, env: Env={}): """Convert a shell command passed as list into a flat shell string with proper escaping. :param cmd: command as list, e.g. ["echo", "string with spaces"] @@ -35,14 +36,14 @@ def flat_cmd(cmd: Sequence[PathString], working_dir: Path=Path("/"), env={}): # Merge env and cmd into escaped list escaped = [] for key, value in env.items(): - escaped.append(key + "=" + shlex.quote(value)) + escaped.append(key + "=" + shlex.quote(os.fspath(value))) for i in range(len(cmd)): escaped.append(shlex.quote(os.fspath(cmd[i]))) # Prepend working dir ret = " ".join(escaped) - if working_dir != Path("/"): - ret = "cd " + shlex.quote(working_dir) + ";" + ret + if working_dir is not None: + ret = "cd " + shlex.quote(str(working_dir)) + ";" + ret return ret @@ -84,6 +85,7 @@ def pipe(cmd, working_dir=None): return ret +# FIXME (#2324): These types make no sense at all def pipe_read(process, output_to_stdout=False, output_return=False, output_return_buffer=False): """Read all output from a subprocess, copy it to the log and optionally stdout and a buffer variable. @@ -179,17 +181,21 @@ def foreground_pipe(args: PmbArgs, cmd, working_dir=None, output_to_stdout=False stdin=stdin) # Make process.stdout non-blocking - handle = process.stdout.fileno() + stdout = process.stdout or None + if not stdout: + raise RuntimeError("Process has no stdout?!") + + handle = stdout.fileno() flags = fcntl.fcntl(handle, fcntl.F_GETFL) fcntl.fcntl(handle, fcntl.F_SETFL, flags | os.O_NONBLOCK) # While process exists wait for output (with timeout) - output_buffer = [] + output_buffer: list[bytes] = [] sel = selectors.DefaultSelector() - sel.register(process.stdout, selectors.EVENT_READ) - timeout = args.timeout if output_timeout else None + sel.register(stdout, selectors.EVENT_READ) + timeout = args.timeout while process.poll() is None: - wait_start = time.perf_counter() if output_timeout else None + wait_start = time.perf_counter() sel.select(timeout) # On timeout raise error (we need to measure time on our own, because diff --git a/pmb/helpers/ui.py b/pmb/helpers/ui.py index d341fe04..882ccde0 100644 --- a/pmb/helpers/ui.py +++ b/pmb/helpers/ui.py @@ -16,8 +16,8 @@ def list(args: PmbArgs, arch): ret = [("none", "Bare minimum OS image for testing and manual" " customization. The \"console\" UI should be selected if" " a graphical UI is not desired.")] - for path in sorted(glob.glob(args.aports + "/main/postmarketos-ui-*")): - apkbuild = pmb.parse.apkbuild(f"{path}/APKBUILD") + for path in sorted(args.aports.glob("main/postmarketos-ui-*")): + apkbuild = pmb.parse.apkbuild(path) ui = os.path.basename(path).split("-", 2)[2] if pmb.helpers.package.check_arch(args, apkbuild["pkgname"], arch): ret.append((ui, apkbuild["pkgdesc"])) diff --git a/pmb/install/_install.py b/pmb/install/_install.py index e8a1f8da..8a204a21 100644 --- a/pmb/install/_install.py +++ b/pmb/install/_install.py @@ -6,8 +6,8 @@ import re import glob import shlex import sys -from typing import Dict, List -from pathlib import Path, PurePath +from typing import Dict, List, Optional, Sequence, TypedDict +from pathlib import Path import pmb.chroot import pmb.chroot.apk @@ -15,7 +15,7 @@ import pmb.chroot.other import pmb.chroot.initfs import pmb.config import pmb.config.pmaports -from pmb.core.types import PmbArgs +from pmb.core.types import PartitionLayout, PmbArgs import pmb.helpers.devices from pmb.helpers.mount import mount_device_rootfs import pmb.helpers.run @@ -60,8 +60,11 @@ def get_nonfree_packages(args: PmbArgs, device): ["device-nokia-n900-nonfree-firmware"] """ # Read subpackages - apkbuild = pmb.parse.apkbuild(pmb.helpers.devices.find_path(args, device, - 'APKBUILD')) + device_path = pmb.helpers.devices.find_path(args, device, 'APKBUILD') + if not device_path: + raise RuntimeError(f"Device package not found for {device}") + + apkbuild = pmb.parse.apkbuild(device_path) subpackages = apkbuild["subpackages"] # Check for firmware and userland @@ -123,7 +126,7 @@ def copy_files_from_chroot(args: PmbArgs, chroot: Chroot): pmb.helpers.run.root(args, ["rm", fifo]) # Get all folders inside the device rootfs (except for home) - folders: List[PurePath] = [] + folders: List[str] = [] for path in mountpoint_outside.glob("*"): if path.name == "home": continue @@ -150,7 +153,7 @@ def create_home_from_skel(args: PmbArgs): rootfs = (Chroot.native() / "mnt/install") # In btrfs, home subvol & home dir is created in format.py if args.filesystem != "btrfs": - pmb.helpers.run.root(args, ["mkdir", rootfs + "/home"]) + pmb.helpers.run.root(args, ["mkdir", rootfs / "home"]) home = (rootfs / "home" / args.user) if (rootfs / "etc/skel").exists(): @@ -310,8 +313,7 @@ def copy_ssh_keys(args: PmbArgs): target = Chroot.native() / "mnt/install/home/" / args.user / ".ssh" pmb.helpers.run.root(args, ["mkdir", target]) pmb.helpers.run.root(args, ["chmod", "700", target]) - pmb.helpers.run.root(args, ["cp", authorized_keys, target + - "/authorized_keys"]) + pmb.helpers.run.root(args, ["cp", authorized_keys, target / "authorized_keys"]) pmb.helpers.run.root(args, ["rm", authorized_keys]) pmb.helpers.run.root(args, ["chown", "-R", "10000:10000", target]) @@ -538,9 +540,9 @@ def generate_binary_list(args: PmbArgs, chroot: Chroot, step): binaries = args.deviceinfo["sd_embed_firmware"].split(",") for binary_offset in binaries: - binary, offset = binary_offset.split(':') + binary, _offset = binary_offset.split(':') try: - offset = int(offset) + offset = int(_offset) except ValueError: raise RuntimeError("Value for firmware binary offset is " f"not valid: {offset}") @@ -627,9 +629,9 @@ def write_cgpt_kpart(args: PmbArgs, layout, suffix: Chroot): def sanity_check_boot_size(args: PmbArgs): default = pmb.config.defaults["boot_size"] - if int(args.boot_size) >= int(default): + if isinstance(default, str) and int(args.boot_size) >= int(default): return - logging.error("ERROR: your pmbootstrap has a small boot_size of" + logging.error("ERROR: your pmbootstrap has a small/invalid boot_size of" f" {args.boot_size} configured, probably because the config" " has been created with an old version.") logging.error("This can lead to problems later on, we recommend setting it" @@ -700,11 +702,12 @@ def get_partition_layout(reserve, kernel): :returns: the partition layout, e.g. without reserve and kernel: {"kernel": None, "boot": 1, "reserve": None, "root": 2} """ - ret = {} - ret["kernel"] = None - ret["boot"] = 1 - ret["reserve"] = None - ret["root"] = 2 + ret: PartitionLayout = { + "kernel": None, + "boot": 1, + "reserve": None, + "root": 2, + } if kernel: ret["kernel"] = 1 @@ -761,11 +764,11 @@ def create_fstab(args: PmbArgs, layout, chroot: Chroot): # Do not install fstab into target rootfs when using on-device # installer. Provide fstab only to installer suffix - if args.on_device_installer and "rootfs_" in chroot: + if args.on_device_installer and chroot.type == ChrootType.ROOTFS: return - boot_dev = f"/dev/installp{layout['boot']}" - root_dev = f"/dev/installp{layout['root']}" + boot_dev = Path(f"/dev/installp{layout['boot']}") + root_dev = Path(f"/dev/installp{layout['root']}") boot_mount_point = f"UUID={get_uuid(args, boot_dev)}" root_mount_point = "/dev/mapper/root" if args.full_disk_encryption \ @@ -806,7 +809,7 @@ def create_fstab(args: PmbArgs, layout, chroot: Chroot): def install_system_image(args: PmbArgs, size_reserve, chroot: Chroot, step, steps, boot_label="pmOS_boot", root_label="pmOS_root", - split=False, disk=None): + split=False, disk: Optional[Path]=None): """ :param size_reserve: empty partition between root and boot in MiB (pma#463) :param suffix: the chroot suffix, where the rootfs that will be installed @@ -912,6 +915,8 @@ def print_flash_info(args: PmbArgs): method = args.deviceinfo["flash_method"] flasher = pmb.config.flashers.get(method, {}) flasher_actions = flasher.get("actions", {}) + if not isinstance(flasher_actions, dict): + raise TypeError(f"flasher actions must be a dictionary, got: {flasher_actions}") requires_split = flasher.get("split", False) if method == "none": @@ -929,11 +934,9 @@ def print_flash_info(args: PmbArgs): logging.info("* pmbootstrap flasher flash_rootfs") logging.info(" Flashes the generated rootfs image to your device:") if args.split: - logging.info(f" {Chroot.native()}/home/pmos/rootfs/" - f"{args.device}-rootfs.img") + logging.info(f" {Chroot.native() / 'home/pmos/rootfs' / args.device}-rootfs.img") else: - logging.info(f" {Chroot.native()}/home/pmos/rootfs/" - f"{args.device}.img") + logging.info(f" {Chroot.native() / 'home/pmos/rootfs' / args.device}.img") logging.info(" (NOTE: This file has a partition table, which" " contains /boot and / subpartitions. That way we" " don't need to change the partition layout on your" @@ -975,8 +978,7 @@ def print_flash_info(args: PmbArgs): " Use 'pmbootstrap flasher boot' to do that.)") if "flash_lk2nd" in flasher_actions and \ - os.path.exists(f"{Chroot(ChrootType.ROOTFS, args.device)}" - "/boot/lk2nd.img"): + (Chroot(ChrootType.ROOTFS, args.device) / "/boot/lk2nd.img").exists(): logging.info("* Your device supports and may even require" " flashing lk2nd. You should flash it before" " flashing anything else. Use 'pmbootstrap flasher" @@ -991,7 +993,7 @@ def print_flash_info(args: PmbArgs): def install_recovery_zip(args: PmbArgs, steps): logging.info(f"*** ({steps}/{steps}) CREATING RECOVERY-FLASHABLE ZIP ***") suffix = "buildroot_" + args.deviceinfo["arch"] - mount_device_rootfs(args, Chroot(ChrootType.ROOTFS, args.device), suffix) + mount_device_rootfs(args, Chroot.rootfs(args.device)) pmb.install.recovery.create_zip(args, suffix) # Flash information @@ -1003,7 +1005,7 @@ def install_recovery_zip(args: PmbArgs, steps): def install_on_device_installer(args: PmbArgs, step, steps): # Generate the rootfs image if not args.ondev_no_rootfs: - suffix_rootfs = Chroot(ChrootType.ROOTFS, args.device) + suffix_rootfs = Chroot.rootfs(args.device) install_system_image(args, 0, suffix_rootfs, step=step, steps=steps, split=True) step += 2 @@ -1132,7 +1134,7 @@ def get_recommends(args: PmbArgs, packages) -> Sequence[str]: """ global get_recommends_visited - ret = [] + ret: List[str] = [] if not args.install_recommends: return ret diff --git a/pmb/install/blockdevice.py b/pmb/install/blockdevice.py index cf1ab79e..c6b75c02 100644 --- a/pmb/install/blockdevice.py +++ b/pmb/install/blockdevice.py @@ -1,5 +1,6 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later +from typing import Optional from pmb.helpers import logging import os import glob @@ -20,7 +21,7 @@ def previous_install(args: PmbArgs, path: Path): :param path: path to disk block device (e.g. /dev/mmcblk0) """ label = "" - for blockdevice_outside in [f"{path}1", f"{path}p1"]: + for blockdevice_outside in [path.with_stem(f"{path.name}1"), path.with_stem(f"{path.name}p1")]: if not os.path.exists(blockdevice_outside): continue blockdevice_inside = "/dev/diskp1" @@ -39,14 +40,14 @@ def previous_install(args: PmbArgs, path: Path): return "pmOS_boot" in label -def mount_disk(args: PmbArgs, path): +def mount_disk(args: PmbArgs, path: Path): """ :param path: path to disk block device (e.g. /dev/mmcblk0) """ # Sanity checks if not os.path.exists(path): raise RuntimeError(f"The disk block device does not exist: {path}") - for path_mount in glob.glob(f"{path}*"): + for path_mount in path.parent.glob(f"{path.name}*"): if pmb.helpers.mount.ismount(path_mount): raise RuntimeError(f"{path_mount} is mounted! Will not attempt to" " format this!") @@ -124,7 +125,7 @@ def create_and_mount_image(args: PmbArgs, size_boot, size_root, size_reserve, pmb.helpers.mount.bind_file(args, device, Chroot.native() / mount_point) -def create(args: PmbArgs, size_boot, size_root, size_reserve, split, disk): +def create(args: PmbArgs, size_boot, size_root, size_reserve, split, disk: Optional[Path]): """ Create /dev/install (the "install blockdevice"). diff --git a/pmb/install/losetup.py b/pmb/install/losetup.py index 77ff3cd1..a3e2945e 100644 --- a/pmb/install/losetup.py +++ b/pmb/install/losetup.py @@ -2,11 +2,13 @@ # SPDX-License-Identifier: GPL-3.0-or-later import glob import json +from pathlib import Path +from typing import List, Optional from pmb.helpers import logging import os import time -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.helpers.mount import pmb.helpers.run import pmb.chroot @@ -14,19 +16,19 @@ from pmb.core import Chroot def init(args: PmbArgs): - if not os.path.isdir("/sys/module/loop"): + if not Path("/sys/module/loop").is_dir(): pmb.helpers.run.root(args, ["modprobe", "loop"]) - for loopdevice in glob.glob("/dev/loop*"): - if os.path.isdir(loopdevice): + for loopdevice in Path("/dev/").glob("loop*"): + if loopdevice.is_dir(): continue pmb.helpers.mount.bind_file(args, loopdevice, Chroot.native() / loopdevice) -def mount(args: PmbArgs, img_path): +def mount(args: PmbArgs, img_path: Path): """ :param img_path: Path to the img file inside native chroot. """ - logging.debug("(native) mount " + img_path + " (loop)") + logging.debug(f"(native) mount {img_path} (loop)") # Try to mount multiple times (let the kernel module initialize #1594) for i in range(0, 5): @@ -39,20 +41,23 @@ def mount(args: PmbArgs, img_path): # Mount and return on success init(args) - losetup_cmd = ["losetup", "-f", img_path] + losetup_cmd: List[PathString] = ["losetup", "-f", img_path] sector_size = args.deviceinfo["rootfs_image_sector_size"] if sector_size: losetup_cmd += ["-b", str(int(sector_size))] pmb.chroot.root(args, losetup_cmd, check=False) - if device_by_back_file(args, img_path): + try: + device_by_back_file(args, img_path) return + except RuntimeError: + pass # Failure: raise exception - raise RuntimeError("Failed to mount loop device: " + img_path) + raise RuntimeError(f"Failed to mount loop device: {img_path}") -def device_by_back_file(args: PmbArgs, back_file, auto_init=True): +def device_by_back_file(args: PmbArgs, back_file: Path, auto_init=True) -> Path: """ Get the /dev/loopX device that points to a specific image file. """ @@ -61,22 +66,24 @@ def device_by_back_file(args: PmbArgs, back_file, auto_init=True): losetup_output = pmb.chroot.root(args, ["losetup", "--json", "--list"], output_return=True, auto_init=auto_init) if not losetup_output: - return None + raise RuntimeError("losetup failed") # Find the back_file losetup = json.loads(losetup_output) for loopdevice in losetup["loopdevices"]: - if loopdevice["back-file"] == back_file: - return loopdevice["name"] - return None + if Path(loopdevice["back-file"]) == back_file: + return Path(loopdevice["name"]) + raise RuntimeError(f"Failed to find loop device for {back_file}") -def umount(args: PmbArgs, img_path, auto_init=True): +def umount(args: PmbArgs, img_path: Path, auto_init=True): """ :param img_path: Path to the img file inside native chroot. """ - device = device_by_back_file(args, img_path, auto_init) - if not device: + device: Path + try: + device = device_by_back_file(args, img_path, auto_init) + except RuntimeError: return - logging.debug("(native) umount " + device) + logging.debug(f"(native) umount {device}") pmb.chroot.root(args, ["losetup", "-d", device], auto_init=auto_init) diff --git a/pmb/install/partition.py b/pmb/install/partition.py index 8973127f..56344ad4 100644 --- a/pmb/install/partition.py +++ b/pmb/install/partition.py @@ -1,5 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later +from pathlib import Path +from typing import Optional from pmb.helpers import logging import os import time @@ -10,25 +12,28 @@ import pmb.install.losetup from pmb.core import Chroot -def partitions_mount(args: PmbArgs, layout, disk): +# FIXME (#2324): this function drops disk to a string because it's easier +# to manipulate, this is probably bad. +def partitions_mount(args: PmbArgs, layout, disk: Optional[Path]): """ Mount blockdevices of partitions inside native chroot :param layout: partition layout from get_partition_layout() :param disk: path to disk block device (e.g. /dev/mmcblk0) or None """ - prefix = disk if not disk: - img_path = "/home/pmos/rootfs/" + args.device + ".img" - prefix = pmb.install.losetup.device_by_back_file(args, img_path) + img_path = Path("/home/pmos/rootfs") / f"{args.device}.img" + disk = pmb.install.losetup.device_by_back_file(args, img_path) + + logging.info(f"Mounting partitions of {disk} inside the chroot") tries = 20 # Devices ending with a number have a "p" before the partition number, # /dev/sda1 has no "p", but /dev/mmcblk0p1 has. See add_partition() in # block/partitions/core.c of linux.git. - partition_prefix = prefix - if str.isdigit(prefix[-1:]): - partition_prefix = f"{prefix}p" + partition_prefix = str(disk) + if str.isdigit(disk.name[-1:]): + partition_prefix = f"{disk}p" found = False for i in range(tries): @@ -40,7 +45,7 @@ def partitions_mount(args: PmbArgs, layout, disk): time.sleep(0.1) if not found: - raise RuntimeError(f"Unable to find the first partition of {prefix}, " + raise RuntimeError(f"Unable to find the first partition of {disk}, " f"expected it to be at {partition_prefix}1!") partitions = [layout["boot"], layout["root"]] @@ -49,7 +54,7 @@ def partitions_mount(args: PmbArgs, layout, disk): partitions += [layout["kernel"]] for i in partitions: - source = f"{partition_prefix}{i}" + source = Path(f"{partition_prefix}{i}") target = Chroot.native() / "dev" / f"installp{i}" pmb.helpers.mount.bind_file(args, source, target) diff --git a/pmb/install/recovery.py b/pmb/install/recovery.py index 7d6d59dc..f4dfc4ba 100644 --- a/pmb/install/recovery.py +++ b/pmb/install/recovery.py @@ -1,5 +1,6 @@ # Copyright 2023 Attila Szollosi # SPDX-License-Identifier: GPL-3.0-or-later +from pathlib import Path from pmb.helpers import logging import pmb.chroot @@ -13,7 +14,7 @@ def create_zip(args: PmbArgs, suffix): """ Create android recovery compatible installer zip. """ - zip_root = "/var/lib/postmarketos-android-recovery-installer/" + zip_root = Path("/var/lib/postmarketos-android-recovery-installer/") rootfs = "/mnt/rootfs_" + args.device flavor = pmb.helpers.frontend._parse_flavor(args) method = args.deviceinfo["flash_method"] @@ -70,4 +71,4 @@ def create_zip(args: PmbArgs, suffix): ["gzip", "-f1", "rootfs.tar"], ["build-recovery-zip", args.device]] for command in commands: - pmb.chroot.root(args, command, suffix, zip_root) + pmb.chroot.root(args, command, suffix, working_dir=zip_root) diff --git a/pmb/install/ui.py b/pmb/install/ui.py index 1fe68759..79a4b132 100644 --- a/pmb/install/ui.py +++ b/pmb/install/ui.py @@ -1,18 +1,19 @@ # Copyright 2023 Dylan Van Assche # SPDX-License-Identifier: GPL-3.0-or-later +from typing import List from pmb.helpers import logging from pmb.core.types import PmbArgs import pmb.helpers.pmaports -def get_groups(args: PmbArgs): +def get_groups(args: PmbArgs) -> List[str]: """ Get all groups to which the user additionally must be added. The list of groups are listed in _pmb_groups of the UI and UI-extras package. :returns: list of groups, e.g. ["feedbackd", "udev"] """ - ret = [] + ret: List[str] = [] if args.ui == "none": return ret diff --git a/pmb/parse/_apkbuild.py b/pmb/parse/_apkbuild.py index 81314970..39c9ed17 100644 --- a/pmb/parse/_apkbuild.py +++ b/pmb/parse/_apkbuild.py @@ -206,7 +206,7 @@ def _parse_attributes(path, lines, apkbuild_attributes, ret): ret[attribute] = replace_variable(ret, value) if "subpackages" in apkbuild_attributes: - subpackages = OrderedDict() + subpackages: OrderedDict[str, str] = OrderedDict() for subpkg in ret["subpackages"].split(" "): if subpkg: _parse_subpackage(path, lines, ret, subpackages, subpkg) @@ -326,8 +326,12 @@ def apkbuild(path: Path, check_pkgver=True, check_pkgname=True): :returns: relevant variables from the APKBUILD. Arrays get returned as arrays. """ - if path.is_dir(): + if path.name != "APKBUILD": path = path / "APKBUILD" + + if not path.exists(): + raise FileNotFoundError(f"{path.relative_to(pmb.config.work)} not found!") + # Try to get a cached result first (we assume that the aports don't change # in one pmbootstrap call) if path in pmb.helpers.other.cache["apkbuild"]: diff --git a/pmb/parse/apkindex.py b/pmb/parse/apkindex.py index ec403361..655141f4 100644 --- a/pmb/parse/apkindex.py +++ b/pmb/parse/apkindex.py @@ -1,6 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later import collections +from typing import Any, Dict, List from pmb.helpers import logging from pathlib import Path import tarfile @@ -34,7 +35,7 @@ def parse_next_block(path: Path, lines, start): :returns: None, when there are no more blocks """ # Parse until we hit an empty line or end of file - ret = {} + ret: Dict[str, Any] = {} mapping = { "A": "arch", "D": "depends", @@ -60,9 +61,8 @@ def parse_next_block(path: Path, lines, start): for letter, key in mapping.items(): if line.startswith(letter + ":"): if key in ret: - raise RuntimeError( - "Key " + key + " (" + letter + ":) specified twice" - " in block: " + str(ret) + ", file: " + path) + raise RuntimeError(f"Key {key} ({letter}:) specified twice" + f" in block: {ret}, file: {path}") ret[key] = line[2:-1] # Format and return the block @@ -91,9 +91,9 @@ def parse_next_block(path: Path, lines, start): # No more blocks elif ret != {}: - raise RuntimeError("Last block in " + path + " does not end" - " with a new line! Delete the file and" - " try again. Last block: " + str(ret)) + raise RuntimeError(f"Last block in {path} does not end" + " with a new line! Delete the file and" + f" try again. Last block: {ret}") return None @@ -168,7 +168,7 @@ def parse(path: Path, multiple_providers=True): # Require the file to exist if not path.is_file(): logging.verbose("NOTE: APKINDEX not found, assuming no binary packages" - " exist for that architecture: " + path) + f" exist for that architecture: {path}") return {} # Try to get a cached result first @@ -185,14 +185,14 @@ def parse(path: Path, multiple_providers=True): # Read all lines if tarfile.is_tarfile(path): with tarfile.open(path, "r:gz") as tar: - with tar.extractfile(tar.getmember("APKINDEX")) as handle: + with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr] lines = handle.readlines() else: with path.open("r", encoding="utf-8") as handle: lines = handle.readlines() # Parse the whole APKINDEX file - ret = collections.OrderedDict() + ret: Dict[str, Any] = collections.OrderedDict() start = [0] while True: block = parse_next_block(path, lines, start) @@ -201,8 +201,8 @@ def parse(path: Path, multiple_providers=True): # Skip virtual packages if "timestamp" not in block: - logging.verbose("Skipped virtual package " + str(block) + " in" - " file: " + path) + logging.verbose(f"Skipped virtual package {block} in" + f" file: {path}") continue # Add the next package and all aliases @@ -232,11 +232,11 @@ def parse_blocks(path: Path): """ # Parse all lines with tarfile.open(path, "r:gz") as tar: - with tar.extractfile(tar.getmember("APKINDEX")) as handle: + with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr] lines = handle.readlines() # Parse lines into blocks - ret = [] + ret: List[str] = [] start = [0] while True: block = pmb.parse.apkindex.parse_next_block(path, lines, start) @@ -251,7 +251,7 @@ def clear_cache(path: Path): :returns: True on successful deletion, False otherwise """ - logging.verbose("Clear APKINDEX cache for: " + path) + logging.verbose(f"Clear APKINDEX cache for: {path}") if path in pmb.helpers.other.cache["apkindex"]: del pmb.helpers.other.cache["apkindex"][path] return True @@ -281,7 +281,7 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None): package = pmb.helpers.package.remove_operators(package) - ret = collections.OrderedDict() + ret: Dict[str, Any] = collections.OrderedDict() for path in indexes: # Skip indexes not providing the package index_packages = parse(path) @@ -295,10 +295,8 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None): if provider_pkgname in ret: version_last = ret[provider_pkgname]["version"] if pmb.parse.version.compare(version, version_last) == -1: - logging.verbose(package + ": provided by: " + - provider_pkgname + "-" + version + " in " + - path + " (but " + version_last + " is" - " higher)") + logging.verbose(f"{package}: provided by: {provider_pkgname}-{version}" + f"in {path} (but {version_last} is higher)") continue # Add the provider to ret @@ -306,7 +304,8 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None): ret[provider_pkgname] = provider if ret == {} and must_exist: - logging.debug("Searched in APKINDEX files: " + ", ".join(indexes)) + import os + logging.debug(f"Searched in APKINDEX files: {', '.join([os.fspath(x) for x in indexes])}") raise RuntimeError("Could not find package '" + package + "'!") return ret @@ -319,7 +318,7 @@ def provider_highest_priority(providers, pkgname): :param pkgname: the package name we are interested in (for the log message) """ max_priority = 0 - priority_providers = collections.OrderedDict() + priority_providers: collections.OrderedDict[str, str] = collections.OrderedDict() for provider_name, provider in providers.items(): priority = int(provider.get("provider_priority", -1)) if priority > max_priority: diff --git a/pmb/parse/arguments.py b/pmb/parse/arguments.py index b2345e54..dba136d4 100644 --- a/pmb/parse/arguments.py +++ b/pmb/parse/arguments.py @@ -3,10 +3,13 @@ import argparse import copy import os +from pathlib import Path import sys +from pmb.core.types import PmbArgs + try: - import argcomplete + import argcomplete # type:ignore[import-untyped] except ImportError: pass @@ -109,7 +112,7 @@ def arguments_install(subparser): help="do not create an image file, instead" " write to the given block device (SD card, USB" " stick, etc.), for example: '/dev/mmcblk0'", - metavar="BLOCKDEV") + metavar="BLOCKDEV", type=lambda x: Path(x)) group.add_argument("--android-recovery-zip", help="generate TWRP flashable zip (recommended read:" " https://postmarketos.org/recoveryzip)", @@ -205,7 +208,8 @@ def arguments_export(subparser): ret.add_argument("export_folder", help="export folder, defaults to" " /tmp/postmarketOS-export", - default="/tmp/postmarketOS-export", nargs="?") + default=Path("/tmp/postmarketOS-export"), nargs="?", + type=lambda x: Path(x)) ret.add_argument("--odin", help="odin flashable tar" " (boot.img/kernel+initramfs only)", action="store_true", dest="odin_flashable_tar") @@ -651,8 +655,8 @@ def get_parser(): f" default: {mirrors_pmos_default}", metavar="URL", action="append", default=[]) parser.add_argument("-m", "--mirror-alpine", dest="mirror_alpine", - help="Alpine Linux mirror, default: " + - pmb.config.defaults["mirror_alpine"], + help="Alpine Linux mirror, default: " + f"{pmb.config.defaults['mirror_alpine']}", metavar="URL") parser.add_argument("-j", "--jobs", help="parallel jobs when compiling") parser.add_argument("-E", "--extra-space", @@ -923,7 +927,8 @@ def get_parser(): # Action: bootimg_analyze bootimg_analyze = sub.add_parser("bootimg_analyze", help="Extract all the" " information from an existing boot.img") - bootimg_analyze.add_argument("path", help="path to the boot.img") + bootimg_analyze.add_argument("path", help="path to the boot.img", + type=lambda x: Path(x)) bootimg_analyze.add_argument("--force", "-f", action="store_true", help="force even if the file seems to be" " invalid") @@ -940,7 +945,7 @@ def get_parser(): def arguments(): # Parse and extend arguments (also backup unmodified result from argparse) - args = get_parser().parse_args() + args: PmbArgs = get_parser().parse_args() # type: ignore setattr(args, "from_argparse", copy.deepcopy(args)) setattr(args.from_argparse, "from_argparse", args.from_argparse) diff --git a/pmb/parse/binfmt_info.py b/pmb/parse/binfmt_info.py index f1780686..369cbf52 100644 --- a/pmb/parse/binfmt_info.py +++ b/pmb/parse/binfmt_info.py @@ -10,8 +10,8 @@ import pmb.config def binfmt_info(arch_qemu): # Parse the info file full = {} - info = pmb.config.pmb_src + "/pmb/data/qemu-user-binfmt.txt" - logging.verbose("parsing: " + info) + info = pmb.config.pmb_src / "pmb/data/qemu-user-binfmt.txt" + logging.verbose(f"parsing: {info}") with open(info, "r") as handle: for line in handle: if line.startswith('#') or "=" not in line: diff --git a/pmb/parse/cpuinfo.py b/pmb/parse/cpuinfo.py index 33ff17fb..dacea09d 100644 --- a/pmb/parse/cpuinfo.py +++ b/pmb/parse/cpuinfo.py @@ -1,9 +1,10 @@ # Copyright 2023 Lary Gibaud # SPDX-License-Identifier: GPL-3.0-or-later import re +from typing import Optional -def arm_big_little_first_group_ncpus(): +def arm_big_little_first_group_ncpus() -> Optional[int]: """ Infer from /proc/cpuinfo on aarch64 if this is a big/little architecture (if there is different processor models) and the number of cores in the diff --git a/pmb/parse/depends.py b/pmb/parse/depends.py index cfc0c41c..e095f827 100644 --- a/pmb/parse/depends.py +++ b/pmb/parse/depends.py @@ -17,12 +17,12 @@ def package_from_aports(args: PmbArgs, pkgname_depend): depends, version. The version is the combined pkgver and pkgrel. """ # Get the aport - aport = pmb.helpers.pmaports.find(args, pkgname_depend, False) + aport = pmb.helpers.pmaports.find_optional(args, pkgname_depend) if not aport: return None # Parse its version - apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD") + apkbuild = pmb.parse.apkbuild(aport / "APKBUILD") pkgname = apkbuild["pkgname"] version = apkbuild["pkgver"] + "-r" + apkbuild["pkgrel"] @@ -118,7 +118,7 @@ def package_from_index(args: PmbArgs, pkgname_depend, pkgnames_install, package_ return provider -def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()): +def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()) -> Sequence[str]: """ Find all dependencies of the given pkgnames. @@ -134,8 +134,8 @@ def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()): # Iterate over todo-list until is is empty todo = list(pkgnames) - required_by = {} - ret = [] + required_by: Dict[str, Set[str]] = {} + ret: List[str] = [] while len(todo): # Skip already passed entries pkgname_depend = todo.pop(0) diff --git a/pmb/parse/deviceinfo.py b/pmb/parse/deviceinfo.py index db0be192..330bebfa 100644 --- a/pmb/parse/deviceinfo.py +++ b/pmb/parse/deviceinfo.py @@ -113,7 +113,8 @@ def _parse_kernel_suffix(args: PmbArgs, info, device, kernel): return ret -def deviceinfo(args: PmbArgs, device=None, kernel=None): +# FIXME (#2324): Make deviceinfo a type! (class!!!) +def deviceinfo(args: PmbArgs, device=None, kernel=None) -> Dict[str, str]: """ :param device: defaults to args.device :param kernel: defaults to args.kernel diff --git a/pmb/parse/kconfig.py b/pmb/parse/kconfig.py index 29045299..5b9426e5 100644 --- a/pmb/parse/kconfig.py +++ b/pmb/parse/kconfig.py @@ -253,21 +253,25 @@ def check(args: PmbArgs, pkgname, components_list=[], details=False, must_exist= # Read all kernel configs in the aport ret = True - aport = pmb.helpers.pmaports.find(args, "linux-" + flavor, must_exist=must_exist) - if aport is None: + aport: Path + try: + aport = pmb.helpers.pmaports.find(args, "linux-" + flavor) + except RuntimeError as e: + if must_exist: + raise e return None - apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD") + apkbuild = pmb.parse.apkbuild(aport / "APKBUILD") pkgver = apkbuild["pkgver"] # We only enforce optional checks for community & main devices - enforce_check = aport.split("/")[-2] in ["community", "main"] + enforce_check = aport.parts[-2] in ["community", "main"] for name in get_all_component_names(): if f"pmb:kconfigcheck-{name}" in apkbuild["options"] and \ name not in components_list: components_list += [name] - for config_path in glob.glob(aport + "/config-*"): + for config_path in aport.glob("config-*"): # The architecture of the config is in the name, so it just needs to be # extracted config_name = os.path.basename(config_path) diff --git a/pmb/parse/version.py b/pmb/parse/version.py index 1f8d5968..d2c1648e 100644 --- a/pmb/parse/version.py +++ b/pmb/parse/version.py @@ -108,12 +108,12 @@ def parse_suffix(rest): C equivalent: get_token(), case TOKEN_SUFFIX """ - suffixes = collections.OrderedDict([ + name_suffixes = collections.OrderedDict([ ("pre", ["alpha", "beta", "pre", "rc"]), ("post", ["cvs", "svn", "git", "hg", "p"]), ]) - for name, suffixes in suffixes.items(): + for name, suffixes in name_suffixes.items(): for i, suffix in enumerate(suffixes): if not rest.startswith(suffix): continue @@ -203,7 +203,7 @@ def validate(version): return True -def compare(a_version, b_version, fuzzy=False): +def compare(a_version: str, b_version: str, fuzzy=False): """ Compare two versions A and B to find out which one is higher, or if both are equal. @@ -307,4 +307,4 @@ def check_string(a_version, rule): # Compare result = compare(a_version, b_version) - return result in expected_results + return not expected_results or result in expected_results diff --git a/pmb/qemu/run.py b/pmb/qemu/run.py index ded45eab..dd314131 100644 --- a/pmb/qemu/run.py +++ b/pmb/qemu/run.py @@ -1,5 +1,6 @@ # Copyright 2023 Pablo Castellano, Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later +import subprocess from typing import Sequence from pmb.helpers import logging import os @@ -16,7 +17,7 @@ import pmb.chroot.other import pmb.chroot.initfs import pmb.config import pmb.config.pmaports -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.helpers.run import pmb.parse.arch import pmb.parse.cpuinfo @@ -30,7 +31,7 @@ def system_image(args: PmbArgs): """ path = Chroot.native() / "home/pmos/rootfs" / f"{args.device}.img" if not path.exists(): - logging.debug("Could not find rootfs: " + path) + logging.debug(f"Could not find rootfs: {path}") raise RuntimeError("The rootfs has not been generated yet, please " "run 'pmbootstrap install' first.") return path @@ -76,10 +77,10 @@ def create_gdk_loader_cache(args: PmbArgs) -> Path: cache_path = gdk_cache_dir / "loaders.cache" if not (chroot_native / cache_path).is_file(): - raise RuntimeError("gdk pixbuf cache file not found: " + cache_path) + raise RuntimeError(f"gdk pixbuf cache file not found: {cache_path}") pmb.chroot.root(args, ["cp", cache_path, custom_cache_path]) - cmd = ["sed", "-i", "-e", + cmd: Sequence[PathString] = ["sed", "-i", "-e", f"s@\"{gdk_cache_dir}@\"{chroot_native / gdk_cache_dir}@", custom_cache_path] pmb.chroot.root(args, cmd) @@ -138,11 +139,12 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None): if "gtk" in args.qemu_display: gdk_cache = create_gdk_loader_cache(args) - env.update({"GTK_THEME": "Default", - "GDK_PIXBUF_MODULE_FILE": gdk_cache, - "XDG_DATA_DIRS": ":".join([ - chroot_native / "usr/local/share", - chroot_native / "usr/share" + # FIXME: why does mypy think the values here should all be paths?? + env.update({"GTK_THEME": "Default", # type: ignore[dict-item] + "GDK_PIXBUF_MODULE_FILE": str(gdk_cache), # type: ignore[dict-item] + "XDG_DATA_DIRS": ":".join([ # type: ignore[dict-item] + str(chroot_native / "usr/local/share"), + str(chroot_native / "usr/share"), ])}) command = [] @@ -162,9 +164,9 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None): command += [chroot_native / "lib" / f"ld-musl-{pmb.config.arch_native}.so.1"] command += ["--library-path=" + ":".join([ - chroot_native / "lib", - chroot_native / "usr/lib" + - chroot_native / "usr/lib/pulseaudio" + str(chroot_native / "lib"), + str(chroot_native / "usr/lib"), + str(chroot_native / "usr/lib/pulseaudio"), ])] command += [chroot_native / "usr/bin" / f"qemu-system-{arch}"] command += ["-L", chroot_native / "usr/share/qemu/"] @@ -188,7 +190,7 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None): else: command += ["stdio"] - command += ["-drive", "file=" + img_path + ",format=raw,if=virtio"] + command += ["-drive", f"file={img_path},format=raw,if=virtio"] if img_path_2nd: command += ["-drive", "file=" + img_path_2nd + ",format=raw,if=virtio"] @@ -387,5 +389,5 @@ def run(args: PmbArgs): "send Ctrl+C to the VM, run:") logging.info("$ pmbootstrap config qemu_redir_stdio True") finally: - if process: + if isinstance(process, subprocess.Popen): process.terminate() diff --git a/pmb/sideload/__init__.py b/pmb/sideload/__init__.py index 272e9229..610a28c6 100644 --- a/pmb/sideload/__init__.py +++ b/pmb/sideload/__init__.py @@ -4,9 +4,8 @@ import os from typing import List from pmb.helpers import logging import shlex -from argparse import Namespace -from pmb.core.types import PmbArgs +from pmb.core.types import PathString, PmbArgs import pmb.helpers.run import pmb.helpers.run_core import pmb.parse.apkindex @@ -22,19 +21,19 @@ def scp_abuild_key(args: PmbArgs, user: str, host: str, port: str): :param host: target device ssh hostname :param port: target device ssh port """ - keys = (pmb.config.work / "config_abuild").glob("*.pub") + keys = list((pmb.config.work / "config_abuild").glob("*.pub")) key = keys[0] key_name = os.path.basename(key) logging.info(f"Copying signing key ({key_name}) to {user}@{host}") - command = ['scp', '-P', port, key, f'{user}@{host}:/tmp'] + command: List[PathString] = ['scp', '-P', port, key, f'{user}@{host}:/tmp'] pmb.helpers.run.user(args, command, output="interactive") logging.info(f"Installing signing key at {user}@{host}") keyname = os.path.join("/tmp", os.path.basename(key)) - remote_cmd = ['sudo', '-p', pmb.config.sideload_sudo_prompt, + remote_cmd_l: List[PathString] = ['sudo', '-p', pmb.config.sideload_sudo_prompt, '-S', 'mv', '-n', keyname, "/etc/apk/keys/"] - remote_cmd = pmb.helpers.run_core.flat_cmd(remote_cmd) + remote_cmd = pmb.helpers.run_core.flat_cmd(remote_cmd_l) command = ['ssh', '-t', '-p', port, f'{user}@{host}', remote_cmd] pmb.helpers.run.user(args, command, output="tui") @@ -43,7 +42,7 @@ def ssh_find_arch(args: PmbArgs, user: str, host: str, port: str) -> str: """Connect to a device via ssh and query the architecture.""" logging.info(f"Querying architecture of {user}@{host}") command = ["ssh", "-p", port, f"{user}@{host}", "uname -m"] - output = pmb.helpers.run.user(args, command, output_return=True) + output = pmb.helpers.run.user_output(args, command) # Split by newlines so we can pick out any irrelevant output, e.g. the "permanently # added to list of known hosts" warnings. output_lines = output.strip().splitlines() diff --git a/test/test_helpers_repo.py b/test/test_helpers_repo.py index 768c0c17..44a63a9f 100644 --- a/test/test_helpers_repo.py +++ b/test/test_helpers_repo.py @@ -25,7 +25,7 @@ def args(tmpdir, request): def test_hash(): url = "https://nl.alpinelinux.org/alpine/edge/testing" hash = "865a153c" - assert pmb.helpers.repo.hash(url, 8) == hash + assert pmb.helpers.repo.apkindex_hash(url, 8) == hash def test_alpine_apkindex_path(args: PmbArgs): diff --git a/test/test_pkgrel_bump.py b/test/test_pkgrel_bump.py index 4bd0a3fe..2b5e0837 100644 --- a/test/test_pkgrel_bump.py +++ b/test/test_pkgrel_bump.py @@ -73,7 +73,7 @@ def setup_work(args: PmbArgs, tmpdir): pmb.helpers.run.user(args, ["./pmbootstrap.py", "shutdown"]) # Link everything from work (except for "packages") to the tmpdir - for path in glob.glob(pmb.config.work / "*"): + for path in pmb.config.work.glob("*"): if os.path.basename(path) != "packages": pmb.helpers.run.user(args, ["ln", "-s", path, tmpdir + "/"]) @@ -91,7 +91,7 @@ def setup_work(args: PmbArgs, tmpdir): f"{tmpdir}/_aports/main/{pkgname}"]) # Copy pmaports.cfg - pmb.helpers.run.user(args, ["cp", args.aports + "/pmaports.cfg", tmpdir + + pmb.helpers.run.user(args, ["cp", args.aports / "pmaports.cfg", tmpdir + "/_aports"]) # Empty packages folder diff --git a/test/test_qemu_running_processes.py b/test/test_qemu_running_processes.py index a543aa21..e6f98727 100644 --- a/test/test_qemu_running_processes.py +++ b/test/test_qemu_running_processes.py @@ -133,7 +133,7 @@ def is_running(args: PmbArgs, programs, timeout=300, sleep_before_retry=1): ssh_works = False end = time.monotonic() + timeout - last_try = 0 + last_try = 0.0 while last_try < end: # Sleep only when last try exited immediately diff --git a/test/test_run_core.py b/test/test_run_core.py index 5988466f..ac4a9615 100644 --- a/test/test_run_core.py +++ b/test/test_run_core.py @@ -1,6 +1,7 @@ # Copyright 2023 Oliver Smith # SPDX-License-Identifier: GPL-3.0-or-later """ Test pmb.helpers.run_core """ +from typing import Sequence from pmb.core.types import PmbArgs import pytest import re @@ -69,7 +70,7 @@ def test_pipe(args: PmbArgs): def test_foreground_pipe(args: PmbArgs): func = pmb.helpers.run_core.foreground_pipe - cmd = ["echo", "test"] + cmd: Sequence[str] = ["echo", "test"] # Normal run assert func(args, cmd) == (0, "")