forked from Mirror/pmbootstrap
pmb: Add more type hints (MR 2489)
This commit is contained in:
parent
0e9a2e596f
commit
472726a9dc
9 changed files with 79 additions and 28 deletions
|
@ -192,7 +192,7 @@ def _init(pkgname: str, arch: Arch | None) -> tuple[str, Arch, Any, Chroot, Env]
|
||||||
|
|
||||||
extract_and_patch_sources(pkgname, arch)
|
extract_and_patch_sources(pkgname, arch)
|
||||||
|
|
||||||
env = {
|
env: Env = {
|
||||||
"ARCH": arch.kernel(),
|
"ARCH": arch.kernel(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ class BuildStatus(enum.Enum):
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return self.value
|
return self.value
|
||||||
|
|
||||||
def necessary(self):
|
def necessary(self) -> bool:
|
||||||
return self in [BuildStatus.OUTDATED, BuildStatus.NEW]
|
return self in [BuildStatus.OUTDATED, BuildStatus.NEW]
|
||||||
|
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ def get_status(arch: Arch | None, apkbuild: dict[str, Any]) -> BuildStatus:
|
||||||
return BuildStatus.UNNECESSARY
|
return BuildStatus.UNNECESSARY
|
||||||
|
|
||||||
|
|
||||||
def index_repo(arch=None):
|
def index_repo(arch: Arch | None = None) -> None:
|
||||||
"""Recreate the APKINDEX.tar.gz for a specific repo, and clear the parsing
|
"""Recreate the APKINDEX.tar.gz for a specific repo, and clear the parsing
|
||||||
cache for that file for the current pmbootstrap session (to prevent
|
cache for that file for the current pmbootstrap session (to prevent
|
||||||
rebuilding packages twice, in case the rebuild takes less than a second).
|
rebuilding packages twice, in case the rebuild takes less than a second).
|
||||||
|
|
|
@ -11,7 +11,7 @@ import pmb.parse
|
||||||
import pmb.chroot.apk
|
import pmb.chroot.apk
|
||||||
|
|
||||||
|
|
||||||
def is_registered(arch_qemu: Arch) -> bool:
|
def is_registered(arch_qemu: str | Arch) -> bool:
|
||||||
return os.path.exists(f"/proc/sys/fs/binfmt_misc/qemu-{arch_qemu}")
|
return os.path.exists(f"/proc/sys/fs/binfmt_misc/qemu-{arch_qemu}")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ class Arch(enum.Enum):
|
||||||
global _cached_native_arch
|
global _cached_native_arch
|
||||||
return _cached_native_arch
|
return _cached_native_arch
|
||||||
|
|
||||||
def is_native(self):
|
def is_native(self) -> bool:
|
||||||
return self == Arch.native()
|
return self == Arch.native()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -87,7 +87,7 @@ class Arch(enum.Enum):
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
def kernel(self):
|
def kernel(self) -> str:
|
||||||
mapping = {
|
mapping = {
|
||||||
Arch.x86: "x86",
|
Arch.x86: "x86",
|
||||||
Arch.x86_64: "x86_64",
|
Arch.x86_64: "x86_64",
|
||||||
|
@ -102,7 +102,7 @@ class Arch(enum.Enum):
|
||||||
}
|
}
|
||||||
return mapping.get(self, self.value)
|
return mapping.get(self, self.value)
|
||||||
|
|
||||||
def qemu(self):
|
def qemu(self) -> str:
|
||||||
mapping = {
|
mapping = {
|
||||||
Arch.x86: "i386",
|
Arch.x86: "i386",
|
||||||
Arch.armhf: "arm",
|
Arch.armhf: "arm",
|
||||||
|
@ -110,7 +110,7 @@ class Arch(enum.Enum):
|
||||||
}
|
}
|
||||||
return mapping.get(self, self.value)
|
return mapping.get(self, self.value)
|
||||||
|
|
||||||
def alpine_triple(self):
|
def alpine_triple(self) -> str:
|
||||||
"""Get the cross compiler triple for this architecture on Alpine."""
|
"""Get the cross compiler triple for this architecture on Alpine."""
|
||||||
mapping = {
|
mapping = {
|
||||||
Arch.aarch64: "aarch64-alpine-linux-musl",
|
Arch.aarch64: "aarch64-alpine-linux-musl",
|
||||||
|
@ -139,7 +139,7 @@ class Arch(enum.Enum):
|
||||||
|
|
||||||
raise ValueError(f"Can not map Alpine architecture '{self}'" " to the right hostspec value")
|
raise ValueError(f"Can not map Alpine architecture '{self}'" " to the right hostspec value")
|
||||||
|
|
||||||
def cpu_emulation_required(self):
|
def cpu_emulation_required(self) -> bool:
|
||||||
# Obvious case: host arch is target arch
|
# Obvious case: host arch is target arch
|
||||||
if self == Arch.native():
|
if self == Arch.native():
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -14,12 +14,12 @@ from pmb.core.context import get_context
|
||||||
class ReadlineTabCompleter:
|
class ReadlineTabCompleter:
|
||||||
"""Store intermediate state for completer function."""
|
"""Store intermediate state for completer function."""
|
||||||
|
|
||||||
def __init__(self, options):
|
def __init__(self, options: list[str]) -> None:
|
||||||
""":param options: list of possible completions."""
|
""":param options: list of possible completions."""
|
||||||
self.options = sorted(options)
|
self.options = sorted(options)
|
||||||
self.matches = []
|
self.matches: list[str] = []
|
||||||
|
|
||||||
def completer_func(self, input_text, iteration):
|
def completer_func(self, input_text: str, iteration: int) -> str | None:
|
||||||
"""
|
"""
|
||||||
:param input_text: text that shall be autocompleted
|
:param input_text: text that shall be autocompleted
|
||||||
:param iteration: how many times "tab" was hit
|
:param iteration: how many times "tab" was hit
|
||||||
|
@ -104,7 +104,9 @@ def ask(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def confirm(question="Continue?", default=False, no_assumptions=False):
|
def confirm(
|
||||||
|
question: str = "Continue?", default: bool = False, no_assumptions: bool = False
|
||||||
|
) -> bool:
|
||||||
"""Convenience wrapper around ask for simple yes-no questions with validation.
|
"""Convenience wrapper around ask for simple yes-no questions with validation.
|
||||||
|
|
||||||
:param no_assumptions: ask for confirmation, even if "pmbootstrap -y' is set
|
:param no_assumptions: ask for confirmation, even if "pmbootstrap -y' is set
|
||||||
|
@ -118,7 +120,7 @@ def confirm(question="Continue?", default=False, no_assumptions=False):
|
||||||
return answer == "y"
|
return answer == "y"
|
||||||
|
|
||||||
|
|
||||||
def progress_print(progress):
|
def progress_print(progress: float) -> None:
|
||||||
"""Print a snapshot of a progress bar to STDOUT.
|
"""Print a snapshot of a progress bar to STDOUT.
|
||||||
|
|
||||||
Call progress_flush to end printing progress and clear the line. No output is printed in
|
Call progress_flush to end printing progress and clear the line. No output is printed in
|
||||||
|
@ -141,7 +143,7 @@ def progress_print(progress):
|
||||||
sys.stdout.write("\u001b8\u001b[0K")
|
sys.stdout.write("\u001b8\u001b[0K")
|
||||||
|
|
||||||
|
|
||||||
def progress_flush():
|
def progress_flush() -> None:
|
||||||
"""Finish printing a progress bar.
|
"""Finish printing a progress bar.
|
||||||
|
|
||||||
This will erase the line. Does nothing in non-interactive mode.
|
This will erase the line. Does nothing in non-interactive mode.
|
||||||
|
|
|
@ -19,7 +19,7 @@ def find_path(codename: str, file: str = "") -> Path | None:
|
||||||
return g
|
return g
|
||||||
|
|
||||||
|
|
||||||
def list_codenames(vendor=None, archived=True):
|
def list_codenames(vendor: str | None = None, archived: bool = True) -> list[str]:
|
||||||
"""Get all devices, for which aports are available.
|
"""Get all devices, for which aports are available.
|
||||||
|
|
||||||
:param vendor: vendor name to choose devices from, or None for all vendors
|
:param vendor: vendor name to choose devices from, or None for all vendors
|
||||||
|
|
|
@ -7,6 +7,7 @@ import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import shutil
|
import shutil
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from typing import Literal, overload
|
||||||
import pmb.helpers.cli
|
import pmb.helpers.cli
|
||||||
|
|
||||||
from pmb.core.context import get_context
|
from pmb.core.context import get_context
|
||||||
|
@ -18,9 +19,36 @@ def cache_file(prefix: str, url: str) -> Path:
|
||||||
return Path(f"{prefix}_{hashlib.sha256(url.encode('utf-8')).hexdigest()}")
|
return Path(f"{prefix}_{hashlib.sha256(url.encode('utf-8')).hexdigest()}")
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
def download(
|
def download(
|
||||||
url, prefix, cache=True, loglevel=logging.INFO, allow_404=False, flush_progress_bar_on_404=False
|
url: str,
|
||||||
):
|
prefix: str,
|
||||||
|
cache: bool = ...,
|
||||||
|
loglevel: int = ...,
|
||||||
|
allow_404: Literal[False] = ...,
|
||||||
|
flush_progress_bar_on_404: bool = ...,
|
||||||
|
) -> Path: ...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def download(
|
||||||
|
url: str,
|
||||||
|
prefix: str,
|
||||||
|
cache: bool = ...,
|
||||||
|
loglevel: int = ...,
|
||||||
|
allow_404: Literal[True] = ...,
|
||||||
|
flush_progress_bar_on_404: bool = ...,
|
||||||
|
) -> Path | None: ...
|
||||||
|
|
||||||
|
|
||||||
|
def download(
|
||||||
|
url: str,
|
||||||
|
prefix: str,
|
||||||
|
cache: bool = True,
|
||||||
|
loglevel: int = logging.INFO,
|
||||||
|
allow_404: bool = False,
|
||||||
|
flush_progress_bar_on_404: bool = False,
|
||||||
|
) -> Path | None:
|
||||||
"""Download a file to disk.
|
"""Download a file to disk.
|
||||||
|
|
||||||
:param url: the http(s) address of to the file to download
|
:param url: the http(s) address of to the file to download
|
||||||
|
@ -73,7 +101,21 @@ def download(
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
def retrieve(url, headers=None, allow_404=False):
|
@overload
|
||||||
|
def retrieve(
|
||||||
|
url: str, headers: dict[str, str] | None = ..., allow_404: Literal[False] = ...
|
||||||
|
) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def retrieve(
|
||||||
|
url: str, headers: dict[str, str] | None = ..., allow_404: Literal[True] = ...
|
||||||
|
) -> str | None: ...
|
||||||
|
|
||||||
|
|
||||||
|
def retrieve(
|
||||||
|
url: str, headers: dict[str, str] | None = None, allow_404: bool = False
|
||||||
|
) -> str | None:
|
||||||
"""Fetch the content of a URL and returns it as string.
|
"""Fetch the content of a URL and returns it as string.
|
||||||
|
|
||||||
:param url: the http(s) address of to the resource to fetch
|
:param url: the http(s) address of to the resource to fetch
|
||||||
|
|
|
@ -58,7 +58,7 @@ def get_subpartitions_size(chroot: Chroot) -> tuple[int, int]:
|
||||||
return (boot, round(root))
|
return (boot, round(root))
|
||||||
|
|
||||||
|
|
||||||
def get_nonfree_packages(device):
|
def get_nonfree_packages(device: str) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Get any legacy non-free subpackages in the APKBUILD.
|
Get any legacy non-free subpackages in the APKBUILD.
|
||||||
Also see: https://postmarketos.org/edge/2024/02/15/default-nonfree-fw/
|
Also see: https://postmarketos.org/edge/2024/02/15/default-nonfree-fw/
|
||||||
|
|
|
@ -8,6 +8,7 @@ import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import pmb.config
|
import pmb.config
|
||||||
from pmb.meta import Cache
|
from pmb.meta import Cache
|
||||||
|
@ -33,7 +34,7 @@ revar5 = re.compile(r"([a-zA-Z_]+[a-zA-Z0-9_]*)=")
|
||||||
|
|
||||||
|
|
||||||
def replace_variable(apkbuild: Apkbuild, value: str) -> str:
|
def replace_variable(apkbuild: Apkbuild, value: str) -> str:
|
||||||
def log_key_not_found(match):
|
def log_key_not_found(match: re.Match) -> None:
|
||||||
logging.verbose(
|
logging.verbose(
|
||||||
f"{apkbuild['pkgname']}: key '{match.group(1)}' for"
|
f"{apkbuild['pkgname']}: key '{match.group(1)}' for"
|
||||||
f" replacing '{match.group(0)}' not found, ignoring"
|
f" replacing '{match.group(0)}' not found, ignoring"
|
||||||
|
@ -135,7 +136,9 @@ def read_file(path: Path) -> list[str]:
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
|
|
||||||
def parse_next_attribute(lines, i, path):
|
def parse_next_attribute(
|
||||||
|
lines: list[str], i: int, path: Path
|
||||||
|
) -> tuple[str, str, int] | tuple[None, None, int]:
|
||||||
"""
|
"""
|
||||||
Parse one attribute from the APKBUILD.
|
Parse one attribute from the APKBUILD.
|
||||||
|
|
||||||
|
@ -196,7 +199,9 @@ def parse_next_attribute(lines, i, path):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _parse_attributes(path, lines, apkbuild_attributes, ret):
|
def _parse_attributes(
|
||||||
|
path: Path, lines: list[str], apkbuild_attributes: dict[str, dict[str, bool]], ret: Apkbuild
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Parse attributes from a list of lines. Variables are replaced with values
|
Parse attributes from a list of lines. Variables are replaced with values
|
||||||
from ret (if found) and split into the format configured in
|
from ret (if found) and split into the format configured in
|
||||||
|
@ -209,7 +214,7 @@ def _parse_attributes(path, lines, apkbuild_attributes, ret):
|
||||||
# Parse all variables first, and replace variables mentioned earlier
|
# Parse all variables first, and replace variables mentioned earlier
|
||||||
for i in range(len(lines)):
|
for i in range(len(lines)):
|
||||||
attribute, value, i = parse_next_attribute(lines, i, path)
|
attribute, value, i = parse_next_attribute(lines, i, path)
|
||||||
if not attribute:
|
if not attribute or not value:
|
||||||
continue
|
continue
|
||||||
ret[attribute] = replace_variable(ret, value)
|
ret[attribute] = replace_variable(ret, value)
|
||||||
|
|
||||||
|
@ -237,7 +242,9 @@ def _parse_attributes(path, lines, apkbuild_attributes, ret):
|
||||||
del ret[attribute]
|
del ret[attribute]
|
||||||
|
|
||||||
|
|
||||||
def _parse_subpackage(path, lines, apkbuild, subpackages, subpkg):
|
def _parse_subpackage(
|
||||||
|
path: Path, lines: list[str], apkbuild: Apkbuild, subpackages: dict[str, Any], subpkg: str
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Attempt to parse attributes from a subpackage function.
|
Attempt to parse attributes from a subpackage function.
|
||||||
This will attempt to locate the subpackage function in the APKBUILD and
|
This will attempt to locate the subpackage function in the APKBUILD and
|
||||||
|
@ -405,7 +412,7 @@ def kernels(device: str) -> dict[str, str] | None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _parse_comment_tags(lines, tag):
|
def _parse_comment_tags(lines: list[str], tag: str) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Parse tags defined as comments in a APKBUILD file. This can be used to
|
Parse tags defined as comments in a APKBUILD file. This can be used to
|
||||||
parse e.g. the maintainers of a package (defined using # Maintainer:).
|
parse e.g. the maintainers of a package (defined using # Maintainer:).
|
||||||
|
@ -422,7 +429,7 @@ def _parse_comment_tags(lines, tag):
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def maintainers(path):
|
def maintainers(path: Path) -> list[str] | None:
|
||||||
"""
|
"""
|
||||||
Parse maintainers of an APKBUILD file. They should be defined using
|
Parse maintainers of an APKBUILD file. They should be defined using
|
||||||
# Maintainer: (first maintainer) and # Co-Maintainer: (additional
|
# Maintainer: (first maintainer) and # Co-Maintainer: (additional
|
||||||
|
@ -447,7 +454,7 @@ def maintainers(path):
|
||||||
return maintainers
|
return maintainers
|
||||||
|
|
||||||
|
|
||||||
def archived(path):
|
def archived(path: Path) -> str | None:
|
||||||
"""
|
"""
|
||||||
Return if (and why) an APKBUILD might be archived. This should be
|
Return if (and why) an APKBUILD might be archived. This should be
|
||||||
defined using a # Archived: <reason> tag in the APKBUILD.
|
defined using a # Archived: <reason> tag in the APKBUILD.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue