forked from Mirror/pmbootstrap
Replace the "kill_as_root" argument with a much simpler "sudo" argument and remove the now obsolete check for the output mode of "kill_as_root". "kill_as_root" would only get set to True if both conditions are met: a) command is running with sudo b) command is running with an output mode ("log" or "stdout") where pmb.helpers.run_core would kill it if it does not output anything before a timeout is reached The new "sudo" argument just indicates if the command is running with sudo (a), regardless of the output mode (b).
154 lines
4.6 KiB
Python
154 lines
4.6 KiB
Python
# Copyright 2020 Oliver Smith
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
""" Test pmb.helpers.run_core """
|
|
import sys
|
|
import subprocess
|
|
import pytest
|
|
|
|
import pmb_test # noqa
|
|
import pmb.helpers.run_core
|
|
|
|
|
|
@pytest.fixture
|
|
def args(request):
|
|
import pmb.parse
|
|
sys.argv = ["pmbootstrap.py", "chroot"]
|
|
args = pmb.parse.arguments()
|
|
args.log = args.work + "/log_testsuite.txt"
|
|
pmb.helpers.logging.init(args)
|
|
request.addfinalizer(args.logfd.close)
|
|
return args
|
|
|
|
|
|
def test_sanity_checks():
|
|
func = pmb.helpers.run_core.sanity_checks
|
|
|
|
# Invalid output
|
|
with pytest.raises(RuntimeError) as e:
|
|
func("invalid-output")
|
|
assert str(e.value).startswith("Invalid output value")
|
|
|
|
# Background and check
|
|
func("background", check=None)
|
|
for check in [True, False]:
|
|
with pytest.raises(RuntimeError) as e:
|
|
func("background", check=check)
|
|
assert str(e.value).startswith("Can't use check with")
|
|
|
|
# output_return
|
|
func("log", output_return=True)
|
|
with pytest.raises(RuntimeError) as e:
|
|
func("tui", output_return=True)
|
|
assert str(e.value).startswith("Can't use output_return with")
|
|
|
|
|
|
def test_background(args):
|
|
# Sleep in background
|
|
process = pmb.helpers.run_core.background(args, ["sleep", "1"], "/")
|
|
|
|
# Check if it is still running
|
|
assert process.poll() is None
|
|
|
|
|
|
def test_pipe(args):
|
|
# Sleep in background
|
|
process = pmb.helpers.run_core.pipe(args, ["sleep", "1"], "/")
|
|
|
|
# Check if it is still running
|
|
assert process.poll() is None
|
|
|
|
# Print output in background
|
|
process = pmb.helpers.run_core.pipe(args, ["echo", "-n", "hello"], "/")
|
|
|
|
# Read output
|
|
assert process.communicate()[0].decode('utf-8') == "hello"
|
|
|
|
|
|
def test_foreground_pipe(args):
|
|
func = pmb.helpers.run_core.foreground_pipe
|
|
cmd = ["echo", "test"]
|
|
|
|
# Normal run
|
|
assert func(args, cmd) == (0, "")
|
|
|
|
# Return output
|
|
assert func(args, cmd, output_return=True) == (0, "test\n")
|
|
|
|
# Kill with output timeout
|
|
cmd = ["sh", "-c", "echo first; sleep 2; echo second"]
|
|
args.timeout = 0.3
|
|
ret = func(args, cmd, output_return=True, output_timeout=True)
|
|
assert ret == (-9, "first\n")
|
|
|
|
# Kill with output timeout as root
|
|
cmd = ["sudo", "sh", "-c", "printf first; sleep 2; printf second"]
|
|
args.timeout = 0.3
|
|
ret = func(args, cmd, output_return=True, output_timeout=True,
|
|
sudo=True)
|
|
assert ret == (-9, "first")
|
|
|
|
# Finish before timeout
|
|
cmd = ["sh", "-c", "echo first; sleep 0.1; echo second; sleep 0.1;"
|
|
"echo third; sleep 0.1; echo fourth"]
|
|
args.timeout = 0.2
|
|
ret = func(args, cmd, output_return=True, output_timeout=True)
|
|
assert ret == (0, "first\nsecond\nthird\nfourth\n")
|
|
|
|
# Check if all child processes are killed after timeout.
|
|
# The first command uses ps to get its process group id (pgid) and echo it
|
|
# to stdout. All of the test commands will be running under that pgid.
|
|
cmd = ["sudo", "sh", "-c",
|
|
"pgid=$(ps -p ${1:-$$} -o pgid=);echo $pgid | tr -d '\n';" +
|
|
"sleep 10 | sleep 20 | sleep 30"]
|
|
args.timeout = 0.3
|
|
ret = func(args, cmd, output_return=True, output_timeout=True,
|
|
sudo=True)
|
|
pgid = str(ret[1])
|
|
|
|
cmd = ["ps", "-e", "-o", "pgid=,comm=", "--noheaders"]
|
|
ret = subprocess.run(cmd, check=True, stdout=subprocess.PIPE)
|
|
procs = str(ret.stdout.decode("utf-8")).rstrip().split('\n')
|
|
child_procs = []
|
|
for process in procs:
|
|
items = process.split(maxsplit=1)
|
|
if len(items) != 2:
|
|
continue
|
|
if pgid == items[0] and "sleep" in items[1]:
|
|
child_procs.append(items)
|
|
assert len(child_procs) == 0
|
|
|
|
|
|
def test_foreground_tui():
|
|
func = pmb.helpers.run_core.foreground_tui
|
|
assert func(["echo", "test"]) == 0
|
|
|
|
|
|
def test_core(args):
|
|
# Background
|
|
func = pmb.helpers.run_core.core
|
|
msg = "test"
|
|
process = func(args, msg, ["sleep", "1"], output="background")
|
|
assert process.poll() is None
|
|
|
|
# Foreground (TUI)
|
|
ret = func(args, msg, ["echo", "test"], output="tui")
|
|
assert ret == 0
|
|
|
|
# Foreground (pipe)
|
|
ret = func(args, msg, ["echo", "test"], output="log")
|
|
assert ret == 0
|
|
|
|
# Return output
|
|
ret = func(args, msg, ["echo", "test"], output="log", output_return=True)
|
|
assert ret == "test\n"
|
|
|
|
# Check the return code
|
|
with pytest.raises(RuntimeError) as e:
|
|
func(args, msg, ["false"], output="log")
|
|
assert str(e.value).startswith("Command failed:")
|
|
|
|
# Kill with timeout
|
|
args.timeout = 0.2
|
|
with pytest.raises(RuntimeError) as e:
|
|
func(args, msg, ["sleep", "1"], output="log")
|
|
assert str(e.value).startswith("Command failed:")
|