meta: introduce a Cache decorator (MR 2252)

Generalise pmb.helpers.other.cache with a more python decorator.

The Cache decorator takes a list of function arguments to use as cache
keys, keyword args can be used to restrict caching so that it is skipped
entirely unless the attribute has a specific value.

For example, pmb.helpers.pmaports.get() has the decorator:
@Cache("pkgname", subpackages=True)

This means the return value will be cached only when subpackages is
True, otherwise it will always miss.

Signed-off-by: Caleb Connolly <caleb@postmarketos.org>
This commit is contained in:
Caleb Connolly 2024-06-09 13:22:58 +02:00 committed by Oliver Smith
parent 79cf2e8910
commit 185d8bcef5
No known key found for this signature in database
GPG key ID: 5AE7F5513E0885CB
20 changed files with 244 additions and 165 deletions

View file

@ -104,7 +104,7 @@ def menuconfig(args: PmbArgs, pkgname: str, use_oldconfig):
apkbuild = pmb.parse.apkbuild(aport / "APKBUILD")
arch = args.arch or get_arch(apkbuild)
chroot = pmb.build.autodetect.chroot(apkbuild, arch)
cross = pmb.build.autodetect.crosscompile(apkbuild, arch, chroot)
cross = pmb.build.autodetect.crosscompile(apkbuild, arch)
hostspec = arch.alpine_triple()
# Set up build tools and makedepends

View file

@ -16,13 +16,15 @@ import pmb.helpers.other
import pmb.helpers.pmaports
import pmb.helpers.repo
import pmb.helpers.run
from pmb.meta import Cache
import pmb.parse.apkindex
import pmb.parse.depends
import pmb.parse.version
from pmb.core import Chroot, get_context
def update_repository_list(suffix: Chroot, postmarketos_mirror=True,
@Cache("chroot")
def update_repository_list(chroot: Chroot, postmarketos_mirror=True,
check=False):
"""
Update /etc/apk/repositories, if it is outdated (when the user changed the
@ -34,12 +36,8 @@ def update_repository_list(suffix: Chroot, postmarketos_mirror=True,
Only for this purpose, the "check" parameter should be set to
True.
"""
# Skip if we already did this
if suffix in pmb.helpers.other.cache["apk_repository_list_updated"]:
return
# Read old entries or create folder structure
path = suffix / "etc/apk/repositories"
path = chroot / "etc/apk/repositories"
lines_old: List[str] = []
if path.exists():
# Read all old lines
@ -54,7 +52,6 @@ def update_repository_list(suffix: Chroot, postmarketos_mirror=True,
exclude = ["pmaports"] if not postmarketos_mirror else []
lines_new = pmb.helpers.repo.urls(mirrors_exclude=exclude)
if lines_old == lines_new:
pmb.helpers.other.cache["apk_repository_list_updated"].append(suffix)
return
# Check phase: raise error when still outdated
@ -62,25 +59,22 @@ def update_repository_list(suffix: Chroot, postmarketos_mirror=True,
raise RuntimeError(f"Failed to update: {path}")
# Update the file
logging.debug(f"({suffix}) update /etc/apk/repositories")
logging.debug(f"({chroot}) update /etc/apk/repositories")
if path.exists():
pmb.helpers.run.root(["rm", path])
for line in lines_new:
pmb.helpers.run.root(["sh", "-c", "echo "
f"{shlex.quote(line)} >> {path}"])
update_repository_list(suffix, postmarketos_mirror, True)
update_repository_list(chroot, postmarketos_mirror, True)
@Cache("chroot")
def check_min_version(chroot: Chroot=Chroot.native()):
"""
Check the minimum apk version, before running it the first time in the
current session (lifetime of one pmbootstrap call).
"""
# Skip if we already did this
if chroot.path in pmb.helpers.other.cache["apk_min_version_checked"]:
return
# Skip if apk is not installed yet
if not (chroot / "sbin/apk").exists():
logging.debug(f"NOTE: Skipped apk version check for chroot '{chroot}'"
@ -94,9 +88,6 @@ def check_min_version(chroot: Chroot=Chroot.native()):
"Delete your http cache and zap all chroots, then try again:"
" 'pmbootstrap zap -hc'")
# Mark this suffix as checked
pmb.helpers.other.cache["apk_min_version_checked"].append(chroot.path)
def packages_split_to_add_del(packages):
"""

View file

@ -3,6 +3,7 @@
import enum
import filecmp
from typing import List
from pmb.meta import Cache
from pmb.helpers import logging
import os
@ -95,7 +96,8 @@ def warn_if_chroot_is_outdated(chroot: Chroot):
cache_chroot_is_outdated += [str(chroot)]
def init(chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO,
@Cache("chroot")
def init(chroot: Chroot, usr_merge=UsrMerge.AUTO,
postmarketos_mirror=True):
"""
Initialize a chroot by copying the resolv.conf and updating
@ -111,10 +113,6 @@ def init(chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO,
arch = chroot.arch
config = get_context().config
already_setup = str(chroot) in pmb.helpers.other.cache["pmb.chroot.init"]
if already_setup:
logging.warning(f"({chroot}) FIXME! init() called multiple times!")
return
pmb.chroot.mount(chroot)
mark_in_chroot(chroot)
@ -123,7 +121,6 @@ def init(chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO,
copy_resolv_conf(chroot)
pmb.chroot.apk.update_repository_list(chroot, postmarketos_mirror)
warn_if_chroot_is_outdated(chroot)
pmb.helpers.other.cache["pmb.chroot.init"][str(chroot)] = True
return
# Require apk-tools-static
@ -187,5 +184,3 @@ def init(chroot: Chroot=Chroot.native(), usr_merge=UsrMerge.AUTO,
command = ["--force-missing-repositories"] + command
pmb.chroot.root(["apk"] + command, chroot)
pmb.helpers.other.cache["pmb.chroot.init"][str(chroot)] = True

View file

@ -1,5 +1,6 @@
# Copyright 2023 Oliver Smith
# SPDX-License-Identifier: GPL-3.0-or-later
from pmb.core.pkgrepo import pkgrepo_default_path
from pmb.helpers import logging
import os
from pathlib import Path
@ -83,7 +84,7 @@ def mount(chroot: Chroot):
# Get all mountpoints
arch = chroot.arch
channel = pmb.config.pmaports.read_config(support_systemd=False)["channel"]
channel = pmb.config.pmaports.read_config(pkgrepo_default_path())["channel"]
mountpoints: Dict[Path, Path] = {}
for src_template, target_template in pmb.config.chroot_mount_bind.items():
src_template = src_template.replace("$WORK", os.fspath(get_context().config.work))

View file

@ -12,6 +12,7 @@ import pmb.config.pmaports
import pmb.config.workdir
import pmb.helpers.pmaports
import pmb.helpers.run
from pmb.meta import Cache
import pmb.parse.apkindex
from pmb.core import Chroot, get_context
@ -77,7 +78,7 @@ def zap(confirm=True, dry=False, pkgs_local=False, http=False,
pmb.config.workdir.clean()
# Chroots were zapped, so no repo lists exist anymore
pmb.helpers.other.cache["apk_repository_list_updated"].clear()
Cache.clear_cache(pmb.chroot.apk.update_repository_list)
# Print amount of cleaned up space
if dry:

View file

@ -120,10 +120,10 @@ def copy_git_repo_to_chroot(topdir):
:param topdir: top directory of the git repository, get it with:
pmb.helpers.git.get_topdir()
"""
pmb.chroot.init()
tarball_path = Chroot.native() / "tmp/git.tar.gz"
"""
chroot = Chroot.native()
pmb.chroot.init(chroot)
tarball_path = chroot / "tmp/git.tar.gz"
files = pmb.helpers.git.get_files(topdir)
with open(f"{tarball_path}.files", "w") as handle:

View file

@ -4,8 +4,10 @@ from pmb.core import Config
from pmb.core.config import SystemdConfig
import pmb.helpers.ui
import pmb.config.pmaports
from pmb.meta import Cache
@Cache()
def is_systemd_selected(config: Config):
if "systemd" not in pmb.config.pmaports.read_config_repos():
return False

View file

@ -9,6 +9,7 @@ import os
import sys
import pmb.config
from pmb.meta import Cache
from pmb.types import PmbArgs
import pmb.helpers.git
import pmb.helpers.pmaports
@ -59,12 +60,9 @@ def check_version_pmbootstrap(min_ver):
" of pmbootstrap from git.")
@Cache()
def read_config_repos():
""" Read the sections starting with "repo:" from pmaports.cfg. """
# Try cache first
cache_key = "pmb.config.pmaports.read_config_repos"
if pmb.helpers.other.cache[cache_key]:
return pmb.helpers.other.cache[cache_key]
cfg = configparser.ConfigParser()
cfg.read(f"{pkgrepo_default_path()}/pmaports.cfg")
@ -76,22 +74,17 @@ def read_config_repos():
repo = section.split("repo:", 1)[1]
ret[repo] = cfg[section]
# Cache and return
pmb.helpers.other.cache[cache_key] = ret
return ret
def read_config(aports: Optional[Path] = None, support_systemd=True):
"""Read and verify pmaports.cfg."""
if not aports:
aports = pkgrepo_default_path()
name = aports.name
if support_systemd and aports.name == "systemd":
name = f"systemd-{aports.name}"
# Try cache first
cache_key = "pmb.config.pmaports.read_config"
if support_systemd and aports.name in pmb.helpers.other.cache[cache_key]:
return pmb.helpers.other.cache[cache_key][name]
@Cache("aports")
def read_config(aports: Optional[Path] = None):
"""Read and verify pmaports.cfg. If aports is not
specified and systemd is enabled, the returned channel
will be the systemd one (e.g. systemd-edge instead of edge)
since we'll use the first pkgrepo which is systemd."""
if aports is None:
aports = pkgrepo_paths()[0]
systemd = aports.name == "systemd"
# extra-repos don't have a pmaports.cfg
@ -123,11 +116,9 @@ def read_config(aports: Optional[Path] = None, support_systemd=True):
# Translate legacy channel names
ret["channel"] = pmb.helpers.pmaports.get_channel_new(ret["channel"])
if "systemd" in name:
if systemd:
ret["channel"] = "systemd-" + ret["channel"]
# Cache and return
pmb.helpers.other.cache[cache_key][name] = ret
return ret
@ -153,7 +144,7 @@ def read_config_channel():
"""
aports = pkgrepo_default_path()
channel = read_config(support_systemd=False)["channel"]
channel = read_config(aports)["channel"]
channels_cfg = pmb.helpers.git.parse_channels_cfg(aports)
if channel in channels_cfg["channels"]:

View file

@ -12,7 +12,7 @@ from typing import Optional
import pmb.config
import pmb.config.pmaports
from pmb.core import Chroot, get_context
from pmb.types import PmbArgs
from pmb.core.pkgrepo import pkgrepo_default_path
def chroot_save_init(suffix: Chroot):
@ -29,7 +29,7 @@ def chroot_save_init(suffix: Chroot):
cfg[key] = {}
# Update sections
channel = pmb.config.pmaports.read_config(support_systemd=False)["channel"]
channel = pmb.config.pmaports.read_config(pkgrepo_default_path())["channel"]
cfg["chroot-channels"][str(suffix)] = channel
cfg["chroot-init-dates"][str(suffix)] = str(int(time.time()))
@ -83,7 +83,8 @@ def chroot_check_channel(chroot: Chroot):
raise RuntimeError(f"{msg_unknown} {msg_again}")
# Exclude systemd repo
channel = pmb.config.pmaports.read_config(support_systemd=False)["channel"]
aports = pkgrepo_default_path()
channel = pmb.config.pmaports.read_config(aports)["channel"]
channel_cfg = cfg[key][str(chroot)]
if channel != channel_cfg:
raise RuntimeError(f"Chroot '{chroot}' was created for the"

View file

@ -100,7 +100,7 @@ class Chroot:
if not isinstance(other, Chroot):
return NotImplemented
return self.type() == other.type() and self.name() == other.name()
return self.type == other.type and self.name() == other.name()
def __truediv__(self, other: object) -> Path:
@ -129,6 +129,7 @@ class Chroot:
return NotImplemented
@property
def type(self) -> ChrootType:
return self.__type

View file

@ -5,14 +5,11 @@ from typing import Any, Dict, Generator, List, Optional, Tuple
import pmb.config
from pmb.core.context import get_context
from pmb.meta import Cache
_cache: Dict[str, Any] = {"pkgrepo_paths": []}
@Cache(skip_extras=False)
def pkgrepo_paths(skip_extras = False) -> List[Path]:
global _cache
if not skip_extras and _cache["pkgrepo_paths"]:
return _cache["pkgrepo_paths"]
config = get_context().config
paths = list(map(lambda x: Path(x),
config.aports))
@ -29,7 +26,6 @@ def pkgrepo_paths(skip_extras = False) -> List[Path]:
out_paths.append(p / "extra-repos/systemd")
out_paths.append(p)
_cache["pkgrepo_paths"] = out_paths
return out_paths
def pkgrepo_default_path() -> Path:

View file

@ -15,6 +15,7 @@ import pmb.chroot.apk
import pmb.config
import pmb.helpers.pmaports
import pmb.helpers.run
from pmb.meta import Cache
def get_path(name_repo: str):
@ -107,6 +108,7 @@ def get_upstream_remote(aports: Path):
" repository: {}".format(name_repo, urls, aports))
@Cache("aports")
def parse_channels_cfg(aports: Path):
"""Parse channels.cfg from pmaports.git, origin/master branch.
@ -119,11 +121,6 @@ def parse_channels_cfg(aports: Path):
"mirrordir_alpine": ...},
...}}
"""
# Cache during one pmbootstrap run
cache_key = "pmb.helpers.git.parse_channels_cfg"
if pmb.helpers.other.cache[cache_key]:
return pmb.helpers.other.cache[cache_key]
# Read with configparser
cfg = configparser.ConfigParser()
remote = get_upstream_remote(aports)
@ -157,7 +154,6 @@ def parse_channels_cfg(aports: Path):
# FIXME: how to type this properly??
ret["channels"][channel_new][key] = value # type: ignore[index]
pmb.helpers.other.cache[cache_key] = ret
return ret

View file

@ -288,18 +288,5 @@ def lookup(key):
"""
cache: Dict[str, Any] = {
"apkindex": {},
"apkbuild": {},
"apk_min_version_checked": [],
"apk_repository_list_updated": [],
"built": {},
"deviceinfo": {},
"find_aport": {},
"pmb.helpers.package.depends_recurse": {},
"pmb.helpers.package.get": {},
"pmb.helpers.repo.update": {"404": [], "offline_msg_shown": False},
"pmb.helpers.git.parse_channels_cfg": {},
"pmb.config.pmaports.read_config": {},
"pmb.config.pmaports.read_config_repos": None,
"pmb.chroot.init": {},
"pkgrepo_paths": [],
}

View file

@ -15,7 +15,7 @@ from pmb.core.context import get_context
from pmb.helpers import logging
import pmb.build._package
from pmb.types import PmbArgs
from pmb.meta import Cache
import pmb.helpers.pmaports
import pmb.helpers.repo
@ -28,6 +28,7 @@ def remove_operators(package):
return package
@Cache("pkgname", "arch", "replace_subpkgnames")
def get(pkgname, arch, replace_subpkgnames=False, must_exist=True):
"""Find a package in pmaports, and as fallback in the APKINDEXes of the binary packages.
@ -47,19 +48,6 @@ def get(pkgname, arch, replace_subpkgnames=False, must_exist=True):
* None if the package was not found
"""
# Cached result
cache_key = "pmb.helpers.package.get"
if (
arch in pmb.helpers.other.cache[cache_key] and
pkgname in pmb.helpers.other.cache[cache_key][arch] and
replace_subpkgnames in pmb.helpers.other.cache[cache_key][arch][
pkgname
]
):
return pmb.helpers.other.cache[cache_key][arch][pkgname][
replace_subpkgnames
]
# Find in pmaports
ret: Dict[str, Any] = {}
pmaport = pmb.helpers.pmaports.get(pkgname, False)
@ -118,13 +106,6 @@ def get(pkgname, arch, replace_subpkgnames=False, must_exist=True):
# Save to cache and return
if ret:
if arch not in pmb.helpers.other.cache[cache_key]:
pmb.helpers.other.cache[cache_key][arch] = {}
if pkgname not in pmb.helpers.other.cache[cache_key][arch]:
pmb.helpers.other.cache[cache_key][arch][pkgname] = {}
pmb.helpers.other.cache[cache_key][arch][pkgname][
replace_subpkgnames
] = ret
return ret
# Could not find the package
@ -134,6 +115,7 @@ def get(pkgname, arch, replace_subpkgnames=False, must_exist=True):
" could not find this package in any APKINDEX!")
@Cache("pkgname", "arch")
def depends_recurse(pkgname, arch):
"""Recursively resolve all of the package's dependencies.
@ -143,12 +125,6 @@ def depends_recurse(pkgname, arch):
["busybox-static-armhf", "device-samsung-i9100",
"linux-samsung-i9100", ...]
"""
# Cached result
cache_key = "pmb.helpers.package.depends_recurse"
if (arch in pmb.helpers.other.cache[cache_key] and
pkgname in pmb.helpers.other.cache[cache_key][arch]):
return pmb.helpers.other.cache[cache_key][arch][pkgname]
# Build ret (by iterating over the queue)
queue = [pkgname]
ret = []
@ -166,10 +142,6 @@ def depends_recurse(pkgname, arch):
ret += [package["pkgname"]]
ret.sort()
# Save to cache and return
if arch not in pmb.helpers.other.cache[cache_key]:
pmb.helpers.other.cache[cache_key][arch] = {}
pmb.helpers.other.cache[cache_key][arch][pkgname] = ret
return ret

View file

@ -14,6 +14,7 @@ from pmb.helpers import logging
from pathlib import Path
from typing import Any, Optional, Sequence, Dict, Tuple
from pmb.meta import Cache
import pmb.parse
def _find_apkbuilds(skip_extra_repos=False) -> Dict[str, Path]:
@ -139,6 +140,7 @@ def _find_package_in_apkbuild(package: str, path: Path) -> bool:
return False
@Cache("package", "subpackages", skip_extra_repos=False)
def find(package, must_exist=True, subpackages=True, skip_extra_repos=False):
"""Find the directory in pmaports that provides a package or subpackage.
If you want the parsed APKBUILD instead, use pmb.helpers.pmaports.get().
@ -155,50 +157,43 @@ def find(package, must_exist=True, subpackages=True, skip_extra_repos=False):
# Try to get a cached result first (we assume that the aports don't change
# in one pmbootstrap call)
ret: Optional[Path] = None
if package in pmb.helpers.other.cache["find_aport"]:
ret = pmb.helpers.other.cache["find_aport"][package]
else:
# Sanity check
if "*" in package:
raise RuntimeError("Invalid pkgname: " + package)
# Sanity check
if "*" in package:
raise RuntimeError("Invalid pkgname: " + package)
# Try to find an APKBUILD with the exact pkgname we are looking for
path = _find_apkbuilds(skip_extra_repos).get(package)
if path:
logging.verbose(f"{package}: found apkbuild: {path}")
ret = path.parent
elif subpackages:
# No luck, take a guess what APKBUILD could have the package we are
# looking for as subpackage
guess = guess_main(package)
if guess:
# Parse the APKBUILD and verify if the guess was right
if _find_package_in_apkbuild(package, guess / "APKBUILD"):
ret = guess
else:
# Otherwise parse all APKBUILDs (takes time!), is the
# package we are looking for a subpackage of any of those?
for path_current in _find_apkbuilds().values():
if _find_package_in_apkbuild(package, path_current):
ret = path_current.parent
break
# Try to find an APKBUILD with the exact pkgname we are looking for
path = _find_apkbuilds(skip_extra_repos).get(package)
if path:
logging.verbose(f"{package}: found apkbuild: {path}")
ret = path.parent
elif subpackages:
# No luck, take a guess what APKBUILD could have the package we are
# looking for as subpackage
guess = guess_main(package)
if guess:
# Parse the APKBUILD and verify if the guess was right
if _find_package_in_apkbuild(package, guess / "APKBUILD"):
ret = guess
else:
# Otherwise parse all APKBUILDs (takes time!), is the
# package we are looking for a subpackage of any of those?
for path_current in _find_apkbuilds().values():
if _find_package_in_apkbuild(package, path_current):
ret = path_current.parent
break
# If we still didn't find anything, as last resort: assume our
# initial guess was right and the APKBUILD parser just didn't
# find the subpackage in there because it is behind shell logic
# that we don't parse.
if not ret:
ret = guess
# If we still didn't find anything, as last resort: assume our
# initial guess was right and the APKBUILD parser just didn't
# find the subpackage in there because it is behind shell logic
# that we don't parse.
if not ret:
ret = guess
# Crash when necessary
if ret is None and must_exist:
raise RuntimeError("Could not find aport for package: " +
package)
# Save result in cache (only if subpackage search was enabled)
if subpackages and not skip_extra_repos:
pmb.helpers.other.cache["find_aport"][package] = ret
return ret
@ -209,6 +204,8 @@ def find_optional(package: str) -> Optional[Path]:
return None
# The only caller with subpackages=False is ui.check_option()
@Cache("pkgname", subpackages=True)
def get_with_path(pkgname, must_exist=True, subpackages=True, skip_extra_repos=False) -> Tuple[Optional[Path], Optional[Dict[str, Any]]]:
"""Find and parse an APKBUILD file.

View file

@ -11,12 +11,13 @@ import os
import hashlib
from pmb.core import get_context
from pmb.core.arch import Arch
from pmb.core.pkgrepo import pkgrepo_names, pkgrepo_paths
from pmb.core.pkgrepo import pkgrepo_names
from pmb.helpers import logging
from pathlib import Path
from typing import List, Optional, Set
import pmb.config.pmaports
from pmb.meta import Cache
from pmb.types import PmbArgs
import pmb.helpers.http
import pmb.helpers.run
@ -126,7 +127,8 @@ def apkindex_files(arch: Optional[Arch]=None, user_repository=True,
return ret
def update(arch=None, force=False, existing_only=False):
@Cache("arch", force=False)
def update(arch: Optional[Arch]=None, force=False, existing_only=False):
"""Download the APKINDEX files for all URLs depending on the architectures.
:param arch: * one Alpine architecture name ("x86_64", "armhf", ...)

View file

@ -3,7 +3,6 @@
from pmb.helpers import logging
import pmb.build
from pmb.types import PmbArgs
import pmb.helpers.package
import pmb.helpers.pmaports

156
pmb/meta/__init__.py Normal file
View file

@ -0,0 +1,156 @@
# Copyright 2024 Caleb Connolly
# SPDX-License-Identifier: GPL-3.0-or-later
import copy
from typing import Callable, Dict, Optional
class Wrapper:
def __init__(self, cache: "Cache", func: Callable):
self.cache = cache
self.func = func
self.__module__ = func.__module__
self.__name__ = func.__name__
self.fhash = hash(func)
# When someone attempts to call a cached function, they'll
# actually end up here. We first check if we have a cached
# result and if not then we do the actual function call and
# cache it if applicable
def __call__(self, *args, **kwargs):
#print(f"Cache.wrapper({args}, {kwargs})")
# Build the cache key from the function arguments that we
# care about, which might be none of them
key = self.cache.build_key(self.func, *args, **kwargs)
# Don't cache
if key is None:
# print(f"Cache: {func.__name__} NULL key!")
return self.func(*args, **kwargs)
if key not in self.cache.cache:
try:
self.cache.cache[key] = self.func(*args, **kwargs)
except Exception as e:
raise e
elif self.cache.cache_deepcopy:
self.cache.cache[key] = copy.deepcopy(self.cache.cache[key])
#print(f"Cache: {func.__name__}({key})")
return self.cache.cache[key]
# This is a hacky workaround to let us fetch the hash of the
# underlying function, since hash(cached_func()) would just get
# the hash of the wrapper
def __getattribute__(self, name: str):
if name == "@realhash":
return self.fhash
return super(Wrapper, self).__getattribute__(name)
class Cache:
"""Cache decorator for caching function results based on parameters.
:param args: a list of function arguments to use as the cache key.
:param kwargs: these are arguments where we should only cache if the
function is called with the given value. For example, in pmb.build._package
we never want to use the cached result when called with force=True."""
_cache: Dict[int, "Cache"] = {}
def __init__(self, *args, cache_deepcopy=False, **kwargs):
for a in args:
if not isinstance(a, str):
raise ValueError(f"Cache key must be a string, not {type(a)}")
if len(args) != len(set(args)):
raise ValueError("Duplicate cache key properties")
# print(f"Cache.__init__({args})")
self.cache = {}
self.params = args
self.kwargs = kwargs
self.cache_deepcopy = cache_deepcopy
# Build the cache key, or return None to not cache in the case where
# we only cache when an argument has a specific value
def build_key(self, func: Callable, *args, **kwargs) -> Optional[str]:
key = "~"
# Easy case: cache irrelevant of arguments
if not self.params and not self.kwargs:
return key
#print(f"Cache.build_key({func}, {args}, {kwargs})")
argnames = list(func.__code__.co_varnames)[:func.__code__.co_argcount]
# Build a dictionary of the arguments passed to the function and their values
# including the default values
# This is a silly mess because I wanted to avoid using inspect, but the reflection
# stuff is not fun to work with...
_kwargs = {}
kwargs_start = len(argnames)-len(list(func.__defaults__ or [])) - 1
for i in range(len(argnames)-1, 0, -1):
arg = argnames[i]
if arg not in self.kwargs:
continue
if arg in kwargs:
_kwargs[argnames[i]] = kwargs[arg]
elif i >= kwargs_start:
#print(f"{func.__name__} -- {i}: {argnames[i]}")
_kwargs[argnames[i]] = list(func.__defaults__ or [])[kwargs_start + i - 1]
passed_args = dict(zip(argnames, args + tuple(_kwargs)))
#print(f"Cache.build_key({func}, {args}, {kwargs}) -- {passed_args}")
if self.kwargs:
for k, v in self.kwargs.items():
if k not in argnames:
raise ValueError(f"Cache key attribute {k} is not a valid parameter to {func.__name__}()")
# Get the value passed into the function, or the default value
# FIXME: could get a false hit if this is None
passed_val = passed_args.get(k, _kwargs.get(k))
if passed_val != v:
return None
else:
key += f"{k}=({v})~"
if self.params:
for i, param in enumerate(args + tuple(kwargs.keys())):
if argnames[i] in self.params[0]:
if param.__str__ != object.__str__:
key += f"{param}~"
else:
raise ValueError(f"Cache key argument {argnames[i]} to function"
f" {func.__name__} must be a stringable type")
return key
def __call__(self, func: Callable):
fhash = hash(func)
Cache._cache[fhash] = self
# print(f"Cache: {func.__module__}.{func.__name__} hash {fhash}")
# print(f"Cache.__call__({func})")
argnames = func.__code__.co_varnames
for a in self.params:
if a not in argnames:
raise ValueError(f"Cache key attribute {a} is not a valid parameter to {func.__name__}()")
return Wrapper(self, func)
def clear(self):
self.cache.clear()
@staticmethod
def clear_cache(func: Callable):
key = None
try:
key = getattr(func, "@realhash")
except AttributeError:
return
cache = getattr(Cache, "_cache")
if key not in cache:
return
cache[key].clear()

View file

@ -9,6 +9,7 @@ import re
from collections import OrderedDict
import pmb.config
from pmb.meta import Cache
from pmb.types import PmbArgs
import pmb.helpers.devices
import pmb.parse.version
@ -313,6 +314,7 @@ def _parse_subpackage(path, lines, apkbuild, subpackages, subpkg):
subpackages[subpkgname] = ret
@Cache("path")
def apkbuild(path: Path, check_pkgver=True, check_pkgname=True):
"""
Parse relevant information out of the APKBUILD file. This is not meant
@ -333,11 +335,6 @@ def apkbuild(path: Path, check_pkgver=True, check_pkgname=True):
if not path.exists():
raise FileNotFoundError(f"{path.relative_to(get_context().config.work)} not found!")
# Try to get a cached result first (we assume that the aports don't change
# in one pmbootstrap call)
if path in pmb.helpers.other.cache["apkbuild"]:
return pmb.helpers.other.cache["apkbuild"][path]
# Read the file and check line endings
lines = read_file(path)
@ -364,7 +361,6 @@ def apkbuild(path: Path, check_pkgver=True, check_pkgname=True):
f" APKBUILD: {path}")
# Fill cache
pmb.helpers.other.cache["apkbuild"][path] = ret
return ret

View file

@ -10,6 +10,7 @@ import os
import pmb.config
import pmb.helpers.other
import pmb.helpers.devices
from pmb.meta import Cache
# FIXME: It feels weird to handle this at parse time.
# we should instead have the Deviceinfo object store
@ -57,7 +58,7 @@ def _parse_kernel_suffix(info, device, kernel):
return ret
@Cache("device", "kernel")
def deviceinfo(device=None, kernel=None) -> "Deviceinfo":
"""
:param device: defaults to args.device
@ -69,9 +70,6 @@ def deviceinfo(device=None, kernel=None) -> "Deviceinfo":
if not kernel:
kernel = context.config.kernel
if device in pmb.helpers.other.cache["deviceinfo"]:
return pmb.helpers.other.cache["deviceinfo"][device]
path = pmb.helpers.devices.find_path(device, 'deviceinfo')
if not path:
raise RuntimeError(
@ -79,10 +77,7 @@ def deviceinfo(device=None, kernel=None) -> "Deviceinfo":
" start a new device port or to choose another device. It may have"
" been renamed, see <https://postmarketos.org/renamed>")
di = Deviceinfo(path, kernel)
pmb.helpers.other.cache["deviceinfo"][device] = di
return di
return Deviceinfo(path, kernel)
class Deviceinfo:
"""Variables from deviceinfo. Reference: <https://postmarketos.org/deviceinfo>