gstreamer: pool: Replace GstAtomicQueue with deque and mutex

The GstAtomicQueue only supports 2 threads, one pushing, and one
popping. We pop and push on error cases and we may have multiple threads
downstream returning buffer (using tee), which breaks this assumption.

On top of which, the release function, that notifies when the queue goes
from empty to not-empty relies on a racy empty check. The downstream
thread that does this check is effectively concurrent with our thread
calling acquire().

Fix this by replacing the GstAtomicQueue with a std::deque, and protect
access to that using the object lock.

Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Acked-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
Signed-off-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
This commit is contained in:
Nicolas Dufresne 2024-06-05 15:41:20 -04:00 committed by Kieran Bingham
parent 01132257b9
commit 04f1f20337

View file

@ -8,6 +8,7 @@
#include "gstlibcamerapool.h"
#include <deque>
#include <libcamera/stream.h>
#include "gstlibcamera-utils.h"
@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
struct _GstLibcameraPool {
GstBufferPool parent;
GstAtomicQueue *queue;
std::deque<GstBuffer *> *queue;
GstLibcameraAllocator *allocator;
Stream *stream;
};
G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
static GstBuffer *
gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
{
GLibLocker lock(GST_OBJECT(self));
GstBuffer *buf;
if (self->queue->empty())
return nullptr;
buf = self->queue->front();
self->queue->pop_front();
return buf;
}
static GstFlowReturn
gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
[[maybe_unused]] GstBufferPoolAcquireParams *params)
{
GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
if (!buf)
return GST_FLOW_ERROR;
if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
gst_atomic_queue_push(self->queue, buf);
GLibLocker lock(GST_OBJECT(self));
self->queue->push_back(buf);
return GST_FLOW_ERROR;
}
@ -64,9 +82,13 @@ static void
gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
{
GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
bool do_notify = gst_atomic_queue_length(self->queue) == 0;
bool do_notify;
gst_atomic_queue_push(self->queue, buffer);
{
GLibLocker lock(GST_OBJECT(self));
do_notify = self->queue->empty();
self->queue->push_back(buffer);
}
if (do_notify)
g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
static void
gst_libcamera_pool_init(GstLibcameraPool *self)
{
self->queue = gst_atomic_queue_new(4);
self->queue = new std::deque<GstBuffer *>();
}
static void
@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
GstBuffer *buf;
while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
while ((buf = gst_libcamera_pool_pop_buffer(self)))
gst_buffer_unref(buf);
gst_atomic_queue_unref(self->queue);
delete self->queue;
g_object_unref(self->allocator);
G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
for (gsize i = 0; i < pool_size; i++) {
GstBuffer *buffer = gst_buffer_new();
gst_atomic_queue_push(pool->queue, buffer);
pool->queue->push_back(buffer);
}
return pool;