mirror of
https://gitlab.postmarketos.org/postmarketOS/pmbootstrap.git
synced 2025-07-13 11:29:46 +03:00
1175 lines
38 KiB
Python
1175 lines
38 KiB
Python
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
# FIXME: this file is wayyy off lol
|
|
# ruff: noqa
|
|
|
|
"""
|
|
This is a standalone implementation of sandboxing which is used by mkosi. Note that this is
|
|
invoked many times while building the image and as a result, the performance of this script has a
|
|
substantial impact on the performance of mkosi itself. To keep the runtime of this script to a
|
|
minimum, please don't import any extra modules if it can be avoided.
|
|
|
|
"""
|
|
|
|
import ctypes
|
|
import os
|
|
import sys
|
|
import warnings # noqa: F401 (loaded lazily by os.execvp() which happens too late)
|
|
|
|
__version__ = "26~devel"
|
|
|
|
# The following constants are taken from the Linux kernel headers.
|
|
AT_EMPTY_PATH = 0x1000
|
|
AT_FDCWD = -100
|
|
AT_NO_AUTOMOUNT = 0x800
|
|
AT_RECURSIVE = 0x8000
|
|
AT_SYMLINK_NOFOLLOW = 0x100
|
|
BTRFS_SUPER_MAGIC = 0x9123683E
|
|
CAP_NET_ADMIN = 12
|
|
CAP_SYS_ADMIN = 21
|
|
CLONE_NEWIPC = 0x08000000
|
|
CLONE_NEWPID = 0x20000000
|
|
CLONE_NEWNET = 0x40000000
|
|
CLONE_NEWNS = 0x00020000
|
|
CLONE_NEWUSER = 0x10000000
|
|
EBADF = 9
|
|
UNSHARE_EPERM_MSGEPERM = 1
|
|
EPERM = 1
|
|
ENOENT = 2
|
|
ENOSYS = 38
|
|
F_DUPFD = 0
|
|
F_GETFD = 1
|
|
FS_IOC_GETFLAGS = 0x80086601
|
|
FS_IOC_SETFLAGS = 0x40086602
|
|
FS_NOCOW_FL = 0x00800000
|
|
LINUX_CAPABILITY_U32S_3 = 2
|
|
LINUX_CAPABILITY_VERSION_3 = 0x20080522
|
|
MNT_DETACH = 2
|
|
MOUNT_ATTR_RDONLY = 0x00000001
|
|
MOUNT_ATTR_NOSUID = 0x00000002
|
|
MOUNT_ATTR_NODEV = 0x00000004
|
|
MOUNT_ATTR_NOEXEC = 0x00000008
|
|
MOUNT_ATTR_SIZE_VER0 = 32
|
|
MOVE_MOUNT_F_EMPTY_PATH = 0x00000004
|
|
MS_BIND = 4096
|
|
MS_MOVE = 8192
|
|
MS_REC = 16384
|
|
MS_SHARED = 1 << 20
|
|
MS_SLAVE = 1 << 19
|
|
NR_mount_setattr = 442
|
|
NR_move_mount = 429
|
|
NR_open_tree = 428
|
|
OPEN_TREE_CLOEXEC = os.O_CLOEXEC
|
|
OPEN_TREE_CLONE = 1
|
|
OVERLAYFS_SUPER_MAGIC = 0x794C7630
|
|
PR_CAP_AMBIENT = 47
|
|
PR_CAP_AMBIENT_RAISE = 2
|
|
# These definitions are taken from the libseccomp headers
|
|
SCMP_ACT_ALLOW = 0x7FFF0000
|
|
SCMP_ACT_ERRNO = 0x00050000
|
|
SD_LISTEN_FDS_START = 3
|
|
SIGSTOP = 19
|
|
|
|
|
|
class mount_attr(ctypes.Structure):
|
|
_fields_ = [
|
|
("attr_set", ctypes.c_uint64),
|
|
("attr_clr", ctypes.c_uint64),
|
|
("propagation", ctypes.c_uint64),
|
|
("userns_fd", ctypes.c_uint64),
|
|
]
|
|
|
|
|
|
class cap_user_header_t(ctypes.Structure):
|
|
# __user_cap_header_struct
|
|
_fields_ = [
|
|
("version", ctypes.c_uint32),
|
|
("pid", ctypes.c_int),
|
|
]
|
|
|
|
|
|
class cap_user_data_t(ctypes.Structure):
|
|
# __user_cap_data_struct
|
|
_fields_ = [
|
|
("effective", ctypes.c_uint32),
|
|
("permitted", ctypes.c_uint32),
|
|
("inheritable", ctypes.c_uint32),
|
|
]
|
|
|
|
|
|
libc = ctypes.CDLL(None, use_errno=True)
|
|
|
|
libc.syscall.restype = ctypes.c_long
|
|
libc.unshare.argtypes = (ctypes.c_int,)
|
|
libc.statfs.argtypes = (ctypes.c_char_p, ctypes.c_void_p)
|
|
libc.eventfd.argtypes = (ctypes.c_int, ctypes.c_int)
|
|
libc.mount.argtypes = (
|
|
ctypes.c_char_p,
|
|
ctypes.c_char_p,
|
|
ctypes.c_char_p,
|
|
ctypes.c_ulong,
|
|
ctypes.c_char_p,
|
|
)
|
|
libc.pivot_root.argtypes = (ctypes.c_char_p, ctypes.c_char_p)
|
|
libc.umount2.argtypes = (ctypes.c_char_p, ctypes.c_int)
|
|
libc.capget.argtypes = (ctypes.c_void_p, ctypes.c_void_p)
|
|
libc.capset.argtypes = (ctypes.c_void_p, ctypes.c_void_p)
|
|
libc.fcntl.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_int)
|
|
|
|
|
|
def terminal_is_dumb() -> bool:
|
|
return not sys.stdout.isatty() or not sys.stderr.isatty() or os.getenv("TERM", "") == "dumb"
|
|
|
|
|
|
class Style:
|
|
# fmt: off
|
|
bold: str = "\033[0;1;39m" if not terminal_is_dumb() else ""
|
|
blue: str = "\033[0;1;34m" if not terminal_is_dumb() else ""
|
|
gray: str = "\033[0;38;5;245m" if not terminal_is_dumb() else ""
|
|
red: str = "\033[31;1m" if not terminal_is_dumb() else ""
|
|
yellow: str = "\033[33;1m" if not terminal_is_dumb() else ""
|
|
reset: str = "\033[0m" if not terminal_is_dumb() else ""
|
|
# fmt: on
|
|
|
|
|
|
ENOSYS_MSG = f"""\
|
|
{Style.red}mkosi was unable to invoke the {{syscall}}() system call.{Style.reset}
|
|
This probably means either the system call is not implemented by the running kernel version ({{kver}}) or the
|
|
system call is prohibited via seccomp if mkosi is being executed inside a containerized environment.\
|
|
"""
|
|
|
|
|
|
def oserror(syscall: str, filename: str = "") -> None:
|
|
if ctypes.get_errno() == ENOSYS:
|
|
print(ENOSYS_MSG.format(syscall=syscall, kver=os.uname().version), file=sys.stderr)
|
|
|
|
raise OSError(ctypes.get_errno(), os.strerror(ctypes.get_errno()), filename or None)
|
|
|
|
|
|
def unshare(flags: int) -> None:
|
|
if libc.unshare(flags) < 0:
|
|
oserror("unshare")
|
|
|
|
|
|
def statfs(path: str) -> int:
|
|
# struct statfs is 120 bytes, which equals 15 longs. Since we only care about the first field
|
|
# and the first field is of type long, we avoid declaring the full struct by just passing an
|
|
# array of 15 longs as the output argument.
|
|
buffer = (ctypes.c_long * 15)()
|
|
|
|
if libc.statfs(path.encode(), ctypes.byref(buffer)) < 0:
|
|
oserror("statfs", path)
|
|
|
|
return int(buffer[0])
|
|
|
|
|
|
def mount(src: str, dst: str, type: str, flags: int, options: str) -> None:
|
|
srcb = src.encode() if src else None
|
|
typeb = type.encode() if type else None
|
|
optionsb = options.encode() if options else None
|
|
if libc.mount(srcb, dst.encode(), typeb, flags, optionsb) < 0:
|
|
oserror("mount", dst)
|
|
|
|
|
|
def umount2(path: str, flags: int = 0) -> None:
|
|
if libc.umount2(path.encode(), flags) < 0:
|
|
oserror("umount2", path)
|
|
|
|
|
|
def cap_permitted_to_ambient() -> None:
|
|
"""
|
|
When unsharing a user namespace and mapping the current user to itself, the user has a full
|
|
set of capabilities in the user namespace. This allows the user to do mounts after unsharing a
|
|
mount namespace for example. However, these capabilities are lost again when the user executes
|
|
a subprocess. As we also want subprocesses invoked by the user to be able to mount stuff, we
|
|
make sure the capabilities are inherited by adding all the user's capabilities to the inherited
|
|
and ambient capabilities set, which makes sure that they are passed down to subprocesses.
|
|
"""
|
|
header = cap_user_header_t(LINUX_CAPABILITY_VERSION_3, 0)
|
|
payload = (cap_user_data_t * LINUX_CAPABILITY_U32S_3)()
|
|
|
|
if libc.capget(ctypes.addressof(header), ctypes.byref(payload)) < 0:
|
|
oserror("capget")
|
|
|
|
payload[0].inheritable = payload[0].permitted
|
|
payload[1].inheritable = payload[1].permitted
|
|
|
|
if libc.capset(ctypes.addressof(header), ctypes.byref(payload)) < 0:
|
|
oserror("capset")
|
|
|
|
effective = payload[1].effective << 32 | payload[0].effective
|
|
|
|
with open("/proc/sys/kernel/cap_last_cap", "rb") as f:
|
|
last_cap = int(f.read())
|
|
|
|
libc.prctl.argtypes = (
|
|
ctypes.c_int,
|
|
ctypes.c_ulong,
|
|
ctypes.c_ulong,
|
|
ctypes.c_ulong,
|
|
ctypes.c_ulong,
|
|
)
|
|
|
|
for cap in range(ctypes.sizeof(ctypes.c_uint64) * 8):
|
|
if cap > last_cap:
|
|
break
|
|
|
|
if (
|
|
effective & (1 << cap)
|
|
and libc.prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, cap, 0, 0) < 0
|
|
):
|
|
oserror("prctl")
|
|
|
|
|
|
def have_effective_cap(capability: int) -> bool:
|
|
with open("/proc/self/status", "rb") as f:
|
|
for line in f.readlines():
|
|
if line.startswith(b"CapEff:"):
|
|
return (int(line[7:], 16) & (1 << capability)) != 0
|
|
|
|
return False
|
|
|
|
|
|
def seccomp_suppress(*, chown: bool = False, sync: bool = False) -> None:
|
|
"""
|
|
There's still a few files and directories left in distributions in /usr and /etc that are
|
|
not owned by root. This causes package managers to fail to install the corresponding packages
|
|
when run from a single uid user namespace. Unfortunately, non-root users can only create files
|
|
owned by their own uid. To still allow non-root users to build images, if requested we install
|
|
a seccomp filter that makes calls to chown() and friends a noop.
|
|
"""
|
|
if not chown and not sync:
|
|
return
|
|
|
|
libseccomp = ctypes.CDLL("libseccomp.so.2")
|
|
if libseccomp is None:
|
|
raise FileNotFoundError("libseccomp.so.2")
|
|
|
|
libseccomp.seccomp_init.argtypes = (ctypes.c_uint32,)
|
|
libseccomp.seccomp_init.restype = ctypes.c_void_p
|
|
libseccomp.seccomp_release.argtypes = (ctypes.c_void_p,)
|
|
libseccomp.seccomp_release.restype = None
|
|
libseccomp.seccomp_syscall_resolve_name.argtypes = (ctypes.c_char_p,)
|
|
libseccomp.seccomp_rule_add_exact.argtypes = (
|
|
ctypes.c_void_p,
|
|
ctypes.c_uint32,
|
|
ctypes.c_int,
|
|
ctypes.c_uint,
|
|
)
|
|
libseccomp.seccomp_load.argtypes = (ctypes.c_void_p,)
|
|
|
|
seccomp = libseccomp.seccomp_init(SCMP_ACT_ALLOW)
|
|
|
|
suppress = []
|
|
if chown:
|
|
suppress += [
|
|
b"chown",
|
|
b"chown32",
|
|
b"fchown",
|
|
b"fchown32",
|
|
b"fchownat",
|
|
b"lchown",
|
|
b"lchown32",
|
|
]
|
|
if sync:
|
|
suppress += [
|
|
b"fdatasync",
|
|
b"fsync",
|
|
b"msync",
|
|
b"sync",
|
|
b"sync_file_range",
|
|
b"sync_file_range2",
|
|
b"syncfs",
|
|
]
|
|
|
|
try:
|
|
for syscall in suppress:
|
|
id = libseccomp.seccomp_syscall_resolve_name(syscall)
|
|
libseccomp.seccomp_rule_add_exact(seccomp, SCMP_ACT_ERRNO, id, 0)
|
|
|
|
libseccomp.seccomp_load(seccomp)
|
|
finally:
|
|
libseccomp.seccomp_release(seccomp)
|
|
|
|
|
|
def lsattr(path: str) -> int:
|
|
attr = ctypes.c_int()
|
|
r = 0
|
|
|
|
fd = os.open(path, os.O_CLOEXEC | os.O_RDONLY)
|
|
|
|
libc.ioctl.argtypes = (ctypes.c_int, ctypes.c_long, ctypes.c_void_p)
|
|
if libc.ioctl(fd, FS_IOC_GETFLAGS, ctypes.byref(attr)) < 0:
|
|
r = ctypes.get_errno()
|
|
|
|
os.close(fd)
|
|
|
|
if r != 0:
|
|
raise OSError(r, os.strerror(r), path)
|
|
|
|
return attr.value
|
|
|
|
|
|
def chattr(path: str, attr: int) -> None:
|
|
cattr = ctypes.c_int(attr)
|
|
fd = os.open(path, os.O_CLOEXEC | os.O_RDONLY)
|
|
r = 0
|
|
|
|
libc.ioctl.argtypes = (ctypes.c_int, ctypes.c_long, ctypes.c_void_p)
|
|
if libc.ioctl(fd, FS_IOC_SETFLAGS, ctypes.byref(cattr)) < 0:
|
|
r = ctypes.get_errno()
|
|
|
|
os.close(fd)
|
|
|
|
if r != 0:
|
|
raise OSError(r, os.strerror(r), path)
|
|
|
|
|
|
def join_new_session_keyring() -> None:
|
|
libkeyutils = ctypes.CDLL("libkeyutils.so.1")
|
|
if libkeyutils is None:
|
|
raise FileNotFoundError("libkeyutils.so.1")
|
|
|
|
libkeyutils.keyctl_join_session_keyring.argtypes = (ctypes.c_char_p,)
|
|
libkeyutils.keyctl_join_session_keyring.restype = ctypes.c_int32
|
|
|
|
keyring = libkeyutils.keyctl_join_session_keyring(None)
|
|
if keyring == -1:
|
|
oserror("keyctl")
|
|
|
|
|
|
def mount_rbind(src: str, dst: str, attrs: int = 0) -> None:
|
|
"""
|
|
When using the old mount syscall to do a recursive bind mount, mount options are not
|
|
applied recursively. Because we want to do recursive read-only bind mounts in some cases, we
|
|
use the new mount API for that which does allow recursively changing mount options when doing
|
|
bind mounts.
|
|
"""
|
|
flags = AT_NO_AUTOMOUNT | AT_RECURSIVE | AT_SYMLINK_NOFOLLOW | OPEN_TREE_CLONE
|
|
|
|
try:
|
|
libc.open_tree.argtypes = (ctypes.c_int, ctypes.c_char_p, ctypes.c_uint)
|
|
fd = libc.open_tree(AT_FDCWD, src.encode(), flags)
|
|
except AttributeError:
|
|
libc.syscall.argtypes = (ctypes.c_long, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint)
|
|
fd = libc.syscall(NR_open_tree, AT_FDCWD, src.encode(), flags)
|
|
|
|
if fd < 0:
|
|
oserror("open_tree", src)
|
|
|
|
try:
|
|
attr = mount_attr()
|
|
attr.attr_set = attrs
|
|
|
|
flags = AT_EMPTY_PATH | AT_RECURSIVE
|
|
|
|
try:
|
|
libc.mount_setattr.argtypes = (
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_uint,
|
|
ctypes.c_void_p,
|
|
ctypes.c_size_t,
|
|
)
|
|
r = libc.mount_setattr(fd, b"", flags, ctypes.addressof(attr), MOUNT_ATTR_SIZE_VER0)
|
|
except AttributeError:
|
|
libc.syscall.argtypes = (
|
|
ctypes.c_long,
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_uint,
|
|
ctypes.c_void_p,
|
|
ctypes.c_size_t,
|
|
)
|
|
r = libc.syscall(
|
|
NR_mount_setattr, fd, b"", flags, ctypes.addressof(attr), MOUNT_ATTR_SIZE_VER0
|
|
)
|
|
|
|
if r < 0:
|
|
oserror("mount_setattr", src)
|
|
|
|
try:
|
|
libc.move_mount.argtypes = (
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_uint,
|
|
)
|
|
r = libc.move_mount(fd, b"", AT_FDCWD, dst.encode(), MOVE_MOUNT_F_EMPTY_PATH)
|
|
except AttributeError:
|
|
libc.syscall.argtypes = (
|
|
ctypes.c_long,
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_int,
|
|
ctypes.c_char_p,
|
|
ctypes.c_uint,
|
|
)
|
|
r = libc.syscall(
|
|
NR_move_mount, fd, b"", AT_FDCWD, dst.encode(), MOVE_MOUNT_F_EMPTY_PATH
|
|
)
|
|
|
|
if r < 0:
|
|
oserror("move_mount", dst)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
|
|
class umask:
|
|
def __init__(self, mask: int) -> None:
|
|
self.mask = mask
|
|
|
|
def __enter__(self) -> None:
|
|
self.mask = os.umask(self.mask)
|
|
|
|
def __exit__(self, *args: object, **kwargs: object) -> None:
|
|
os.umask(self.mask)
|
|
|
|
|
|
def become_user(uid: int, gid: int) -> None:
|
|
"""
|
|
This function implements the required dance to unshare a user namespace and map the current
|
|
user to itself or to root within it. The kernel only allows a process running outside of the
|
|
unshared user namespace to write the necessary uid and gid mappings, so we fork off a child
|
|
process, make it wait until the parent process has unshared a user namespace, and then writes
|
|
the necessary uid and gid mappings.
|
|
"""
|
|
ppid = os.getpid()
|
|
|
|
event = libc.eventfd(0, 0)
|
|
if event < 0:
|
|
oserror("eventfd")
|
|
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
try:
|
|
os.read(event, ctypes.sizeof(ctypes.c_uint64))
|
|
os.close(event)
|
|
# Fork again for UID map, bweh this is suuuper gross
|
|
newpid = os.fork()
|
|
if newpid == 0:
|
|
os.execl(
|
|
"/usr/bin/newuidmap",
|
|
"newuidmap",
|
|
str(ppid),
|
|
"0",
|
|
str(uid),
|
|
"1",
|
|
"1",
|
|
"100000",
|
|
"9999",
|
|
# 12345 is the UID of the "pmos" or "build" user in our containers
|
|
"12345",
|
|
"110000",
|
|
"1",
|
|
"10000",
|
|
"120000",
|
|
"1",
|
|
)
|
|
else:
|
|
os.waitpid(newpid, 0)
|
|
os.execl(
|
|
"/usr/bin/newgidmap",
|
|
"newgidmap",
|
|
str(ppid),
|
|
"0",
|
|
str(uid),
|
|
"1",
|
|
"1",
|
|
"100000",
|
|
"9999",
|
|
"12345",
|
|
"110000",
|
|
"1",
|
|
"10000",
|
|
"120000",
|
|
"1",
|
|
)
|
|
except OSError as e:
|
|
print(e.strerror, flush=True)
|
|
os._exit(e.errno or 1)
|
|
except BaseException as e:
|
|
print(e, flush=True)
|
|
os._exit(1)
|
|
else:
|
|
print("", flush=True)
|
|
os._exit(0)
|
|
|
|
try:
|
|
unshare(CLONE_NEWUSER)
|
|
except OSError as e:
|
|
if e.errno == EPERM:
|
|
print(UNSHARE_EPERM_MSG, file=sys.stderr)
|
|
raise
|
|
finally:
|
|
os.write(event, ctypes.c_uint64(1))
|
|
os.close(event)
|
|
_, status = os.waitpid(pid, 0)
|
|
|
|
rc = os.waitstatus_to_exitcode(status)
|
|
if rc != 0:
|
|
raise OSError(rc, os.strerror(rc))
|
|
|
|
|
|
def acquire_privileges(*, become_root: bool = False) -> bool:
|
|
if have_effective_cap(CAP_SYS_ADMIN) and (os.getuid() == 0 or not become_root):
|
|
return False
|
|
|
|
if become_root:
|
|
become_user(0, 0)
|
|
else:
|
|
become_user(os.getuid(), os.getgid())
|
|
cap_permitted_to_ambient()
|
|
|
|
return True
|
|
|
|
|
|
def userns_has_single_user() -> bool:
|
|
try:
|
|
with open("/proc/self/uid_map", "rb") as f:
|
|
lines = f.readlines()
|
|
except FileNotFoundError:
|
|
return False
|
|
|
|
return len(lines) == 1 and int(lines[0].split()[-1]) == 1
|
|
|
|
|
|
def chase(root: str, path: str) -> str:
|
|
if root == "/":
|
|
return os.path.realpath(path)
|
|
|
|
cwd = os.getcwd()
|
|
fd = os.open("/", os.O_CLOEXEC | os.O_PATH | os.O_DIRECTORY)
|
|
|
|
try:
|
|
os.chroot(root)
|
|
os.chdir("/")
|
|
return joinpath(root, os.path.realpath(path))
|
|
finally:
|
|
os.fchdir(fd)
|
|
os.close(fd)
|
|
os.chroot(".")
|
|
os.chdir(cwd)
|
|
|
|
|
|
def splitpath(path: str) -> tuple[str, ...]:
|
|
return tuple(p for p in path.split("/") if p)
|
|
|
|
|
|
def joinpath(path: str, *paths: str) -> str:
|
|
return os.path.join(path, *[p.lstrip("/") for p in paths])
|
|
|
|
|
|
def is_relative_to(one: str, two: str) -> bool:
|
|
return os.path.commonpath((one, two)) == two
|
|
|
|
|
|
def pack_file_descriptors() -> int:
|
|
fds = []
|
|
|
|
with os.scandir("/proc/self/fd") as it:
|
|
for e in it:
|
|
if not e.is_symlink() and (e.is_file() or e.is_dir()):
|
|
continue
|
|
|
|
try:
|
|
fd = int(e.name)
|
|
except ValueError:
|
|
continue
|
|
|
|
if fd < SD_LISTEN_FDS_START:
|
|
continue
|
|
|
|
fds.append(fd)
|
|
|
|
# os.scandir() either opens a file descriptor to the given path or dups the given file descriptor. Either
|
|
# way, there will be an extra file descriptor in the fds array that's not valid anymore now, so find out
|
|
# which one and drop it.
|
|
fds = sorted(fd for fd in fds if libc.fcntl(fd, F_GETFD, 0) >= 0)
|
|
|
|
# The following is a reimplementation of pack_fds() in systemd.
|
|
|
|
if len(fds) == 0:
|
|
return 0
|
|
|
|
start = 0
|
|
while True:
|
|
restart_from = -1
|
|
|
|
for i in range(start, len(fds)):
|
|
if fds[i] == SD_LISTEN_FDS_START + i:
|
|
continue
|
|
|
|
nfd = libc.fcntl(fds[i], F_DUPFD, SD_LISTEN_FDS_START + i)
|
|
if nfd < 0:
|
|
oserror("fnctl")
|
|
|
|
try:
|
|
os.close(fds[i])
|
|
except OSError as e:
|
|
if e.errno != EBADF:
|
|
raise
|
|
|
|
fds[i] = nfd
|
|
|
|
if nfd != (SD_LISTEN_FDS_START + i) and restart_from < 0:
|
|
restart_from = i
|
|
|
|
if restart_from < 0:
|
|
break
|
|
|
|
start = restart_from
|
|
|
|
assert fds[0] == SD_LISTEN_FDS_START
|
|
|
|
return len(fds)
|
|
|
|
|
|
class FSOperation:
|
|
def __init__(self, dst: str, *, relative: bool = False) -> None:
|
|
self.dst = dst
|
|
self.relative = relative
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
raise NotImplementedError()
|
|
|
|
@classmethod
|
|
def optimize(cls, fsops: list["FSOperation"]) -> list["FSOperation"]:
|
|
binds: dict[BindOperation, None] = {}
|
|
rest = []
|
|
|
|
for fsop in fsops:
|
|
if isinstance(fsop, BindOperation):
|
|
binds[fsop] = None
|
|
else:
|
|
rest.append(fsop)
|
|
|
|
# Drop all bind mounts that are mounted from beneath another bind mount to the same
|
|
# location within the new rootfs.
|
|
optimized = [
|
|
m
|
|
for m in binds
|
|
if not any(
|
|
m != n
|
|
and m.readonly == n.readonly
|
|
and m.required == n.required
|
|
and m.relative == n.relative
|
|
and is_relative_to(m.src, n.src)
|
|
and is_relative_to(m.dst, n.dst)
|
|
and os.path.relpath(m.src, n.src) == os.path.relpath(m.dst, n.dst)
|
|
for n in binds
|
|
)
|
|
]
|
|
|
|
# Make sure bind mounts override other operations on the same destination by appending them
|
|
# to the rest and depending on python's stable sort behavior. Additionally, relative operations
|
|
# always go last.
|
|
return sorted([*rest, *optimized], key=lambda fsop: (fsop.relative, splitpath(fsop.dst)))
|
|
|
|
|
|
class BindOperation(FSOperation):
|
|
def __init__(
|
|
self, src: str, dst: str, *, readonly: bool, required: bool, relative: bool
|
|
) -> None:
|
|
self.src = src
|
|
self.readonly = readonly
|
|
self.required = required
|
|
super().__init__(dst, relative=relative)
|
|
|
|
def __hash__(self) -> int:
|
|
return hash((splitpath(self.src), splitpath(self.dst), self.readonly, self.required))
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
return isinstance(other, BindOperation) and self.__hash__() == other.__hash__()
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
src = chase(newroot if self.relative else oldroot, self.src)
|
|
|
|
if not os.path.exists(src) and not self.required:
|
|
return
|
|
|
|
# If we're mounting a file on top of a symlink, mount directly on top of the symlink instead of
|
|
# resolving it.
|
|
dst = joinpath(newroot, self.dst)
|
|
if not os.path.isdir(src) and os.path.islink(dst):
|
|
return mount_rbind(src, dst, attrs=MOUNT_ATTR_RDONLY if self.readonly else 0)
|
|
|
|
dst = chase(newroot, self.dst)
|
|
if not os.path.exists(dst):
|
|
isfile = os.path.isfile(src)
|
|
|
|
with umask(~0o755):
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
|
|
with umask(~0o644 if isfile else ~0o755):
|
|
if isfile:
|
|
os.close(os.open(dst, os.O_CREAT | os.O_CLOEXEC | os.O_EXCL))
|
|
else:
|
|
os.mkdir(dst)
|
|
|
|
mount_rbind(src, dst, attrs=MOUNT_ATTR_RDONLY if self.readonly else 0)
|
|
|
|
|
|
class ProcOperation(FSOperation):
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(dst, exist_ok=True)
|
|
|
|
mount_rbind(joinpath(oldroot, "proc"), dst)
|
|
|
|
|
|
class BinfmtOperation(FSOperation):
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(dst, exist_ok=True)
|
|
|
|
mount("binfmt_misc", dst, "binfmt_misc", 0, "")
|
|
|
|
|
|
class DevOperation(FSOperation):
|
|
def __init__(self, ttyname: str, dst: str) -> None:
|
|
self.ttyname = ttyname
|
|
super().__init__(dst)
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
# We don't put actual devices in /dev, just the API stuff in there that all manner of
|
|
# things depend on, like /dev/null.
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(dst, exist_ok=True)
|
|
|
|
# Note that the mode is crucial here. If the default mode (1777) is used, trying to access
|
|
# /dev/null fails with EACCESS for unknown reasons.
|
|
mount("tmpfs", dst, "tmpfs", 0, "mode=0755")
|
|
|
|
for node in ("null", "zero", "full", "random", "urandom", "tty", "fuse"):
|
|
nsrc = joinpath(oldroot, "dev", node)
|
|
if node == "fuse" and not os.path.exists(nsrc):
|
|
continue
|
|
|
|
ndst = joinpath(dst, node)
|
|
os.close(os.open(ndst, os.O_CREAT | os.O_CLOEXEC | os.O_EXCL))
|
|
|
|
mount(nsrc, ndst, "", MS_BIND, "")
|
|
|
|
for i, node in enumerate(("stdin", "stdout", "stderr")):
|
|
os.symlink(f"/proc/self/fd/{i}", joinpath(dst, node))
|
|
|
|
os.symlink("/proc/self/fd", joinpath(dst, "fd"))
|
|
os.symlink("/proc/kcore", joinpath(dst, "core"))
|
|
|
|
with umask(~0o1777):
|
|
os.mkdir(joinpath(dst, "shm"), mode=0o1777)
|
|
with umask(~0o755):
|
|
os.mkdir(joinpath(dst, "pts"))
|
|
|
|
mount("devpts", joinpath(dst, "pts"), "devpts", 0, "newinstance,ptmxmode=0666,mode=620")
|
|
|
|
os.symlink("pts/ptmx", joinpath(dst, "ptmx"))
|
|
|
|
if self.ttyname:
|
|
os.close(os.open(joinpath(dst, "console"), os.O_CREAT | os.O_CLOEXEC | os.O_EXCL))
|
|
mount(joinpath(oldroot, self.ttyname), joinpath(dst, "console"), "", MS_BIND, "")
|
|
|
|
|
|
class TmpfsOperation(FSOperation):
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(dst, exist_ok=True)
|
|
|
|
options = (
|
|
"" if any(dst.endswith(suffix) for suffix in ("/tmp", "/var/tmp")) else "mode=0755"
|
|
)
|
|
mount("tmpfs", dst, "tmpfs", 0, options)
|
|
|
|
|
|
class DirOperation(FSOperation):
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
|
|
mode = 0o1777 if any(dst.endswith(suffix) for suffix in ("/tmp", "/var/tmp")) else 0o755
|
|
if not os.path.exists(dst):
|
|
with umask(~mode):
|
|
os.mkdir(dst, mode=mode)
|
|
|
|
|
|
class SymlinkOperation(FSOperation):
|
|
def __init__(self, src: str, dst: str) -> None:
|
|
self.src = src
|
|
super().__init__(dst)
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = joinpath(newroot, self.dst)
|
|
try:
|
|
return os.symlink(self.src, dst)
|
|
except FileExistsError:
|
|
if os.path.islink(dst) and os.readlink(dst) == self.src:
|
|
return
|
|
|
|
if os.path.isdir(dst):
|
|
raise
|
|
|
|
# If the target already exists and is not a directory, create the symlink somewhere else and mount
|
|
# it over the existing file or symlink.
|
|
os.symlink(self.src, "/symlink")
|
|
mount_rbind("/symlink", dst)
|
|
os.unlink("/symlink")
|
|
|
|
|
|
class WriteOperation(FSOperation):
|
|
def __init__(self, data: str, dst: str) -> None:
|
|
self.data = data
|
|
super().__init__(dst)
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
dst = chase(newroot, self.dst)
|
|
with umask(~0o755):
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
with open(dst, "wb") as f:
|
|
f.write(self.data.encode())
|
|
|
|
|
|
class OverlayOperation(FSOperation):
|
|
def __init__(self, lowerdirs: tuple[str, ...], upperdir: str, workdir: str, dst: str) -> None:
|
|
self.lowerdirs = lowerdirs
|
|
self.upperdir = upperdir
|
|
self.workdir = workdir
|
|
super().__init__(dst)
|
|
|
|
# This supports being used as a context manager so we can reuse the logic for mount_overlay()
|
|
# in mounts.py.
|
|
def __enter__(self) -> None:
|
|
self.execute("/", "/")
|
|
|
|
def __exit__(self, *args: object, **kwargs: object) -> None:
|
|
umount2(self.dst)
|
|
|
|
def execute(self, oldroot: str, newroot: str) -> None:
|
|
lowerdirs = tuple(chase(oldroot, p) for p in self.lowerdirs)
|
|
upperdir = (
|
|
chase(oldroot, self.upperdir)
|
|
if self.upperdir and self.upperdir != "tmpfs"
|
|
else self.upperdir
|
|
)
|
|
workdir = chase(oldroot, self.workdir) if self.workdir else None
|
|
dst = chase(newroot, self.dst)
|
|
|
|
with umask(~0o755):
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
|
|
mode = 0o1777 if any(dst.endswith(suffix) for suffix in ("/tmp", "/var/tmp")) else 0o755
|
|
if not os.path.exists(dst):
|
|
with umask(~mode):
|
|
os.mkdir(dst, mode=mode)
|
|
|
|
options = [
|
|
f"lowerdir={':'.join(lowerdirs)}",
|
|
"userxattr",
|
|
# Disable the inodes index and metacopy (only copy metadata upwards if possible)
|
|
# options. If these are enabled (e.g., if the kernel enables them by default),
|
|
# the mount will fail if the upper directory has been earlier used with a different
|
|
# lower directory, such as with a build overlay that was generated on top of a
|
|
# different temporary root.
|
|
# See https://www.kernel.org/doc/html/latest/filesystems/overlayfs.html#sharing-and-copying-layers
|
|
# and https://github.com/systemd/mkosi/issues/1841.
|
|
"index=off",
|
|
"metacopy=off",
|
|
]
|
|
|
|
if upperdir and upperdir == "tmpfs":
|
|
mount("tmpfs", dst, "tmpfs", 0, "mode=0755")
|
|
|
|
with umask(~mode):
|
|
os.mkdir(f"{dst}/upper", mode=mode)
|
|
with umask(~0o755):
|
|
os.mkdir(f"{dst}/cache")
|
|
|
|
options += [f"upperdir={dst}/upper", f"workdir={dst}/cache"]
|
|
else:
|
|
if upperdir:
|
|
options += [f"upperdir={upperdir}"]
|
|
if workdir:
|
|
options += [f"workdir={workdir}"]
|
|
|
|
mount("overlayfs", dst, "overlay", 0, ",".join(options))
|
|
|
|
|
|
ANSI_HIGHLIGHT = "\x1b[0;1;39m" if os.isatty(2) else ""
|
|
ANSI_NORMAL = "\x1b[0m" if os.isatty(2) else ""
|
|
|
|
HELP = f"""\
|
|
mkosi-sandbox [OPTIONS...] COMMAND [ARGUMENTS...]
|
|
|
|
{ANSI_HIGHLIGHT}Run the specified command in a custom sandbox.{ANSI_NORMAL}
|
|
|
|
-h --help Show this help
|
|
--version Show package version
|
|
--tmpfs DST Mount a new tmpfs on DST
|
|
--dev DST Mount dev on DST
|
|
--proc DST Mount procfs on DST
|
|
--dir DST Create a new directory at DST
|
|
--bind SRC DST Bind mount the host path SRC to DST
|
|
--bind-try SRC DST Bind mount the host path SRC to DST if it exists
|
|
--ro-bind SRC DST Bind mount the host path SRC to DST read-only
|
|
--ro-bind-try SRC DST Bind mount the host path SRC to DST read-only if it exists
|
|
--symlink SRC DST Create a symlink at DST pointing to SRC
|
|
--write DATA DST Write DATA to DST
|
|
--overlay-lowerdir DIR Add a lower directory for the next overlayfs mount
|
|
--overlay-upperdir DIR Set the upper directory for the next overlayfs mount
|
|
--overlay-workdir DIR Set the working directory for the next overlayfs mount
|
|
--overlay DST Mount an overlay filesystem at DST
|
|
--unsetenv NAME Unset the environment variable with name NAME
|
|
--setenv NAME VALUE Set the environment variable with name NAME to VALUE
|
|
--chdir DIR Change the working directory in the sandbox to DIR
|
|
--same-dir Change the working directory in the sandbox to $PWD
|
|
--become-root Map the current user/group to root:root in the sandbox
|
|
--suppress-chown Make chown() syscalls in the sandbox a noop
|
|
--suppress-sync Make sync() syscalls in the sandbox a noop
|
|
--unshare-net Unshare the network namespace if possible
|
|
--unshare-ipc Unshare the IPC namespace if possible
|
|
--suspend Stop process before execve()
|
|
|
|
See the mkosi-sandbox(1) man page for details.\
|
|
"""
|
|
|
|
|
|
UNSHARE_EPERM_MSG = f"""\
|
|
{Style.red}mkosi was forbidden to unshare namespaces{Style.reset}.
|
|
This probably means your distribution has restricted unprivileged user namespaces.
|
|
Please consult the REQUIREMENTS section of the mkosi man page, e.g. via "mkosi
|
|
documentation", for workarounds.\
|
|
"""
|
|
|
|
|
|
def setup_mounts(fsops: list[FSOperation]) -> None:
|
|
# We need a workspace to setup the sandbox, the easiest way to do this in a tmpfs, since it's
|
|
# automatically cleaned up. We need a mountpoint to put the workspace on and it can't be root,
|
|
# so let's use /tmp which is almost guaranteed to exist.
|
|
mount("tmpfs", "/tmp", "tmpfs", 0, "")
|
|
|
|
os.chdir("/tmp")
|
|
|
|
with umask(~0o755):
|
|
# This is where we set up the sandbox rootfs
|
|
os.mkdir("newroot")
|
|
# This is the old rootfs which is used as the source for mounts in the new rootfs.
|
|
os.mkdir("oldroot")
|
|
|
|
# Make sure that newroot is a mountpoint.
|
|
mount("newroot", "newroot", "", MS_BIND | MS_REC, "")
|
|
|
|
# Make the workspace in /tmp / and put the old rootfs in oldroot.
|
|
if libc.pivot_root(b".", b"oldroot") < 0:
|
|
# pivot_root() can fail in the initramfs since / isn't a mountpoint there, so let's fall
|
|
# back to MS_MOVE if that's the case.
|
|
|
|
# First we move the old rootfs to oldroot.
|
|
mount("/", "oldroot", "", MS_BIND | MS_REC, "")
|
|
|
|
# Then we move the workspace (/tmp) to /.
|
|
mount(".", "/", "", MS_MOVE, "")
|
|
|
|
# chroot and chdir to fully make the workspace the new root.
|
|
os.chroot(".")
|
|
os.chdir(".")
|
|
|
|
# When we use MS_MOVE we have to unmount oldroot/tmp manually to reveal the original /tmp
|
|
# again as it might contain stuff that we want to mount into the sandbox.
|
|
umount2("oldroot/tmp", MNT_DETACH)
|
|
|
|
for fsop in fsops:
|
|
fsop.execute("oldroot", "newroot")
|
|
|
|
# Now that we're done setting up the sandbox let's pivot root into newroot to make it the new
|
|
# root. We use the pivot_root(".", ".") process described in the pivot_root() man page.
|
|
|
|
os.chdir("newroot")
|
|
|
|
# We're guaranteed to have / be a mount when we get here, so pivot_root() won't fail anymore,
|
|
# even if we're in the initramfs.
|
|
if libc.pivot_root(b".", b".") < 0:
|
|
oserror("pivot_root")
|
|
|
|
# As documented in the pivot_root() man page, this will unmount the old rootfs.
|
|
umount2(".", MNT_DETACH)
|
|
|
|
# Avoid surprises by making sure the sandbox's mount propagation is shared. This doesn't
|
|
# actually mean mounts get propagated into the host. Instead, a new mount propagation peer
|
|
# group is set up.
|
|
mount("", ".", "", MS_SHARED | MS_REC, "")
|
|
|
|
|
|
def main() -> None:
|
|
# We don't use argparse as it takes +- 10ms to import and since this is primarily for internal
|
|
# use, it's not necessary to have amazing UX for this CLI interface so it's trivial to write
|
|
# ourselves.
|
|
argv = list(reversed(sys.argv[1:]))
|
|
fsops: list[FSOperation] = []
|
|
setenv = []
|
|
unsetenv = []
|
|
lowerdirs = []
|
|
upperdir = ""
|
|
workdir = ""
|
|
chdir = None
|
|
become_root = suppress_chown = suppress_sync = unshare_net = unshare_ipc = suspend = (
|
|
pack_fds
|
|
) = False
|
|
|
|
ttyname = os.ttyname(2) if os.isatty(2) else ""
|
|
|
|
while argv:
|
|
arg = argv.pop()
|
|
|
|
if arg == "--":
|
|
break
|
|
|
|
if arg in ("-h", "--help"):
|
|
print(HELP, file=sys.stderr)
|
|
sys.exit(0)
|
|
elif arg == "--version":
|
|
print(__version__, file=sys.stderr)
|
|
sys.exit(0)
|
|
if arg == "--tmpfs":
|
|
fsops.append(TmpfsOperation(argv.pop()))
|
|
elif arg == "--dev":
|
|
fsops.append(DevOperation(ttyname, argv.pop()))
|
|
elif arg == "--proc":
|
|
fsops.append(ProcOperation(argv.pop()))
|
|
elif arg == "--dir":
|
|
fsops.append(DirOperation(argv.pop()))
|
|
elif arg in ("--bind", "--ro-bind", "--bind-try", "--ro-bind-try"):
|
|
readonly = arg.startswith("--ro")
|
|
required = not arg.endswith("-try")
|
|
src = argv.pop()
|
|
fsops.append(
|
|
BindOperation(
|
|
src.removeprefix("+"),
|
|
argv.pop(),
|
|
readonly=readonly,
|
|
required=required,
|
|
relative=src.startswith("+"),
|
|
)
|
|
)
|
|
elif arg == "--symlink":
|
|
fsops.append(SymlinkOperation(argv.pop(), argv.pop()))
|
|
elif arg == "--write":
|
|
fsops.append(WriteOperation(argv.pop(), argv.pop()))
|
|
elif arg == "--overlay-lowerdir":
|
|
lowerdirs.append(argv.pop())
|
|
elif arg == "--overlay-upperdir":
|
|
upperdir = argv.pop()
|
|
elif arg == "--overlay-workdir":
|
|
workdir = argv.pop()
|
|
elif arg == "--overlay":
|
|
fsops.append(
|
|
OverlayOperation(tuple(reversed(lowerdirs)), upperdir, workdir, argv.pop())
|
|
)
|
|
upperdir = ""
|
|
workdir = ""
|
|
lowerdirs = []
|
|
elif arg == "--unsetenv":
|
|
unsetenv.append(argv.pop())
|
|
elif arg == "--setenv":
|
|
setenv.append((argv.pop(), argv.pop()))
|
|
elif arg == "--chdir":
|
|
chdir = argv.pop()
|
|
elif arg == "--same-dir":
|
|
chdir = os.getcwd()
|
|
elif arg == "--become-root":
|
|
become_root = True
|
|
elif arg == "--suppress-chown":
|
|
suppress_chown = True
|
|
elif arg == "--suppress-sync":
|
|
suppress_sync = True
|
|
elif arg == "--unshare-net":
|
|
unshare_net = True
|
|
elif arg == "--unshare-ipc":
|
|
unshare_ipc = True
|
|
elif arg == "--suspend":
|
|
suspend = True
|
|
elif arg == "--pack-fds":
|
|
pack_fds = True
|
|
elif arg.startswith("-"):
|
|
raise ValueError(f"Unrecognized option {arg}")
|
|
else:
|
|
argv.append(arg)
|
|
break
|
|
|
|
argv.reverse()
|
|
|
|
argv = argv or ["bash"]
|
|
|
|
# Make sure all destination paths are absolute.
|
|
for fsop in fsops:
|
|
if fsop.dst[0] != "/":
|
|
raise ValueError(f"{fsop.dst} is not an absolute path")
|
|
|
|
fsops = FSOperation.optimize(fsops)
|
|
|
|
for k, v in setenv:
|
|
os.environ[k] = v
|
|
|
|
for e in unsetenv:
|
|
if e in os.environ:
|
|
del os.environ[e]
|
|
|
|
if pack_fds:
|
|
nfds = pack_file_descriptors()
|
|
if nfds > 0:
|
|
os.environ["LISTEN_FDS"] = str(nfds)
|
|
os.environ["LISTEN_PID"] = str(os.getpid())
|
|
|
|
namespaces = CLONE_NEWNS
|
|
if unshare_net and have_effective_cap(CAP_NET_ADMIN):
|
|
namespaces |= CLONE_NEWNET
|
|
if unshare_ipc:
|
|
namespaces |= CLONE_NEWIPC
|
|
|
|
userns = acquire_privileges(become_root=become_root)
|
|
|
|
seccomp_suppress(
|
|
# If we're root in a user namespace with a single user, we're still not going to be able to
|
|
# chown() stuff, so check for that and apply the seccomp filter as well in that case.
|
|
chown=suppress_chown and (userns or userns_has_single_user()),
|
|
sync=suppress_sync,
|
|
)
|
|
|
|
try:
|
|
unshare(namespaces)
|
|
except OSError as e:
|
|
# This can happen here as well as in become_user, it depends on exactly
|
|
# how the userns restrictions are implemented.
|
|
if e.errno == EPERM:
|
|
print(UNSHARE_EPERM_MSG, file=sys.stderr)
|
|
raise
|
|
|
|
# If we unshared the user namespace the mount propagation of root is changed to slave automatically.
|
|
if not userns:
|
|
mount("", "/", "", MS_SLAVE | MS_REC, "")
|
|
|
|
setup_mounts(fsops)
|
|
|
|
if chdir:
|
|
os.chdir(chdir)
|
|
|
|
if suspend:
|
|
os.kill(os.getpid(), SIGSTOP)
|
|
|
|
try:
|
|
os.execvp(argv[0], argv)
|
|
except OSError as e:
|
|
# Let's return a recognizable error when the binary we're going to execute is not found.
|
|
# We use 127 as that's the exit code used by shells when a program to execute is not found.
|
|
if e.errno == ENOENT:
|
|
sys.exit(127)
|
|
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|