libcamera/test/py/unittests.py
Laurent Pinchart 876730d805 test: py: Fix log level restore in SimpleTestMethods()
The SimpleTestMethods() function tests that incorrect calls to the
Camera.acquire() method raise an exception. Before doing so, it sets the
log level for the Camera category to FATAL, in order to avoid showing
misleading errors in the test log, and then restores the log level to
ERROR after running the test. ERROR is however not the default log
level. Restore the log level to INFO instead, in order to avoid losing
log messages in subsequent tests.

Fixes: 06cb7130c4 ("py: Add unittests.py")
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Reviewed-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
2024-11-05 15:28:13 +02:00

365 lines
8.9 KiB
Python
Executable file

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
from collections import defaultdict
import gc
import libcamera as libcam
import selectors
import typing
import unittest
import weakref
class BaseTestCase(unittest.TestCase):
def assertZero(self, a, msg=None):
self.assertEqual(a, 0, msg)
def assertIsAlive(self, wr, msg='object not alive'):
self.assertIsNotNone(wr(), msg)
def assertIsDead(self, wr, msg='object not dead'):
self.assertIsNone(wr(), msg)
def assertIsAllAlive(self, wr_list, msg='object not alive'):
self.assertTrue(all([wr() for wr in wr_list]), msg)
def assertIsAllDead(self, wr_list, msg='object not dead'):
self.assertTrue(all([not wr() for wr in wr_list]), msg)
class SimpleTestMethods(BaseTestCase):
def test_get_ref(self):
cm = libcam.CameraManager.singleton()
wr_cm = weakref.ref(cm)
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
wr_cam = weakref.ref(cam)
del cm
gc.collect()
self.assertIsAlive(wr_cm)
del cam
gc.collect()
self.assertIsDead(wr_cm)
self.assertIsDead(wr_cam)
def test_acquire_release(self):
cm = libcam.CameraManager.singleton()
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
cam.acquire()
cam.release()
def test_double_acquire(self):
cm = libcam.CameraManager.singleton()
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
cam.acquire()
libcam.log_set_level('Camera', 'FATAL')
with self.assertRaises(RuntimeError):
cam.acquire()
libcam.log_set_level('Camera', 'INFO')
cam.release()
# I expected exception here, but looks like double release works fine
cam.release()
def test_version(self):
cm = libcam.CameraManager.singleton()
self.assertIsInstance(cm.version, str)
class CameraTesterBase(BaseTestCase):
cm: typing.Any
cam: typing.Any
def setUp(self):
self.cm = libcam.CameraManager.singleton()
self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None)
if self.cam is None:
self.cm = None
self.skipTest('No vimc found')
self.cam.acquire()
self.wr_cam = weakref.ref(self.cam)
self.wr_cm = weakref.ref(self.cm)
def tearDown(self):
# If a test fails, the camera may be in running state. So always stop.
self.cam.stop()
self.cam.release()
self.cam = None
self.cm = None
self.assertIsDead(self.wr_cm)
self.assertIsDead(self.wr_cam)
class AllocatorTestMethods(CameraTesterBase):
def test_allocator(self):
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
wr_camconfig = weakref.ref(camconfig)
streamconfig = camconfig.at(0)
wr_streamconfig = weakref.ref(streamconfig)
cam.configure(camconfig)
stream = streamconfig.stream
wr_stream = weakref.ref(stream)
# stream should keep streamconfig and camconfig alive
del streamconfig
del camconfig
gc.collect()
self.assertIsAlive(wr_camconfig)
self.assertIsAlive(wr_streamconfig)
allocator = libcam.FrameBufferAllocator(cam)
num_bufs = allocator.allocate(stream)
self.assertTrue(num_bufs > 0)
wr_allocator = weakref.ref(allocator)
buffers = allocator.buffers(stream)
self.assertIsNotNone(buffers)
del buffers
buffer = allocator.buffers(stream)[0]
self.assertIsNotNone(buffer)
wr_buffer = weakref.ref(buffer)
del allocator
gc.collect()
self.assertIsAlive(wr_buffer)
self.assertIsAlive(wr_allocator)
self.assertIsAlive(wr_stream)
del buffer
gc.collect()
self.assertIsDead(wr_buffer)
self.assertIsDead(wr_allocator)
self.assertIsAlive(wr_stream)
del stream
gc.collect()
self.assertIsDead(wr_stream)
self.assertIsDead(wr_camconfig)
self.assertIsDead(wr_streamconfig)
class SimpleCaptureMethods(CameraTesterBase):
def test_blocking(self):
cm = self.cm
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
streamconfig = camconfig.at(0)
fmts = streamconfig.formats
self.assertIsNotNone(fmts)
fmts = None
cam.configure(camconfig)
stream = streamconfig.stream
allocator = libcam.FrameBufferAllocator(cam)
num_bufs = allocator.allocate(stream)
self.assertTrue(num_bufs > 0)
num_bufs = len(allocator.buffers(stream))
reqs = []
for i in range(num_bufs):
req = cam.create_request(i)
self.assertIsNotNone(req)
buffer = allocator.buffers(stream)[i]
req.add_buffer(stream, buffer)
reqs.append(req)
buffer = None
cam.start()
for req in reqs:
cam.queue_request(req)
reqs = None
gc.collect()
sel = selectors.DefaultSelector()
sel.register(cm.event_fd, selectors.EVENT_READ)
reqs = []
while True:
events = sel.select()
if not events:
continue
ready_reqs = cm.get_ready_requests()
reqs += ready_reqs
if len(reqs) == num_bufs:
break
for i, req in enumerate(reqs):
self.assertTrue(i == req.cookie)
reqs = None
gc.collect()
cam.stop()
def test_select(self):
cm = self.cm
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
streamconfig = camconfig.at(0)
fmts = streamconfig.formats
self.assertIsNotNone(fmts)
fmts = None
cam.configure(camconfig)
stream = streamconfig.stream
allocator = libcam.FrameBufferAllocator(cam)
num_bufs = allocator.allocate(stream)
self.assertTrue(num_bufs > 0)
num_bufs = len(allocator.buffers(stream))
reqs = []
for i in range(num_bufs):
req = cam.create_request(i)
self.assertIsNotNone(req)
buffer = allocator.buffers(stream)[i]
req.add_buffer(stream, buffer)
reqs.append(req)
buffer = None
cam.start()
for req in reqs:
cam.queue_request(req)
reqs = None
gc.collect()
sel = selectors.DefaultSelector()
sel.register(cm.event_fd, selectors.EVENT_READ)
reqs = []
running = True
while running:
events = sel.select()
for _ in events:
ready_reqs = cm.get_ready_requests()
reqs += ready_reqs
if len(reqs) == num_bufs:
running = False
self.assertTrue(len(reqs) == num_bufs)
for i, req in enumerate(reqs):
self.assertTrue(i == req.cookie)
reqs = None
gc.collect()
cam.stop()
# Recursively expand slist's objects into olist, using seen to track already
# processed objects.
def _getr(slist, olist, seen):
for e in slist:
if id(e) in seen:
continue
seen.add(id(e))
olist.append(e)
tl = gc.get_referents(e)
if tl:
_getr(tl, olist, seen)
def get_all_objects(ignored=[]):
gcl = gc.get_objects()
olist = []
seen = set()
seen.add(id(gcl))
seen.add(id(olist))
seen.add(id(seen))
seen.update(set([id(o) for o in ignored]))
_getr(gcl, olist, seen)
return olist
def create_type_count_map(olist):
map = defaultdict(int)
for o in olist:
map[type(o)] += 1
return map
def diff_type_count_maps(before, after):
return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
if __name__ == '__main__':
# \todo This is an attempt to see the Python objects that are not collected,
# but this doesn't work very well, as things always leak a bit.
test_leaks = False
if test_leaks:
gc.unfreeze()
gc.collect()
obs_before = get_all_objects()
unittest.main(exit=False)
if test_leaks:
gc.unfreeze()
gc.collect()
obs_after = get_all_objects([obs_before]) # type: ignore
before = create_type_count_map(obs_before) # type: ignore
after = create_type_count_map(obs_after)
leaks = diff_type_count_maps(before, after)
if len(leaks) > 0:
print(leaks)