forked from Mirror/pmbootstrap
105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
# Copyright 2023 Oliver Smith
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
import json
|
|
from pathlib import Path
|
|
from pmb.core.context import get_context
|
|
from pmb.helpers import logging
|
|
import time
|
|
|
|
from pmb.types import PathString
|
|
import pmb.helpers.mount
|
|
import pmb.helpers.run
|
|
import pmb.chroot
|
|
from pmb.core import Chroot
|
|
|
|
|
|
def init() -> None:
|
|
if not Path("/sys/module/loop").is_dir():
|
|
pmb.helpers.run.root(["modprobe", "loop"])
|
|
for loopdevice in Path("/dev/").glob("loop*"):
|
|
if loopdevice.is_dir():
|
|
continue
|
|
pmb.helpers.mount.bind_file(loopdevice, Chroot.native() / loopdevice)
|
|
|
|
|
|
def mount(img_path: Path, _sector_size: int | None = None) -> Path:
|
|
"""
|
|
:param img_path: Path to the img file inside native chroot.
|
|
"""
|
|
logging.debug(f"(native) mount {img_path} (loop)")
|
|
|
|
# Try to mount multiple times (let the kernel module initialize #1594)
|
|
for i in range(5):
|
|
# Retry
|
|
if i > 0:
|
|
logging.debug("loop module might not be initialized yet, retry in one second...")
|
|
time.sleep(1)
|
|
|
|
# Mount and return on success
|
|
init()
|
|
|
|
sector_size = None
|
|
if _sector_size:
|
|
sector_size = str(_sector_size)
|
|
sector_size = sector_size or pmb.parse.deviceinfo().rootfs_image_sector_size
|
|
|
|
losetup_cmd: list[PathString] = ["losetup", "-f", img_path]
|
|
if sector_size:
|
|
losetup_cmd += ["-b", str(int(sector_size))]
|
|
|
|
pmb.chroot.root(losetup_cmd, check=False)
|
|
|
|
try:
|
|
return device_by_back_file(img_path)
|
|
except RuntimeError as e:
|
|
if i == 4:
|
|
raise e
|
|
|
|
raise AssertionError("This should never be reached")
|
|
|
|
|
|
def device_by_back_file(back_file: Path) -> Path:
|
|
"""
|
|
Get the /dev/loopX device that points to a specific image file.
|
|
"""
|
|
|
|
# Get list from losetup
|
|
losetup_output = pmb.chroot.root(["losetup", "--json", "--list"], output_return=True)
|
|
if not losetup_output:
|
|
raise RuntimeError("losetup failed")
|
|
|
|
# Find the back_file
|
|
losetup = json.loads(losetup_output)
|
|
for loopdevice in losetup["loopdevices"]:
|
|
if loopdevice["back-file"] is not None and Path(loopdevice["back-file"]) == back_file:
|
|
return Path(loopdevice["name"])
|
|
raise RuntimeError(f"Failed to find loop device for {back_file}")
|
|
|
|
|
|
def umount(img_path: Path) -> None:
|
|
"""
|
|
:param img_path: Path to the img file inside native chroot.
|
|
"""
|
|
device: Path
|
|
try:
|
|
device = device_by_back_file(img_path)
|
|
except RuntimeError:
|
|
return
|
|
logging.debug(f"(native) umount {device}")
|
|
pmb.chroot.root(["losetup", "-d", device])
|
|
|
|
|
|
def detach_all() -> None:
|
|
"""
|
|
Detach all loop devices used by pmbootstrap
|
|
"""
|
|
losetup_output = pmb.helpers.run.root(["losetup", "--json", "--list"], output_return=True)
|
|
if not losetup_output:
|
|
return
|
|
losetup = json.loads(losetup_output)
|
|
work = get_context().config.work
|
|
for loopdevice in losetup["loopdevices"]:
|
|
if Path(loopdevice["back-file"]).is_relative_to(work):
|
|
pmb.helpers.run.root(["kpartx", "-d", loopdevice["name"]], check=False)
|
|
pmb.helpers.run.root(["losetup", "-d", loopdevice["name"]])
|
|
return
|