pmb: Add more types and fix type errors (MR 2514)

This commit is contained in:
Newbyte 2024-12-20 16:29:33 +01:00
parent 3ee75e61a5
commit 01264bd39f
No known key found for this signature in database
GPG key ID: 5AE7F5513E0885CB
6 changed files with 102 additions and 38 deletions

View file

@ -102,7 +102,7 @@ def _prepare_fifo() -> Path:
return fifo return fifo
def _create_command_with_progress(command, fifo): def _create_command_with_progress(command: list[str], fifo: Path) -> list[str]:
"""Build a full apk command from a subcommand, set up to redirect progress into a fifo. """Build a full apk command from a subcommand, set up to redirect progress into a fifo.
:param command: apk subcommand in list form :param command: apk subcommand in list form
@ -132,7 +132,7 @@ def _compute_progress(line: str) -> float:
return cur / tot if tot > 0 else 0 return cur / tot if tot > 0 else 0
def _apk_with_progress(command: Sequence[str]) -> None: def _apk_with_progress(command: list[str]) -> None:
"""Run an apk subcommand while printing a progress bar to STDOUT. """Run an apk subcommand while printing a progress bar to STDOUT.
:param command: apk subcommand in list form :param command: apk subcommand in list form

View file

@ -55,7 +55,7 @@ def read_signature_info(tar: tarfile.TarFile) -> tuple[str, str]:
return (sigfilename, sigkey_path) return (sigfilename, sigkey_path)
def extract_temp(tar, sigfilename): def extract_temp(tar: tarfile.TarFile, sigfilename: str) -> dict[str, dict]:
""" """
Extract apk.static and signature as temporary files. Extract apk.static and signature as temporary files.
""" """
@ -64,19 +64,25 @@ def extract_temp(tar, sigfilename):
"sig": {"filename": sigfilename, "temp_path": None}, "sig": {"filename": sigfilename, "temp_path": None},
} }
for ftype in ret.keys(): for ftype in ret.keys():
member = tar.getmember(ret[ftype]["filename"]) filename = ret[ftype]["filename"]
if filename is None:
raise AssertionError
member = tar.getmember(filename)
fd, path = tempfile.mkstemp(ftype, "pmbootstrap") fd, path = tempfile.mkstemp(ftype, "pmbootstrap")
handle = open(fd, "wb") handle = open(fd, "wb")
ret[ftype]["temp_path"] = path ret[ftype]["temp_path"] = path
shutil.copyfileobj(tar.extractfile(member), handle) extracted_file = tar.extractfile(member)
if extracted_file is None:
raise AssertionError
shutil.copyfileobj(extracted_file, handle)
logging.debug(f"extracted: {path}") logging.debug(f"extracted: {path}")
handle.close() handle.close()
return ret return ret
def verify_signature(files, sigkey_path): def verify_signature(files: dict[str, dict], sigkey_path: str) -> None:
""" """
Verify the signature with openssl. Verify the signature with openssl.

View file

@ -188,8 +188,31 @@ def show_pkg_not_found_systemd_hint(package: str, with_extra_repos: WithExtraRep
) )
@overload
def find(
package: str,
must_exist: Literal[True] = ...,
subpackages: bool = ...,
with_extra_repos: WithExtraRepos = ...,
) -> Path: ...
@overload
def find(
package: str,
must_exist: bool = ...,
subpackages: bool = ...,
with_extra_repos: WithExtraRepos = ...,
) -> Path | None: ...
@Cache("package", "subpackages", "with_extra_repos") @Cache("package", "subpackages", "with_extra_repos")
def find(package, must_exist=True, subpackages=True, with_extra_repos="default"): def find(
package: str,
must_exist: bool = True,
subpackages: bool = True,
with_extra_repos: WithExtraRepos = "default",
) -> Path | None:
"""Find the directory in pmaports that provides a package or subpackage. """Find the directory in pmaports that provides a package or subpackage.
If you want the parsed APKBUILD instead, use pmb.helpers.pmaports.get(). If you want the parsed APKBUILD instead, use pmb.helpers.pmaports.get().

View file

@ -2,7 +2,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
import fcntl import fcntl
from pmb.core.context import get_context from pmb.core.context import get_context
from pmb.types import PathString, Env, RunOutputType from pmb.types import PathString, Env, RunOutputType, RunReturnType
from pmb.helpers import logging from pmb.helpers import logging
import os import os
from pathlib import Path from pathlib import Path
@ -72,7 +72,9 @@ def sanity_checks(
raise RuntimeError("Can't use output_return with output: " + output) raise RuntimeError("Can't use output_return with output: " + output)
def background(cmd: str, working_dir: PathString | None = None) -> subprocess.Popen: def background(
cmd: PathString | Sequence[PathString], working_dir: PathString | None = None
) -> subprocess.Popen:
"""Run a subprocess in background and redirect its output to the log.""" """Run a subprocess in background and redirect its output to the log."""
ret = subprocess.Popen( ret = subprocess.Popen(
cmd, stdout=pmb.helpers.logging.logfd, stderr=pmb.helpers.logging.logfd, cwd=working_dir cmd, stdout=pmb.helpers.logging.logfd, stderr=pmb.helpers.logging.logfd, cwd=working_dir
@ -81,7 +83,9 @@ def background(cmd: str, working_dir: PathString | None = None) -> subprocess.Po
return ret return ret
def pipe(cmd: str, working_dir: PathString | None = None) -> subprocess.Popen: def pipe(
cmd: PathString | Sequence[PathString], working_dir: PathString | None = None
) -> subprocess.Popen:
"""Run a subprocess in background and redirect its output to a pipe.""" """Run a subprocess in background and redirect its output to a pipe."""
ret = subprocess.Popen( ret = subprocess.Popen(
cmd, cmd,
@ -114,6 +118,16 @@ def pipe_read(
) -> None: ... ) -> None: ...
@overload
def pipe_read(
process: subprocess.Popen,
output_to_stdout: bool = ...,
output_log: bool = ...,
output_return: bool = ...,
output_return_buffer: list[bytes] | None = ...,
) -> None: ...
def pipe_read( def pipe_read(
process: subprocess.Popen, process: subprocess.Popen,
output_to_stdout: bool = False, output_to_stdout: bool = False,
@ -195,15 +209,15 @@ def kill_command(pid: int, sudo: bool) -> None:
def foreground_pipe( def foreground_pipe(
cmd, cmd: PathString | Sequence[PathString],
working_dir=None, working_dir: Path | None = None,
output_to_stdout=False, output_to_stdout: bool = False,
output_return=False, output_return: bool = False,
output_log=True, output_log: bool = True,
output_timeout=True, output_timeout: bool = True,
sudo=False, sudo: bool = False,
stdin=None, stdin: int | None = None,
): ) -> tuple[int, str]:
"""Run a subprocess in foreground with redirected output. """Run a subprocess in foreground with redirected output.
Optionally kill it after being silent for too long. Optionally kill it after being silent for too long.
@ -270,7 +284,9 @@ def foreground_pipe(
return (process.returncode, b"".join(output_buffer).decode("utf-8")) return (process.returncode, b"".join(output_buffer).decode("utf-8"))
def foreground_tui(cmd: str, working_dir: PathString | None = None) -> int: def foreground_tui(
cmd: PathString | Sequence[PathString], working_dir: PathString | None = None
) -> int:
"""Run a subprocess in foreground without redirecting any of its output. """Run a subprocess in foreground without redirecting any of its output.
This is the only way text-based user interfaces (ncurses programs like This is the only way text-based user interfaces (ncurses programs like
@ -342,15 +358,15 @@ def add_proxy_env_vars(env: Env) -> None:
def core( def core(
log_message, log_message: str,
cmd, cmd: Sequence[PathString],
working_dir=None, working_dir: Path | None = None,
output="log", output: RunOutputType = "log",
output_return=False, output_return: bool = False,
check=None, check: bool | None = None,
sudo=False, sudo: bool = False,
disable_timeout=False, disable_timeout: bool = False,
): ) -> RunReturnType:
"""Run a command and create a log entry. """Run a command and create a log entry.
This is a low level function not meant to be used directly. Use one of the This is a low level function not meant to be used directly. Use one of the

View file

@ -1,10 +1,11 @@
# Copyright 2023 Oliver Smith # Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
import argparse import argparse
from collections.abc import Sequence
import os import os
from pathlib import Path from pathlib import Path
import sys import sys
from typing import cast from typing import Any, cast
from pmb.core.arch import Arch from pmb.core.arch import Arch
from pmb.core import Config from pmb.core import Config
@ -45,10 +46,16 @@ def toggle_other_boolean_flags(
""" """
class SetOtherDestinationsAction(argparse.Action): class SetOtherDestinationsAction(argparse.Action):
def __init__(self, option_strings, dest, **kwargs): def __init__(self, option_strings: list[str], dest: str, **kwargs: Any) -> None:
super().__init__(option_strings, dest, nargs=0, const=value, default=value, **kwargs) super().__init__(option_strings, dest, nargs=0, const=value, default=value, **kwargs)
def __call__(self, parser, namespace, values, option_string=None): def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
for destination in other_destinations: for destination in other_destinations:
setattr(namespace, destination, value) setattr(namespace, destination, value)
@ -720,7 +727,7 @@ def arguments_kconfig(subparser: argparse._SubParsersAction) -> None:
add_kernel_arg(migrate, nargs=1) add_kernel_arg(migrate, nargs=1)
def arguments_repo_bootstrap(subparser): def arguments_repo_bootstrap(subparser: argparse._SubParsersAction) -> argparse.ArgumentParser:
arch_choices = Arch.supported() arch_choices = Arch.supported()
ret = subparser.add_parser("repo_bootstrap") ret = subparser.add_parser("repo_bootstrap")
@ -792,16 +799,26 @@ def arguments_ci(subparser: argparse._SubParsersAction) -> argparse.ArgumentPars
return ret return ret
def package_completer(prefix, action, parser=None, parsed_args=None): def package_completer(
prefix: str,
action: str,
parser: argparse.ArgumentParser | None = None,
parsed_args: list[str] | None = None,
) -> set[str]:
packages = set( packages = set(
package for package in pmb.helpers.pmaports.get_list() if package.startswith(prefix) package for package in pmb.helpers.pmaports.get_list() if package.startswith(prefix)
) )
return packages return packages
def kernel_completer(prefix, action, parser=None, parsed_args=None): def kernel_completer(
prefix: str,
action: str,
parser: argparse.ArgumentParser | None = None,
parsed_args: list[str] | None = None,
) -> list[str]:
""":returns: matched linux-* packages, with linux-* prefix and without""" """:returns: matched linux-* packages, with linux-* prefix and without"""
ret = [] ret: list[str] = []
# Full package name, starting with "linux-" # Full package name, starting with "linux-"
if len("linux-") < len(prefix) and prefix.startswith("linux-") or "linux-".startswith(prefix): if len("linux-") < len(prefix) and prefix.startswith("linux-") or "linux-".startswith(prefix):
@ -814,7 +831,9 @@ def kernel_completer(prefix, action, parser=None, parsed_args=None):
return ret return ret
def add_packages_arg(subparser, name="packages", *args, **kwargs): def add_packages_arg(
subparser: argparse.ArgumentParser, name: str = "packages", *args: str, **kwargs: Any
) -> None:
arg = subparser.add_argument(name, *args, **kwargs) arg = subparser.add_argument(name, *args, **kwargs)
if "argcomplete" in sys.modules: if "argcomplete" in sys.modules:
arg.completer = package_completer # type: ignore[attr-defined] arg.completer = package_completer # type: ignore[attr-defined]

View file

@ -129,7 +129,7 @@ def parse_suffix(rest: str) -> tuple[str, int, bool]:
return (rest, 0, True) return (rest, 0, True)
def get_token(previous, rest): def get_token(previous: str, rest: str) -> tuple[str, int, str]:
""" """
This function does three things: This function does three things:
* get the next token * get the next token