mirror of
https://gitlab.postmarketos.org/postmarketOS/pmbootstrap.git
synced 2025-07-12 19:09:56 +03:00
This fixes an issue where the local apk index has two packages in it that provide the same thing, but this function was returning only the one with the shortest name... which could be very bad :) https://gitlab.postmarketos.org/postmarketOS/pmaports/-/merge_requests/6475#note_480167 In the process, add tests for pmb.parse.apkindex.package function. Some of the tests would fail without this patch, surfacing a bug where priorities are not considered, but should, and that we are now fixing. Co-authored-by: Clayton Craft <craftyguy@postmarketos.org> Part-of: https://gitlab.postmarketos.org/postmarketOS/pmbootstrap/-/merge_requests/2594
503 lines
17 KiB
Python
503 lines
17 KiB
Python
# Copyright 2023 Oliver Smith
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
import collections
|
|
from typing import cast, overload, Any, Literal
|
|
from collections.abc import Sequence
|
|
from pmb.core.apkindex_block import ApkindexBlock
|
|
from pmb.core.arch import Arch
|
|
from pmb.helpers import logging
|
|
from pathlib import Path
|
|
import tarfile
|
|
import pmb.chroot.apk
|
|
import pmb.helpers.package
|
|
import pmb.helpers.repo
|
|
import pmb.parse.version
|
|
|
|
apkindex_map = {
|
|
"A": "arch",
|
|
"D": "depends",
|
|
"o": "origin",
|
|
"P": "pkgname",
|
|
"p": "provides",
|
|
"k": "provider_priority",
|
|
"t": "timestamp",
|
|
"V": "version",
|
|
}
|
|
|
|
required_apkindex_keys = ["arch", "pkgname", "version"]
|
|
|
|
|
|
def parse_next_block(path: Path, lines: list[str]) -> ApkindexBlock | None:
|
|
"""Parse the next block in an APKINDEX.
|
|
|
|
:param path: to the APKINDEX.tar.gz
|
|
:param start: current index in lines, gets increased in this
|
|
function. Wrapped into a list, so it can be modified
|
|
"by reference". Example: [5]
|
|
:param lines: all lines from the "APKINDEX" file inside the archive
|
|
:returns: ApkindexBlock
|
|
:returns: None, when there are no more blocks
|
|
"""
|
|
# Parse until we hit an empty line or end of file
|
|
ret: dict[str, Any] = {}
|
|
required_found = 0 # Count the required keys we found
|
|
line = ""
|
|
while len(lines):
|
|
# We parse backwards for performance (pop(0) is super slow)
|
|
line = lines.pop()
|
|
if not line:
|
|
continue
|
|
# Parse keys from the mapping
|
|
k = line[0]
|
|
key = apkindex_map.get(k, None)
|
|
|
|
# The checksum key is always the FIRST in the block, so when we find
|
|
# it we know we're done.
|
|
if k == "C":
|
|
break
|
|
if key:
|
|
if key in ret:
|
|
raise RuntimeError(f"Key {key} specified twice in block: {ret}, file: {path}")
|
|
if key in required_apkindex_keys:
|
|
required_found += 1
|
|
ret[key] = line[2:]
|
|
|
|
# Format and return the block
|
|
if not len(lines) and not ret:
|
|
return None
|
|
|
|
# Check for required keys
|
|
if required_found != len(required_apkindex_keys):
|
|
for key in required_apkindex_keys:
|
|
if key not in ret:
|
|
raise RuntimeError(f"Missing required key '{key}' in block {ret}, file: {path}")
|
|
raise RuntimeError(
|
|
f"Expected {len(required_apkindex_keys)} required keys,"
|
|
f" but found {required_found} in block: {ret}, file: {path}"
|
|
)
|
|
|
|
# Format optional lists
|
|
for key in ["provides", "depends"]:
|
|
if key in ret and ret[key] != "":
|
|
# Ignore all operators for now
|
|
values = ret[key].split(" ")
|
|
ret[key] = []
|
|
for value in values:
|
|
for operator in [">", "=", "<", "~"]:
|
|
if operator in value:
|
|
value = value.split(operator)[0]
|
|
break
|
|
ret[key].append(value)
|
|
else:
|
|
ret[key] = []
|
|
provider_priority = ret.get("provider_priority")
|
|
if provider_priority:
|
|
if not provider_priority.isdigit():
|
|
raise RuntimeError(
|
|
f"Invalid provider_priority: '{provider_priority}' parsing block {ret}"
|
|
)
|
|
provider_priority = int(provider_priority)
|
|
else:
|
|
provider_priority = None
|
|
|
|
return ApkindexBlock(
|
|
arch=Arch.from_str(ret["arch"]),
|
|
depends=ret["depends"],
|
|
origin=ret.get("origin"),
|
|
pkgname=ret["pkgname"],
|
|
provides=ret["provides"],
|
|
provider_priority=provider_priority,
|
|
timestamp=ret.get("timestamp"),
|
|
version=ret["version"],
|
|
)
|
|
|
|
|
|
@overload
|
|
def parse_add_block(
|
|
ret: dict[str, ApkindexBlock],
|
|
block: ApkindexBlock,
|
|
alias: str | None = ...,
|
|
multiple_providers: Literal[False] = ...,
|
|
) -> None: ...
|
|
|
|
|
|
@overload
|
|
def parse_add_block(
|
|
ret: dict[str, dict[str, ApkindexBlock]],
|
|
block: ApkindexBlock,
|
|
alias: str | None = ...,
|
|
multiple_providers: Literal[True] = ...,
|
|
) -> None: ...
|
|
|
|
|
|
@overload
|
|
def parse_add_block(
|
|
ret: dict[str, ApkindexBlock] | dict[str, dict[str, ApkindexBlock]],
|
|
block: ApkindexBlock,
|
|
alias: str | None = ...,
|
|
multiple_providers: bool = ...,
|
|
) -> None: ...
|
|
|
|
|
|
def parse_add_block(
|
|
ret: dict[str, ApkindexBlock] | dict[str, dict[str, ApkindexBlock]],
|
|
block: ApkindexBlock,
|
|
alias: str | None = None,
|
|
multiple_providers: bool = True,
|
|
) -> None:
|
|
"""Add one block to the return dictionary of parse().
|
|
|
|
:param ret: dictionary of all packages in the APKINDEX that is
|
|
getting built right now. This function will extend it.
|
|
:param block: return value from parse_next_block().
|
|
:param alias: defaults to the pkgname, could be an alias from the
|
|
"provides" list.
|
|
:param multiple_providers: assume that there are more than one provider for
|
|
the alias. This makes sense when parsing the
|
|
APKINDEX files from a repository (#1122), but
|
|
not when parsing apk's installed packages DB.
|
|
"""
|
|
# Defaults
|
|
pkgname = block.pkgname
|
|
alias = alias or pkgname
|
|
|
|
# Get an existing block with the same alias
|
|
block_old = None
|
|
if multiple_providers:
|
|
ret = cast(dict[str, dict[str, ApkindexBlock]], ret)
|
|
if alias in ret and pkgname in ret[alias]:
|
|
picked_aliases = ret[alias]
|
|
if not isinstance(picked_aliases, dict):
|
|
raise AssertionError
|
|
block_old = picked_aliases[pkgname]
|
|
elif not multiple_providers:
|
|
if alias in ret:
|
|
ret = cast(dict[str, ApkindexBlock], ret)
|
|
picked_alias = ret[alias]
|
|
if not isinstance(picked_alias, ApkindexBlock):
|
|
raise AssertionError
|
|
block_old = picked_alias
|
|
|
|
# Ignore the block, if the block we already have has a higher version
|
|
if block_old:
|
|
version_old = block_old.version
|
|
version_new = block.version
|
|
if pmb.parse.version.compare(version_old, version_new) == 1:
|
|
return
|
|
|
|
# Add it to the result set
|
|
if multiple_providers:
|
|
ret = cast(dict[str, dict[str, ApkindexBlock]], ret)
|
|
if alias not in ret:
|
|
ret[alias] = {}
|
|
picked_aliases = cast(dict[str, ApkindexBlock], ret[alias])
|
|
picked_aliases[pkgname] = block
|
|
else:
|
|
ret = cast(dict[str, ApkindexBlock], ret)
|
|
ret[alias] = block
|
|
|
|
|
|
@overload
|
|
def parse(path: Path, multiple_providers: Literal[False] = ...) -> dict[str, ApkindexBlock]: ...
|
|
|
|
|
|
@overload
|
|
def parse(
|
|
path: Path, multiple_providers: Literal[True] = ...
|
|
) -> dict[str, dict[str, ApkindexBlock]]: ...
|
|
|
|
|
|
def parse(
|
|
path: Path, multiple_providers: bool = True
|
|
) -> dict[str, ApkindexBlock] | dict[str, dict[str, ApkindexBlock]]:
|
|
r"""Parse an APKINDEX.tar.gz file, and return its content as dictionary.
|
|
|
|
:param path: path to an APKINDEX.tar.gz file or apk package database
|
|
(almost the same format, but not compressed).
|
|
:param multiple_providers: assume that there are more than one provider for
|
|
the alias. This makes sense when parsing the
|
|
APKINDEX files from a repository (#1122), but
|
|
not when parsing apk's installed packages DB.
|
|
:returns: (without multiple_providers)
|
|
|
|
Generic format:
|
|
``{ pkgname: ApkindexBlock, ... }``
|
|
|
|
Example:
|
|
``{ "postmarketos-mkinitfs": ApkindexBlock, "so:libGL.so.1": ApkindexBlock, ...}``
|
|
|
|
:returns: (with multiple_providers)
|
|
|
|
Generic format:
|
|
``{ provide: { pkgname: ApkindexBlock, ... }, ... }``
|
|
|
|
Example:
|
|
``{ "postmarketos-mkinitfs": {"postmarketos-mkinitfs": ApkindexBlock},"so:libGL.so.1": {"mesa-egl": ApkindexBlock, "libhybris": ApkindexBlock}, ...}``
|
|
|
|
*NOTE:* ``block`` is the return value from ``parse_next_block()`` above.
|
|
|
|
"""
|
|
# Require the file to exist
|
|
if not path.is_file():
|
|
logging.verbose(
|
|
"NOTE: APKINDEX not found, assuming no binary packages"
|
|
f" exist for that architecture: {path}"
|
|
)
|
|
return {}
|
|
|
|
# Try to get a cached result first
|
|
lastmod = path.lstat().st_mtime
|
|
_cache_key = "multiple" if multiple_providers else "single"
|
|
key = cache_key(path)
|
|
if key in pmb.helpers.other.cache["apkindex"]:
|
|
cache = pmb.helpers.other.cache["apkindex"][key]
|
|
if cache["lastmod"] == lastmod:
|
|
if _cache_key in cache:
|
|
return cache[_cache_key]
|
|
else:
|
|
clear_cache(path)
|
|
|
|
# Read all lines
|
|
lines: Sequence[str]
|
|
if tarfile.is_tarfile(path):
|
|
with tarfile.open(path, "r:gz") as tar:
|
|
with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr]
|
|
lines = handle.read().decode().splitlines()
|
|
else:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
lines = handle.read().splitlines()
|
|
|
|
# The APKINDEX might be empty, for example if you run "pmbootstrap index" and have no local
|
|
# packages
|
|
if not lines:
|
|
return {}
|
|
|
|
# Parse the whole APKINDEX file
|
|
ret: dict[str, ApkindexBlock] = collections.OrderedDict()
|
|
if lines[-1] == "\n":
|
|
lines.pop() # Strip the trailing newline
|
|
while True:
|
|
block = parse_next_block(path, lines)
|
|
if not block:
|
|
break
|
|
|
|
# Skip virtual packages
|
|
if block.timestamp is None:
|
|
logging.verbose(f"Skipped virtual package {block} in file: {path}")
|
|
continue
|
|
|
|
# Add the next package and all aliases
|
|
parse_add_block(ret, block, None, multiple_providers)
|
|
if block.provides is not None:
|
|
for alias in block.provides:
|
|
parse_add_block(ret, block, alias, multiple_providers)
|
|
|
|
# Update the cache
|
|
key = cache_key(path)
|
|
if key not in pmb.helpers.other.cache["apkindex"]:
|
|
pmb.helpers.other.cache["apkindex"][key] = {"lastmod": lastmod}
|
|
pmb.helpers.other.cache["apkindex"][key][_cache_key] = ret
|
|
return ret
|
|
|
|
|
|
def parse_blocks(path: Path) -> list[ApkindexBlock]:
|
|
"""
|
|
Read all blocks from an APKINDEX.tar.gz into a list.
|
|
|
|
:path: full path to the APKINDEX.tar.gz file.
|
|
:returns: all blocks in the APKINDEX, without restructuring them by
|
|
pkgname or removing duplicates with lower versions (use
|
|
parse() if you need these features).
|
|
"""
|
|
# Parse all lines
|
|
with tarfile.open(path, "r:gz") as tar:
|
|
with tar.extractfile(tar.getmember("APKINDEX")) as handle: # type:ignore[union-attr]
|
|
lines = handle.read().decode().splitlines()
|
|
|
|
# Parse lines into blocks
|
|
ret: list[ApkindexBlock] = []
|
|
while True:
|
|
block = pmb.parse.apkindex.parse_next_block(path, lines)
|
|
if not block:
|
|
return ret
|
|
ret.append(block)
|
|
|
|
|
|
# FIXME: come up with something better here...
|
|
def cache_key(path: Path) -> int:
|
|
return hash(path)
|
|
|
|
|
|
def clear_cache(path: Path) -> bool:
|
|
"""
|
|
Clear the APKINDEX parsing cache.
|
|
|
|
:returns: True on successful deletion, False otherwise
|
|
"""
|
|
key = cache_key(path)
|
|
logging.verbose(f"Clear APKINDEX cache for: {key}")
|
|
if key in pmb.helpers.other.cache["apkindex"]:
|
|
del pmb.helpers.other.cache["apkindex"][key]
|
|
return True
|
|
else:
|
|
logging.verbose(
|
|
"Nothing to do, path was not in cache:"
|
|
+ str(pmb.helpers.other.cache["apkindex"].keys())
|
|
)
|
|
return False
|
|
|
|
|
|
def providers(
|
|
package: str,
|
|
arch: Arch | None = None,
|
|
must_exist: bool = True,
|
|
indexes: list[Path] | None = None,
|
|
user_repository: bool = True,
|
|
) -> dict[str, ApkindexBlock]:
|
|
"""
|
|
Get all packages, which provide one package.
|
|
|
|
:param package: of which you want to have the providers
|
|
:param arch: defaults to native arch, only relevant for indexes=None
|
|
:param must_exist: When set to true, raise an exception when the package is
|
|
not provided at all.
|
|
:param indexes: list of APKINDEX.tar.gz paths, defaults to all index files
|
|
(depending on arch)
|
|
:param user_repository: add path to index of locally built packages
|
|
:returns: list of parsed packages. Example for package="so:libGL.so.1":
|
|
``{"mesa-egl": ApkindexBlock, "libhybris": ApkindexBlock}``
|
|
"""
|
|
if not indexes:
|
|
indexes = pmb.helpers.repo.apkindex_files(arch, user_repository=user_repository)
|
|
|
|
package = pmb.helpers.package.remove_operators(package)
|
|
|
|
ret: dict[str, ApkindexBlock] = collections.OrderedDict()
|
|
for path in indexes:
|
|
# Skip indexes not providing the package
|
|
index_packages = parse(path)
|
|
if package not in index_packages:
|
|
continue
|
|
|
|
indexed_package = index_packages[package]
|
|
|
|
if isinstance(indexed_package, ApkindexBlock):
|
|
raise AssertionError
|
|
|
|
# Iterate over found providers
|
|
for provider_pkgname, provider in indexed_package.items():
|
|
# Skip lower versions of providers we already found
|
|
version = provider.version
|
|
if provider_pkgname in ret:
|
|
version_last = ret[provider_pkgname].version
|
|
if pmb.parse.version.compare(version, version_last) == -1:
|
|
logging.verbose(
|
|
f"{package}: provided by: {provider_pkgname}-{version}"
|
|
f"in {path} (but {version_last} is higher)"
|
|
)
|
|
continue
|
|
|
|
# Add the provider to ret
|
|
logging.verbose(f"{package}: provided by: {provider_pkgname}-{version} in {path}")
|
|
ret[provider_pkgname] = provider
|
|
|
|
if ret == {} and must_exist:
|
|
import os
|
|
|
|
logging.debug(f"Searched in APKINDEX files: {', '.join([os.fspath(x) for x in indexes])}")
|
|
raise RuntimeError("Could not find package '" + package + "'!")
|
|
|
|
return ret
|
|
|
|
|
|
def provider_highest_priority(
|
|
providers: dict[str, ApkindexBlock], pkgname: str
|
|
) -> dict[str, ApkindexBlock]:
|
|
"""Get the provider(s) with the highest provider_priority and log a message.
|
|
|
|
:param providers: returned dict from providers(), must not be empty
|
|
:param pkgname: the package name we are interested in (for the log message)
|
|
"""
|
|
max_priority = 0
|
|
priority_providers: collections.OrderedDict[str, ApkindexBlock] = collections.OrderedDict()
|
|
for provider_name, provider in providers.items():
|
|
priority = int(-1 if provider.provider_priority is None else provider.provider_priority)
|
|
if priority > max_priority:
|
|
priority_providers.clear()
|
|
max_priority = priority
|
|
if priority == max_priority:
|
|
priority_providers[provider_name] = provider
|
|
|
|
if priority_providers:
|
|
logging.debug(
|
|
f"{pkgname}: picked provider(s) with highest priority "
|
|
f"{max_priority}: {', '.join(priority_providers.keys())}"
|
|
)
|
|
return priority_providers
|
|
|
|
# None of the providers seems to have a provider_priority defined
|
|
return providers
|
|
|
|
|
|
def provider_shortest(providers: dict[str, ApkindexBlock], pkgname: str) -> ApkindexBlock:
|
|
"""Get the provider with the shortest pkgname and log a message. In most cases
|
|
this should be sufficient, e.g. 'mesa-purism-gc7000-egl, mesa-egl' or
|
|
'gtk+2.0-maemo, gtk+2.0'.
|
|
|
|
:param providers: returned dict from providers(), must not be empty
|
|
:param pkgname: the package name we are interested in (for the log message)
|
|
"""
|
|
ret = min(list(providers.keys()), key=len)
|
|
if len(providers) != 1:
|
|
logging.debug(
|
|
f"{pkgname}: has multiple providers ("
|
|
f"{', '.join(providers.keys())}), picked shortest: {ret}"
|
|
)
|
|
return providers[ret]
|
|
|
|
|
|
# This can't be cached because the APKINDEX can change during pmbootstrap build!
|
|
def package(
|
|
package: str,
|
|
arch: Arch | None = None,
|
|
must_exist: bool = True,
|
|
indexes: list[Path] | None = None,
|
|
user_repository: bool = True,
|
|
) -> ApkindexBlock | None:
|
|
"""
|
|
Get a specific package's data from an apkindex.
|
|
|
|
:param package: of which you want to have the apkindex data
|
|
:param arch: defaults to native arch, only relevant for indexes=None
|
|
:param must_exist: When set to true, raise an exception when the package is
|
|
not provided at all.
|
|
:param indexes: list of APKINDEX.tar.gz paths, defaults to all index files
|
|
(depending on arch)
|
|
:param user_repository: add path to index of locally built packages
|
|
:returns: ApkindexBlock or None when the package was not found.
|
|
"""
|
|
# Provider with the same package
|
|
package_providers = providers(
|
|
package, arch, must_exist, indexes, user_repository=user_repository
|
|
)
|
|
if package in package_providers:
|
|
return package_providers[package]
|
|
|
|
if package_providers:
|
|
providers_priority = provider_highest_priority(package_providers, package)
|
|
num_providers = len(providers_priority)
|
|
# Only one provider with max priority: return it
|
|
if num_providers == 1:
|
|
return next(iter(providers_priority.values()))
|
|
# Multiple providers with max priority: let follow-on logic decide
|
|
elif num_providers > 1:
|
|
package_providers = providers_priority
|
|
|
|
# Any provider
|
|
if package_providers:
|
|
return pmb.parse.apkindex.provider_shortest(package_providers, package)
|
|
|
|
# No provider
|
|
if must_exist:
|
|
raise RuntimeError("Package '" + package + "' not found in any APKINDEX.")
|
|
return None
|