drop args from helpers.git and chroot.apk (MR 2252)

be gone!

Signed-off-by: Caleb Connolly <caleb@postmarketos.org>
This commit is contained in:
Caleb Connolly 2024-05-24 19:47:53 +02:00 committed by Oliver Smith
parent 52338c5e76
commit 1be8653935
No known key found for this signature in database
GPG key ID: 5AE7F5513E0885CB
17 changed files with 85 additions and 70 deletions

View file

@ -164,7 +164,7 @@ def get_upstream_aport(args: PmbArgs, pkgname, arch=None):
example: /opt/pmbootstrap_work/cache_git/aports/upstream/main/gcc
"""
# APKBUILD
pmb.helpers.git.clone(args, "aports_upstream")
pmb.helpers.git.clone("aports_upstream")
aports_upstream_path = pmb.config.work / "cache_git/aports_upstream"
if getattr(args, "fork_alpine_retain_branch", False):

View file

@ -20,7 +20,7 @@ import pmb.parse.apkindex
import pmb.parse.arch
import pmb.parse.depends
import pmb.parse.version
from pmb.core import Chroot
from pmb.core import Chroot, get_context
def update_repository_list(args: PmbArgs, suffix: Chroot, postmarketos_mirror=True,
@ -88,7 +88,7 @@ def check_min_version(args: PmbArgs, chroot: Chroot=Chroot.native()):
return
# Compare
version_installed = installed(args, chroot)["apk-tools"]["version"]
version_installed = installed(chroot)["apk-tools"]["version"]
pmb.helpers.apk.check_outdated(
args, version_installed,
"Delete your http cache and zap all chroots, then try again:"
@ -174,7 +174,7 @@ def packages_get_locally_built_apks(args: PmbArgs, packages, arch: str) -> List[
return ret
def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, chroot: Chroot):
def install_run_apk(to_add, to_add_local, to_del, chroot: Chroot):
"""
Run apk to add packages, and ensure only the desired packages get
explicitly marked as installed.
@ -186,6 +186,7 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, chroot: Chroot)
installed or their dependencies (e.g. ["unl0kr"])
:param chroot: the chroot suffix, e.g. "native" or "rootfs_qemu-amd64"
"""
context = get_context()
# Sanitize packages: don't allow '--allow-untrusted' and other options
# to be passed to apk!
for package in to_add + [os.fspath(p) for p in to_add_local] + to_del:
@ -221,10 +222,10 @@ def install_run_apk(args: PmbArgs, to_add, to_add_local, to_del, chroot: Chroot)
if os.getenv("PMB_APK_FORCE_MISSING_REPOSITORIES") == "1":
command = ["--force-missing-repositories"] + command
if args.offline:
if context.offline:
command = ["--no-network"] + command
if i == 0:
pmb.helpers.apk.apk_with_progress(args, [apk_static] + command)
pmb.helpers.apk.apk_with_progress([apk_static] + command)
else:
# Virtual package related commands don't actually install or remove
# packages, but only mark the right ones as explicitly installed.
@ -271,10 +272,10 @@ def install(args: PmbArgs, packages, chroot: Chroot, build=True):
to_add_no_deps, _ = packages_split_to_add_del(packages)
logging.info(f"({chroot}) install {' '.join(to_add_no_deps)}")
install_run_apk(args, to_add_no_deps, to_add_local, to_del, chroot)
install_run_apk(to_add_no_deps, to_add_local, to_del, chroot)
def installed(args: PmbArgs, suffix: Chroot=Chroot.native()):
def installed(suffix: Chroot=Chroot.native()):
"""
Read the list of installed packages (which has almost the same format, as
an APKINDEX, but with more keys).

View file

@ -172,5 +172,4 @@ def run(args: PmbArgs, parameters):
if args.offline:
parameters = ["--no-network"] + parameters
pmb.helpers.apk.apk_with_progress(
args, [pmb.config.work / "apk.static"] + parameters)
pmb.helpers.apk.apk_with_progress([pmb.config.work / "apk.static"] + parameters)

View file

@ -13,7 +13,7 @@ from pmb.core.types import PmbArgs
def list_chroot(args: PmbArgs, suffix: Chroot, remove_prefix=True):
ret = []
prefix = pmb.config.initfs_hook_prefix
for pkgname in pmb.chroot.apk.installed(args, suffix).keys():
for pkgname in pmb.chroot.apk.installed(suffix).keys():
if pkgname.startswith(prefix):
if remove_prefix:
ret.append(pkgname[len(prefix):])

View file

@ -124,7 +124,7 @@ def copy_git_repo_to_chroot(args: PmbArgs, topdir):
"""
pmb.chroot.init(args)
tarball_path = Chroot.native() / "tmp/git.tar.gz"
files = pmb.helpers.git.get_files(args, topdir)
files = pmb.helpers.git.get_files(topdir)
with open(f"{tarball_path}.files", "w") as handle:
for file in files:

View file

@ -669,7 +669,7 @@ def frontend(args: PmbArgs):
# Copy the git hooks if master was checked out. (Don't symlink them and
# only do it on master, so the git hooks don't change unexpectedly when
# having a random branch checked out.)
branch_current = pmb.helpers.git.rev_parse(args, args.aports,
branch_current = pmb.helpers.git.rev_parse(args.aports,
extra_args=["--abbrev-ref"])
if branch_current == "master":
logging.info("NOTE: pmaports is on master branch, copying git hooks.")

View file

@ -31,7 +31,7 @@ def clone(args: PmbArgs):
" recipes (pmaports)...")
# Set up the native chroot and clone pmaports
pmb.helpers.git.clone(args, "pmaports")
pmb.helpers.git.clone("pmaports")
def symlink(args: PmbArgs):
@ -159,11 +159,11 @@ def read_config_channel(args: PmbArgs):
return channels_cfg["channels"][channel]
# Channel not in channels.cfg, try to be helpful
branch = pmb.helpers.git.rev_parse(args, args.aports,
branch = pmb.helpers.git.rev_parse(args.aports,
extra_args=["--abbrev-ref"])
branches_official = pmb.helpers.git.get_branches_official(args, "pmaports")
branches_official = pmb.helpers.git.get_branches_official("pmaports")
branches_official = ", ".join(branches_official)
remote = pmb.helpers.git.get_upstream_remote(args, "pmaports")
remote = pmb.helpers.git.get_upstream_remote("pmaports")
logging.info("NOTE: fix the error by rebasing or cherry picking relevant"
" commits from this branch onto a branch that is on a"
f" supported channel: {branches_official}")
@ -199,7 +199,7 @@ def switch_to_channel_branch(args: PmbArgs, channel_new):
# List current and new branches/channels
channels_cfg = pmb.helpers.git.parse_channels_cfg(args)
branch_new = channels_cfg["channels"][channel_new]["branch_pmaports"]
branch_current = pmb.helpers.git.rev_parse(args, args.aports,
branch_current = pmb.helpers.git.rev_parse(args.aports,
extra_args=["--abbrev-ref"])
logging.info(f"Currently checked out branch '{branch_current}' of"
f" pmaports.git is on channel '{channel_current}'.")

View file

@ -16,6 +16,8 @@ class Context():
log: Path
# The architecture of the selected device
device_arch: Optional[str]
offline: bool
aports: Path
def __init__(self):
self.details_to_stdout = False
@ -24,3 +26,5 @@ class Context():
self.log = pmb.config.work / "log.txt"
self.quiet = False
self.device_arch = None
self.offline = False
self.aports = pmb.config.work / "cache_git" / "pmaports"

View file

@ -105,7 +105,7 @@ class PmbArgs(Namespace):
no_reboot: str
no_sshd: str
odin_flashable_tar: str
offline: str
offline: bool
ondev_cp: List[Tuple[str, str]]
on_device_installer: str
ondev_no_rootfs: str

View file

@ -13,7 +13,7 @@ import pmb.helpers.run_core
import pmb.parse.version
def _prepare_fifo(args: PmbArgs):
def _prepare_fifo():
"""Prepare the progress fifo for reading / writing.
:param chroot: whether to run the command inside the chroot or on the host
@ -62,7 +62,7 @@ def _compute_progress(line):
return cur / tot if tot > 0 else 0
def apk_with_progress(args: PmbArgs, command: Sequence[PathString]):
def apk_with_progress(command: Sequence[PathString]):
"""Run an apk subcommand while printing a progress bar to STDOUT.
:param command: apk subcommand in list form
@ -71,7 +71,7 @@ def apk_with_progress(args: PmbArgs, command: Sequence[PathString]):
set to True.
:raises RuntimeError: when the apk command fails
"""
fifo, fifo_outside = _prepare_fifo(args)
fifo, fifo_outside = _prepare_fifo()
_command: List[str] = [os.fspath(c) for c in command]
command_with_progress = _create_command_with_progress(_command, fifo)
log_msg = " ".join(_command)
@ -82,8 +82,8 @@ def apk_with_progress(args: PmbArgs, command: Sequence[PathString]):
while p_apk.poll() is None:
line = p_cat.stdout.readline().decode('utf-8')
progress = _compute_progress(line)
pmb.helpers.cli.progress_print(args, progress)
pmb.helpers.cli.progress_flush(args)
pmb.helpers.cli.progress_print(progress)
pmb.helpers.cli.progress_flush()
pmb.helpers.run_core.check_return_code(p_apk.returncode,
log_msg)

View file

@ -1,6 +1,7 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
import copy
import os
from pathlib import Path
import pmb.config
from pmb.core.types import PmbArgs
@ -120,17 +121,14 @@ def init(args: PmbArgs) -> PmbArgs:
context.details_to_stdout = args.details_to_stdout
context.sudo_timer = args.sudo_timer
context.quiet = args.quiet
context.offline = args.offline
if args.aports:
print(f"Using pmaports from: {args.aports}")
context.aports = args.aports
# Initialize logs (we could raise errors below)
pmb.helpers.logging.init(args)
# Remove attributes from args so they don't get used by mistake
delattr(args, "timeout")
delattr(args, "details_to_stdout")
delattr(args, "sudo_timer")
delattr(args, "log")
delattr(args, "quiet")
# Initialization code which may raise errors
check_pmaports_path(args)
if args.action not in ["init", "checksum", "config", "bootimg_analyze", "log",
@ -140,6 +138,15 @@ def init(args: PmbArgs) -> PmbArgs:
pmb.helpers.git.parse_channels_cfg(args)
context.device_arch = args.deviceinfo["arch"]
# Remove attributes from args so they don't get used by mistake
delattr(args, "timeout")
delattr(args, "details_to_stdout")
delattr(args, "sudo_timer")
delattr(args, "log")
delattr(args, "quiet")
delattr(args, "offline")
delattr(args, "aports")
return args

View file

@ -112,7 +112,7 @@ def confirm(args: PmbArgs, question="Continue?", default=False, no_assumptions=F
return answer == "y"
def progress_print(args: PmbArgs, progress):
def progress_print(progress):
"""Print a snapshot of a progress bar to STDOUT.
Call progress_flush to end printing progress and clear the line. No output is printed in
@ -135,7 +135,7 @@ def progress_print(args: PmbArgs, progress):
sys.stdout.write("\u001b8\u001b[0K")
def progress_flush(args):
def progress_flush():
"""Finish printing a progress bar.
This will erase the line. Does nothing in non-interactive mode.

View file

@ -584,7 +584,7 @@ def bootimg_analyze(args: PmbArgs):
def pull(args: PmbArgs):
failed = []
for repo in pmb.config.git_repos.keys():
if pmb.helpers.git.pull(args, repo) < 0:
if pmb.helpers.git.pull(repo) < 0:
failed.append(repo)
if not failed:
@ -602,7 +602,7 @@ def pull(args: PmbArgs):
logging.info("")
logging.info("Fix and try again:")
for name_repo in failed:
logging.info("* " + pmb.helpers.git.get_path(args, name_repo))
logging.info("* " + pmb.helpers.git.get_path(name_repo))
logging.info("---")
return False
@ -623,7 +623,7 @@ def status(args: PmbArgs) -> None:
def ci(args: PmbArgs):
topdir = pmb.helpers.git.get_topdir(args, Path.cwd())
topdir = pmb.helpers.git.get_topdir(Path.cwd())
if not os.path.exists(topdir):
logging.error("ERROR: change your current directory to a git"
" repository (e.g. pmbootstrap, pmaports) before running"
@ -657,7 +657,7 @@ def ci(args: PmbArgs):
if "slow" not in script_data["options"]:
scripts_selected[script] = script_data
if not pmb.helpers.git.clean_worktree(args, topdir):
if not pmb.helpers.git.clean_worktree(topdir):
logging.warning("WARNING: this git repository has uncommitted changes")
if not scripts_selected:

View file

@ -3,6 +3,8 @@
import configparser
from pathlib import Path
from typing import Dict
from pmb.core import get_context
from pmb.core.context import Context
from pmb.helpers import logging
import os
from pathlib import Path
@ -10,12 +12,11 @@ from pathlib import Path
import pmb.build
import pmb.chroot.apk
import pmb.config
from pmb.core.types import PmbArgs
import pmb.helpers.pmaports
import pmb.helpers.run
def get_path(args: PmbArgs, name_repo):
def get_path(context: Context, name_repo):
"""Get the path to the repository.
The path is either the default one in the work dir, or a user-specified one in args.
@ -23,11 +24,11 @@ def get_path(args: PmbArgs, name_repo):
:returns: full path to repository
"""
if name_repo == "pmaports":
return args.aports
return context.aports
return pmb.config.work / "cache_git" / name_repo
def clone(args: PmbArgs, name_repo):
def clone(name_repo):
"""Clone a git repository to $WORK/cache_git/$name_repo.
(or to the overridden path set in args, as with ``pmbootstrap --aports``).
@ -39,7 +40,7 @@ def clone(args: PmbArgs, name_repo):
if name_repo not in pmb.config.git_repos:
raise ValueError("No git repository configured for " + name_repo)
path = get_path(args, name_repo)
path = get_path(get_context(), name_repo)
if not os.path.exists(path):
# Build git command
url = pmb.config.git_repos[name_repo][0]
@ -58,7 +59,7 @@ def clone(args: PmbArgs, name_repo):
open(fetch_head, "w").close()
def rev_parse(args: PmbArgs, path, revision="HEAD", extra_args: list = []):
def rev_parse(path, revision="HEAD", extra_args: list = []):
"""Run "git rev-parse" in a specific repository dir.
:param path: to the git repository
@ -72,7 +73,7 @@ def rev_parse(args: PmbArgs, path, revision="HEAD", extra_args: list = []):
return rev.rstrip()
def can_fast_forward(args: PmbArgs, path, branch_upstream, branch="HEAD"):
def can_fast_forward(path, branch_upstream, branch="HEAD"):
command = ["git", "merge-base", "--is-ancestor", branch, branch_upstream]
ret = pmb.helpers.run.user(command, path, check=False)
if ret == 0:
@ -83,19 +84,19 @@ def can_fast_forward(args: PmbArgs, path, branch_upstream, branch="HEAD"):
raise RuntimeError("Unexpected exit code from git: " + str(ret))
def clean_worktree(args: PmbArgs, path):
def clean_worktree(path):
"""Check if there are not any modified files in the git dir."""
command = ["git", "status", "--porcelain"]
return pmb.helpers.run.user_output(command, path) == ""
def get_upstream_remote(args: PmbArgs, name_repo):
def get_upstream_remote(context: Context, name_repo):
"""Find the remote, which matches the git URL from the config.
Usually "origin", but the user may have set up their git repository differently.
"""
urls = pmb.config.git_repos[name_repo]
path = get_path(args, name_repo)
path = get_path(context, name_repo)
command = ["git", "remote", "-v"]
output = pmb.helpers.run.user_output(command, path)
for line in output.split("\n"):
@ -105,7 +106,7 @@ def get_upstream_remote(args: PmbArgs, name_repo):
" repository: {}".format(name_repo, urls, path))
def parse_channels_cfg(args):
def parse_channels_cfg():
"""Parse channels.cfg from pmaports.git, origin/master branch.
Reference: https://postmarketos.org/channels.cfg
@ -122,11 +123,13 @@ def parse_channels_cfg(args):
if pmb.helpers.other.cache[cache_key]:
return pmb.helpers.other.cache[cache_key]
context = get_context()
# Read with configparser
cfg = configparser.ConfigParser()
remote = get_upstream_remote(args, "pmaports")
remote = get_upstream_remote(context, "pmaports")
command = ["git", "show", f"{remote}/master:channels.cfg"]
stdout = pmb.helpers.run.user_output(command, args.aports,
stdout = pmb.helpers.run.user_output(command, context.aports,
check=False)
try:
cfg.read_string(stdout)
@ -159,7 +162,7 @@ def parse_channels_cfg(args):
return ret
def get_branches_official(args: PmbArgs, name_repo):
def get_branches_official(name_repo):
"""Get all branches that point to official release channels.
:returns: list of supported branches, e.g. ["master", "3.11"]
@ -170,14 +173,14 @@ def get_branches_official(args: PmbArgs, name_repo):
if name_repo != "pmaports":
return ["master"]
channels_cfg = parse_channels_cfg(args)
channels_cfg = parse_channels_cfg()
ret = []
for channel, channel_data in channels_cfg["channels"].items():
ret.append(channel_data["branch_pmaports"])
return ret
def pull(args: PmbArgs, name_repo):
def pull(name_repo):
"""Check if on official branch and essentially try ``git pull --ff-only``.
Instead of really doing ``git pull --ff-only``, do it in multiple steps
@ -186,16 +189,17 @@ def pull(args: PmbArgs, name_repo):
:returns: integer, >= 0 on success, < 0 on error
"""
branches_official = get_branches_official(args, name_repo)
branches_official = get_branches_official(name_repo)
context = get_context()
# Skip if repo wasn't cloned
path = get_path(args, name_repo)
path = get_path(context, name_repo)
if not os.path.exists(path):
logging.debug(name_repo + ": repo was not cloned, skipping pull!")
return 1
# Skip if not on official branch
branch = rev_parse(args, path, extra_args=["--abbrev-ref"])
branch = rev_parse(path, extra_args=["--abbrev-ref"])
msg_start = "{} (branch: {}):".format(name_repo, branch)
if branch not in branches_official:
logging.warning("{} not on one of the official branches ({}), skipping"
@ -204,13 +208,13 @@ def pull(args: PmbArgs, name_repo):
return -1
# Skip if workdir is not clean
if not clean_worktree(args, path):
if not clean_worktree(path):
logging.warning(msg_start + " workdir is not clean, skipping pull!")
return -2
# Skip if branch is tracking different remote
branch_upstream = get_upstream_remote(args, name_repo) + "/" + branch
remote_ref = rev_parse(args, path, branch + "@{u}", ["--abbrev-ref"])
branch_upstream = get_upstream_remote(context, name_repo) + "/" + branch
remote_ref = rev_parse(path, branch + "@{u}", ["--abbrev-ref"])
if remote_ref != branch_upstream:
logging.warning("{} is tracking unexpected remote branch '{}' instead"
" of '{}'".format(msg_start, remote_ref,
@ -219,16 +223,16 @@ def pull(args: PmbArgs, name_repo):
# Fetch (exception on failure, meaning connection to server broke)
logging.info(msg_start + " git pull --ff-only")
if not args.offline:
if not context.offline:
pmb.helpers.run.user(["git", "fetch"], path)
# Skip if already up to date
if rev_parse(args, path, branch) == rev_parse(args, path, branch_upstream):
if rev_parse(path, branch) == rev_parse(path, branch_upstream):
logging.info(msg_start + " already up to date")
return 2
# Skip if we can't fast-forward
if not can_fast_forward(args, path, branch_upstream):
if not can_fast_forward(path, branch_upstream):
logging.warning("{} can't fast-forward to {}, looks like you changed"
" the git history of your local branch. Skipping pull!"
"".format(msg_start, branch_upstream))
@ -241,7 +245,7 @@ def pull(args: PmbArgs, name_repo):
return 0
def get_topdir(args: PmbArgs, path: Path):
def get_topdir(path: Path):
"""Get top-dir of git repo.
:returns: a string with the top dir of the git repository,
@ -251,7 +255,7 @@ def get_topdir(args: PmbArgs, path: Path):
path, output_return=True, check=False).rstrip()
def get_files(args: PmbArgs, path):
def get_files(path):
"""Get all files inside a git repository, that are either already in the git tree or are not in gitignore.
Do not list deleted files. To be used for creating a tarball of the git repository.

View file

@ -20,12 +20,12 @@ def print_channel(args: PmbArgs) -> None:
channel = pmaports_cfg["channel"]
# Get branch name (if on branch) or current commit
path = pmb.helpers.git.get_path(args, "pmaports")
ref = pmb.helpers.git.rev_parse(args, path, extra_args=["--abbrev-ref"])
path = pmb.helpers.git.get_path("pmaports")
ref = pmb.helpers.git.rev_parse(path, extra_args=["--abbrev-ref"])
if ref == "HEAD":
ref = pmb.helpers.git.rev_parse(args, path)[0:8]
ref = pmb.helpers.git.rev_parse(path)[0:8]
if not pmb.helpers.git.clean_worktree(args, path):
if not pmb.helpers.git.clean_worktree(path):
ref += ", dirty"
value = f"{channel} (pmaports: {ref})"

View file

@ -47,7 +47,7 @@ def test_switch_to_channel_branch(args: PmbArgs, monkeypatch, tmpdir):
run_git(["checkout", "-b", "v20.05"])
run_git(["checkout", "master"])
assert func(args, "v20.05") is True
branch = pmb.helpers.git.rev_parse(args, path, extra_args=["--abbrev-ref"])
branch = pmb.helpers.git.rev_parse(path, extra_args=["--abbrev-ref"])
assert branch == "v20.05"

View file

@ -128,7 +128,7 @@ def test_parse_channels_cfg(args: PmbArgs):
def test_pull_non_existing(args: PmbArgs):
assert pmb.helpers.git.pull(args, "non-existing-repo-name") == 1
assert pmb.helpers.git.pull("non-existing-repo-name") == 1
def test_pull(args: PmbArgs, monkeypatch, tmpdir):