forked from Mirror/pmbootstrap
pmb: Add more type hints (MR 2490)
This commit is contained in:
parent
206ba62417
commit
5ed5817e80
36 changed files with 174 additions and 120 deletions
|
@ -35,7 +35,7 @@ def get_cross_package_arches(pkgname: str) -> str:
|
|||
return "x86_64"
|
||||
|
||||
|
||||
def properties(pkgname):
|
||||
def properties(pkgname: str) -> tuple[str, str, AportGenEntry]:
|
||||
"""
|
||||
Get the `pmb.config.aportgen` properties for the aport generator, based on
|
||||
the pkgname prefix.
|
||||
|
|
|
@ -13,7 +13,7 @@ import pmb.parse.apkindex
|
|||
import pmb.parse
|
||||
|
||||
|
||||
def ask_for_architecture():
|
||||
def ask_for_architecture() -> Arch:
|
||||
architectures = [str(a) for a in Arch.supported()]
|
||||
# Don't show armhf, new ports shouldn't use this architecture
|
||||
if "armhf" in architectures:
|
||||
|
@ -32,12 +32,12 @@ def ask_for_architecture():
|
|||
)
|
||||
|
||||
|
||||
def ask_for_manufacturer():
|
||||
def ask_for_manufacturer() -> str:
|
||||
logging.info("Who produced the device (e.g. LG)?")
|
||||
return pmb.helpers.cli.ask("Manufacturer", None, None, False)
|
||||
|
||||
|
||||
def ask_for_name(manufacturer):
|
||||
def ask_for_name(manufacturer: str) -> str:
|
||||
logging.info("What is the official name (e.g. Google Nexus 5)?")
|
||||
ret = pmb.helpers.cli.ask("Name", None, None, False)
|
||||
|
||||
|
@ -47,13 +47,13 @@ def ask_for_name(manufacturer):
|
|||
return ret
|
||||
|
||||
|
||||
def ask_for_year():
|
||||
def ask_for_year() -> str:
|
||||
# Regex from https://stackoverflow.com/a/12240826
|
||||
logging.info("In what year was the device released (e.g. 2012)?")
|
||||
return pmb.helpers.cli.ask("Year", None, None, False, validation_regex=r"^[1-9]\d{3,}$")
|
||||
|
||||
|
||||
def ask_for_chassis():
|
||||
def ask_for_chassis() -> str:
|
||||
types = pmb.config.deviceinfo_chassis_types
|
||||
|
||||
logging.info("What type of device is it?")
|
||||
|
@ -69,7 +69,7 @@ def ask_for_external_storage() -> bool:
|
|||
)
|
||||
|
||||
|
||||
def ask_for_flash_method():
|
||||
def ask_for_flash_method() -> str:
|
||||
while True:
|
||||
logging.info("Which flash method does the device support?")
|
||||
method = pmb.helpers.cli.ask(
|
||||
|
@ -100,7 +100,7 @@ def ask_for_flash_method():
|
|||
)
|
||||
|
||||
|
||||
def ask_for_bootimg():
|
||||
def ask_for_bootimg() -> Bootimg | None:
|
||||
logging.info(
|
||||
"You can analyze a known working boot.img file to"
|
||||
" automatically fill out the flasher information for your"
|
||||
|
|
|
@ -11,7 +11,7 @@ import pmb.helpers.pmaports
|
|||
from pmb.core import Chroot
|
||||
|
||||
|
||||
def update(pkgname):
|
||||
def update(pkgname: str) -> None:
|
||||
"""Fetch all sources and update the checksums in the APKBUILD."""
|
||||
pmb.build.init_abuild_minimal()
|
||||
pmb.build.copy_to_buildpath(pkgname, no_override=True)
|
||||
|
@ -24,7 +24,7 @@ def update(pkgname):
|
|||
pmb.helpers.run.user(["cp", source, target])
|
||||
|
||||
|
||||
def verify(pkgname):
|
||||
def verify(pkgname: str) -> None:
|
||||
"""Fetch all sources and verify their checksums."""
|
||||
pmb.build.init_abuild_minimal()
|
||||
pmb.build.copy_to_buildpath(pkgname)
|
||||
|
|
|
@ -21,7 +21,7 @@ from pmb.core import Chroot
|
|||
from pmb.core.context import get_context
|
||||
|
||||
|
||||
def match_kbuild_out(word):
|
||||
def match_kbuild_out(word: str) -> str | None:
|
||||
"""Look for paths in the following formats:
|
||||
"<prefix>/<kbuild_out>/arch/<arch>/boot"
|
||||
"<prefix>/<kbuild_out>/include/config/kernel.release"
|
||||
|
@ -55,7 +55,7 @@ def match_kbuild_out(word):
|
|||
return "" if out_dir is None else out_dir.strip("/")
|
||||
|
||||
|
||||
def find_kbuild_output_dir(function_body):
|
||||
def find_kbuild_output_dir(function_body: list[str]) -> str:
|
||||
"""Guess what the kernel build output directory is.
|
||||
|
||||
Parses each line of the function word by word, looking for paths which
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
import os
|
||||
from pmb.core.pkgrepo import pkgrepo_default_path
|
||||
from pmb.helpers import logging
|
||||
from pmb.types import PathString
|
||||
from pathlib import Path
|
||||
import pmb.chroot
|
||||
import pmb.helpers.cli
|
||||
|
@ -12,7 +13,7 @@ import pmb.build
|
|||
from pmb.core import Chroot
|
||||
|
||||
|
||||
def newapkbuild(folder, args_passed, force=False):
|
||||
def newapkbuild(folder: PathString, args_passed: list[str], force: bool = False) -> None:
|
||||
# Initialize build environment and build folder
|
||||
pmb.build.init()
|
||||
pmb.chroot.init(Chroot.native())
|
||||
|
|
|
@ -58,7 +58,7 @@ def check_min_version(chroot: Chroot = Chroot.native()) -> None:
|
|||
)
|
||||
|
||||
|
||||
def packages_split_to_add_del(packages):
|
||||
def packages_split_to_add_del(packages: list[str]) -> tuple[list[str], list[str]]:
|
||||
"""
|
||||
Sort packages into "to_add" and "to_del" lists depending on their pkgname
|
||||
starting with an exclamation mark.
|
||||
|
|
|
@ -59,7 +59,7 @@ def mark_in_chroot(chroot: Chroot = Chroot.native()) -> None:
|
|||
pmb.helpers.run.root(["touch", in_chroot_file])
|
||||
|
||||
|
||||
def init_keys():
|
||||
def init_keys() -> None:
|
||||
"""
|
||||
All Alpine and postmarketOS repository keys are shipped with pmbootstrap.
|
||||
Copy them into $WORK/config_apk_keys, which gets mounted inside the various
|
||||
|
|
|
@ -77,7 +77,7 @@ def extract(flavor: str | None, chroot: Chroot, extra: bool = False) -> Path:
|
|||
return outside
|
||||
|
||||
|
||||
def ls(flavor, suffix, extra=False):
|
||||
def ls(flavor: str | None, suffix: Chroot, extra: bool = False) -> None:
|
||||
tmp = "/tmp/initfs-extracted"
|
||||
if extra:
|
||||
tmp = "/tmp/initfs-extra-extracted"
|
||||
|
|
|
@ -12,7 +12,7 @@ from pmb.core import Chroot, ChrootType
|
|||
from pmb.core.context import get_context
|
||||
|
||||
|
||||
def kill_adb():
|
||||
def kill_adb() -> None:
|
||||
"""
|
||||
Kill adb daemon if it's running.
|
||||
"""
|
||||
|
@ -22,7 +22,7 @@ def kill_adb():
|
|||
pmb.chroot.root(["adb", "-P", str(port), "kill-server"])
|
||||
|
||||
|
||||
def kill_sccache():
|
||||
def kill_sccache() -> None:
|
||||
"""
|
||||
Kill sccache daemon if it's running. Unlike ccache it automatically spawns
|
||||
a daemon when you call it and exits after some time of inactivity.
|
||||
|
@ -59,7 +59,7 @@ def shutdown_cryptsetup_device(name: str) -> None:
|
|||
raise RuntimeError("Failed to parse 'cryptsetup status' output!")
|
||||
|
||||
|
||||
def shutdown(only_install_related=False):
|
||||
def shutdown(only_install_related: bool = False) -> None:
|
||||
# Stop daemons
|
||||
kill_adb()
|
||||
kill_sccache()
|
||||
|
|
|
@ -173,7 +173,7 @@ def zap_pkgs_local_mismatch(confirm: bool = True, dry: bool = False) -> None:
|
|||
pmb.build.other.index_repo()
|
||||
|
||||
|
||||
def zap_pkgs_online_mismatch(confirm=True, dry=False):
|
||||
def zap_pkgs_online_mismatch(confirm: bool = True, dry: bool = False) -> None:
|
||||
# Check whether we need to do anything
|
||||
paths = list(get_context().config.work.glob("cache_apk_*"))
|
||||
if not len(paths):
|
||||
|
|
|
@ -5,13 +5,20 @@ import glob
|
|||
from pmb.helpers import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, TypedDict
|
||||
import pmb.chroot
|
||||
from pmb.types import Env, PmbArgs
|
||||
import pmb.helpers.cli
|
||||
from pmb.core import Chroot
|
||||
|
||||
|
||||
def get_ci_scripts(topdir):
|
||||
class CiScriptDescriptor(TypedDict):
|
||||
description: str
|
||||
options: list[str]
|
||||
artifacts: str | None
|
||||
|
||||
|
||||
def get_ci_scripts(topdir: Path) -> dict[str, CiScriptDescriptor]:
|
||||
"""Find 'pmbootstrap ci'-compatible scripts inside a git repository, and
|
||||
parse their metadata (description, options). The reference is at:
|
||||
https://postmarketos.org/pmb-ci
|
||||
|
@ -21,7 +28,7 @@ def get_ci_scripts(topdir):
|
|||
:returns: a dict of CI scripts found in the git repository, e.g.
|
||||
{"ruff": {"description": "lint all python scripts", "options": []}, ...}
|
||||
"""
|
||||
ret = {}
|
||||
ret: dict[str, CiScriptDescriptor] = {}
|
||||
for script in glob.glob(f"{topdir}/.ci/*.sh"):
|
||||
is_pmb_ci_script = False
|
||||
description = ""
|
||||
|
@ -61,7 +68,7 @@ def get_ci_scripts(topdir):
|
|||
return ret
|
||||
|
||||
|
||||
def sort_scripts_by_speed(scripts):
|
||||
def sort_scripts_by_speed(scripts: dict[str, CiScriptDescriptor]) -> dict[str, CiScriptDescriptor]:
|
||||
"""Order the scripts, so fast scripts run before slow scripts. Whether a
|
||||
script is fast or not is determined by the '# Options: slow' comment in
|
||||
the file.
|
||||
|
@ -88,7 +95,9 @@ def sort_scripts_by_speed(scripts):
|
|||
return ret
|
||||
|
||||
|
||||
def ask_which_scripts_to_run(scripts_available):
|
||||
def ask_which_scripts_to_run(
|
||||
scripts_available: dict[str, CiScriptDescriptor],
|
||||
) -> dict[str, CiScriptDescriptor]:
|
||||
"""Display an interactive prompt about which of the scripts the user
|
||||
wishes to run, or all of them.
|
||||
|
||||
|
@ -117,7 +126,7 @@ def ask_which_scripts_to_run(scripts_available):
|
|||
return ret
|
||||
|
||||
|
||||
def copy_git_repo_to_chroot(topdir):
|
||||
def copy_git_repo_to_chroot(topdir: Path) -> None:
|
||||
"""Create a tarball of the git repo (including unstaged changes and new
|
||||
files) and extract it in chroot_native.
|
||||
|
||||
|
@ -142,7 +151,7 @@ def copy_git_repo_to_chroot(topdir):
|
|||
pmb.chroot.user(["tar", "-xf", "/tmp/git.tar.gz"], working_dir=ci_dir)
|
||||
|
||||
|
||||
def run_scripts(topdir, scripts):
|
||||
def run_scripts(topdir: Path, scripts: dict[str, CiScriptDescriptor]) -> None:
|
||||
"""Run one of the given scripts after another, either natively or in a
|
||||
chroot. Display a progress message and stop on error (without printing
|
||||
a python stack trace).
|
||||
|
|
|
@ -5,6 +5,6 @@
|
|||
class Command:
|
||||
"""Base class for pmbootstrap commands."""
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
"""Run the command."""
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -7,8 +7,8 @@ import pmb.build.other
|
|||
|
||||
|
||||
class Index(commands.Command):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
pmb.build.other.index_repo()
|
||||
|
|
|
@ -96,6 +96,6 @@ class KConfigMigrate(commands.Command):
|
|||
self.pkgname_list = [pkgname] if isinstance(pkgname, str) else pkgname
|
||||
self.arch = arch
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
for pkgname in self.pkgname_list:
|
||||
pmb.build.kconfig.migrate_config(pkgname, self.arch)
|
||||
|
|
|
@ -26,7 +26,7 @@ class RepoBootstrap(commands.Command):
|
|||
progress_total: int = 0
|
||||
progress_step: str
|
||||
|
||||
def check_repo_arg(self):
|
||||
def check_repo_arg(self) -> None:
|
||||
cfg = pmb.config.pmaports.read_config_repos()
|
||||
|
||||
if self.repo in cfg:
|
||||
|
@ -79,7 +79,7 @@ class RepoBootstrap(commands.Command):
|
|||
if self.arch.cpu_emulation_required():
|
||||
self.progress_total += len(steps)
|
||||
|
||||
def log_progress(self, msg):
|
||||
def log_progress(self, msg: str) -> None:
|
||||
percent = int(100 * self.progress_done / self.progress_total)
|
||||
logging.info(f"*** {percent}% [{self.progress_step}] {msg} ***")
|
||||
|
||||
|
@ -130,7 +130,7 @@ class RepoBootstrap(commands.Command):
|
|||
|
||||
self.log_progress("bootstrap complete!")
|
||||
|
||||
def check_existing_pkgs(self):
|
||||
def check_existing_pkgs(self) -> None:
|
||||
channel = pmb.config.pmaports.read_config()["channel"]
|
||||
path = self.context.config.work / "packages" / channel / self.arch
|
||||
|
||||
|
@ -169,7 +169,7 @@ class RepoBootstrap(commands.Command):
|
|||
|
||||
return ret
|
||||
|
||||
def run(self): # noqa: F821
|
||||
def run(self) -> None: # noqa: F821
|
||||
self.check_existing_pkgs()
|
||||
|
||||
steps = self.get_steps()
|
||||
|
|
|
@ -7,8 +7,8 @@ import pmb.chroot
|
|||
|
||||
|
||||
class Shutdown(commands.Command):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
pmb.chroot.shutdown()
|
||||
|
|
|
@ -12,7 +12,7 @@ import time
|
|||
"""Various internal test commands for performance testing and debugging."""
|
||||
|
||||
|
||||
def apkindex_parse_all():
|
||||
def apkindex_parse_all() -> None:
|
||||
indexes = pmb.helpers.repo.apkindex_files(Arch.native())
|
||||
|
||||
pkgs = 0
|
||||
|
@ -26,9 +26,9 @@ def apkindex_parse_all():
|
|||
|
||||
|
||||
class Test(commands.Command):
|
||||
def __init__(self, action: str):
|
||||
def __init__(self, action: str) -> None:
|
||||
self.action = action
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
if self.action == "apkindex_parse_all":
|
||||
apkindex_parse_all()
|
||||
|
|
|
@ -129,7 +129,7 @@ def chroot_check_channel(chroot: Chroot) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def clean():
|
||||
def clean() -> bool | None:
|
||||
"""Remove obsolete data data from workdir.cfg.
|
||||
|
||||
:returns: None if workdir does not exist,
|
||||
|
|
|
@ -83,7 +83,7 @@ class Config:
|
|||
|
||||
providers: dict[str, str] = {}
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
# Make sure we aren't modifying the class defaults
|
||||
for key in Config.__annotations__.keys():
|
||||
setattr(self, key, deepcopy(Config.get_default(key)))
|
||||
|
|
|
@ -116,7 +116,7 @@ def _create_command_with_progress(command, fifo):
|
|||
return ["sh", "-c", command_flat]
|
||||
|
||||
|
||||
def _compute_progress(line):
|
||||
def _compute_progress(line: str) -> float:
|
||||
"""Compute the progress as a number between 0 and 1.
|
||||
|
||||
:param line: line as read from the progress fifo
|
||||
|
@ -277,7 +277,7 @@ def cache_clean(arch: Arch) -> None:
|
|||
pmb.helpers.run.root(_command)
|
||||
|
||||
|
||||
def check_outdated(version_installed, action_msg):
|
||||
def check_outdated(version_installed: str, action_msg: str) -> None:
|
||||
"""Check if the provided alpine version is outdated.
|
||||
|
||||
This depends on the alpine mirrordir (edge, v3.12, ...) related to currently checked out
|
||||
|
|
|
@ -7,6 +7,7 @@ import shutil
|
|||
import tarfile
|
||||
import tempfile
|
||||
import stat
|
||||
from pathlib import Path
|
||||
|
||||
import pmb.helpers.apk
|
||||
import pmb.helpers.run
|
||||
|
@ -19,7 +20,7 @@ import pmb.parse.version
|
|||
from pmb.core.context import get_context
|
||||
|
||||
|
||||
def read_signature_info(tar):
|
||||
def read_signature_info(tar: tarfile.TarFile) -> tuple[str, str]:
|
||||
"""
|
||||
Find various information about the signature that was used to sign
|
||||
/sbin/apk.static inside the archive (not to be confused with the normal apk
|
||||
|
@ -107,7 +108,7 @@ def verify_signature(files, sigkey_path):
|
|||
)
|
||||
|
||||
|
||||
def extract(version, apk_path):
|
||||
def extract(version: str, apk_path: Path) -> None:
|
||||
"""
|
||||
Extract everything to temporary locations, verify signatures and reported
|
||||
versions. When everything is right, move the extracted apk.static to the
|
||||
|
@ -147,7 +148,7 @@ def extract(version, apk_path):
|
|||
shutil.move(temp_path, target_path)
|
||||
|
||||
|
||||
def download(file):
|
||||
def download(file: str) -> Path:
|
||||
"""
|
||||
Download a single file from an Alpine mirror.
|
||||
"""
|
||||
|
|
|
@ -6,6 +6,8 @@ import os
|
|||
import re
|
||||
import readline
|
||||
import sys
|
||||
from collections.abc import KeysView
|
||||
from typing import Any
|
||||
|
||||
import pmb.config
|
||||
from pmb.core.context import get_context
|
||||
|
@ -14,7 +16,7 @@ from pmb.core.context import get_context
|
|||
class ReadlineTabCompleter:
|
||||
"""Store intermediate state for completer function."""
|
||||
|
||||
def __init__(self, options: list[str]) -> None:
|
||||
def __init__(self, options: KeysView[str] | dict[str, Any] | list[str]) -> None:
|
||||
""":param options: list of possible completions."""
|
||||
self.options = sorted(options)
|
||||
self.matches: list[str] = []
|
||||
|
@ -38,13 +40,13 @@ class ReadlineTabCompleter:
|
|||
|
||||
|
||||
def ask(
|
||||
question="Continue?",
|
||||
choices=["y", "n"],
|
||||
default="n",
|
||||
lowercase_answer=True,
|
||||
validation_regex=None,
|
||||
complete=None,
|
||||
):
|
||||
question: str = "Continue?",
|
||||
choices: list[str] | None = ["y", "n"],
|
||||
default: int | str | None = "n",
|
||||
lowercase_answer: bool | None = True,
|
||||
validation_regex: str | None = None,
|
||||
complete: KeysView[str] | dict[str, Any] | list[str] | None = None,
|
||||
) -> str:
|
||||
"""Ask a question on the terminal.
|
||||
|
||||
:param question: display prompt
|
||||
|
|
|
@ -84,7 +84,7 @@ def is_up_to_date(path_sources, path_target=None, lastmod_target=None):
|
|||
return lastmod_target >= lastmod_source
|
||||
|
||||
|
||||
def is_older_than(path, seconds):
|
||||
def is_older_than(path: Path, seconds: int) -> bool:
|
||||
"""Check if a single file is older than a given amount of seconds."""
|
||||
if not os.path.exists(path):
|
||||
return True
|
||||
|
|
|
@ -20,7 +20,7 @@ import pmb.helpers.pmaports
|
|||
import pmb.helpers.repo
|
||||
|
||||
|
||||
def remove_operators(package):
|
||||
def remove_operators(package: str) -> str:
|
||||
for operator in [">", ">=", "<=", "=", "<", "~"]:
|
||||
if operator in package:
|
||||
package = package.split(operator)[0]
|
||||
|
|
|
@ -375,7 +375,7 @@ def get_channel_new(channel: str) -> str:
|
|||
return channel
|
||||
|
||||
|
||||
def require_bootstrap_error(repo, arch, trigger_str):
|
||||
def require_bootstrap_error(repo: str, arch: Arch, trigger_str: str) -> None:
|
||||
"""
|
||||
Tell the user that they need to do repo_bootstrap, with some context.
|
||||
|
||||
|
@ -392,7 +392,7 @@ def require_bootstrap_error(repo, arch, trigger_str):
|
|||
)
|
||||
|
||||
|
||||
def require_bootstrap(arch, trigger_str):
|
||||
def require_bootstrap(arch: Arch, trigger_str: str) -> None:
|
||||
"""
|
||||
Check if repo_bootstrap was done, if any is needed.
|
||||
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
# Copyright 2023 Oliver Smith
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
from typing import Any
|
||||
|
||||
from pmb.core.arch import Arch
|
||||
from pmb.helpers import logging
|
||||
from pmb.types import Apkbuild
|
||||
|
||||
import pmb.build
|
||||
import pmb.helpers.package
|
||||
import pmb.helpers.pmaports
|
||||
|
||||
|
||||
def filter_missing_packages(arch, pkgnames):
|
||||
def filter_missing_packages(arch: Arch, pkgnames: list[str]) -> list[str]:
|
||||
"""Create a subset of pkgnames with missing or outdated binary packages.
|
||||
|
||||
:param arch: architecture (e.g. "armhf")
|
||||
|
@ -27,7 +27,7 @@ def filter_missing_packages(arch, pkgnames):
|
|||
return ret
|
||||
|
||||
|
||||
def filter_aport_packages(pkgnames):
|
||||
def filter_aport_packages(pkgnames: list[str]) -> list[str]:
|
||||
"""Create a subset of pkgnames where each one has an aport.
|
||||
|
||||
:param pkgnames: list of package names (e.g. ["hello-world", "test12"])
|
||||
|
@ -40,7 +40,7 @@ def filter_aport_packages(pkgnames):
|
|||
return ret
|
||||
|
||||
|
||||
def filter_arch_packages(arch, pkgnames):
|
||||
def filter_arch_packages(arch: Arch, pkgnames: list[str]) -> list[str]:
|
||||
"""Create a subset of pkgnames with packages removed that can not be built for a certain arch.
|
||||
|
||||
:param arch: architecture (e.g. "armhf")
|
||||
|
@ -54,7 +54,7 @@ def filter_arch_packages(arch, pkgnames):
|
|||
return ret
|
||||
|
||||
|
||||
def get_relevant_packages(arch, pkgname=None, built=False):
|
||||
def get_relevant_packages(arch: Arch, pkgname: str | None = None, built: bool = False) -> list[str]:
|
||||
"""Get all packages that can be built for the architecture in question.
|
||||
|
||||
:param arch: architecture (e.g. "armhf")
|
||||
|
@ -92,7 +92,7 @@ def get_relevant_packages(arch, pkgname=None, built=False):
|
|||
return ret
|
||||
|
||||
|
||||
def generate_output_format(arch: Arch, pkgnames: list[str]) -> list[dict[str, Any]]:
|
||||
def generate_output_format(arch: Arch, pkgnames: list[str]) -> list[Apkbuild]:
|
||||
"""Generate the detailed output format.
|
||||
|
||||
:param arch: architecture
|
||||
|
|
|
@ -72,7 +72,7 @@ def sanity_checks(
|
|||
raise RuntimeError("Can't use output_return with output: " + output)
|
||||
|
||||
|
||||
def background(cmd, working_dir=None):
|
||||
def background(cmd: str, working_dir: PathString | None = None) -> subprocess.Popen:
|
||||
"""Run a subprocess in background and redirect its output to the log."""
|
||||
ret = subprocess.Popen(
|
||||
cmd, stdout=pmb.helpers.logging.logfd, stderr=pmb.helpers.logging.logfd, cwd=working_dir
|
||||
|
@ -81,7 +81,7 @@ def background(cmd, working_dir=None):
|
|||
return ret
|
||||
|
||||
|
||||
def pipe(cmd, working_dir=None):
|
||||
def pipe(cmd: str, working_dir: PathString | None = None) -> subprocess.Popen:
|
||||
"""Run a subprocess in background and redirect its output to a pipe."""
|
||||
ret = subprocess.Popen(
|
||||
cmd,
|
||||
|
@ -156,7 +156,9 @@ def pipe_read(
|
|||
return
|
||||
|
||||
|
||||
def kill_process_tree(pid, ppids, sudo):
|
||||
# FIXME: The docstring claims that ppids should be a list of "process ID tuples", but in practice it
|
||||
# gets called with a list of string lists for the ppids argument.
|
||||
def kill_process_tree(pid: int | str, ppids: list[list[str]], sudo: bool) -> None:
|
||||
"""Recursively kill a pid and its child processes.
|
||||
|
||||
:param pid: process id that will be killed
|
||||
|
@ -173,7 +175,7 @@ def kill_process_tree(pid, ppids, sudo):
|
|||
kill_process_tree(child_pid, ppids, sudo)
|
||||
|
||||
|
||||
def kill_command(pid, sudo):
|
||||
def kill_command(pid: int, sudo: bool) -> None:
|
||||
"""Kill a command process and recursively kill its child processes.
|
||||
|
||||
:param pid: process id that will be killed
|
||||
|
@ -268,7 +270,7 @@ def foreground_pipe(
|
|||
return (process.returncode, b"".join(output_buffer).decode("utf-8"))
|
||||
|
||||
|
||||
def foreground_tui(cmd, working_dir=None):
|
||||
def foreground_tui(cmd: str, working_dir: PathString | None = None) -> int:
|
||||
"""Run a subprocess in foreground without redirecting any of its output.
|
||||
|
||||
This is the only way text-based user interfaces (ncurses programs like
|
||||
|
@ -279,7 +281,7 @@ def foreground_tui(cmd, working_dir=None):
|
|||
return process.wait()
|
||||
|
||||
|
||||
def check_return_code(code, log_message):
|
||||
def check_return_code(code: int, log_message: str) -> None:
|
||||
"""Check the return code of a command.
|
||||
|
||||
:param code: exit code to check
|
||||
|
@ -298,7 +300,7 @@ def check_return_code(code, log_message):
|
|||
raise RuntimeError(f"Command failed (exit code {str(code)}): " + log_message)
|
||||
|
||||
|
||||
def sudo_timer_iterate():
|
||||
def sudo_timer_iterate() -> None:
|
||||
"""Run sudo -v and schedule a new timer to repeat the same."""
|
||||
if pmb.config.which_sudo() == "sudo":
|
||||
subprocess.Popen(["sudo", "-v"]).wait()
|
||||
|
@ -310,7 +312,7 @@ def sudo_timer_iterate():
|
|||
timer.start()
|
||||
|
||||
|
||||
def sudo_timer_start():
|
||||
def sudo_timer_start() -> None:
|
||||
"""Start a timer to call sudo -v periodically, so that the password is only needed once."""
|
||||
if "sudo_timer_active" in pmb.helpers.other.cache:
|
||||
return
|
||||
|
@ -319,7 +321,7 @@ def sudo_timer_start():
|
|||
sudo_timer_iterate()
|
||||
|
||||
|
||||
def add_proxy_env_vars(env):
|
||||
def add_proxy_env_vars(env: Env) -> None:
|
||||
"""Add proxy environment variables from host to the environment of the command we are running.
|
||||
|
||||
:param env: dict of environment variables, it will be extended with all of the proxy env vars
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Copyright 2023 Clayton Craft
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
import os
|
||||
from pmb.core.arch import Arch
|
||||
from pmb.core.pkgrepo import pkgrepo_iglob
|
||||
from pmb.types import WithExtraRepos
|
||||
import pmb.helpers.pmaports
|
||||
|
@ -8,7 +9,7 @@ import pmb.helpers.package
|
|||
import pmb.parse
|
||||
|
||||
|
||||
def list_ui(arch):
|
||||
def list_ui(arch: Arch) -> list[tuple[str, str]]:
|
||||
"""Get all UIs, for which aports are available with their description.
|
||||
|
||||
:param arch: device architecture, for which the UIs must be available
|
||||
|
|
|
@ -673,7 +673,7 @@ def write_cgpt_kpart(args: PmbArgs, layout: PartitionLayout, suffix: Chroot) ->
|
|||
pmb.chroot.root(["dd", f"if={filename}", f"of=/dev/installp{layout['kernel']}"])
|
||||
|
||||
|
||||
def sanity_check_boot_size():
|
||||
def sanity_check_boot_size() -> None:
|
||||
default = Config().boot_size
|
||||
config = get_context().config
|
||||
if int(config.boot_size) >= int(default):
|
||||
|
|
|
@ -6,7 +6,7 @@ from pmb.core import Chroot
|
|||
from pmb.types import PartitionLayout, PmbArgs, PathString
|
||||
|
||||
|
||||
def install_fsprogs(filesystem):
|
||||
def install_fsprogs(filesystem: str) -> None:
|
||||
"""Install the package required to format a specific filesystem."""
|
||||
fsprogs = pmb.config.filesystems.get(filesystem)
|
||||
if not fsprogs:
|
||||
|
|
|
@ -12,7 +12,7 @@ FuncReturn = TypeVar("FuncReturn")
|
|||
|
||||
|
||||
class Wrapper(Generic[FuncArgs, FuncReturn]):
|
||||
def __init__(self, cache: "Cache", func: Callable[[FuncArgs], FuncReturn]):
|
||||
def __init__(self, cache: "Cache", func: Callable[[FuncArgs], FuncReturn]) -> None:
|
||||
self.cache = cache
|
||||
self.func = func
|
||||
self.disabled = False
|
||||
|
@ -47,12 +47,12 @@ class Wrapper(Generic[FuncArgs, FuncReturn]):
|
|||
|
||||
return self.cache.cache[key]
|
||||
|
||||
def cache_clear(self):
|
||||
def cache_clear(self) -> None:
|
||||
self.cache.clear()
|
||||
self.misses = 0
|
||||
self.hits = 0
|
||||
|
||||
def cache_disable(self):
|
||||
def cache_disable(self) -> None:
|
||||
self.disabled = True
|
||||
|
||||
|
||||
|
@ -64,7 +64,7 @@ class Cache:
|
|||
function is called with the given value. For example, in pmb.build._package
|
||||
we never want to use the cached result when called with force=True."""
|
||||
|
||||
def __init__(self, *args, cache_deepcopy=False, **kwargs):
|
||||
def __init__(self, *args: str, cache_deepcopy: bool = False, **kwargs: Any) -> None:
|
||||
for a in args:
|
||||
if not isinstance(a, str):
|
||||
raise ValueError(f"Cache key must be a string, not {type(a)}")
|
||||
|
@ -72,7 +72,7 @@ class Cache:
|
|||
if len(args) != len(set(args)):
|
||||
raise ValueError("Duplicate cache key properties")
|
||||
|
||||
self.cache = {}
|
||||
self.cache: dict[str, Any] = {}
|
||||
self.params = args
|
||||
self.kwargs = kwargs
|
||||
self.cache_deepcopy = cache_deepcopy
|
||||
|
@ -144,5 +144,5 @@ class Cache:
|
|||
# FIXME: Once PEP-695 generics are in we shouldn't need this.
|
||||
return Wrapper(self, func)
|
||||
|
||||
def clear(self):
|
||||
def clear(self) -> None:
|
||||
self.cache.clear()
|
||||
|
|
|
@ -29,7 +29,9 @@ import pmb.helpers.pmaports
|
|||
"""
|
||||
|
||||
|
||||
def toggle_other_boolean_flags(*other_destinations, value=True):
|
||||
def toggle_other_boolean_flags(
|
||||
*other_destinations: str, value: bool = True
|
||||
) -> type[argparse.Action]:
|
||||
"""Group several argparse flags to one.
|
||||
|
||||
Sets multiple other_destination to value.
|
||||
|
@ -52,12 +54,12 @@ def toggle_other_boolean_flags(*other_destinations, value=True):
|
|||
return SetOtherDestinationsAction
|
||||
|
||||
|
||||
def type_ondev_cp(val):
|
||||
def type_ondev_cp(val: str) -> list[str]:
|
||||
"""Parse and validate arguments to 'pmbootstrap install --ondev --cp'.
|
||||
|
||||
:param val: 'HOST_SRC:CHROOT_DEST' string
|
||||
|
||||
:returns: (HOST_SRC, CHROOT_DEST)
|
||||
:returns: [HOST_SRC, CHROOT_DEST]
|
||||
"""
|
||||
ret = val.split(":")
|
||||
|
||||
|
@ -75,7 +77,7 @@ def type_ondev_cp(val):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_install(subparser):
|
||||
def arguments_install(subparser: argparse._SubParsersAction) -> None:
|
||||
ret = subparser.add_parser(
|
||||
"install", help="set up device specific chroot and install to SD card or image file"
|
||||
)
|
||||
|
@ -266,7 +268,7 @@ def arguments_install(subparser):
|
|||
)
|
||||
|
||||
|
||||
def arguments_export(subparser):
|
||||
def arguments_export(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"export",
|
||||
help="create convenience symlinks"
|
||||
|
@ -297,7 +299,7 @@ def arguments_export(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_sideload(subparser):
|
||||
def arguments_sideload(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"sideload", help="Push packages to a running phone connected over usb or wifi"
|
||||
)
|
||||
|
@ -325,7 +327,7 @@ def arguments_sideload(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_flasher(subparser):
|
||||
def arguments_flasher(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("flasher", help="flash something to the target device")
|
||||
ret.add_argument("--method", help="override flash method", dest="flash_method", default=None)
|
||||
sub = ret.add_subparsers(dest="action_flasher")
|
||||
|
@ -431,7 +433,7 @@ def arguments_flasher(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_initfs(subparser):
|
||||
def arguments_initfs(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("initfs", help="do something with the initramfs")
|
||||
sub = ret.add_subparsers(dest="action_initfs")
|
||||
|
||||
|
@ -457,7 +459,7 @@ def arguments_initfs(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_qemu(subparser):
|
||||
def arguments_qemu(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("qemu")
|
||||
ret.add_argument("--cmdline", help="override kernel commandline")
|
||||
ret.add_argument(
|
||||
|
@ -549,7 +551,7 @@ def arguments_qemu(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_pkgrel_bump(subparser):
|
||||
def arguments_pkgrel_bump(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"pkgrel_bump",
|
||||
help="increase the pkgrel to"
|
||||
|
@ -577,7 +579,7 @@ def arguments_pkgrel_bump(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_pkgver_bump(subparser):
|
||||
def arguments_pkgver_bump(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"pkgver_bump",
|
||||
help="increase the pkgver and reset pkgrel to 0. useful when dealing with metapackages.",
|
||||
|
@ -587,7 +589,7 @@ def arguments_pkgver_bump(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_aportupgrade(subparser):
|
||||
def arguments_aportupgrade(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"aportupgrade", help="check for outdated packages that need upgrading"
|
||||
)
|
||||
|
@ -609,7 +611,7 @@ def arguments_aportupgrade(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_newapkbuild(subparser):
|
||||
def arguments_newapkbuild(subparser: argparse._SubParsersAction) -> None:
|
||||
"""
|
||||
Wrapper for Alpine's "newapkbuild" command.
|
||||
|
||||
|
@ -651,7 +653,7 @@ def arguments_newapkbuild(subparser):
|
|||
)
|
||||
|
||||
|
||||
def arguments_kconfig(subparser):
|
||||
def arguments_kconfig(subparser: argparse._SubParsersAction) -> None:
|
||||
# Allowed architectures
|
||||
arch_choices = Arch.supported()
|
||||
|
||||
|
@ -726,7 +728,7 @@ def arguments_repo_bootstrap(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_repo_missing(subparser):
|
||||
def arguments_repo_missing(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("repo_missing")
|
||||
package = ret.add_argument(
|
||||
"package", nargs="?", help="only look at a specific package and its dependencies"
|
||||
|
@ -745,23 +747,23 @@ def arguments_repo_missing(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_lint(subparser):
|
||||
def arguments_lint(subparser: argparse._SubParsersAction) -> None:
|
||||
lint = subparser.add_parser("lint", help="run quality checks on pmaports (required to pass CI)")
|
||||
add_packages_arg(lint, nargs="*")
|
||||
|
||||
|
||||
def arguments_test(subparser):
|
||||
def arguments_test(subparser: argparse._SubParsersAction) -> None:
|
||||
test = subparser.add_parser("test", help="Internal pmbootstrap test tools")
|
||||
sub = test.add_subparsers(dest="action_test", required=True)
|
||||
sub.add_parser("apkindex_parse_all", help="parse all APKINDEX files")
|
||||
|
||||
|
||||
def arguments_status(subparser):
|
||||
def arguments_status(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("status", help="show a config and pmaports overview")
|
||||
return ret
|
||||
|
||||
|
||||
def arguments_netboot(subparser):
|
||||
def arguments_netboot(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser("netboot", help="launch nbd server with pmOS rootfs")
|
||||
sub = ret.add_subparsers(dest="action_netboot")
|
||||
sub.required = True
|
||||
|
@ -772,7 +774,7 @@ def arguments_netboot(subparser):
|
|||
return ret
|
||||
|
||||
|
||||
def arguments_ci(subparser):
|
||||
def arguments_ci(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
ret = subparser.add_parser(
|
||||
"ci",
|
||||
help="run continuous integration scripts locally of git repo in current directory",
|
||||
|
@ -1294,7 +1296,7 @@ def get_parser():
|
|||
return parser
|
||||
|
||||
|
||||
def arguments():
|
||||
def arguments() -> PmbArgs:
|
||||
args: PmbArgs = get_parser().parse_args()
|
||||
|
||||
if getattr(args, "fork_alpine_retain_branch", False):
|
||||
|
|
|
@ -7,7 +7,8 @@ import pmb.config
|
|||
# Return: {magic: ..., mask: ...}
|
||||
|
||||
|
||||
def binfmt_info(arch_qemu):
|
||||
# FIXME: Maybe this should use Arch instead of str.
|
||||
def binfmt_info(arch_qemu: str) -> dict[str, str]:
|
||||
# Parse the info file
|
||||
full = {}
|
||||
info = pmb.config.pmb_src / "pmb/data/qemu-user-binfmt.txt"
|
||||
|
|
|
@ -4,6 +4,7 @@ from pathlib import Path
|
|||
from pmb.helpers import logging
|
||||
import re
|
||||
import os
|
||||
from typing import Literal, overload
|
||||
|
||||
import pmb.build
|
||||
import pmb.config
|
||||
|
@ -14,7 +15,7 @@ from pmb.helpers.exceptions import NonBugError
|
|||
from pmb.types import PathString
|
||||
|
||||
|
||||
def is_set(config, option):
|
||||
def is_set(config: str, option: str) -> bool:
|
||||
"""
|
||||
Check, whether a boolean or tristate option is enabled
|
||||
either as builtin or module.
|
||||
|
@ -26,7 +27,7 @@ def is_set(config, option):
|
|||
return re.search("^CONFIG_" + option + "=[ym]$", config, re.M) is not None
|
||||
|
||||
|
||||
def is_set_str(config, option, string):
|
||||
def is_set_str(config: str, option: str, string: str) -> bool:
|
||||
"""
|
||||
Check, whether a config option contains a string as value.
|
||||
|
||||
|
@ -42,7 +43,7 @@ def is_set_str(config, option, string):
|
|||
return False
|
||||
|
||||
|
||||
def is_in_array(config, option, string):
|
||||
def is_in_array(config: str, option: str, string: str) -> bool:
|
||||
"""
|
||||
Check, whether a config option contains string as an array element
|
||||
|
||||
|
@ -59,7 +60,14 @@ def is_in_array(config, option, string):
|
|||
return False
|
||||
|
||||
|
||||
def check_option(component, details, config, config_path, option, option_value):
|
||||
def check_option(
|
||||
component: str,
|
||||
details: bool,
|
||||
config: str,
|
||||
config_path: PathString,
|
||||
option: str,
|
||||
option_value: bool | str | list[str],
|
||||
) -> bool:
|
||||
"""
|
||||
Check, whether one kernel config option has a given value.
|
||||
|
||||
|
@ -72,7 +80,7 @@ def check_option(component, details, config, config_path, option, option_value):
|
|||
:returns: True if the check passed, False otherwise
|
||||
"""
|
||||
|
||||
def warn_ret_false(should_str):
|
||||
def warn_ret_false(should_str: str) -> bool:
|
||||
config_name = os.path.basename(config_path)
|
||||
if details:
|
||||
logging.warning(
|
||||
|
@ -107,8 +115,14 @@ def check_option(component, details, config, config_path, option, option_value):
|
|||
|
||||
|
||||
def check_config_options_set(
|
||||
config, config_path, config_arch, options, component, pkgver, details=False
|
||||
):
|
||||
config: str,
|
||||
config_path: PathString,
|
||||
config_arch: str, # TODO: Replace with Arch type?
|
||||
options: dict[str, dict],
|
||||
component: str,
|
||||
pkgver: str,
|
||||
details: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Check, whether all the kernel config passes all rules of one component.
|
||||
|
||||
|
@ -195,7 +209,27 @@ def check_config(
|
|||
return all(ret)
|
||||
|
||||
|
||||
def check(pkgname, components_list=[], details=False, must_exist=True):
|
||||
@overload
|
||||
def check(
|
||||
pkgname: str,
|
||||
components_list: list[str] = ...,
|
||||
details: bool = ...,
|
||||
must_exist: Literal[False] = ...,
|
||||
) -> bool | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def check(
|
||||
pkgname: str,
|
||||
components_list: list[str] = ...,
|
||||
details: bool = ...,
|
||||
must_exist: Literal[True] = ...,
|
||||
) -> bool: ...
|
||||
|
||||
|
||||
def check(
|
||||
pkgname: str, components_list: list[str] = [], details: bool = False, must_exist: bool = True
|
||||
) -> bool | None:
|
||||
"""
|
||||
Check for necessary kernel config options in a package.
|
||||
|
||||
|
@ -263,7 +297,8 @@ def check(pkgname, components_list=[], details=False, must_exist=True):
|
|||
return ret
|
||||
|
||||
|
||||
def extract_arch(config_path):
|
||||
# TODO: Make this use the Arch type probably
|
||||
def extract_arch(config_path: PathString) -> str:
|
||||
# Extract the architecture out of the config
|
||||
with open(config_path) as f:
|
||||
config = f.read()
|
||||
|
@ -283,7 +318,7 @@ def extract_arch(config_path):
|
|||
return "unknown"
|
||||
|
||||
|
||||
def extract_version(config_path):
|
||||
def extract_version(config_path: PathString) -> str:
|
||||
# Try to extract the version string out of the comment header
|
||||
with open(config_path) as f:
|
||||
# Read the first 3 lines of the file and get the third line only
|
||||
|
|
|
@ -10,7 +10,7 @@ https://git.alpinelinux.org/cgit/apk-tools/tree/src/version.c
|
|||
"""
|
||||
|
||||
|
||||
def token_value(string):
|
||||
def token_value(string: str) -> int:
|
||||
"""
|
||||
Return the associated value for a given token string (we parse
|
||||
through the version string one token at a time).
|
||||
|
@ -35,7 +35,7 @@ def token_value(string):
|
|||
return order[string]
|
||||
|
||||
|
||||
def next_token(previous, rest):
|
||||
def next_token(previous: str, rest: str) -> tuple[str, str]:
|
||||
"""
|
||||
Parse the next token in the rest of the version string, we're
|
||||
currently looking at.
|
||||
|
@ -90,7 +90,7 @@ def next_token(previous, rest):
|
|||
return (next, rest)
|
||||
|
||||
|
||||
def parse_suffix(rest):
|
||||
def parse_suffix(rest: str) -> tuple[str, int, bool]:
|
||||
"""
|
||||
Cut off the suffix of rest (which is now at the beginning of the
|
||||
rest variable, but regarding the whole version string, it is a
|
||||
|
@ -188,7 +188,7 @@ def get_token(previous, rest):
|
|||
return (next, value, rest)
|
||||
|
||||
|
||||
def validate(version):
|
||||
def validate(version: str) -> bool:
|
||||
"""
|
||||
Check whether one version string is valid.
|
||||
|
||||
|
@ -277,7 +277,7 @@ Convenience functions below are not modeled after apk's version.c.
|
|||
"""
|
||||
|
||||
|
||||
def check_string(a_version, rule):
|
||||
def check_string(a_version: str, rule: str) -> bool:
|
||||
"""
|
||||
Compare a version against a check string. This is used in "pmbootstrap
|
||||
kconfig check", to only require certain options if the pkgver is in a
|
||||
|
|
|
@ -287,7 +287,7 @@ def command_qemu(
|
|||
return (command, env)
|
||||
|
||||
|
||||
def resize_image(img_size_new, img_path):
|
||||
def resize_image(img_size_new: str, img_path: Path) -> None:
|
||||
"""
|
||||
Truncates an image to a specific size. The value must be larger than the
|
||||
current image size, and it must be specified in MiB or GiB units (powers of
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue