py: Add unittests.py

Add a simple unittests.py as a base for python unittests.

Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Signed-off-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
This commit is contained in:
Tomi Valkeinen 2022-05-09 13:10:22 +03:00 committed by Kieran Bingham
parent 6e92cb9dc4
commit 06cb7130c4
3 changed files with 370 additions and 0 deletions

View file

@ -18,6 +18,7 @@ subdir('log')
subdir('media_device')
subdir('pipeline')
subdir('process')
subdir('py')
subdir('serialization')
subdir('stream')
subdir('v4l2_compat')

17
test/py/meson.build Normal file
View file

@ -0,0 +1,17 @@
# SPDX-License-Identifier: CC0-1.0
if not pycamera_enabled
subdir_done()
endif
pymod = import('python')
py3 = pymod.find_installation('python3')
pypathdir = meson.project_build_root() / 'src' / 'py'
test('pyunittests',
py3,
args : files('unittests.py'),
env : ['PYTHONPATH=' + pypathdir],
suite : 'pybindings',
is_parallel : false)

352
test/py/unittests.py Executable file
View file

@ -0,0 +1,352 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
from collections import defaultdict
import errno
import gc
import libcamera as libcam
import os
import selectors
import time
import unittest
import weakref
class BaseTestCase(unittest.TestCase):
def assertZero(self, a, msg=None):
self.assertEqual(a, 0, msg)
class SimpleTestMethods(BaseTestCase):
def test_get_ref(self):
cm = libcam.CameraManager.singleton()
wr_cm = weakref.ref(cm)
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
wr_cam = weakref.ref(cam)
cm = None
gc.collect()
self.assertIsNotNone(wr_cm())
cam = None
gc.collect()
self.assertIsNone(wr_cm())
self.assertIsNone(wr_cam())
def test_acquire_release(self):
cm = libcam.CameraManager.singleton()
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
ret = cam.acquire()
self.assertZero(ret)
ret = cam.release()
self.assertZero(ret)
def test_double_acquire(self):
cm = libcam.CameraManager.singleton()
cam = cm.get('platform/vimc.0 Sensor B')
self.assertIsNotNone(cam)
ret = cam.acquire()
self.assertZero(ret)
libcam.log_set_level('Camera', 'FATAL')
ret = cam.acquire()
self.assertEqual(ret, -errno.EBUSY)
libcam.log_set_level('Camera', 'ERROR')
ret = cam.release()
self.assertZero(ret)
ret = cam.release()
# I expected EBUSY, but looks like double release works fine
self.assertZero(ret)
class CameraTesterBase(BaseTestCase):
def setUp(self):
self.cm = libcam.CameraManager.singleton()
self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None)
if self.cam is None:
self.cm = None
self.skipTest('No vimc found')
ret = self.cam.acquire()
if ret != 0:
self.cam = None
self.cm = None
raise Exception('Failed to acquire camera')
def tearDown(self):
# If a test fails, the camera may be in running state. So always stop.
self.cam.stop()
ret = self.cam.release()
if ret != 0:
raise Exception('Failed to release camera')
self.cam = None
self.cm = None
class AllocatorTestMethods(CameraTesterBase):
def test_allocator(self):
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
wr_camconfig = weakref.ref(camconfig)
streamconfig = camconfig.at(0)
wr_streamconfig = weakref.ref(streamconfig)
ret = cam.configure(camconfig)
self.assertZero(ret)
stream = streamconfig.stream
wr_stream = weakref.ref(stream)
# stream should keep streamconfig and camconfig alive
streamconfig = None
camconfig = None
gc.collect()
self.assertIsNotNone(wr_camconfig())
self.assertIsNotNone(wr_streamconfig())
allocator = libcam.FrameBufferAllocator(cam)
ret = allocator.allocate(stream)
self.assertTrue(ret > 0)
wr_allocator = weakref.ref(allocator)
buffers = allocator.buffers(stream)
buffers = None
buffer = allocator.buffers(stream)[0]
self.assertIsNotNone(buffer)
wr_buffer = weakref.ref(buffer)
allocator = None
gc.collect()
self.assertIsNotNone(wr_buffer())
self.assertIsNotNone(wr_allocator())
self.assertIsNotNone(wr_stream())
buffer = None
gc.collect()
self.assertIsNone(wr_buffer())
self.assertIsNone(wr_allocator())
self.assertIsNotNone(wr_stream())
stream = None
gc.collect()
self.assertIsNone(wr_stream())
self.assertIsNone(wr_camconfig())
self.assertIsNone(wr_streamconfig())
class SimpleCaptureMethods(CameraTesterBase):
def test_sleep(self):
cm = self.cm
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
streamconfig = camconfig.at(0)
fmts = streamconfig.formats
ret = cam.configure(camconfig)
self.assertZero(ret)
stream = streamconfig.stream
allocator = libcam.FrameBufferAllocator(cam)
ret = allocator.allocate(stream)
self.assertTrue(ret > 0)
num_bufs = len(allocator.buffers(stream))
reqs = []
for i in range(num_bufs):
req = cam.create_request(i)
self.assertIsNotNone(req)
buffer = allocator.buffers(stream)[i]
ret = req.add_buffer(stream, buffer)
self.assertZero(ret)
reqs.append(req)
buffer = None
ret = cam.start()
self.assertZero(ret)
for req in reqs:
ret = cam.queue_request(req)
self.assertZero(ret)
reqs = None
gc.collect()
time.sleep(0.5)
reqs = cm.get_ready_requests()
self.assertTrue(len(reqs) == num_bufs)
for i, req in enumerate(reqs):
self.assertTrue(i == req.cookie)
reqs = None
gc.collect()
ret = cam.stop()
self.assertZero(ret)
def test_select(self):
cm = self.cm
cam = self.cam
camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
self.assertTrue(camconfig.size == 1)
streamconfig = camconfig.at(0)
fmts = streamconfig.formats
ret = cam.configure(camconfig)
self.assertZero(ret)
stream = streamconfig.stream
allocator = libcam.FrameBufferAllocator(cam)
ret = allocator.allocate(stream)
self.assertTrue(ret > 0)
num_bufs = len(allocator.buffers(stream))
reqs = []
for i in range(num_bufs):
req = cam.create_request(i)
self.assertIsNotNone(req)
buffer = allocator.buffers(stream)[i]
ret = req.add_buffer(stream, buffer)
self.assertZero(ret)
reqs.append(req)
buffer = None
ret = cam.start()
self.assertZero(ret)
for req in reqs:
ret = cam.queue_request(req)
self.assertZero(ret)
reqs = None
gc.collect()
sel = selectors.DefaultSelector()
sel.register(cm.efd, selectors.EVENT_READ)
reqs = []
running = True
while running:
events = sel.select()
for key, mask in events:
os.read(key.fileobj, 8)
ready_reqs = cm.get_ready_requests()
self.assertTrue(len(ready_reqs) > 0)
reqs += ready_reqs
if len(reqs) == num_bufs:
running = False
self.assertTrue(len(reqs) == num_bufs)
for i, req in enumerate(reqs):
self.assertTrue(i == req.cookie)
reqs = None
gc.collect()
ret = cam.stop()
self.assertZero(ret)
# Recursively expand slist's objects into olist, using seen to track already
# processed objects.
def _getr(slist, olist, seen):
for e in slist:
if id(e) in seen:
continue
seen.add(id(e))
olist.append(e)
tl = gc.get_referents(e)
if tl:
_getr(tl, olist, seen)
def get_all_objects(ignored=[]):
gcl = gc.get_objects()
olist = []
seen = set()
seen.add(id(gcl))
seen.add(id(olist))
seen.add(id(seen))
seen.update(set([id(o) for o in ignored]))
_getr(gcl, olist, seen)
return olist
def create_type_count_map(olist):
map = defaultdict(int)
for o in olist:
map[type(o)] += 1
return map
def diff_type_count_maps(before, after):
return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
if __name__ == '__main__':
# \todo This is an attempt to see the Python objects that are not collected,
# but this doesn't work very well, as things always leak a bit.
test_leaks = False
if test_leaks:
gc.unfreeze()
gc.collect()
obs_before = get_all_objects()
unittest.main(exit=False)
if test_leaks:
gc.unfreeze()
gc.collect()
obs_after = get_all_objects([obs_before])
before = create_type_count_map(obs_before)
after = create_type_count_map(obs_after)
leaks = diff_type_count_maps(before, after)
if len(leaks) > 0:
print(leaks)