Add option to auto-select the best mirror

Signed-off-by: Jens Reidel <adrian@travitia.xyz>
Part-of: https://gitlab.postmarketos.org/postmarketOS/pmbootstrap/-/merge_requests/2559
This commit is contained in:
Jens Reidel 2025-02-27 03:14:44 +01:00 committed by Oliver Smith
parent 987793bb24
commit ec0163ce63
No known key found for this signature in database
GPG key ID: 5AE7F5513E0885CB
2 changed files with 46 additions and 11 deletions

View file

@ -11,6 +11,7 @@ import glob
import json import json
import os import os
import shutil import shutil
import urllib
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -612,7 +613,7 @@ def ask_for_additional_options(config: Config) -> None:
def ask_for_mirror() -> str: def ask_for_mirror() -> str:
regex = "^[1-9][0-9]*$" # single non-zero number only regex = "^(?:[1-9][0-9]*|best)$" # single non-zero number only
json_path = pmb.helpers.http.download( json_path = pmb.helpers.http.download(
"https://postmarketos.org/mirrors.json", "pmos_mirrors", cache=False "https://postmarketos.org/mirrors.json", "pmos_mirrors", cache=False
@ -627,6 +628,7 @@ def ask_for_mirror() -> str:
for key in keys: for key in keys:
logging.info(f"[{i}]\t{key} ({mirrors[key]['location']})") logging.info(f"[{i}]\t{key} ({mirrors[key]['location']})")
i += 1 i += 1
logging.info("choose 'best' to select the one closest to you")
urls = [] urls = []
for key in keys: for key in keys:
@ -645,23 +647,41 @@ def ask_for_mirror() -> str:
if len(link_list) > 0: if len(link_list) > 0:
urls.append(link_list[0]) urls.append(link_list[0])
mirror_indexes = [] mirror_index = "best"
mirror = get_context().config.mirrors["pmaports"] mirror = get_context().config.mirrors["pmaports"]
for i in range(len(urls)): for i in range(len(urls)):
if urls[i] == mirror: if urls[i] == mirror:
mirror_indexes.append(str(i + 1)) mirror_index = str(i + 1)
break break
mirror = "" mirror = ""
# require one valid mirror index selected by user # require one valid mirror index selected by user
while len(mirror) == 0: while len(mirror) == 0:
answer = pmb.helpers.cli.ask( answer = pmb.helpers.cli.ask("Select a mirror", None, mirror_index, validation_regex=regex)
"Select a mirror", None, ",".join(mirror_indexes), validation_regex=regex if answer == "best":
) timings = []
i = int(answer) # determine the best available mirror
if i < 1 or i > len(urls): for url in urls:
logging.info("You must select one valid mirror!") try:
mirror = urls[i - 1] timings.append((pmb.helpers.http.measure_latency(url), url))
except urllib.error.HTTPError:
logging.warning(f"{url} was unavailable, skipping!")
continue
timings.sort(key=lambda i: i[0])
try:
latency, mirror = timings[0]
logging.info(
f"Best mirror was {mirror} with a latency of {round(latency * 1000, 2)}ms"
)
except IndexError:
logging.error(
"No mirror was available! Please check your internet connection. Falling back to the main mirror"
)
mirror = urls[0]
else:
i = int(answer)
if i < 1 or i > len(urls):
logging.info("You must select one valid mirror!")
mirror = urls[i - 1]
return mirror return mirror

View file

@ -6,6 +6,7 @@ from pmb.helpers import logging
import os import os
from pathlib import Path from pathlib import Path
import shutil import shutil
import time
import urllib.request import urllib.request
from typing import Any, Literal, overload from typing import Any, Literal, overload
import pmb.helpers.cli import pmb.helpers.cli
@ -149,3 +150,17 @@ def retrieve_json(url: str, headers: dict[str, str] | None = None) -> Any:
See retrieve() for the meaning of the parameters. See retrieve() for the meaning of the parameters.
""" """
return json.loads(retrieve(url, headers, False)) return json.loads(retrieve(url, headers, False))
def measure_latency(url: str) -> float:
"""Requests a URL and returns the total time it took to perform the request.
:param url: the http(s) address of the resource to fetch
:returns: seconds it took to complete the request
"""
req = urllib.request.Request(url)
start_time = time.monotonic()
with urllib.request.urlopen(req) as response:
response.read()
return time.monotonic() - start_time