mirror of
https://git.libcamera.org/libcamera/libcamera.git
synced 2025-07-12 23:09:45 +03:00
The SimpleTestMethods() function tests that incorrect calls to the
Camera.acquire() method raise an exception. Before doing so, it sets the
log level for the Camera category to FATAL, in order to avoid showing
misleading errors in the test log, and then restores the log level to
ERROR after running the test. ERROR is however not the default log
level. Restore the log level to INFO instead, in order to avoid losing
log messages in subsequent tests.
Fixes: 06cb7130c4
("py: Add unittests.py")
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Reviewed-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
365 lines
8.9 KiB
Python
Executable file
365 lines
8.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
|
|
|
|
from collections import defaultdict
|
|
import gc
|
|
import libcamera as libcam
|
|
import selectors
|
|
import typing
|
|
import unittest
|
|
import weakref
|
|
|
|
|
|
class BaseTestCase(unittest.TestCase):
|
|
def assertZero(self, a, msg=None):
|
|
self.assertEqual(a, 0, msg)
|
|
|
|
def assertIsAlive(self, wr, msg='object not alive'):
|
|
self.assertIsNotNone(wr(), msg)
|
|
|
|
def assertIsDead(self, wr, msg='object not dead'):
|
|
self.assertIsNone(wr(), msg)
|
|
|
|
def assertIsAllAlive(self, wr_list, msg='object not alive'):
|
|
self.assertTrue(all([wr() for wr in wr_list]), msg)
|
|
|
|
def assertIsAllDead(self, wr_list, msg='object not dead'):
|
|
self.assertTrue(all([not wr() for wr in wr_list]), msg)
|
|
|
|
|
|
class SimpleTestMethods(BaseTestCase):
|
|
def test_get_ref(self):
|
|
cm = libcam.CameraManager.singleton()
|
|
wr_cm = weakref.ref(cm)
|
|
|
|
cam = cm.get('platform/vimc.0 Sensor B')
|
|
self.assertIsNotNone(cam)
|
|
wr_cam = weakref.ref(cam)
|
|
|
|
del cm
|
|
gc.collect()
|
|
self.assertIsAlive(wr_cm)
|
|
|
|
del cam
|
|
gc.collect()
|
|
self.assertIsDead(wr_cm)
|
|
self.assertIsDead(wr_cam)
|
|
|
|
def test_acquire_release(self):
|
|
cm = libcam.CameraManager.singleton()
|
|
cam = cm.get('platform/vimc.0 Sensor B')
|
|
self.assertIsNotNone(cam)
|
|
|
|
cam.acquire()
|
|
|
|
cam.release()
|
|
|
|
def test_double_acquire(self):
|
|
cm = libcam.CameraManager.singleton()
|
|
cam = cm.get('platform/vimc.0 Sensor B')
|
|
self.assertIsNotNone(cam)
|
|
|
|
cam.acquire()
|
|
|
|
libcam.log_set_level('Camera', 'FATAL')
|
|
with self.assertRaises(RuntimeError):
|
|
cam.acquire()
|
|
libcam.log_set_level('Camera', 'INFO')
|
|
|
|
cam.release()
|
|
|
|
# I expected exception here, but looks like double release works fine
|
|
cam.release()
|
|
|
|
def test_version(self):
|
|
cm = libcam.CameraManager.singleton()
|
|
self.assertIsInstance(cm.version, str)
|
|
|
|
|
|
class CameraTesterBase(BaseTestCase):
|
|
cm: typing.Any
|
|
cam: typing.Any
|
|
|
|
def setUp(self):
|
|
self.cm = libcam.CameraManager.singleton()
|
|
self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None)
|
|
if self.cam is None:
|
|
self.cm = None
|
|
self.skipTest('No vimc found')
|
|
|
|
self.cam.acquire()
|
|
|
|
self.wr_cam = weakref.ref(self.cam)
|
|
self.wr_cm = weakref.ref(self.cm)
|
|
|
|
def tearDown(self):
|
|
# If a test fails, the camera may be in running state. So always stop.
|
|
self.cam.stop()
|
|
|
|
self.cam.release()
|
|
|
|
self.cam = None
|
|
self.cm = None
|
|
|
|
self.assertIsDead(self.wr_cm)
|
|
self.assertIsDead(self.wr_cam)
|
|
|
|
|
|
class AllocatorTestMethods(CameraTesterBase):
|
|
def test_allocator(self):
|
|
cam = self.cam
|
|
|
|
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
|
|
self.assertTrue(camconfig.size == 1)
|
|
wr_camconfig = weakref.ref(camconfig)
|
|
|
|
streamconfig = camconfig.at(0)
|
|
wr_streamconfig = weakref.ref(streamconfig)
|
|
|
|
cam.configure(camconfig)
|
|
|
|
stream = streamconfig.stream
|
|
wr_stream = weakref.ref(stream)
|
|
|
|
# stream should keep streamconfig and camconfig alive
|
|
del streamconfig
|
|
del camconfig
|
|
gc.collect()
|
|
self.assertIsAlive(wr_camconfig)
|
|
self.assertIsAlive(wr_streamconfig)
|
|
|
|
allocator = libcam.FrameBufferAllocator(cam)
|
|
num_bufs = allocator.allocate(stream)
|
|
self.assertTrue(num_bufs > 0)
|
|
wr_allocator = weakref.ref(allocator)
|
|
|
|
buffers = allocator.buffers(stream)
|
|
self.assertIsNotNone(buffers)
|
|
del buffers
|
|
|
|
buffer = allocator.buffers(stream)[0]
|
|
self.assertIsNotNone(buffer)
|
|
wr_buffer = weakref.ref(buffer)
|
|
|
|
del allocator
|
|
gc.collect()
|
|
self.assertIsAlive(wr_buffer)
|
|
self.assertIsAlive(wr_allocator)
|
|
self.assertIsAlive(wr_stream)
|
|
|
|
del buffer
|
|
gc.collect()
|
|
self.assertIsDead(wr_buffer)
|
|
self.assertIsDead(wr_allocator)
|
|
self.assertIsAlive(wr_stream)
|
|
|
|
del stream
|
|
gc.collect()
|
|
self.assertIsDead(wr_stream)
|
|
self.assertIsDead(wr_camconfig)
|
|
self.assertIsDead(wr_streamconfig)
|
|
|
|
|
|
class SimpleCaptureMethods(CameraTesterBase):
|
|
def test_blocking(self):
|
|
cm = self.cm
|
|
cam = self.cam
|
|
|
|
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
|
|
self.assertTrue(camconfig.size == 1)
|
|
|
|
streamconfig = camconfig.at(0)
|
|
fmts = streamconfig.formats
|
|
self.assertIsNotNone(fmts)
|
|
fmts = None
|
|
|
|
cam.configure(camconfig)
|
|
|
|
stream = streamconfig.stream
|
|
|
|
allocator = libcam.FrameBufferAllocator(cam)
|
|
num_bufs = allocator.allocate(stream)
|
|
self.assertTrue(num_bufs > 0)
|
|
|
|
num_bufs = len(allocator.buffers(stream))
|
|
|
|
reqs = []
|
|
for i in range(num_bufs):
|
|
req = cam.create_request(i)
|
|
self.assertIsNotNone(req)
|
|
|
|
buffer = allocator.buffers(stream)[i]
|
|
req.add_buffer(stream, buffer)
|
|
|
|
reqs.append(req)
|
|
|
|
buffer = None
|
|
|
|
cam.start()
|
|
|
|
for req in reqs:
|
|
cam.queue_request(req)
|
|
|
|
reqs = None
|
|
gc.collect()
|
|
|
|
sel = selectors.DefaultSelector()
|
|
sel.register(cm.event_fd, selectors.EVENT_READ)
|
|
|
|
reqs = []
|
|
|
|
while True:
|
|
events = sel.select()
|
|
if not events:
|
|
continue
|
|
|
|
ready_reqs = cm.get_ready_requests()
|
|
|
|
reqs += ready_reqs
|
|
|
|
if len(reqs) == num_bufs:
|
|
break
|
|
|
|
for i, req in enumerate(reqs):
|
|
self.assertTrue(i == req.cookie)
|
|
|
|
reqs = None
|
|
gc.collect()
|
|
|
|
cam.stop()
|
|
|
|
def test_select(self):
|
|
cm = self.cm
|
|
cam = self.cam
|
|
|
|
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
|
|
self.assertTrue(camconfig.size == 1)
|
|
|
|
streamconfig = camconfig.at(0)
|
|
fmts = streamconfig.formats
|
|
self.assertIsNotNone(fmts)
|
|
fmts = None
|
|
|
|
cam.configure(camconfig)
|
|
|
|
stream = streamconfig.stream
|
|
|
|
allocator = libcam.FrameBufferAllocator(cam)
|
|
num_bufs = allocator.allocate(stream)
|
|
self.assertTrue(num_bufs > 0)
|
|
|
|
num_bufs = len(allocator.buffers(stream))
|
|
|
|
reqs = []
|
|
for i in range(num_bufs):
|
|
req = cam.create_request(i)
|
|
self.assertIsNotNone(req)
|
|
|
|
buffer = allocator.buffers(stream)[i]
|
|
req.add_buffer(stream, buffer)
|
|
|
|
reqs.append(req)
|
|
|
|
buffer = None
|
|
|
|
cam.start()
|
|
|
|
for req in reqs:
|
|
cam.queue_request(req)
|
|
|
|
reqs = None
|
|
gc.collect()
|
|
|
|
sel = selectors.DefaultSelector()
|
|
sel.register(cm.event_fd, selectors.EVENT_READ)
|
|
|
|
reqs = []
|
|
|
|
running = True
|
|
while running:
|
|
events = sel.select()
|
|
for _ in events:
|
|
ready_reqs = cm.get_ready_requests()
|
|
|
|
reqs += ready_reqs
|
|
|
|
if len(reqs) == num_bufs:
|
|
running = False
|
|
|
|
self.assertTrue(len(reqs) == num_bufs)
|
|
|
|
for i, req in enumerate(reqs):
|
|
self.assertTrue(i == req.cookie)
|
|
|
|
reqs = None
|
|
gc.collect()
|
|
|
|
cam.stop()
|
|
|
|
|
|
# Recursively expand slist's objects into olist, using seen to track already
|
|
# processed objects.
|
|
def _getr(slist, olist, seen):
|
|
for e in slist:
|
|
if id(e) in seen:
|
|
continue
|
|
seen.add(id(e))
|
|
olist.append(e)
|
|
tl = gc.get_referents(e)
|
|
if tl:
|
|
_getr(tl, olist, seen)
|
|
|
|
|
|
def get_all_objects(ignored=[]):
|
|
gcl = gc.get_objects()
|
|
olist = []
|
|
seen = set()
|
|
|
|
seen.add(id(gcl))
|
|
seen.add(id(olist))
|
|
seen.add(id(seen))
|
|
seen.update(set([id(o) for o in ignored]))
|
|
|
|
_getr(gcl, olist, seen)
|
|
|
|
return olist
|
|
|
|
|
|
def create_type_count_map(olist):
|
|
map = defaultdict(int)
|
|
for o in olist:
|
|
map[type(o)] += 1
|
|
return map
|
|
|
|
|
|
def diff_type_count_maps(before, after):
|
|
return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# \todo This is an attempt to see the Python objects that are not collected,
|
|
# but this doesn't work very well, as things always leak a bit.
|
|
test_leaks = False
|
|
|
|
if test_leaks:
|
|
gc.unfreeze()
|
|
gc.collect()
|
|
|
|
obs_before = get_all_objects()
|
|
|
|
unittest.main(exit=False)
|
|
|
|
if test_leaks:
|
|
gc.unfreeze()
|
|
gc.collect()
|
|
|
|
obs_after = get_all_objects([obs_before]) # type: ignore
|
|
|
|
before = create_type_count_map(obs_before) # type: ignore
|
|
after = create_type_count_map(obs_after)
|
|
|
|
leaks = diff_type_count_maps(before, after)
|
|
if len(leaks) > 0:
|
|
print(leaks)
|