treewide: adopt pathlib.Path and type hinting (MR 2252)

With the new chroot type, we can now write fancy paths in the pythonic
way. Convert most of the codebase over, as well as adding various other
type hints.

Signed-off-by: Caleb Connolly <caleb@postmarketos.org>
This commit is contained in:
Caleb Connolly 2024-04-04 06:14:14 +02:00 committed by Oliver Smith
parent 00383bf354
commit 31cc898dd5
No known key found for this signature in database
GPG key ID: 5AE7F5513E0885CB
64 changed files with 513 additions and 385 deletions

View file

@ -2,10 +2,9 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# PYTHON_ARGCOMPLETE_OK
import sys
import logging
import os
import traceback
from argparse import Namespace
from typing import Any, Optional
from pmb.helpers.exceptions import BuildFailedError, NonBugError
@ -42,7 +41,9 @@ def print_log_hint(args: Any) -> None:
def main() -> int:
# Wrap everything to display nice error messages
args = None
# FIXME: can't use PmbArgs here because it creates a circular import
args: Any
try:
# Parse arguments, set up logging
args = parse.arguments()

View file

@ -76,8 +76,8 @@ def rewrite(args: PmbArgs, pkgname, path_original="", fields={}, replace_pkgname
if path_original:
lines_new = [
"# Automatically generated aport, do not edit!\n",
"# Generator: pmbootstrap aportgen " + pkgname + "\n",
"# Based on: " + path_original + "\n",
f"# Generator: pmbootstrap aportgen {pkgname}\n",
f"# Based on: {path_original}\n",
"\n",
]
else:

View file

@ -12,7 +12,8 @@ import pmb.parse.arch
from pmb.core import Chroot, ChrootType
def arch_from_deviceinfo(args: PmbArgs, pkgname, aport):
# FIXME (#2324): type hint Arch
def arch_from_deviceinfo(args: PmbArgs, pkgname, aport: Path) -> Optional[str]:
"""
The device- packages are noarch packages. But it only makes sense to build
them for the device's architecture, which is specified in the deviceinfo
@ -23,10 +24,10 @@ def arch_from_deviceinfo(args: PmbArgs, pkgname, aport):
"""
# Require a deviceinfo file in the aport
if not pkgname.startswith("device-"):
return
deviceinfo = aport + "/deviceinfo"
if not os.path.exists(deviceinfo):
return
return None
deviceinfo = aport / "deviceinfo"
if not deviceinfo.exists():
return None
# Return its arch
device = pkgname.split("-", 1)[1]
@ -35,7 +36,7 @@ def arch_from_deviceinfo(args: PmbArgs, pkgname, aport):
return arch
def arch(args: PmbArgs, pkgname):
def arch(args: PmbArgs, pkgname: str):
"""
Find a good default in case the user did not specify for which architecture
a package should be built.
@ -47,6 +48,8 @@ def arch(args: PmbArgs, pkgname):
* first arch in the APKBUILD
"""
aport = pmb.helpers.pmaports.find(args, pkgname)
if not aport:
raise FileNotFoundError(f"APKBUILD not found for {pkgname}")
ret = arch_from_deviceinfo(args, pkgname, aport)
if ret:
return ret
@ -80,7 +83,7 @@ def chroot(apkbuild: Dict[str, str], arch: str) -> Chroot:
if "pmb:cross-native" in apkbuild["options"]:
return Chroot.native()
return Chroot(ChrootType.BUILDROOT, arch)
return Chroot.buildroot(arch)
def crosscompile(args: PmbArgs, apkbuild, arch, suffix: Chroot):

View file

@ -1,5 +1,6 @@
# Copyright 2023 Robert Yang
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import List
from pmb.helpers import logging
import os
from pathlib import Path
@ -9,7 +10,7 @@ import pmb.aportgen
import pmb.build
import pmb.build.autodetect
import pmb.chroot
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.helpers
import pmb.helpers.mount
import pmb.helpers.pmaports
@ -89,7 +90,7 @@ def find_kbuild_output_dir(function_body):
"can't resolve it, please open an issue.")
def modify_apkbuild(args: PmbArgs, pkgname, aport):
def modify_apkbuild(args: PmbArgs, pkgname: str, aport: Path):
"""Modify kernel APKBUILD to package build output from envkernel.sh."""
apkbuild_path = aport + "/APKBUILD"
apkbuild = pmb.parse.apkbuild(apkbuild_path)
@ -110,7 +111,7 @@ def modify_apkbuild(args: PmbArgs, pkgname, aport):
pmb.aportgen.core.rewrite(args, pkgname, apkbuild_path, fields=fields)
def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out):
def run_abuild(args: PmbArgs, pkgname: str, arch: str, apkbuild_path: Path, kbuild_out):
"""
Prepare build environment and run abuild.
@ -142,17 +143,16 @@ def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out):
pmb.build.copy_to_buildpath(args, pkgname)
# Create symlink from abuild working directory to envkernel build directory
build_output = Path("" if kbuild_out == "" else "/" + kbuild_out)
if False or build_output != "":
if os.path.islink(chroot / "mnt/linux" / build_output) and \
os.path.lexists(chroot / "mnt/linux" / build_output):
pmb.chroot.root(args, ["rm", "/mnt/linux" / build_output])
if kbuild_out != "":
if os.path.islink(chroot / "mnt/linux" / kbuild_out) and \
os.path.lexists(chroot / "mnt/linux" / kbuild_out):
pmb.chroot.root(args, ["rm", "/mnt/linux" / kbuild_out])
pmb.chroot.root(args, ["ln", "-s", "/mnt/linux",
build_path / "src"])
pmb.chroot.root(args, ["ln", "-s", kbuild_out_source,
build_path / "src" / build_output])
build_path / "src" / kbuild_out])
cmd = ["cp", apkbuild_path, chroot / build_path / "APKBUILD"]
cmd: List[PathString] = ["cp", apkbuild_path, chroot / build_path / "APKBUILD"]
pmb.helpers.run.root(args, cmd)
# Create the apk package
@ -167,10 +167,10 @@ def run_abuild(args: PmbArgs, pkgname, arch, apkbuild_path, kbuild_out):
pmb.helpers.mount.umount_all(args, chroot / "mnt/linux")
# Clean up symlinks
if build_output != "":
if os.path.islink(chroot / "mnt/linux" / build_output) and \
os.path.lexists(chroot / "mnt/linux" / build_output):
pmb.chroot.root(args, ["rm", "/mnt/linux" / build_output])
if kbuild_out != "":
if os.path.islink(chroot / "mnt/linux" / kbuild_out) and \
os.path.lexists(chroot / "mnt/linux" / kbuild_out):
pmb.chroot.root(args, ["rm", "/mnt/linux" / kbuild_out])
pmb.chroot.root(args, ["rm", build_path / "src"])

View file

@ -33,13 +33,14 @@ def copy_to_buildpath(args: PmbArgs, package, chroot: Chroot=Chroot.native()):
# Copy aport contents with resolved symlinks
pmb.helpers.run.root(args, ["mkdir", "-p", build])
for entry in aport.iterdir():
file = entry.name
# Don't copy those dirs, as those have probably been generated by running `abuild`
# on the host system directly and not cleaning up after itself.
# Those dirs might contain broken symlinks and cp fails resolving them.
if entry.name in ["src", "pkg"]:
logging.warn(f"WARNING: Not copying {entry}, looks like a leftover from abuild")
if file in ["src", "pkg"]:
logging.warning(f"WARNING: Not copying {file}, looks like a leftover from abuild")
continue
pmb.helpers.run.root(args, ["cp", "-rL", aport / entry, build / entry])
pmb.helpers.run.root(args, ["cp", "-rL", aport / file, build / file])
pmb.chroot.root(args, ["chown", "-R", "pmos:pmos",
"/home/pmos/build"], chroot)

View file

@ -168,7 +168,7 @@ def packages_get_locally_built_apks(args: PmbArgs, packages, arch: str):
return ret
def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix):
def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, chroot: Chroot):
"""
Run apk to add packages, and ensure only the desired packages get
explicitly marked as installed.
@ -178,7 +178,7 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix):
:param to_del: list of pkgnames to be deleted, this should be set to
conflicting dependencies in any of the packages to be
installed or their dependencies (e.g. ["unl0kr"])
:param suffix: the chroot suffix, e.g. "native" or "rootfs_qemu-amd64"
:param chroot: the chroot suffix, e.g. "native" or "rootfs_qemu-amd64"
"""
# Sanitize packages: don't allow '--allow-untrusted' and other options
# to be passed to apk!
@ -210,16 +210,16 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, suffix):
command = ["--no-network"] + command
if i == 0:
pmb.helpers.apk.apk_with_progress(args, ["apk"] + command,
chroot=True, suffix=suffix)
run_in_chroot=True, chroot=chroot)
else:
# Virtual package related commands don't actually install or remove
# packages, but only mark the right ones as explicitly installed.
# They finish up almost instantly, so don't display a progress bar.
pmb.chroot.root(args, ["apk", "--no-progress"] + command,
chroot=suffix)
chroot=chroot)
def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True):
def install(args: PmbArgs, packages, chroot: Chroot=Chroot.native(), build=True):
"""
Install packages from pmbootstrap's local package index or the pmOS/Alpine
binary package mirrors. Iterate over all dependencies recursively, and
@ -232,7 +232,7 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True)
special case that all packages are expected to be in Alpine's
repositories, set this to False for performance optimization.
"""
arch = pmb.parse.arch.from_chroot_suffix(args, suffix)
arch = pmb.parse.arch.from_chroot_suffix(args, chroot)
if not packages:
logging.verbose("pmb.chroot.apk.install called with empty packages list,"
@ -240,10 +240,10 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True)
return
# Initialize chroot
check_min_version(args, suffix)
pmb.chroot.init(args, suffix)
check_min_version(args, chroot)
pmb.chroot.init(args, chroot)
packages_with_depends = pmb.parse.depends.recurse(args, packages, suffix)
packages_with_depends = pmb.parse.depends.recurse(args, packages, chroot)
to_add, to_del = packages_split_to_add_del(packages_with_depends)
if build:
@ -253,8 +253,8 @@ def install(args: PmbArgs, packages, suffix: Chroot=Chroot.native(), build=True)
to_add_local = packages_get_locally_built_apks(args, to_add, arch)
to_add_no_deps, _ = packages_split_to_add_del(packages)
logging.info(f"({suffix}) install {' '.join(to_add_no_deps)}")
install_run_apk(args, to_add_no_deps, to_add_local, to_del, suffix)
logging.info(f"({chroot}) install {' '.join(to_add_no_deps)}")
install_run_apk(args, to_add_no_deps, to_add_local, to_del, chroot)
def installed(args: PmbArgs, suffix: Chroot=Chroot.native()):

View file

@ -67,8 +67,8 @@ def extract_temp(tar, sigfilename):
for ftype in ret.keys():
member = tar.getmember(ret[ftype]["filename"])
handle, path = tempfile.mkstemp(ftype, "pmbootstrap")
handle = open(handle, "wb")
fd, path = tempfile.mkstemp(ftype, "pmbootstrap")
handle = open(fd, "wb")
ret[ftype]["temp_path"] = path
shutil.copyfileobj(tar.extractfile(member), handle)
@ -119,8 +119,7 @@ def extract(args: PmbArgs, version, apk_path):
logging.debug("Verify the version reported by the apk.static binary"
f" (must match the package version {version})")
os.chmod(temp_path, os.stat(temp_path).st_mode | stat.S_IEXEC)
version_bin = pmb.helpers.run.user(args, [temp_path, "--version"],
output_return=True)
version_bin = pmb.helpers.run.user_output(args, [temp_path, "--version"])
version_bin = version_bin.split(" ")[1].split(",")[0]
if not version.startswith(f"{version_bin}-r"):
os.unlink(temp_path)
@ -174,4 +173,4 @@ def run(args: PmbArgs, parameters):
if args.offline:
parameters = ["--no-network"] + parameters
pmb.helpers.apk.apk_with_progress(
args, [pmb.config.work / "apk.static"] + parameters, chroot=False)
args, [pmb.config.work / "apk.static"] + parameters, run_in_chroot=False)

View file

@ -5,7 +5,6 @@ import filecmp
from typing import List
from pmb.helpers import logging
import os
import filecmp
import pmb.chroot
import pmb.chroot.apk_static
@ -17,7 +16,7 @@ import pmb.helpers.run
import pmb.parse.arch
from pmb.core import Chroot, ChrootType
cache_chroot_is_outdated = []
cache_chroot_is_outdated: List[str] = []
class UsrMerge(enum.Enum):
"""
@ -98,7 +97,7 @@ def warn_if_chroot_is_outdated(args: PmbArgs, chroot: Chroot):
global cache_chroot_is_outdated
# Only check / display the warning once per session
if chroot in cache_chroot_is_outdated:
if str(chroot) in cache_chroot_is_outdated:
return
if pmb.config.workdir.chroots_outdated(args, chroot):
@ -107,7 +106,7 @@ def warn_if_chroot_is_outdated(args: PmbArgs, chroot: Chroot):
f" {days_warn} days. Consider running"
" 'pmbootstrap zap'.")
cache_chroot_is_outdated += [chroot]
cache_chroot_is_outdated += [str(chroot)]
def init(args: PmbArgs, chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO,

View file

@ -93,6 +93,7 @@ def mount(args: PmbArgs, chroot: Chroot=Chroot.native()):
# Mount if necessary
for source, target in mountpoints.items():
target_outer = chroot / target
#raise RuntimeError("test")
pmb.helpers.mount.bind(args, source, target_outer)
@ -101,7 +102,7 @@ def mount_native_into_foreign(args: PmbArgs, chroot: Chroot):
target = chroot / "native"
pmb.helpers.mount.bind(args, source, target)
musl = next(source.glob("/lib/ld-musl-*.so.1")).name
musl = next(source.glob("lib/ld-musl-*.so.1")).name
musl_link = (chroot / "lib" / musl)
if not musl_link.is_symlink():
pmb.helpers.run.root(args, ["ln", "-s", "/native/lib/" + musl,

View file

@ -1,7 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
from pathlib import Path, PurePath
import shutil
from typing import Sequence
@ -11,7 +11,7 @@ import pmb.chroot.binfmt
import pmb.helpers.run
import pmb.helpers.run_core
from pmb.core import Chroot
from pmb.core.types import PathString, PmbArgs
from pmb.core.types import Env, PathString, PmbArgs
def executables_absolute_path():
@ -29,7 +29,7 @@ def executables_absolute_path():
return ret
def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(), working_dir: Path=Path("/"), output="log",
def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(), working_dir: PurePath=PurePath("/"), output="log",
output_return=False, check=None, env={}, auto_init=True,
disable_timeout=False, add_proxy_env_vars=True):
"""
@ -38,6 +38,7 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(
:param env: dict of environment variables to be passed to the command, e.g.
{"JOBS": "5"}
:param auto_init: automatically initialize the chroot
:param working_dir: chroot-relative working directory
:param add_proxy_env_vars: if True, preserve HTTP_PROXY etc. vars from host
environment. pmb.chroot.user sets this to False
when calling pmb.chroot.root, because it already
@ -59,12 +60,12 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(
msg = f"({chroot}) % "
for key, value in env.items():
msg += f"{key}={value} "
if working_dir != Path("/"):
if working_dir != PurePath("/"):
msg += f"cd {working_dir}; "
msg += " ".join(cmd_str)
# Merge env with defaults into env_all
env_all = {"CHARSET": "UTF-8",
env_all: Env = {"CHARSET": "UTF-8",
"HISTFILE": "~/.ash_history",
"HOME": "/root",
"LANG": "UTF-8",
@ -83,7 +84,7 @@ def root(args: PmbArgs, cmd: Sequence[PathString], chroot: Chroot=Chroot.native(
# cmd_sudo: ["sudo", "env", "-i", "sh", "-c", "PATH=... /sbin/chroot ..."]
executables = executables_absolute_path()
cmd_chroot = [executables["chroot"], chroot.path, "/bin/sh", "-c",
pmb.helpers.run_core.flat_cmd(cmd_str, working_dir)]
pmb.helpers.run_core.flat_cmd(cmd_str, Path(working_dir))]
cmd_sudo = pmb.config.sudo([
"env", "-i", executables["sh"], "-c",
pmb.helpers.run_core.flat_cmd(cmd_chroot, env=env_all)]

View file

@ -11,7 +11,7 @@ from pmb.core.types import PmbArgs
import pmb.helpers.pmaports
import pmb.helpers.run
import pmb.parse.apkindex
from pmb.core import Chroot, ChrootType
from pmb.core import Chroot
def zap(args: PmbArgs, confirm=True, dry=False, pkgs_local=False, http=False,
@ -65,6 +65,7 @@ def zap(args: PmbArgs, confirm=True, dry=False, pkgs_local=False, http=False,
# Delete everything matching the patterns
for pattern in patterns:
logging.debug(f"Deleting {pattern}")
pattern = os.path.realpath(f"{pmb.config.work}/{pattern}")
matches = glob.glob(pattern)
for match in matches:
@ -114,7 +115,7 @@ def zap_pkgs_local_mismatch(args: PmbArgs, confirm=True, dry=False):
continue
# Aport path
aport_path = pmb.helpers.pmaports.find(args, origin, False)
aport_path = pmb.helpers.pmaports.find_optional(args, origin)
if not aport_path:
logging.info(f"% rm {apk_path_short}"
f" ({origin} aport not found)")
@ -154,7 +155,7 @@ def zap_pkgs_online_mismatch(args: PmbArgs, confirm=True, dry=False):
suffix = Chroot.native()
else:
try:
suffix = Chroot(ChrootType.BUILDROOT, arch)
suffix = Chroot.buildroot(arch)
except ValueError:
continue # Ignore invalid directory name

View file

@ -3,14 +3,16 @@
import multiprocessing
import os
from pathlib import Path
from pmb.core.types import PathString
from pmb.core.types import AportGenEntry, PathString
import pmb.parse.arch
import sys
from typing import Sequence
from typing import Dict, List, Sequence, TypedDict
#
# Exported functions
#
# FIXME (#2324): this sucks, we should re-organise this and not rely on "lifting"
# this functions this way
from pmb.config.load import load, sanity_checks
from pmb.config.save import save
from pmb.config.merge_with_args import merge_with_args
@ -24,7 +26,7 @@ from pmb.config.other import is_systemd_selected
pmb_src: Path = Path(Path(__file__) / "../../..").resolve()
apk_keys_path: Path = (pmb_src / "pmb/data/keys")
arch_native = pmb.parse.arch.alpine_native()
work: Path = Path("/unitialised/pmbootstrap/work/dir")
work: Path
# apk-tools minimum version
# https://pkgs.alpinelinux.org/packages?name=apk-tools&branch=edge
@ -67,7 +69,7 @@ required_programs = [
]
def sudo(cmd: Sequence[PathString]) -> Sequence[str]:
def sudo(cmd: Sequence[PathString]) -> Sequence[PathString]:
"""Adapt a command to run as root."""
sudo = which_sudo()
if sudo:
@ -81,7 +83,7 @@ def work_dir(_work: Path) -> None:
work directory before any other code is run. It is not meant to be used
anywhere else."""
global work
if work:
if "work" in globals():
raise RuntimeError("work_dir() called multiple times!")
work = _work
@ -947,10 +949,10 @@ flash_methods = [
# These folders will be mounted at the same location into the native
# chroot, before the flash programs get started.
flash_mount_bind = [
"sys/bus/usb/devices/",
"sys/dev/",
"sys/devices/",
"dev/bus/usb/"
Path("/sys/bus/usb/devices/"),
Path("/sys/dev/"),
Path("/sys/devices/"),
Path("/dev/bus/usb/"),
]
"""
@ -969,7 +971,7 @@ Fastboot specific: $KERNEL_CMDLINE
Heimdall specific: $PARTITION_INITFS
uuu specific: $UUU_SCRIPT
"""
flashers = {
flashers: Dict[str, Dict[str, bool | List[str] | Dict[str, List[List[str]]]]] = {
"fastboot": {
"depends": [], # pmaports.cfg: supported_fastboot_depends
"actions": {
@ -1129,7 +1131,7 @@ git_repos = {
#
# APORTGEN
#
aportgen = {
aportgen: Dict[str, AportGenEntry] = {
"cross": {
"prefixes": ["busybox-static", "gcc", "musl", "grub-efi"],
"confirm_overwrite": False,

View file

@ -423,8 +423,7 @@ def ask_for_device(args: PmbArgs):
device = f"{vendor}-{codename}"
device_path = pmb.helpers.devices.find_path(args, device, 'deviceinfo')
device_exists = device_path is not None
if not device_exists:
if device_path is None:
if device == args.device:
raise RuntimeError(
"This device does not exist anymore, check"
@ -449,7 +448,7 @@ def ask_for_device(args: PmbArgs):
break
kernel = ask_for_device_kernel(args, device)
return (device, device_exists, kernel)
return (device, device_path is not None, kernel)
def ask_for_additional_options(args: PmbArgs, cfg):
@ -739,7 +738,7 @@ def frontend(args: PmbArgs):
# Zap existing chroots
if (work_exists and device_exists and
len(glob.glob(pmb.config.work / "chroot_*")) and
len(list(Chroot.iter_patterns())) and
pmb.helpers.cli.confirm(
args, "Zap existing chroots to apply configuration?",
default=True)):

View file

@ -9,13 +9,14 @@ import pmb.config
from pmb.core.types import PmbArgs
import pmb.helpers.git
import pmb.helpers.pmaports
import pmb.parse.version
def check_legacy_folder():
# Existing pmbootstrap/aports must be a symlink
link = pmb.config.pmb_src + "/aports"
link = pmb.config.pmb_src / "aports"
if os.path.exists(link) and not os.path.islink(link):
raise RuntimeError("The path '" + link + "' should be a"
raise RuntimeError(f"The path '{link}' should be a"
" symlink pointing to the new pmaports"
" repository, which was split from the"
" pmbootstrap repository (#383). Consider"
@ -58,21 +59,21 @@ def check_version_pmaports(real):
raise RuntimeError("Run 'pmbootstrap pull' to update your pmaports.")
def check_version_pmbootstrap(min):
def check_version_pmbootstrap(min_ver):
# Compare versions
real = pmb.__version__
if pmb.parse.version.compare(real, min) >= 0:
if pmb.parse.version.compare(real, min_ver) >= 0:
return
# Show versions
logging.info("NOTE: you are using pmbootstrap version " + real + ", but" +
" version " + min + " is required.")
logging.info(f"NOTE: you are using pmbootstrap version {real}, but"
f" version {min_ver} is required.")
# Error for git clone
pmb_src = pmb.config.pmb_src
if os.path.exists(pmb_src + "/.git"):
if os.path.exists(pmb_src / ".git"):
raise RuntimeError("Please update your local pmbootstrap repository."
" Usually with: 'git -C \"" + pmb_src + "\" pull'")
f" Usually with: 'git -C \"{pmb_src}\" pull'")
# Error for package manager installation
raise RuntimeError("Please update your pmbootstrap version (with your"
@ -121,7 +122,7 @@ def read_config(args: PmbArgs):
path_cfg = args.aports / "pmaports.cfg"
if not os.path.exists(path_cfg):
raise RuntimeError("Invalid pmaports repository, could not find the"
" config: " + path_cfg)
f" config: {path_cfg}")
# Load the config
cfg = configparser.ConfigParser()

View file

@ -7,7 +7,7 @@ from pmb.core.types import PmbArgs
def save(args: PmbArgs, cfg):
logging.debug("Save config: " + args.config)
logging.debug(f"Save config: {args.config}")
os.makedirs(os.path.dirname(args.config), 0o700, True)
with open(args.config, "w") as handle:
cfg.write(handle)

View file

@ -4,7 +4,7 @@
from __future__ import annotations
import enum
from typing import Generator, Optional
from pathlib import Path, PosixPath
from pathlib import Path, PosixPath, PurePosixPath
import pmb.config
class ChrootType(enum.Enum):
@ -86,7 +86,7 @@ class Chroot:
def __truediv__(self, other: object) -> Path:
if isinstance(other, PosixPath):
if isinstance(other, PosixPath) or isinstance(other, PurePosixPath):
# Convert the other path to a relative path
# FIXME: we should avoid creating absolute paths that we actually want
# to make relative to the chroot...
@ -99,8 +99,8 @@ class Chroot:
def __rtruediv__(self, other: object) -> Path:
if isinstance(other, PosixPath):
return other / self.path
if isinstance(other, PosixPath) or isinstance(other, PurePosixPath):
return Path(other) / self.path
if isinstance(other, str):
return other / self.path
@ -120,6 +120,11 @@ class Chroot:
return Chroot(ChrootType.NATIVE)
@staticmethod
def buildroot(arch: str) -> Chroot:
return Chroot(ChrootType.BUILDROOT, arch)
@staticmethod
def rootfs(device: str) -> Chroot:
return Chroot(ChrootType.ROOTFS, device)

View file

@ -3,9 +3,24 @@
from argparse import Namespace
from pathlib import Path
from typing import Dict, Union
from typing import Dict, List, Optional, Tuple, TypedDict, Union
PathString = Union[Path, str]
Env = Dict[str, PathString]
# These types are not definitive / API, they exist to describe the current
# state of things so that we can improve our type hinting coverage and make
# future refactoring efforts easier.
class PartitionLayout(TypedDict):
kernel: Optional[int]
boot: int
reserve: Optional[int]
root: int
class AportGenEntry(TypedDict):
prefixes: List[str]
confirm_overwrite: bool
# Property list generated with:
# $ rg --vimgrep "((^|\s)args\.\w+)" --only-matching | cut -d"." -f3 | sort | uniq
@ -28,7 +43,7 @@ class PmbArgs(Namespace):
autoinstall: str
boot_size: str
build_default_device_arch: str
build_pkgs_on_install: str
build_pkgs_on_install: bool
buildroot: str
built: str
ccache_size: str
@ -38,17 +53,17 @@ class PmbArgs(Namespace):
command: str
config: Path
config_channels: str
details: str
details: bool
details_to_stdout: str
device: str
deviceinfo: Dict[str, str]
deviceinfo_parse_kernel: str
devices: str
disk: str
disk: Path
dry: str
efi: str
envkernel: str
export_folder: str
export_folder: Path
extra_packages: str
extra_space: str
fast: str
@ -58,7 +73,8 @@ class PmbArgs(Namespace):
folder: str
force: str
fork_alpine: str
from_argparse: str
# This is a filthy lie
from_argparse: "PmbArgs"
full_disk_encryption: str
hook: str
host: str
@ -68,7 +84,7 @@ class PmbArgs(Namespace):
install_base: str
install_blockdev: str
install_cgpt: str
install_key: str
install_key: bool
install_local_pkgs: str
install_recommends: str
is_default_channel: str
@ -80,7 +96,7 @@ class PmbArgs(Namespace):
lines: str
log: Path
mirror_alpine: str
mirrors_postmarketos: str
mirrors_postmarketos: List[str]
name: str
no_depends: str
no_fde: str
@ -91,15 +107,16 @@ class PmbArgs(Namespace):
no_sshd: str
odin_flashable_tar: str
offline: str
ondev_cp: str
ondev_cp: List[Tuple[str, str]]
on_device_installer: str
ondev_no_rootfs: str
overview: str
package: str
packages: str
# FIXME (#2324): figure out the args.package vs args.packages situation
package: str | List[str]
packages: List[str]
partition: str
password: str
path: str
path: Path
pkgname: str
pkgname_pkgver_srcurl: str
port: str
@ -122,7 +139,7 @@ class PmbArgs(Namespace):
rsync: str
scripts: str
second_storage: str
selected_providers: str
selected_providers: Dict[str, str]
sparse: str
split: str
src: str
@ -131,7 +148,7 @@ class PmbArgs(Namespace):
sudo_timer: str
suffix: str
systemd: str
timeout: str
timeout: float
ui: str
ui_extras: str
user: str

View file

@ -28,7 +28,7 @@ def frontend(args: PmbArgs):
pmb.chroot.initfs.build(args, flavor, Chroot(ChrootType.ROOTFS, args.device))
# Do the export, print all files
logging.info("Export symlinks to: " + target)
logging.info(f"Export symlinks to: {target}")
if args.odin_flashable_tar:
pmb.export.odin(args, flavor, target)
pmb.export.symlinks(args, flavor, target)

View file

@ -45,7 +45,7 @@ def symlinks(args: PmbArgs, flavor, folder: Path):
# Generate a list of patterns
chroot_native = Chroot.native()
path_boot = Chroot(ChrootType.ROOTFS, args.device) / "boot"
chroot_buildroot = Chroot(ChrootType.BUILDROOT, args.deviceinfo['arch'])
chroot_buildroot = Chroot.buildroot(args.deviceinfo['arch'])
files: List[Path] = [
path_boot / f"boot.img{suffix}",
path_boot / f"uInitrd{suffix}",

View file

@ -93,7 +93,7 @@ def sideload(args: PmbArgs):
pmb.flasher.install_depends(args)
# Mount the buildroot
chroot = Chroot(ChrootType.BUILDROOT, args.deviceinfo["arch"])
chroot = Chroot.buildroot(args.deviceinfo["arch"])
mountpoint = "/mnt/" / chroot
pmb.helpers.mount.bind(args, chroot.path,
Chroot.native().path / mountpoint)

View file

@ -22,6 +22,8 @@ def run(args: PmbArgs, action, flavor=None):
# Verify action
method = args.flash_method or args.deviceinfo["flash_method"]
cfg = pmb.config.flashers[method]
if not isinstance(cfg["actions"], dict):
raise TypeError(f"Flashers misconfigured! {method} key 'actions' should be a dictionary")
if action not in cfg["actions"]:
raise RuntimeError("action " + action + " is not"
" configured for method " + method + "!"

View file

@ -1,5 +1,6 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Optional
import pmb.config.pmaports
from pmb.core.types import PmbArgs
@ -15,6 +16,9 @@ def variables(args: PmbArgs, flavor, method):
# updated and minimum pmbootstrap version bumped.
# See also https://gitlab.com/postmarketOS/pmbootstrap/-/issues/2243
_partition_kernel: Optional[str]
_partition_rootfs: Optional[str]
if method.startswith("fastboot"):
_partition_kernel = args.deviceinfo["flash_fastboot_partition_kernel"]\
or "boot"

View file

@ -1,10 +1,12 @@
# Copyright 2023 Johannes Marbach, Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
from typing import List, Sequence
import pmb.chroot.root
import pmb.config.pmaports
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.helpers.cli
import pmb.helpers.run
import pmb.helpers.run_core
@ -12,7 +14,7 @@ import pmb.parse.version
from pmb.core import Chroot
def _run(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native(), output="log"):
def _run(args: PmbArgs, command, run_in_chroot=False, chroot: Chroot=Chroot.native(), output="log"):
"""Run a command.
:param command: command in list form
@ -23,8 +25,8 @@ def _run(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native(), o
See pmb.helpers.run_core.core() for a detailed description of all other
arguments and the return value.
"""
if chroot:
return pmb.chroot.root(args, command, output=output, suffix=suffix,
if run_in_chroot:
return pmb.chroot.root(args, command, output=output, chroot=chroot,
disable_timeout=True)
return pmb.helpers.run.root(args, command, output=output)
@ -41,7 +43,7 @@ def _prepare_fifo(args: PmbArgs, run_in_chroot=False, chroot: Chroot=Chroot.nati
relative to the host)
"""
if run_in_chroot:
fifo = "/tmp/apk_progress_fifo"
fifo = Path("/tmp/apk_progress_fifo")
fifo_outside = chroot / fifo
else:
_run(args, ["mkdir", "-p", pmb.config.work / "tmp"])
@ -82,7 +84,7 @@ def _compute_progress(line):
return cur / tot if tot > 0 else 0
def apk_with_progress(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroot.native()):
def apk_with_progress(args: PmbArgs, command: Sequence[PathString], run_in_chroot=False, chroot: Chroot=Chroot.native()):
"""Run an apk subcommand while printing a progress bar to STDOUT.
:param command: apk subcommand in list form
@ -91,12 +93,13 @@ def apk_with_progress(args: PmbArgs, command, chroot=False, suffix: Chroot=Chroo
set to True.
:raises RuntimeError: when the apk command fails
"""
fifo, fifo_outside = _prepare_fifo(args, chroot, suffix)
command_with_progress = _create_command_with_progress(command, fifo)
log_msg = " ".join(command)
with _run(args, ['cat', fifo], chroot=chroot, suffix=suffix,
fifo, fifo_outside = _prepare_fifo(args, run_in_chroot, chroot)
_command: List[str] = [os.fspath(c) for c in command]
command_with_progress = _create_command_with_progress(_command, fifo)
log_msg = " ".join(_command)
with _run(args, ['cat', fifo], run_in_chroot=run_in_chroot, chroot=chroot,
output="pipe") as p_cat:
with _run(args, command_with_progress, chroot=chroot, suffix=suffix,
with _run(args, command_with_progress, run_in_chroot=run_in_chroot, chroot=chroot,
output="background") as p_apk:
while p_apk.poll() is None:
line = p_cat.stdout.readline().decode('utf-8')

View file

@ -6,15 +6,15 @@ from pmb.helpers import logging
import os
import re
import urllib.parse
from typing import Optional
from typing import Dict, Optional
from pmb.core.types import PmbArgs
import pmb.helpers.file
import pmb.helpers.http
import pmb.helpers.pmaports
req_headers = None
req_headers_github = None
req_headers: Dict[str, str]
req_headers_github: Dict[str, str]
ANITYA_API_BASE = "https://release-monitoring.org/api/v2"
GITHUB_API_BASE = "https://api.github.com"
@ -272,7 +272,6 @@ def upgrade(args: PmbArgs, pkgname, git=True, stable=True) -> None:
if stable:
upgrade_stable_package(args, pkgname, package)
return False
def upgrade_all(args: PmbArgs) -> None:
"""Upgrade all packages, based on args.all, args.all_git and args.all_stable."""

View file

@ -1,11 +1,11 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import copy
import os
from pathlib import Path
import pmb.config
from pmb.core.types import PmbArgs
import pmb.helpers.git
import pmb.helpers.args
"""This file constructs the args variable, which is passed to almost all
functions in the pmbootstrap code base. Here's a listing of the kind of
@ -75,7 +75,7 @@ def check_pmaports_path(args: PmbArgs):
"""
if args.from_argparse.aports and not os.path.exists(args.aports):
raise ValueError("pmaports path (specified with --aports) does"
" not exist: " + args.aports)
f" not exist: {args.aports}")
def replace_placeholders(args: PmbArgs):
@ -108,7 +108,7 @@ def add_deviceinfo(args: PmbArgs):
" <https://postmarketos.org/newarch>")
def init(args: PmbArgs):
def init(args: PmbArgs) -> PmbArgs:
# Basic initialization
fix_mirrors_postmarketos(args)
pmb.config.merge_with_args(args)

View file

@ -1,13 +1,13 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import glob
from pathlib import Path
from typing import Optional
from pmb.core.types import PmbArgs
import pmb.parse
def find_path(args: PmbArgs, codename: str, file='') -> Path:
def find_path(args: PmbArgs, codename: str, file='') -> Optional[Path]:
"""Find path to device APKBUILD under `device/*/device-`.
:param codename: device codename
@ -58,7 +58,7 @@ def list_apkbuilds(args: PmbArgs):
""":returns: { "first-device": {"pkgname": ..., "pkgver": ...}, ... }"""
ret = {}
for device in list_codenames(args):
apkbuild_path = f"{args.aports}/device/*/device-{device}/APKBUILD"
apkbuild_path = next(args.aports.glob(f"device/*/device-{device}/APKBUILD"))
ret[device] = pmb.parse.apkbuild(apkbuild_path)
return ret

View file

@ -7,16 +7,17 @@ import time
from pmb.core.types import PmbArgs
import pmb.helpers.run
import pmb.helpers.pmaports
def replace(path, old, new):
def replace(path: Path, old: str, new: str):
text = ""
with open(path, "r", encoding="utf-8") as handle:
with path.open("r", encoding="utf-8") as handle:
text = handle.read()
text = text.replace(old, new)
with open(path, "w", encoding="utf-8") as handle:
with path.open("w", encoding="utf-8") as handle:
handle.write(text)
@ -29,7 +30,7 @@ def replace_apkbuild(args: PmbArgs, pkgname, key, new, in_quotes=False):
:param in_quotes: expect the value to be in quotation marks ("")
"""
# Read old value
path = pmb.helpers.pmaports.find(args, pkgname) + "/APKBUILD"
path = pmb.helpers.pmaports.find(args, pkgname) / "APKBUILD"
apkbuild = pmb.parse.apkbuild(path)
old = apkbuild[key]
@ -95,7 +96,7 @@ def symlink(args: PmbArgs, file: Path, link: Path):
if (os.path.islink(link) and
os.path.realpath(os.readlink(link)) == os.path.realpath(file)):
return
raise RuntimeError("File exists: " + link)
raise RuntimeError(f"File exists: {link}")
elif link.is_symlink():
link.unlink()

View file

@ -1,6 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import json
from typing import List, Sequence
from pmb.helpers import logging
import os
from pathlib import Path
@ -14,7 +15,7 @@ import pmb.chroot.initfs
import pmb.chroot.other
import pmb.ci
import pmb.config
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.export
import pmb.flasher
import pmb.helpers.aportupgrade
@ -35,7 +36,6 @@ import pmb.netboot
import pmb.parse
import pmb.qemu
import pmb.sideload
from argparse import Namespace
from pmb.core import ChrootType, Chroot
@ -48,13 +48,13 @@ def _parse_flavor(args: PmbArgs, autoinstall=True):
# identifier that is typically in the form
# "postmarketos-<manufacturer>-<device/chip>", e.g.
# "postmarketos-qcom-sdm845"
suffix = Chroot(ChrootType.ROOTFS, args.device)
chroot = Chroot(ChrootType.ROOTFS, args.device)
flavor = pmb.chroot.other.kernel_flavor_installed(
args, suffix, autoinstall)
args, chroot, autoinstall)
if not flavor:
raise RuntimeError(
"No kernel flavors installed in chroot " + suffix + "! Please let"
f"No kernel flavors installed in chroot '{chroot}'! Please let"
" your device package depend on a package starting with 'linux-'.")
return flavor
@ -64,9 +64,9 @@ def _parse_suffix(args: PmbArgs) -> Chroot:
return Chroot(ChrootType.ROOTFS, args.device)
elif args.buildroot:
if args.buildroot == "device":
return Chroot(ChrootType.BUILDROOT, args.deviceinfo["arch"])
return Chroot.buildroot(args.deviceinfo["arch"])
else:
return Chroot(ChrootType.BUILDROOT, args.buildroot)
return Chroot.buildroot(args.buildroot)
elif args.suffix:
(_t, s) = args.suffix.split("_")
t: ChrootType = ChrootType(_t)
@ -416,11 +416,16 @@ def kconfig(args: PmbArgs):
raise RuntimeError("kconfig check failed!")
# Default to all kernel packages
packages: List[str]
# FIXME (#2324): figure out the args.package vs args.packages situation
if isinstance(args.package, list):
packages = args.package
else:
packages = [args.package]
if not args.package:
for aport in pmb.helpers.pmaports.get_list(args):
if aport.startswith("linux-"):
packages.append(aport.split("linux-")[1])
for pkg in pmb.helpers.pmaports.get_list(args):
if pkg.startswith("linux-"):
packages.append(pkg.split("linux-")[1])
# Iterate over all kernels
error = False
@ -431,7 +436,7 @@ def kconfig(args: PmbArgs):
pkgname = package if package.startswith("linux-") \
else "linux-" + package
aport = pmb.helpers.pmaports.find(args, pkgname)
apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD")
apkbuild = pmb.parse.apkbuild(aport)
if "!pmb:kconfigcheck" in apkbuild["options"]:
skipped += 1
continue
@ -449,7 +454,7 @@ def kconfig(args: PmbArgs):
logging.info("kconfig check succeeded!")
elif args.action_kconfig in ["edit", "migrate"]:
if args.package:
pkgname = args.package
pkgname = args.package if isinstance(args.package, str) else args.package[0]
else:
pkgname = args.deviceinfo["codename"]
use_oldconfig = args.action_kconfig == "migrate"
@ -472,7 +477,7 @@ def deviceinfo_parse(args: PmbArgs):
def apkbuild_parse(args: PmbArgs):
# Default to all packages
packages = args.packages
packages: Sequence[str] = args.packages
if not packages:
packages = pmb.helpers.pmaports.get_list(args)
@ -480,8 +485,7 @@ def apkbuild_parse(args: PmbArgs):
for package in packages:
print(package + ":")
aport = pmb.helpers.pmaports.find(args, package)
path = aport + "/APKBUILD"
print(json.dumps(pmb.parse.apkbuild(path), indent=4,
print(json.dumps(pmb.parse.apkbuild(aport), indent=4,
sort_keys=True))
@ -489,8 +493,7 @@ def apkindex_parse(args: PmbArgs):
result = pmb.parse.apkindex.parse(args.apkindex_path)
if args.package:
if args.package not in result:
raise RuntimeError("Package not found in the APKINDEX: " +
args.package)
raise RuntimeError(f"Package not found in the APKINDEX: {args.package}")
result = result[args.package]
print(json.dumps(result, indent=4))
@ -536,14 +539,14 @@ def shutdown(args: PmbArgs):
def stats(args: PmbArgs):
# Chroot suffix
suffix = "native"
chroot = Chroot.native()
if args.arch != pmb.config.arch_native:
suffix = "buildroot_" + args.arch
chroot = Chroot.buildroot(args.arch)
# Install ccache and display stats
pmb.chroot.apk.install(args, ["ccache"], suffix)
logging.info(f"({suffix}) % ccache -s")
pmb.chroot.user(args, ["ccache", "-s"], suffix, output="stdout")
pmb.chroot.apk.install(args, ["ccache"], chroot)
logging.info(f"({chroot}) % ccache -s")
pmb.chroot.user(args, ["ccache", "-s"], chroot, output="stdout")
def work_migrate(args: PmbArgs):
@ -558,7 +561,7 @@ def log(args: PmbArgs):
pmb.helpers.run.user(args, ["truncate", "-s", "0", args.log])
pmb.helpers.run.user(args, ["truncate", "-s", "0", log_testsuite])
cmd = ["tail", "-n", args.lines, "-F"]
cmd: List[PathString] = ["tail", "-n", args.lines, "-F"]
# Follow the testsuite's log file too if it exists. It will be created when
# starting a test case that writes to it (git -C test grep log_testsuite).
@ -620,7 +623,7 @@ def pull(args: PmbArgs):
def lint(args: PmbArgs):
packages = args.packages
packages: Sequence[str] = args.packages
if not packages:
packages = pmb.helpers.pmaports.get_list(args)

View file

@ -1,6 +1,8 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import configparser
from pathlib import Path
from typing import Dict
from pmb.helpers import logging
import os
from pathlib import Path
@ -22,7 +24,7 @@ def get_path(args: PmbArgs, name_repo):
"""
if name_repo == "pmaports":
return args.aports
return pmb.config.work / "cache_git/" + name_repo
return pmb.config.work / "cache_git" / name_repo
def clone(args: PmbArgs, name_repo):
@ -66,7 +68,7 @@ def rev_parse(args: PmbArgs, path, revision="HEAD", extra_args: list = []):
or (with ``--abbrev-ref``): the branch name, e.g. "master"
"""
command = ["git", "rev-parse"] + extra_args + [revision]
rev = pmb.helpers.run.user(args, command, path, output_return=True)
rev = pmb.helpers.run.user_output(args, command, path)
return rev.rstrip()
@ -84,7 +86,7 @@ def can_fast_forward(args: PmbArgs, path, branch_upstream, branch="HEAD"):
def clean_worktree(args: PmbArgs, path):
"""Check if there are not any modified files in the git dir."""
command = ["git", "status", "--porcelain"]
return pmb.helpers.run.user(args, command, path, output_return=True) == ""
return pmb.helpers.run.user_output(args, command, path) == ""
def get_upstream_remote(args: PmbArgs, name_repo):
@ -95,7 +97,7 @@ def get_upstream_remote(args: PmbArgs, name_repo):
urls = pmb.config.git_repos[name_repo]
path = get_path(args, name_repo)
command = ["git", "remote", "-v"]
output = pmb.helpers.run.user(args, command, path, output_return=True)
output = pmb.helpers.run.user_output(args, command, path)
for line in output.split("\n"):
if any(u in line for u in urls):
return line.split("\t", 1)[0]
@ -127,8 +129,8 @@ def parse_channels_cfg(args):
else:
remote = get_upstream_remote(args, "pmaports")
command = ["git", "show", f"{remote}/master:channels.cfg"]
stdout = pmb.helpers.run.user(args, command, args.aports,
output_return=True, check=False)
stdout = pmb.helpers.run.user_output(args, command, args.aports,
check=False)
try:
cfg.read_string(stdout)
except configparser.MissingSectionHeaderError:
@ -139,7 +141,7 @@ def parse_channels_cfg(args):
" pmaports clone")
# Meta section
ret = {"channels": {}}
ret: Dict[str, Dict[str, str | Dict[str, str]]] = {"channels": {}}
ret["meta"] = {"recommended": cfg.get("channels.cfg", "recommended")}
# Channels
@ -153,7 +155,8 @@ def parse_channels_cfg(args):
for key in ["description", "branch_pmaports", "branch_aports",
"mirrordir_alpine"]:
value = cfg.get(channel, key)
ret["channels"][channel_new][key] = value
# FIXME: how to type this properly??
ret["channels"][channel_new][key] = value # type: ignore[index]
pmb.helpers.other.cache[cache_key] = ret
return ret
@ -261,11 +264,10 @@ def get_files(args: PmbArgs, path):
:returns: all files in a git repository as list, relative to path
"""
ret = []
files = pmb.helpers.run.user(args, ["git", "ls-files"], path,
output_return=True).split("\n")
files += pmb.helpers.run.user(args, ["git", "ls-files",
"--exclude-standard", "--other"], path,
output_return=True).split("\n")
files = pmb.helpers.run.user_output(args, ["git", "ls-files"], path).split("\n")
files += pmb.helpers.run.user_output(args, ["git", "ls-files",
"--exclude-standard", "--other"],
path).split("\n")
for file in files:
if os.path.exists(f"{path}/{file}"):
ret += [file]

View file

@ -4,12 +4,17 @@ import hashlib
import json
from pmb.helpers import logging
import os
from pathlib import Path
import shutil
import urllib.request
from pmb.core.types import PmbArgs
import pmb.helpers.run
def cache_file(prefix: str, url: str) -> Path:
prefix = prefix.replace("/", "_")
return Path(f"{prefix}_{hashlib.sha256(url.encode('utf-8')).hexdigest()}")
def download(args: PmbArgs, url, prefix, cache=True, loglevel=logging.INFO,
allow_404=False):
@ -33,9 +38,7 @@ def download(args: PmbArgs, url, prefix, cache=True, loglevel=logging.INFO,
pmb.helpers.run.user(args, ["mkdir", "-p", pmb.config.work / "cache_http"])
# Check if file exists in cache
prefix = prefix.replace("/", "_")
path = (pmb.config.work / "cache_http/" + prefix + "_" +
hashlib.sha256(url.encode("utf-8")).hexdigest())
path = pmb.config.work / "cache_http" / cache_file(prefix, url)
if os.path.exists(path):
if cache:
return path

View file

@ -1,5 +1,6 @@
# Copyright 2023 Danct12 <danct12@disroot.org>
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from pmb.helpers import logging
import os
@ -20,7 +21,7 @@ def check(args: PmbArgs, pkgnames):
# Mount pmaports.git inside the chroot so that we don't have to copy the
# package folders
pmaports = "/mnt/pmaports"
pmaports = Path("/mnt/pmaports")
pmb.build.mount_pmaports(args, pmaports)
# Locate all APKBUILDs and make the paths be relative to the pmaports
@ -28,9 +29,8 @@ def check(args: PmbArgs, pkgnames):
apkbuilds = []
for pkgname in pkgnames:
aport = pmb.helpers.pmaports.find(args, pkgname)
if not os.path.exists(aport + "/APKBUILD"):
raise ValueError("Path does not contain an APKBUILD file:" +
aport)
if not (aport / "APKBUILD").exists():
raise ValueError(f"Path does not contain an APKBUILD file: {aport}")
relpath = os.path.relpath(aport, args.aports)
apkbuilds.append(f"{relpath}/APKBUILD")

View file

@ -3,15 +3,29 @@
import logging
import os
import sys
from typing import TextIO
import pmb.config
from pmb.core.types import PmbArgs
logfd = None
logfd: TextIO
CRITICAL = logging.CRITICAL
FATAL = logging.FATAL
ERROR = logging.ERROR
WARNING = logging.WARNING
WARN = logging.WARN
INFO = logging.INFO
DEBUG = logging.DEBUG
NOTSET = logging.NOTSET
VERBOSE = 5
class log_handler(logging.StreamHandler):
"""Write to stdout and to the already opened log file."""
_args = None
_args: PmbArgs
def __init__(self, args: PmbArgs):
super().__init__()
self._args = args
def emit(self, record):
try:
@ -80,13 +94,13 @@ def add_verbose_log_level():
All stackoverflow user contributions are licensed as CC-BY-SA:
https://creativecommons.org/licenses/by-sa/3.0/
"""
logging.VERBOSE = 5
logging.addLevelName(logging.VERBOSE, "VERBOSE")
logging.Logger.verbose = lambda inst, msg, * \
args, **kwargs: inst.log(logging.VERBOSE, msg, *args, **kwargs)
logging.verbose = lambda msg, *args, **kwargs: logging.log(logging.VERBOSE,
setattr(logging, "VERBOSE", VERBOSE)
logging.addLevelName(VERBOSE, "VERBOSE")
setattr(logging.Logger, "verbose", lambda inst, msg, * \
args, **kwargs: inst.log(VERBOSE, msg, *args, **kwargs))
setattr(logging, "verbose", lambda msg, *args, **kwargs: logging.log(VERBOSE,
msg, *args,
**kwargs)
**kwargs))
def init(args: PmbArgs):
@ -118,11 +132,10 @@ def init(args: PmbArgs):
add_verbose_log_level()
root_logger.setLevel(logging.DEBUG)
if args.verbose:
root_logger.setLevel(logging.VERBOSE)
root_logger.setLevel(VERBOSE)
# Add a custom log handler
handler = log_handler()
log_handler._args = args
handler = log_handler(args)
handler.setFormatter(formatter)
root_logger.addHandler(handler)

View file

@ -37,6 +37,8 @@ def bind(args: PmbArgs, source: Path, destination: Path, create_folders=True, um
else:
return
print(f"Mounting {source} -> {destination}")
# Check/create folders
for path in [source, destination]:
if os.path.exists(path):
@ -44,8 +46,7 @@ def bind(args: PmbArgs, source: Path, destination: Path, create_folders=True, um
if create_folders:
pmb.helpers.run.root(args, ["mkdir", "-p", path])
else:
raise RuntimeError("Mount failed, folder does not exist: " +
path)
raise RuntimeError(f"Mount failed, folder does not exist: {path}")
# Actually mount the folder
pmb.helpers.run.root(args, ["mount", "--bind", source, destination])
@ -89,8 +90,7 @@ def umount_all_list(prefix: Path, source: Path=Path("/proc/mounts")) -> List[Pat
for line in handle:
words = line.split()
if len(words) < 2:
raise RuntimeError("Failed to parse line in " + source + ": " +
line)
raise RuntimeError(f"Failed to parse line in {source}: {line}")
mountpoint = Path(words[1].replace(r"\040(deleted)", ""))
if mountpoint.is_relative_to(prefix): # is subpath
ret.append(mountpoint)
@ -103,7 +103,7 @@ def umount_all(args: PmbArgs, folder: Path):
for mountpoint in umount_all_list(folder):
pmb.helpers.run.root(args, ["umount", mountpoint])
if ismount(mountpoint):
raise RuntimeError("Failed to umount: " + mountpoint)
raise RuntimeError(f"Failed to umount: {mountpoint}")
def mount_device_rootfs(args: PmbArgs, chroot_rootfs: Chroot) -> PurePath:
@ -114,7 +114,7 @@ def mount_device_rootfs(args: PmbArgs, chroot_rootfs: Chroot) -> PurePath:
"rootfs_qemu-amd64")
:returns: the mountpoint (relative to the native chroot)
"""
mountpoint = PurePath("/mnt", chroot_rootfs.dirname())
mountpoint = PurePath("/mnt", chroot_rootfs.dirname)
pmb.helpers.mount.bind(args, chroot_rootfs.path,
Chroot.native() / mountpoint)
return mountpoint

View file

@ -12,6 +12,8 @@ import pmb.helpers.pmaports
import pmb.helpers.run
from typing import Dict, Any
from typing import Any, Dict
def folder_size(args: PmbArgs, path: Path):
"""Run `du` to calculate the size of a folder.

View file

@ -9,6 +9,7 @@ See also:
- pmb/helpers/repo.py (work with binary package repos)
"""
import copy
from typing import Any, Dict
from pmb.helpers import logging
from pmb.core.types import PmbArgs
@ -57,7 +58,7 @@ def get(args: PmbArgs, pkgname, arch, replace_subpkgnames=False, must_exist=True
]
# Find in pmaports
ret = None
ret: Dict[str, Any] = {}
pmaport = pmb.helpers.pmaports.get(args, pkgname, False)
if pmaport:
ret = {"arch": pmaport["arch"],

View file

@ -18,7 +18,7 @@ def package(args: PmbArgs, pkgname, reason="", dry=False):
:param dry: don't modify the APKBUILD, just print the message
"""
# Current and new pkgrel
path = pmb.helpers.pmaports.find(args, pkgname) + "/APKBUILD"
path = pmb.helpers.pmaports.find(args, pkgname) / "APKBUILD"
apkbuild = pmb.parse.apkbuild(path)
pkgrel = int(apkbuild["pkgrel"])
pkgrel_new = pkgrel + 1
@ -91,7 +91,7 @@ def auto_apkindex_package(args: PmbArgs, arch, aport, apk, dry=False):
# (which means dynamic libraries that the package was linked
# against) and packages for which no aport exists.
if (depend.startswith("so:") or
not pmb.helpers.pmaports.find(args, depend, False)):
not pmb.helpers.pmaports.find_optional(args, depend)):
missing.append(depend)
# Increase pkgrel
@ -107,7 +107,7 @@ def auto(args: PmbArgs, dry=False):
for arch in pmb.config.build_device_architectures:
paths = pmb.helpers.repo.apkindex_files(args, arch, alpine=False)
for path in paths:
logging.info("scan " + path)
logging.info(f"scan {path}")
index = pmb.parse.apkindex.parse(path, False)
for pkgname, apk in index.items():
origin = apk["origin"]
@ -116,12 +116,12 @@ def auto(args: PmbArgs, dry=False):
logging.verbose(
f"{pkgname}: origin '{origin}' found again")
continue
aport_path = pmb.helpers.pmaports.find(args, origin, False)
aport_path = pmb.helpers.pmaports.find_optional(args, origin)
if not aport_path:
logging.warning("{}: origin '{}' aport not found".format(
pkgname, origin))
continue
aport = pmb.parse.apkbuild(f"{aport_path}/APKBUILD")
aport = pmb.parse.apkbuild(aport_path)
if auto_apkindex_package(args, arch, aport, apk, dry):
ret.append(pkgname)
return ret

View file

@ -8,7 +8,6 @@ See also:
"""
import glob
from pmb.helpers import logging
import os
from pathlib import Path
from typing import Optional, Sequence, Dict
@ -44,7 +43,7 @@ def get_list(args: PmbArgs) -> Sequence[str]:
return list(_find_apkbuilds(args).keys())
def guess_main_dev(args: PmbArgs, subpkgname):
def guess_main_dev(args: PmbArgs, subpkgname) -> Optional[Path]:
"""Check if a package without "-dev" at the end exists in pmaports or not, and log the appropriate message.
Don't call this function directly, use guess_main() instead.
@ -57,7 +56,7 @@ def guess_main_dev(args: PmbArgs, subpkgname):
if path:
logging.verbose(subpkgname + ": guessed to be a subpackage of " +
pkgname + " (just removed '-dev')")
return os.path.dirname(path)
return path.parent
logging.verbose(subpkgname + ": guessed to be a subpackage of " + pkgname +
", which we can't find in pmaports, so it's probably in"
@ -147,7 +146,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path:
"""
# Try to get a cached result first (we assume that the aports don't change
# in one pmbootstrap call)
ret = Path()
ret: Optional[Path] = None
if package in pmb.helpers.other.cache["find_aport"]:
ret = pmb.helpers.other.cache["find_aport"][package]
else:
@ -165,7 +164,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path:
guess = guess_main(args, package)
if guess:
# Parse the APKBUILD and verify if the guess was right
if _find_package_in_apkbuild(package, f'{guess}/APKBUILD'):
if _find_package_in_apkbuild(package, guess / "APKBUILD"):
ret = guess
else:
# Otherwise parse all APKBUILDs (takes time!), is the
@ -183,7 +182,7 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path:
ret = guess
# Crash when necessary
if ret is None and must_exist:
if ret is None:
raise RuntimeError("Could not find aport for package: " +
package)
@ -192,6 +191,13 @@ def find(args: PmbArgs, package: str, must_exist=True) -> Path:
return ret
def find_optional(args: PmbArgs, package: str) -> Optional[Path]:
try:
return find(args, package)
except RuntimeError:
return None
def get(args: PmbArgs, pkgname, must_exist=True, subpackages=True):
"""Find and parse an APKBUILD file.
@ -213,9 +219,12 @@ def get(args: PmbArgs, pkgname, must_exist=True, subpackages=True):
"""
pkgname = pmb.helpers.package.remove_operators(pkgname)
if subpackages:
aport = find(args, pkgname, must_exist)
aport = find_optional(args, pkgname)
if aport:
return pmb.parse.apkbuild(f"{aport}/APKBUILD")
return pmb.parse.apkbuild(aport / "APKBUILD")
elif must_exist:
raise RuntimeError("Could not find APKBUILD for package:"
f" {pkgname}")
else:
path = _find_apkbuilds(args).get(pkgname)
if path:
@ -251,7 +260,8 @@ def find_providers(args: PmbArgs, provide):
key=lambda p: p[1].get('provider_priority', 0))
def get_repo(args: PmbArgs, pkgname, must_exist=True):
# FIXME (#2324): split into an _optional variant or drop must_exist
def get_repo(args: PmbArgs, pkgname, must_exist=True) -> Optional[str]:
"""Get the repository folder of an aport.
:pkgname: package name
@ -259,10 +269,14 @@ def get_repo(args: PmbArgs, pkgname, must_exist=True):
:returns: a string like "main", "device", "cross", ...
or None when the aport could not be found
"""
aport = find(args, pkgname, must_exist)
aport: Optional[Path]
if must_exist:
aport = find(args, pkgname)
else:
aport = find_optional(args, pkgname)
if not aport:
return None
return os.path.basename(os.path.dirname(aport))
return aport.parent.name
def check_arches(arches, arch):
@ -284,7 +298,7 @@ def check_arches(arches, arch):
return False
def get_channel_new(channel):
def get_channel_new(channel: str) -> str:
"""Translate legacy channel names to the new ones.
Legacy names are still supported for compatibility with old branches (pmb#2015).

View file

@ -17,9 +17,10 @@ import pmb.config.pmaports
from pmb.core.types import PmbArgs
import pmb.helpers.http
import pmb.helpers.run
import pmb.helpers.other
def hash(url, length=8):
def apkindex_hash(url: str, length: int=8) -> Path:
r"""Generate the hash that APK adds to the APKINDEX and apk packages in its apk cache folder.
It is the "12345678" part in this example:
@ -43,7 +44,7 @@ def hash(url, length=8):
ret += xd[(binary[i] >> 4) & 0xf]
ret += xd[binary[i] & 0xf]
return ret
return Path(f"APKINDEX.{ret}.tar.gz")
def urls(args: PmbArgs, user_repository=True, postmarketos_mirror=True, alpine=True):
@ -112,7 +113,7 @@ def apkindex_files(args: PmbArgs, arch=None, user_repository=True, pmos=True,
# Resolve the APKINDEX.$HASH.tar.gz files
for url in urls(args, False, pmos, alpine):
ret.append(pmb.config.work / f"cache_apk_{arch}" / f"APKINDEX.{hash(url)}.tar.gz")
ret.append(pmb.config.work / f"cache_apk_{arch}" / apkindex_hash(url))
return ret
@ -151,7 +152,7 @@ def update(args: PmbArgs, arch=None, force=False, existing_only=False):
# APKINDEX file name from the URL
url_full = url + "/" + arch + "/APKINDEX.tar.gz"
cache_apk_outside = pmb.config.work / f"cache_apk_{arch}"
apkindex = cache_apk_outside / f"APKINDEX.{hash(url)}.tar.gz"
apkindex = cache_apk_outside / f"APKINDEX.{apkindex_hash(url)}.tar.gz"
# Find update reason, possibly skip non-existing or known 404 files
reason = None
@ -217,5 +218,5 @@ def alpine_apkindex_path(args: PmbArgs, repo="main", arch=None):
# Find it on disk
channel_cfg = pmb.config.pmaports.read_config_channel(args)
repo_link = f"{args.mirror_alpine}{channel_cfg['mirrordir_alpine']}/{repo}"
cache_folder = pmb.config.work / "cache_apk_" + arch
return cache_folder + "/APKINDEX." + hash(repo_link) + ".tar.gz"
cache_folder = pmb.config.work / (f"cache_apk_{arch}")
return cache_folder / apkindex_hash(repo_link)

View file

@ -1,5 +1,6 @@
# Copyright 2024 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
from pmb.core.chroot import Chroot
from pmb.helpers import logging
import glob
@ -10,7 +11,7 @@ from pmb.core.types import PmbArgs
progress_done = 0
progress_total = 0
progress_step = None
progress_step: str
def get_arch(args: PmbArgs):
@ -115,7 +116,7 @@ def log_progress(msg):
progress_done += 1
def run_steps(args: PmbArgs, steps, arch, suffix):
def run_steps(args: PmbArgs, steps, arch, chroot: Chroot):
global progress_step
for step, bootstrap_line in steps.items():
@ -128,14 +129,14 @@ def run_steps(args: PmbArgs, steps, arch, suffix):
if "[usr_merge]" in bootstrap_line:
usr_merge = pmb.chroot.UsrMerge.ON
if suffix != "native":
if chroot != Chroot.native():
log_progress(f"initializing native chroot (merge /usr: {usr_merge.name})")
# Native chroot needs pmOS binary package repo for cross compilers
pmb.chroot.init(args, "native", usr_merge)
pmb.chroot.init(args, Chroot.native(), usr_merge)
log_progress(f"initializing {suffix} chroot (merge /usr: {usr_merge.name})")
log_progress(f"initializing {chroot} chroot (merge /usr: {usr_merge.name})")
# Initialize without pmOS binary package repo
pmb.chroot.init(args, suffix, usr_merge, postmarketos_mirror=False)
pmb.chroot.init(args, chroot, usr_merge, postmarketos_mirror=False)
for package in get_packages(bootstrap_line):
log_progress(f"building {package}")

View file

@ -34,7 +34,7 @@ def filter_aport_packages(args: PmbArgs, arch, pkgnames):
"""
ret = []
for pkgname in pkgnames:
if pmb.helpers.pmaports.find(args, pkgname, False):
if pmb.helpers.pmaports.find_optional(args, pkgname):
ret += [pkgname]
return ret

View file

@ -2,13 +2,14 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
import subprocess
import pmb.helpers.run_core
from typing import Any, Dict, List, Optional, Sequence
from pmb.core.types import PathString, PmbArgs
from pmb.core.types import Env, PathString, PmbArgs
def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"), output: str="log", output_return: bool=False,
check: Optional[bool]=None, env: Dict[Any, Any]={}, sudo: bool=False) -> str:
def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Optional[Path] = None, output: str = "log", output_return: bool = False,
check: Optional[bool] = None, env: Env = {}, sudo: bool = False) -> str | int | subprocess.Popen:
"""
Run a command on the host system as user.
@ -22,8 +23,8 @@ def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"),
# Readable log message (without all the escaping)
msg = "% "
for key, value in env.items():
msg += key + "=" + value + " "
if working_dir != Path("/"):
msg += f"{key}={value} "
if working_dir is not None:
msg += f"cd {os.fspath(working_dir)}; "
msg += " ".join(cmd_parts)
@ -35,6 +36,15 @@ def user(args: PmbArgs, cmd: Sequence[PathString], working_dir: Path=Path("/"),
return pmb.helpers.run_core.core(args, msg, cmd_parts, working_dir, output,
output_return, check, sudo)
# FIXME: should probably use some kind of wrapper class / builder pattern for all these parameters...
def user_output(args: PmbArgs, cmd: Sequence[PathString], working_dir: Optional[Path] = None, output: str = "log",
check: Optional[bool] = None, env: Env = {}, sudo: bool = False) -> str:
ret = user(args, cmd, working_dir, output, output_return=True, check=check, env=env, sudo=sudo)
if not isinstance(ret, str):
raise TypeError("Expected str output, got " + str(ret))
return ret
def root(args: PmbArgs, cmd: Sequence[PathString], working_dir=None, output="log", output_return=False,
check=None, env={}):

View file

@ -1,6 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import fcntl
from pmb.core.types import PathString, Env
from pmb.helpers import logging
import os
from pathlib import Path
@ -10,17 +11,17 @@ import subprocess
import sys
import threading
import time
from typing import Sequence
from pmb.core.chroot import Chroot
from typing import Dict, Optional, Sequence
import pmb.helpers.run
from pmb.core.types import PathString, PmbArgs
from pmb.core.types import Env, PathString, PmbArgs
"""For a detailed description of all output modes, read the description of
core() at the bottom. All other functions in this file get (indirectly)
called by core(). """
def flat_cmd(cmd: Sequence[PathString], working_dir: Path=Path("/"), env={}):
def flat_cmd(cmd: Sequence[PathString], working_dir: Optional[Path]=None, env: Env={}):
"""Convert a shell command passed as list into a flat shell string with proper escaping.
:param cmd: command as list, e.g. ["echo", "string with spaces"]
@ -35,14 +36,14 @@ def flat_cmd(cmd: Sequence[PathString], working_dir: Path=Path("/"), env={}):
# Merge env and cmd into escaped list
escaped = []
for key, value in env.items():
escaped.append(key + "=" + shlex.quote(value))
escaped.append(key + "=" + shlex.quote(os.fspath(value)))
for i in range(len(cmd)):
escaped.append(shlex.quote(os.fspath(cmd[i])))
# Prepend working dir
ret = " ".join(escaped)
if working_dir != Path("/"):
ret = "cd " + shlex.quote(working_dir) + ";" + ret
if working_dir is not None:
ret = "cd " + shlex.quote(str(working_dir)) + ";" + ret
return ret
@ -84,6 +85,7 @@ def pipe(cmd, working_dir=None):
return ret
# FIXME (#2324): These types make no sense at all
def pipe_read(process, output_to_stdout=False, output_return=False,
output_return_buffer=False):
"""Read all output from a subprocess, copy it to the log and optionally stdout and a buffer variable.
@ -179,17 +181,21 @@ def foreground_pipe(args: PmbArgs, cmd, working_dir=None, output_to_stdout=False
stdin=stdin)
# Make process.stdout non-blocking
handle = process.stdout.fileno()
stdout = process.stdout or None
if not stdout:
raise RuntimeError("Process has no stdout?!")
handle = stdout.fileno()
flags = fcntl.fcntl(handle, fcntl.F_GETFL)
fcntl.fcntl(handle, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# While process exists wait for output (with timeout)
output_buffer = []
output_buffer: list[bytes] = []
sel = selectors.DefaultSelector()
sel.register(process.stdout, selectors.EVENT_READ)
timeout = args.timeout if output_timeout else None
sel.register(stdout, selectors.EVENT_READ)
timeout = args.timeout
while process.poll() is None:
wait_start = time.perf_counter() if output_timeout else None
wait_start = time.perf_counter()
sel.select(timeout)
# On timeout raise error (we need to measure time on our own, because

View file

@ -16,8 +16,8 @@ def list(args: PmbArgs, arch):
ret = [("none", "Bare minimum OS image for testing and manual"
" customization. The \"console\" UI should be selected if"
" a graphical UI is not desired.")]
for path in sorted(glob.glob(args.aports + "/main/postmarketos-ui-*")):
apkbuild = pmb.parse.apkbuild(f"{path}/APKBUILD")
for path in sorted(args.aports.glob("main/postmarketos-ui-*")):
apkbuild = pmb.parse.apkbuild(path)
ui = os.path.basename(path).split("-", 2)[2]
if pmb.helpers.package.check_arch(args, apkbuild["pkgname"], arch):
ret.append((ui, apkbuild["pkgdesc"]))

View file

@ -6,8 +6,8 @@ import re
import glob
import shlex
import sys
from typing import Dict, List
from pathlib import Path, PurePath
from typing import Dict, List, Optional, Sequence, TypedDict
from pathlib import Path
import pmb.chroot
import pmb.chroot.apk
@ -15,7 +15,7 @@ import pmb.chroot.other
import pmb.chroot.initfs
import pmb.config
import pmb.config.pmaports
from pmb.core.types import PmbArgs
from pmb.core.types import PartitionLayout, PmbArgs
import pmb.helpers.devices
from pmb.helpers.mount import mount_device_rootfs
import pmb.helpers.run
@ -60,8 +60,11 @@ def get_nonfree_packages(args: PmbArgs, device):
["device-nokia-n900-nonfree-firmware"]
"""
# Read subpackages
apkbuild = pmb.parse.apkbuild(pmb.helpers.devices.find_path(args, device,
'APKBUILD'))
device_path = pmb.helpers.devices.find_path(args, device, 'APKBUILD')
if not device_path:
raise RuntimeError(f"Device package not found for {device}")
apkbuild = pmb.parse.apkbuild(device_path)
subpackages = apkbuild["subpackages"]
# Check for firmware and userland
@ -123,7 +126,7 @@ def copy_files_from_chroot(args: PmbArgs, chroot: Chroot):
pmb.helpers.run.root(args, ["rm", fifo])
# Get all folders inside the device rootfs (except for home)
folders: List[PurePath] = []
folders: List[str] = []
for path in mountpoint_outside.glob("*"):
if path.name == "home":
continue
@ -150,7 +153,7 @@ def create_home_from_skel(args: PmbArgs):
rootfs = (Chroot.native() / "mnt/install")
# In btrfs, home subvol & home dir is created in format.py
if args.filesystem != "btrfs":
pmb.helpers.run.root(args, ["mkdir", rootfs + "/home"])
pmb.helpers.run.root(args, ["mkdir", rootfs / "home"])
home = (rootfs / "home" / args.user)
if (rootfs / "etc/skel").exists():
@ -310,8 +313,7 @@ def copy_ssh_keys(args: PmbArgs):
target = Chroot.native() / "mnt/install/home/" / args.user / ".ssh"
pmb.helpers.run.root(args, ["mkdir", target])
pmb.helpers.run.root(args, ["chmod", "700", target])
pmb.helpers.run.root(args, ["cp", authorized_keys, target +
"/authorized_keys"])
pmb.helpers.run.root(args, ["cp", authorized_keys, target / "authorized_keys"])
pmb.helpers.run.root(args, ["rm", authorized_keys])
pmb.helpers.run.root(args, ["chown", "-R", "10000:10000", target])
@ -538,9 +540,9 @@ def generate_binary_list(args: PmbArgs, chroot: Chroot, step):
binaries = args.deviceinfo["sd_embed_firmware"].split(",")
for binary_offset in binaries:
binary, offset = binary_offset.split(':')
binary, _offset = binary_offset.split(':')
try:
offset = int(offset)
offset = int(_offset)
except ValueError:
raise RuntimeError("Value for firmware binary offset is "
f"not valid: {offset}")
@ -627,9 +629,9 @@ def write_cgpt_kpart(args: PmbArgs, layout, suffix: Chroot):
def sanity_check_boot_size(args: PmbArgs):
default = pmb.config.defaults["boot_size"]
if int(args.boot_size) >= int(default):
if isinstance(default, str) and int(args.boot_size) >= int(default):
return
logging.error("ERROR: your pmbootstrap has a small boot_size of"
logging.error("ERROR: your pmbootstrap has a small/invalid boot_size of"
f" {args.boot_size} configured, probably because the config"
" has been created with an old version.")
logging.error("This can lead to problems later on, we recommend setting it"
@ -700,11 +702,12 @@ def get_partition_layout(reserve, kernel):
:returns: the partition layout, e.g. without reserve and kernel:
{"kernel": None, "boot": 1, "reserve": None, "root": 2}
"""
ret = {}
ret["kernel"] = None
ret["boot"] = 1
ret["reserve"] = None
ret["root"] = 2
ret: PartitionLayout = {
"kernel": None,
"boot": 1,
"reserve": None,
"root": 2,
}
if kernel:
ret["kernel"] = 1
@ -761,11 +764,11 @@ def create_fstab(args: PmbArgs, layout, chroot: Chroot):
# Do not install fstab into target rootfs when using on-device
# installer. Provide fstab only to installer suffix
if args.on_device_installer and "rootfs_" in chroot:
if args.on_device_installer and chroot.type == ChrootType.ROOTFS:
return
boot_dev = f"/dev/installp{layout['boot']}"
root_dev = f"/dev/installp{layout['root']}"
boot_dev = Path(f"/dev/installp{layout['boot']}")
root_dev = Path(f"/dev/installp{layout['root']}")
boot_mount_point = f"UUID={get_uuid(args, boot_dev)}"
root_mount_point = "/dev/mapper/root" if args.full_disk_encryption \
@ -806,7 +809,7 @@ def create_fstab(args: PmbArgs, layout, chroot: Chroot):
def install_system_image(args: PmbArgs, size_reserve, chroot: Chroot, step, steps,
boot_label="pmOS_boot", root_label="pmOS_root",
split=False, disk=None):
split=False, disk: Optional[Path]=None):
"""
:param size_reserve: empty partition between root and boot in MiB (pma#463)
:param suffix: the chroot suffix, where the rootfs that will be installed
@ -912,6 +915,8 @@ def print_flash_info(args: PmbArgs):
method = args.deviceinfo["flash_method"]
flasher = pmb.config.flashers.get(method, {})
flasher_actions = flasher.get("actions", {})
if not isinstance(flasher_actions, dict):
raise TypeError(f"flasher actions must be a dictionary, got: {flasher_actions}")
requires_split = flasher.get("split", False)
if method == "none":
@ -929,11 +934,9 @@ def print_flash_info(args: PmbArgs):
logging.info("* pmbootstrap flasher flash_rootfs")
logging.info(" Flashes the generated rootfs image to your device:")
if args.split:
logging.info(f" {Chroot.native()}/home/pmos/rootfs/"
f"{args.device}-rootfs.img")
logging.info(f" {Chroot.native() / 'home/pmos/rootfs' / args.device}-rootfs.img")
else:
logging.info(f" {Chroot.native()}/home/pmos/rootfs/"
f"{args.device}.img")
logging.info(f" {Chroot.native() / 'home/pmos/rootfs' / args.device}.img")
logging.info(" (NOTE: This file has a partition table, which"
" contains /boot and / subpartitions. That way we"
" don't need to change the partition layout on your"
@ -975,8 +978,7 @@ def print_flash_info(args: PmbArgs):
" Use 'pmbootstrap flasher boot' to do that.)")
if "flash_lk2nd" in flasher_actions and \
os.path.exists(f"{Chroot(ChrootType.ROOTFS, args.device)}"
"/boot/lk2nd.img"):
(Chroot(ChrootType.ROOTFS, args.device) / "/boot/lk2nd.img").exists():
logging.info("* Your device supports and may even require"
" flashing lk2nd. You should flash it before"
" flashing anything else. Use 'pmbootstrap flasher"
@ -991,7 +993,7 @@ def print_flash_info(args: PmbArgs):
def install_recovery_zip(args: PmbArgs, steps):
logging.info(f"*** ({steps}/{steps}) CREATING RECOVERY-FLASHABLE ZIP ***")
suffix = "buildroot_" + args.deviceinfo["arch"]
mount_device_rootfs(args, Chroot(ChrootType.ROOTFS, args.device), suffix)
mount_device_rootfs(args, Chroot.rootfs(args.device))
pmb.install.recovery.create_zip(args, suffix)
# Flash information
@ -1003,7 +1005,7 @@ def install_recovery_zip(args: PmbArgs, steps):
def install_on_device_installer(args: PmbArgs, step, steps):
# Generate the rootfs image
if not args.ondev_no_rootfs:
suffix_rootfs = Chroot(ChrootType.ROOTFS, args.device)
suffix_rootfs = Chroot.rootfs(args.device)
install_system_image(args, 0, suffix_rootfs, step=step, steps=steps,
split=True)
step += 2
@ -1132,7 +1134,7 @@ def get_recommends(args: PmbArgs, packages) -> Sequence[str]:
"""
global get_recommends_visited
ret = []
ret: List[str] = []
if not args.install_recommends:
return ret

View file

@ -1,5 +1,6 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Optional
from pmb.helpers import logging
import os
import glob
@ -20,7 +21,7 @@ def previous_install(args: PmbArgs, path: Path):
:param path: path to disk block device (e.g. /dev/mmcblk0)
"""
label = ""
for blockdevice_outside in [f"{path}1", f"{path}p1"]:
for blockdevice_outside in [path.with_stem(f"{path.name}1"), path.with_stem(f"{path.name}p1")]:
if not os.path.exists(blockdevice_outside):
continue
blockdevice_inside = "/dev/diskp1"
@ -39,14 +40,14 @@ def previous_install(args: PmbArgs, path: Path):
return "pmOS_boot" in label
def mount_disk(args: PmbArgs, path):
def mount_disk(args: PmbArgs, path: Path):
"""
:param path: path to disk block device (e.g. /dev/mmcblk0)
"""
# Sanity checks
if not os.path.exists(path):
raise RuntimeError(f"The disk block device does not exist: {path}")
for path_mount in glob.glob(f"{path}*"):
for path_mount in path.parent.glob(f"{path.name}*"):
if pmb.helpers.mount.ismount(path_mount):
raise RuntimeError(f"{path_mount} is mounted! Will not attempt to"
" format this!")
@ -124,7 +125,7 @@ def create_and_mount_image(args: PmbArgs, size_boot, size_root, size_reserve,
pmb.helpers.mount.bind_file(args, device, Chroot.native() / mount_point)
def create(args: PmbArgs, size_boot, size_root, size_reserve, split, disk):
def create(args: PmbArgs, size_boot, size_root, size_reserve, split, disk: Optional[Path]):
"""
Create /dev/install (the "install blockdevice").

View file

@ -2,11 +2,13 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import glob
import json
from pathlib import Path
from typing import List, Optional
from pmb.helpers import logging
import os
import time
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.helpers.mount
import pmb.helpers.run
import pmb.chroot
@ -14,19 +16,19 @@ from pmb.core import Chroot
def init(args: PmbArgs):
if not os.path.isdir("/sys/module/loop"):
if not Path("/sys/module/loop").is_dir():
pmb.helpers.run.root(args, ["modprobe", "loop"])
for loopdevice in glob.glob("/dev/loop*"):
if os.path.isdir(loopdevice):
for loopdevice in Path("/dev/").glob("loop*"):
if loopdevice.is_dir():
continue
pmb.helpers.mount.bind_file(args, loopdevice, Chroot.native() / loopdevice)
def mount(args: PmbArgs, img_path):
def mount(args: PmbArgs, img_path: Path):
"""
:param img_path: Path to the img file inside native chroot.
"""
logging.debug("(native) mount " + img_path + " (loop)")
logging.debug(f"(native) mount {img_path} (loop)")
# Try to mount multiple times (let the kernel module initialize #1594)
for i in range(0, 5):
@ -39,20 +41,23 @@ def mount(args: PmbArgs, img_path):
# Mount and return on success
init(args)
losetup_cmd = ["losetup", "-f", img_path]
losetup_cmd: List[PathString] = ["losetup", "-f", img_path]
sector_size = args.deviceinfo["rootfs_image_sector_size"]
if sector_size:
losetup_cmd += ["-b", str(int(sector_size))]
pmb.chroot.root(args, losetup_cmd, check=False)
if device_by_back_file(args, img_path):
try:
device_by_back_file(args, img_path)
return
except RuntimeError:
pass
# Failure: raise exception
raise RuntimeError("Failed to mount loop device: " + img_path)
raise RuntimeError(f"Failed to mount loop device: {img_path}")
def device_by_back_file(args: PmbArgs, back_file, auto_init=True):
def device_by_back_file(args: PmbArgs, back_file: Path, auto_init=True) -> Path:
"""
Get the /dev/loopX device that points to a specific image file.
"""
@ -61,22 +66,24 @@ def device_by_back_file(args: PmbArgs, back_file, auto_init=True):
losetup_output = pmb.chroot.root(args, ["losetup", "--json", "--list"],
output_return=True, auto_init=auto_init)
if not losetup_output:
return None
raise RuntimeError("losetup failed")
# Find the back_file
losetup = json.loads(losetup_output)
for loopdevice in losetup["loopdevices"]:
if loopdevice["back-file"] == back_file:
return loopdevice["name"]
return None
if Path(loopdevice["back-file"]) == back_file:
return Path(loopdevice["name"])
raise RuntimeError(f"Failed to find loop device for {back_file}")
def umount(args: PmbArgs, img_path, auto_init=True):
def umount(args: PmbArgs, img_path: Path, auto_init=True):
"""
:param img_path: Path to the img file inside native chroot.
"""
device: Path
try:
device = device_by_back_file(args, img_path, auto_init)
if not device:
except RuntimeError:
return
logging.debug("(native) umount " + device)
logging.debug(f"(native) umount {device}")
pmb.chroot.root(args, ["losetup", "-d", device], auto_init=auto_init)

View file

@ -1,5 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from typing import Optional
from pmb.helpers import logging
import os
import time
@ -10,25 +12,28 @@ import pmb.install.losetup
from pmb.core import Chroot
def partitions_mount(args: PmbArgs, layout, disk):
# FIXME (#2324): this function drops disk to a string because it's easier
# to manipulate, this is probably bad.
def partitions_mount(args: PmbArgs, layout, disk: Optional[Path]):
"""
Mount blockdevices of partitions inside native chroot
:param layout: partition layout from get_partition_layout()
:param disk: path to disk block device (e.g. /dev/mmcblk0) or None
"""
prefix = disk
if not disk:
img_path = "/home/pmos/rootfs/" + args.device + ".img"
prefix = pmb.install.losetup.device_by_back_file(args, img_path)
img_path = Path("/home/pmos/rootfs") / f"{args.device}.img"
disk = pmb.install.losetup.device_by_back_file(args, img_path)
logging.info(f"Mounting partitions of {disk} inside the chroot")
tries = 20
# Devices ending with a number have a "p" before the partition number,
# /dev/sda1 has no "p", but /dev/mmcblk0p1 has. See add_partition() in
# block/partitions/core.c of linux.git.
partition_prefix = prefix
if str.isdigit(prefix[-1:]):
partition_prefix = f"{prefix}p"
partition_prefix = str(disk)
if str.isdigit(disk.name[-1:]):
partition_prefix = f"{disk}p"
found = False
for i in range(tries):
@ -40,7 +45,7 @@ def partitions_mount(args: PmbArgs, layout, disk):
time.sleep(0.1)
if not found:
raise RuntimeError(f"Unable to find the first partition of {prefix}, "
raise RuntimeError(f"Unable to find the first partition of {disk}, "
f"expected it to be at {partition_prefix}1!")
partitions = [layout["boot"], layout["root"]]
@ -49,7 +54,7 @@ def partitions_mount(args: PmbArgs, layout, disk):
partitions += [layout["kernel"]]
for i in partitions:
source = f"{partition_prefix}{i}"
source = Path(f"{partition_prefix}{i}")
target = Chroot.native() / "dev" / f"installp{i}"
pmb.helpers.mount.bind_file(args, source, target)

View file

@ -1,5 +1,6 @@
# Copyright 2023 Attila Szollosi
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from pmb.helpers import logging
import pmb.chroot
@ -13,7 +14,7 @@ def create_zip(args: PmbArgs, suffix):
"""
Create android recovery compatible installer zip.
"""
zip_root = "/var/lib/postmarketos-android-recovery-installer/"
zip_root = Path("/var/lib/postmarketos-android-recovery-installer/")
rootfs = "/mnt/rootfs_" + args.device
flavor = pmb.helpers.frontend._parse_flavor(args)
method = args.deviceinfo["flash_method"]
@ -70,4 +71,4 @@ def create_zip(args: PmbArgs, suffix):
["gzip", "-f1", "rootfs.tar"],
["build-recovery-zip", args.device]]
for command in commands:
pmb.chroot.root(args, command, suffix, zip_root)
pmb.chroot.root(args, command, suffix, working_dir=zip_root)

View file

@ -1,18 +1,19 @@
# Copyright 2023 Dylan Van Assche
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import List
from pmb.helpers import logging
from pmb.core.types import PmbArgs
import pmb.helpers.pmaports
def get_groups(args: PmbArgs):
def get_groups(args: PmbArgs) -> List[str]:
""" Get all groups to which the user additionally must be added.
The list of groups are listed in _pmb_groups of the UI and
UI-extras package.
:returns: list of groups, e.g. ["feedbackd", "udev"] """
ret = []
ret: List[str] = []
if args.ui == "none":
return ret

View file

@ -206,7 +206,7 @@ def _parse_attributes(path, lines, apkbuild_attributes, ret):
ret[attribute] = replace_variable(ret, value)
if "subpackages" in apkbuild_attributes:
subpackages = OrderedDict()
subpackages: OrderedDict[str, str] = OrderedDict()
for subpkg in ret["subpackages"].split(" "):
if subpkg:
_parse_subpackage(path, lines, ret, subpackages, subpkg)
@ -326,8 +326,12 @@ def apkbuild(path: Path, check_pkgver=True, check_pkgname=True):
:returns: relevant variables from the APKBUILD. Arrays get returned as
arrays.
"""
if path.is_dir():
if path.name != "APKBUILD":
path = path / "APKBUILD"
if not path.exists():
raise FileNotFoundError(f"{path.relative_to(pmb.config.work)} not found!")
# Try to get a cached result first (we assume that the aports don't change
# in one pmbootstrap call)
if path in pmb.helpers.other.cache["apkbuild"]:

View file

@ -1,6 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import collections
from typing import Any, Dict, List
from pmb.helpers import logging
from pathlib import Path
import tarfile
@ -34,7 +35,7 @@ def parse_next_block(path: Path, lines, start):
:returns: None, when there are no more blocks
"""
# Parse until we hit an empty line or end of file
ret = {}
ret: Dict[str, Any] = {}
mapping = {
"A": "arch",
"D": "depends",
@ -60,9 +61,8 @@ def parse_next_block(path: Path, lines, start):
for letter, key in mapping.items():
if line.startswith(letter + ":"):
if key in ret:
raise RuntimeError(
"Key " + key + " (" + letter + ":) specified twice"
" in block: " + str(ret) + ", file: " + path)
raise RuntimeError(f"Key {key} ({letter}:) specified twice"
f" in block: {ret}, file: {path}")
ret[key] = line[2:-1]
# Format and return the block
@ -91,9 +91,9 @@ def parse_next_block(path: Path, lines, start):
# No more blocks
elif ret != {}:
raise RuntimeError("Last block in " + path + " does not end"
raise RuntimeError(f"Last block in {path} does not end"
" with a new line! Delete the file and"
" try again. Last block: " + str(ret))
f" try again. Last block: {ret}")
return None
@ -168,7 +168,7 @@ def parse(path: Path, multiple_providers=True):
# Require the file to exist
if not path.is_file():
logging.verbose("NOTE: APKINDEX not found, assuming no binary packages"
" exist for that architecture: " + path)
f" exist for that architecture: {path}")
return {}
# Try to get a cached result first
@ -185,14 +185,14 @@ def parse(path: Path, multiple_providers=True):
# Read all lines
if tarfile.is_tarfile(path):
with tarfile.open(path, "r:gz") as tar:
with tar.extractfile(tar.getmember("APKINDEX")) as handle:
with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr]
lines = handle.readlines()
else:
with path.open("r", encoding="utf-8") as handle:
lines = handle.readlines()
# Parse the whole APKINDEX file
ret = collections.OrderedDict()
ret: Dict[str, Any] = collections.OrderedDict()
start = [0]
while True:
block = parse_next_block(path, lines, start)
@ -201,8 +201,8 @@ def parse(path: Path, multiple_providers=True):
# Skip virtual packages
if "timestamp" not in block:
logging.verbose("Skipped virtual package " + str(block) + " in"
" file: " + path)
logging.verbose(f"Skipped virtual package {block} in"
f" file: {path}")
continue
# Add the next package and all aliases
@ -232,11 +232,11 @@ def parse_blocks(path: Path):
"""
# Parse all lines
with tarfile.open(path, "r:gz") as tar:
with tar.extractfile(tar.getmember("APKINDEX")) as handle:
with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr]
lines = handle.readlines()
# Parse lines into blocks
ret = []
ret: List[str] = []
start = [0]
while True:
block = pmb.parse.apkindex.parse_next_block(path, lines, start)
@ -251,7 +251,7 @@ def clear_cache(path: Path):
:returns: True on successful deletion, False otherwise
"""
logging.verbose("Clear APKINDEX cache for: " + path)
logging.verbose(f"Clear APKINDEX cache for: {path}")
if path in pmb.helpers.other.cache["apkindex"]:
del pmb.helpers.other.cache["apkindex"][path]
return True
@ -281,7 +281,7 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None):
package = pmb.helpers.package.remove_operators(package)
ret = collections.OrderedDict()
ret: Dict[str, Any] = collections.OrderedDict()
for path in indexes:
# Skip indexes not providing the package
index_packages = parse(path)
@ -295,10 +295,8 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None):
if provider_pkgname in ret:
version_last = ret[provider_pkgname]["version"]
if pmb.parse.version.compare(version, version_last) == -1:
logging.verbose(package + ": provided by: " +
provider_pkgname + "-" + version + " in " +
path + " (but " + version_last + " is"
" higher)")
logging.verbose(f"{package}: provided by: {provider_pkgname}-{version}"
f"in {path} (but {version_last} is higher)")
continue
# Add the provider to ret
@ -306,7 +304,8 @@ def providers(args: PmbArgs, package, arch=None, must_exist=True, indexes=None):
ret[provider_pkgname] = provider
if ret == {} and must_exist:
logging.debug("Searched in APKINDEX files: " + ", ".join(indexes))
import os
logging.debug(f"Searched in APKINDEX files: {', '.join([os.fspath(x) for x in indexes])}")
raise RuntimeError("Could not find package '" + package + "'!")
return ret
@ -319,7 +318,7 @@ def provider_highest_priority(providers, pkgname):
:param pkgname: the package name we are interested in (for the log message)
"""
max_priority = 0
priority_providers = collections.OrderedDict()
priority_providers: collections.OrderedDict[str, str] = collections.OrderedDict()
for provider_name, provider in providers.items():
priority = int(provider.get("provider_priority", -1))
if priority > max_priority:

View file

@ -3,10 +3,13 @@
import argparse
import copy
import os
from pathlib import Path
import sys
from pmb.core.types import PmbArgs
try:
import argcomplete
import argcomplete # type:ignore[import-untyped]
except ImportError:
pass
@ -109,7 +112,7 @@ def arguments_install(subparser):
help="do not create an image file, instead"
" write to the given block device (SD card, USB"
" stick, etc.), for example: '/dev/mmcblk0'",
metavar="BLOCKDEV")
metavar="BLOCKDEV", type=lambda x: Path(x))
group.add_argument("--android-recovery-zip",
help="generate TWRP flashable zip (recommended read:"
" https://postmarketos.org/recoveryzip)",
@ -205,7 +208,8 @@ def arguments_export(subparser):
ret.add_argument("export_folder", help="export folder, defaults to"
" /tmp/postmarketOS-export",
default="/tmp/postmarketOS-export", nargs="?")
default=Path("/tmp/postmarketOS-export"), nargs="?",
type=lambda x: Path(x))
ret.add_argument("--odin", help="odin flashable tar"
" (boot.img/kernel+initramfs only)",
action="store_true", dest="odin_flashable_tar")
@ -651,8 +655,8 @@ def get_parser():
f" default: {mirrors_pmos_default}",
metavar="URL", action="append", default=[])
parser.add_argument("-m", "--mirror-alpine", dest="mirror_alpine",
help="Alpine Linux mirror, default: " +
pmb.config.defaults["mirror_alpine"],
help="Alpine Linux mirror, default: "
f"{pmb.config.defaults['mirror_alpine']}",
metavar="URL")
parser.add_argument("-j", "--jobs", help="parallel jobs when compiling")
parser.add_argument("-E", "--extra-space",
@ -923,7 +927,8 @@ def get_parser():
# Action: bootimg_analyze
bootimg_analyze = sub.add_parser("bootimg_analyze", help="Extract all the"
" information from an existing boot.img")
bootimg_analyze.add_argument("path", help="path to the boot.img")
bootimg_analyze.add_argument("path", help="path to the boot.img",
type=lambda x: Path(x))
bootimg_analyze.add_argument("--force", "-f", action="store_true",
help="force even if the file seems to be"
" invalid")
@ -940,7 +945,7 @@ def get_parser():
def arguments():
# Parse and extend arguments (also backup unmodified result from argparse)
args = get_parser().parse_args()
args: PmbArgs = get_parser().parse_args() # type: ignore
setattr(args, "from_argparse", copy.deepcopy(args))
setattr(args.from_argparse, "from_argparse", args.from_argparse)

View file

@ -10,8 +10,8 @@ import pmb.config
def binfmt_info(arch_qemu):
# Parse the info file
full = {}
info = pmb.config.pmb_src + "/pmb/data/qemu-user-binfmt.txt"
logging.verbose("parsing: " + info)
info = pmb.config.pmb_src / "pmb/data/qemu-user-binfmt.txt"
logging.verbose(f"parsing: {info}")
with open(info, "r") as handle:
for line in handle:
if line.startswith('#') or "=" not in line:

View file

@ -1,9 +1,10 @@
# Copyright 2023 Lary Gibaud
# SPDX-License-Identifier: GPL-3.0-or-later
import re
from typing import Optional
def arm_big_little_first_group_ncpus():
def arm_big_little_first_group_ncpus() -> Optional[int]:
"""
Infer from /proc/cpuinfo on aarch64 if this is a big/little architecture
(if there is different processor models) and the number of cores in the

View file

@ -17,12 +17,12 @@ def package_from_aports(args: PmbArgs, pkgname_depend):
depends, version. The version is the combined pkgver and pkgrel.
"""
# Get the aport
aport = pmb.helpers.pmaports.find(args, pkgname_depend, False)
aport = pmb.helpers.pmaports.find_optional(args, pkgname_depend)
if not aport:
return None
# Parse its version
apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD")
apkbuild = pmb.parse.apkbuild(aport / "APKBUILD")
pkgname = apkbuild["pkgname"]
version = apkbuild["pkgver"] + "-r" + apkbuild["pkgrel"]
@ -118,7 +118,7 @@ def package_from_index(args: PmbArgs, pkgname_depend, pkgnames_install, package_
return provider
def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()):
def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()) -> Sequence[str]:
"""
Find all dependencies of the given pkgnames.
@ -134,8 +134,8 @@ def recurse(args: PmbArgs, pkgnames, suffix: Chroot=Chroot.native()):
# Iterate over todo-list until is is empty
todo = list(pkgnames)
required_by = {}
ret = []
required_by: Dict[str, Set[str]] = {}
ret: List[str] = []
while len(todo):
# Skip already passed entries
pkgname_depend = todo.pop(0)

View file

@ -113,7 +113,8 @@ def _parse_kernel_suffix(args: PmbArgs, info, device, kernel):
return ret
def deviceinfo(args: PmbArgs, device=None, kernel=None):
# FIXME (#2324): Make deviceinfo a type! (class!!!)
def deviceinfo(args: PmbArgs, device=None, kernel=None) -> Dict[str, str]:
"""
:param device: defaults to args.device
:param kernel: defaults to args.kernel

View file

@ -253,21 +253,25 @@ def check(args: PmbArgs, pkgname, components_list=[], details=False, must_exist=
# Read all kernel configs in the aport
ret = True
aport = pmb.helpers.pmaports.find(args, "linux-" + flavor, must_exist=must_exist)
if aport is None:
aport: Path
try:
aport = pmb.helpers.pmaports.find(args, "linux-" + flavor)
except RuntimeError as e:
if must_exist:
raise e
return None
apkbuild = pmb.parse.apkbuild(f"{aport}/APKBUILD")
apkbuild = pmb.parse.apkbuild(aport / "APKBUILD")
pkgver = apkbuild["pkgver"]
# We only enforce optional checks for community & main devices
enforce_check = aport.split("/")[-2] in ["community", "main"]
enforce_check = aport.parts[-2] in ["community", "main"]
for name in get_all_component_names():
if f"pmb:kconfigcheck-{name}" in apkbuild["options"] and \
name not in components_list:
components_list += [name]
for config_path in glob.glob(aport + "/config-*"):
for config_path in aport.glob("config-*"):
# The architecture of the config is in the name, so it just needs to be
# extracted
config_name = os.path.basename(config_path)

View file

@ -108,12 +108,12 @@ def parse_suffix(rest):
C equivalent: get_token(), case TOKEN_SUFFIX
"""
suffixes = collections.OrderedDict([
name_suffixes = collections.OrderedDict([
("pre", ["alpha", "beta", "pre", "rc"]),
("post", ["cvs", "svn", "git", "hg", "p"]),
])
for name, suffixes in suffixes.items():
for name, suffixes in name_suffixes.items():
for i, suffix in enumerate(suffixes):
if not rest.startswith(suffix):
continue
@ -203,7 +203,7 @@ def validate(version):
return True
def compare(a_version, b_version, fuzzy=False):
def compare(a_version: str, b_version: str, fuzzy=False):
"""
Compare two versions A and B to find out which one is higher, or if
both are equal.
@ -307,4 +307,4 @@ def check_string(a_version, rule):
# Compare
result = compare(a_version, b_version)
return result in expected_results
return not expected_results or result in expected_results

View file

@ -1,5 +1,6 @@
# Copyright 2023 Pablo Castellano, Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import subprocess
from typing import Sequence
from pmb.helpers import logging
import os
@ -16,7 +17,7 @@ import pmb.chroot.other
import pmb.chroot.initfs
import pmb.config
import pmb.config.pmaports
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.helpers.run
import pmb.parse.arch
import pmb.parse.cpuinfo
@ -30,7 +31,7 @@ def system_image(args: PmbArgs):
"""
path = Chroot.native() / "home/pmos/rootfs" / f"{args.device}.img"
if not path.exists():
logging.debug("Could not find rootfs: " + path)
logging.debug(f"Could not find rootfs: {path}")
raise RuntimeError("The rootfs has not been generated yet, please "
"run 'pmbootstrap install' first.")
return path
@ -76,10 +77,10 @@ def create_gdk_loader_cache(args: PmbArgs) -> Path:
cache_path = gdk_cache_dir / "loaders.cache"
if not (chroot_native / cache_path).is_file():
raise RuntimeError("gdk pixbuf cache file not found: " + cache_path)
raise RuntimeError(f"gdk pixbuf cache file not found: {cache_path}")
pmb.chroot.root(args, ["cp", cache_path, custom_cache_path])
cmd = ["sed", "-i", "-e",
cmd: Sequence[PathString] = ["sed", "-i", "-e",
f"s@\"{gdk_cache_dir}@\"{chroot_native / gdk_cache_dir}@",
custom_cache_path]
pmb.chroot.root(args, cmd)
@ -138,11 +139,12 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None):
if "gtk" in args.qemu_display:
gdk_cache = create_gdk_loader_cache(args)
env.update({"GTK_THEME": "Default",
"GDK_PIXBUF_MODULE_FILE": gdk_cache,
"XDG_DATA_DIRS": ":".join([
chroot_native / "usr/local/share",
chroot_native / "usr/share"
# FIXME: why does mypy think the values here should all be paths??
env.update({"GTK_THEME": "Default", # type: ignore[dict-item]
"GDK_PIXBUF_MODULE_FILE": str(gdk_cache), # type: ignore[dict-item]
"XDG_DATA_DIRS": ":".join([ # type: ignore[dict-item]
str(chroot_native / "usr/local/share"),
str(chroot_native / "usr/share"),
])})
command = []
@ -162,9 +164,9 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None):
command += [chroot_native / "lib" / f"ld-musl-{pmb.config.arch_native}.so.1"]
command += ["--library-path=" + ":".join([
chroot_native / "lib",
chroot_native / "usr/lib" +
chroot_native / "usr/lib/pulseaudio"
str(chroot_native / "lib"),
str(chroot_native / "usr/lib"),
str(chroot_native / "usr/lib/pulseaudio"),
])]
command += [chroot_native / "usr/bin" / f"qemu-system-{arch}"]
command += ["-L", chroot_native / "usr/share/qemu/"]
@ -188,7 +190,7 @@ def command_qemu(args: PmbArgs, arch, img_path, img_path_2nd=None):
else:
command += ["stdio"]
command += ["-drive", "file=" + img_path + ",format=raw,if=virtio"]
command += ["-drive", f"file={img_path},format=raw,if=virtio"]
if img_path_2nd:
command += ["-drive", "file=" + img_path_2nd + ",format=raw,if=virtio"]
@ -387,5 +389,5 @@ def run(args: PmbArgs):
"send Ctrl+C to the VM, run:")
logging.info("$ pmbootstrap config qemu_redir_stdio True")
finally:
if process:
if isinstance(process, subprocess.Popen):
process.terminate()

View file

@ -4,9 +4,8 @@ import os
from typing import List
from pmb.helpers import logging
import shlex
from argparse import Namespace
from pmb.core.types import PmbArgs
from pmb.core.types import PathString, PmbArgs
import pmb.helpers.run
import pmb.helpers.run_core
import pmb.parse.apkindex
@ -22,19 +21,19 @@ def scp_abuild_key(args: PmbArgs, user: str, host: str, port: str):
:param host: target device ssh hostname
:param port: target device ssh port """
keys = (pmb.config.work / "config_abuild").glob("*.pub")
keys = list((pmb.config.work / "config_abuild").glob("*.pub"))
key = keys[0]
key_name = os.path.basename(key)
logging.info(f"Copying signing key ({key_name}) to {user}@{host}")
command = ['scp', '-P', port, key, f'{user}@{host}:/tmp']
command: List[PathString] = ['scp', '-P', port, key, f'{user}@{host}:/tmp']
pmb.helpers.run.user(args, command, output="interactive")
logging.info(f"Installing signing key at {user}@{host}")
keyname = os.path.join("/tmp", os.path.basename(key))
remote_cmd = ['sudo', '-p', pmb.config.sideload_sudo_prompt,
remote_cmd_l: List[PathString] = ['sudo', '-p', pmb.config.sideload_sudo_prompt,
'-S', 'mv', '-n', keyname, "/etc/apk/keys/"]
remote_cmd = pmb.helpers.run_core.flat_cmd(remote_cmd)
remote_cmd = pmb.helpers.run_core.flat_cmd(remote_cmd_l)
command = ['ssh', '-t', '-p', port, f'{user}@{host}', remote_cmd]
pmb.helpers.run.user(args, command, output="tui")
@ -43,7 +42,7 @@ def ssh_find_arch(args: PmbArgs, user: str, host: str, port: str) -> str:
"""Connect to a device via ssh and query the architecture."""
logging.info(f"Querying architecture of {user}@{host}")
command = ["ssh", "-p", port, f"{user}@{host}", "uname -m"]
output = pmb.helpers.run.user(args, command, output_return=True)
output = pmb.helpers.run.user_output(args, command)
# Split by newlines so we can pick out any irrelevant output, e.g. the "permanently
# added to list of known hosts" warnings.
output_lines = output.strip().splitlines()

View file

@ -25,7 +25,7 @@ def args(tmpdir, request):
def test_hash():
url = "https://nl.alpinelinux.org/alpine/edge/testing"
hash = "865a153c"
assert pmb.helpers.repo.hash(url, 8) == hash
assert pmb.helpers.repo.apkindex_hash(url, 8) == hash
def test_alpine_apkindex_path(args: PmbArgs):

View file

@ -73,7 +73,7 @@ def setup_work(args: PmbArgs, tmpdir):
pmb.helpers.run.user(args, ["./pmbootstrap.py", "shutdown"])
# Link everything from work (except for "packages") to the tmpdir
for path in glob.glob(pmb.config.work / "*"):
for path in pmb.config.work.glob("*"):
if os.path.basename(path) != "packages":
pmb.helpers.run.user(args, ["ln", "-s", path, tmpdir + "/"])
@ -91,7 +91,7 @@ def setup_work(args: PmbArgs, tmpdir):
f"{tmpdir}/_aports/main/{pkgname}"])
# Copy pmaports.cfg
pmb.helpers.run.user(args, ["cp", args.aports + "/pmaports.cfg", tmpdir +
pmb.helpers.run.user(args, ["cp", args.aports / "pmaports.cfg", tmpdir +
"/_aports"])
# Empty packages folder

View file

@ -133,7 +133,7 @@ def is_running(args: PmbArgs, programs, timeout=300, sleep_before_retry=1):
ssh_works = False
end = time.monotonic() + timeout
last_try = 0
last_try = 0.0
while last_try < end:
# Sleep only when last try exited immediately

View file

@ -1,6 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
""" Test pmb.helpers.run_core """
from typing import Sequence
from pmb.core.types import PmbArgs
import pytest
import re
@ -69,7 +70,7 @@ def test_pipe(args: PmbArgs):
def test_foreground_pipe(args: PmbArgs):
func = pmb.helpers.run_core.foreground_pipe
cmd = ["echo", "test"]
cmd: Sequence[str] = ["echo", "test"]
# Normal run
assert func(args, cmd) == (0, "")