py: Move MappedFrameBuffer to libcamera.utils

Move MappedFrameBuffer to libcamera.utils, instead of extending
FrameBuffer class with a new mmap() method. This keeps us more aligned
to the C++ API.

Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
This commit is contained in:
Tomi Valkeinen 2022-05-27 17:44:24 +03:00 committed by Laurent Pinchart
parent 210ce547a4
commit 9e4388cca5
6 changed files with 87 additions and 83 deletions

View file

@ -9,6 +9,7 @@
import argparse
import binascii
import libcamera as libcam
import libcamera.utils
import sys
import traceback
@ -327,7 +328,7 @@ def request_handler(state, ctx, req):
crcs = []
if ctx['opt-crc']:
with fb.mmap() as mfb:
with libcamera.utils.MappedFrameBuffer(fb) as mfb:
plane_crcs = [binascii.crc32(p) for p in mfb.planes]
crcs.append(plane_crcs)
@ -345,7 +346,7 @@ def request_handler(state, ctx, req):
print(f'\t{ctrl} = {val}')
if ctx['opt-save-frames']:
with fb.mmap() as mfb:
with libcamera.utils.MappedFrameBuffer(fb) as mfb:
filename = 'frame-{}-{}-{}.data'.format(ctx['id'], stream_name, ctx['reqs-completed'])
with open(filename, 'wb') as f:
for p in mfb.planes:

View file

@ -9,6 +9,7 @@ from PIL import Image
from PIL.ImageQt import ImageQt
from PyQt5 import QtCore, QtGui, QtWidgets
import libcamera as libcam
import libcamera.utils
import numpy as np
import sys
@ -285,7 +286,7 @@ class MainWindow(QtWidgets.QWidget):
controlsLayout.addStretch()
def buf_to_qpixmap(self, stream, fb):
with fb.mmap() as mfb:
with libcamera.utils.MappedFrameBuffer(fb) as mfb:
cfg = stream.configuration
if cfg.pixel_format == libcam.formats.MJPEG:

View file

@ -2,83 +2,3 @@
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
from ._libcamera import *
class MappedFrameBuffer:
def __init__(self, fb):
self.__fb = fb
def __enter__(self):
import os
import mmap
fb = self.__fb
# Collect information about the buffers
bufinfos = {}
for i in range(fb.num_planes):
fd = fb.fd(i)
if fd not in bufinfos:
buflen = os.lseek(fd, 0, os.SEEK_END)
bufinfos[fd] = {'maplen': 0, 'buflen': buflen}
else:
buflen = bufinfos[fd]['buflen']
if fb.offset(i) > buflen or fb.offset(i) + fb.length(i) > buflen:
raise RuntimeError(f'plane is out of buffer: buffer length={buflen}, ' +
f'plane offset={fb.offset(i)}, plane length={fb.length(i)}')
bufinfos[fd]['maplen'] = max(bufinfos[fd]['maplen'], fb.offset(i) + fb.length(i))
# mmap the buffers
maps = []
for fd, info in bufinfos.items():
map = mmap.mmap(fd, info['maplen'], mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
info['map'] = map
maps.append(map)
self.__maps = tuple(maps)
# Create memoryviews for the planes
planes = []
for i in range(fb.num_planes):
fd = fb.fd(i)
info = bufinfos[fd]
mv = memoryview(info['map'])
start = fb.offset(i)
end = fb.offset(i) + fb.length(i)
mv = mv[start:end]
planes.append(mv)
self.__planes = tuple(planes)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
for p in self.__planes:
p.release()
for mm in self.__maps:
mm.close()
@property
def planes(self):
return self.__planes
def __FrameBuffer__mmap(self):
return MappedFrameBuffer(self)
FrameBuffer.mmap = __FrameBuffer__mmap

View file

@ -72,6 +72,10 @@ run_command('ln', '-fsT', files('__init__.py'),
meson.current_build_dir() / '__init__.py',
check: true)
run_command('ln', '-fsT', meson.current_source_dir() / 'utils',
meson.current_build_dir() / 'utils',
check: true)
install_data(['__init__.py'], install_dir : destdir)
# \todo Generate stubs when building. See https://peps.python.org/pep-0484/#stub-files

View file

@ -0,0 +1,74 @@
# SPDX-License-Identifier: LGPL-2.1-or-later
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
class MappedFrameBuffer:
def __init__(self, fb):
self.__fb = fb
def __enter__(self):
import os
import mmap
fb = self.__fb
# Collect information about the buffers
bufinfos = {}
for i in range(fb.num_planes):
fd = fb.fd(i)
if fd not in bufinfos:
buflen = os.lseek(fd, 0, os.SEEK_END)
bufinfos[fd] = {'maplen': 0, 'buflen': buflen}
else:
buflen = bufinfos[fd]['buflen']
if fb.offset(i) > buflen or fb.offset(i) + fb.length(i) > buflen:
raise RuntimeError(f'plane is out of buffer: buffer length={buflen}, ' +
f'plane offset={fb.offset(i)}, plane length={fb.length(i)}')
bufinfos[fd]['maplen'] = max(bufinfos[fd]['maplen'], fb.offset(i) + fb.length(i))
# mmap the buffers
maps = []
for fd, info in bufinfos.items():
map = mmap.mmap(fd, info['maplen'], mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
info['map'] = map
maps.append(map)
self.__maps = tuple(maps)
# Create memoryviews for the planes
planes = []
for i in range(fb.num_planes):
fd = fb.fd(i)
info = bufinfos[fd]
mv = memoryview(info['map'])
start = fb.offset(i)
end = fb.offset(i) + fb.length(i)
mv = mv[start:end]
planes.append(mv)
self.__planes = tuple(planes)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
for p in self.__planes:
p.release()
for mm in self.__maps:
mm.close()
@property
def planes(self):
return self.__planes

View file

@ -0,0 +1,4 @@
# SPDX-License-Identifier: LGPL-2.1-or-later
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
from .MappedFrameBuffer import MappedFrameBuffer