gstreamer: Split completed request processing to a separate function

Simplify the task run function futher by moving the processing of
completed requests to a separate function. No functional change
intended, only increased readability.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Reviewed-by: Umang Jain <umang.jain@ideasonboard.com>
This commit is contained in:
Laurent Pinchart 2022-06-21 22:47:53 +03:00
parent cc9998c90f
commit 29ef923877

View file

@ -135,6 +135,7 @@ struct GstLibcameraSrcState {
int queueRequest();
void requestCompleted(Request *request);
int processRequest();
};
struct _GstLibcameraSrc {
@ -254,6 +255,64 @@ GstLibcameraSrcState::requestCompleted(Request *request)
gst_task_resume(src_->task);
}
/* Must be called with stream_lock held. */
int GstLibcameraSrcState::processRequest()
{
std::unique_ptr<RequestWrap> wrap;
{
MutexLocker locker(lock_);
if (!completedRequests_.empty()) {
wrap = std::move(completedRequests_.front());
completedRequests_.pop();
}
}
if (!wrap)
return -ENODATA;
GstFlowReturn ret = GST_FLOW_OK;
gst_flow_combiner_reset(src_->flow_combiner);
for (GstPad *srcpad : srcpads_) {
Stream *stream = gst_libcamera_pad_get_stream(srcpad);
GstBuffer *buffer = wrap->detachBuffer(stream);
FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
GST_BUFFER_PTS(buffer) = wrap->pts_;
gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
} else {
GST_BUFFER_PTS(buffer) = 0;
}
GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
ret = gst_pad_push(srcpad, buffer);
ret = gst_flow_combiner_update_pad_flow(src_->flow_combiner,
srcpad, ret);
}
if (ret != GST_FLOW_OK) {
if (ret == GST_FLOW_EOS) {
g_autoptr(GstEvent) eos = gst_event_new_eos();
guint32 seqnum = gst_util_seqnum_next();
gst_event_set_seqnum(eos, seqnum);
for (GstPad *srcpad : srcpads_)
gst_pad_push_event(srcpad, gst_event_ref(eos));
} else if (ret != GST_FLOW_FLUSHING) {
GST_ELEMENT_FLOW_ERROR(src_, ret);
}
return -EPIPE;
}
return 0;
}
static bool
gst_libcamera_src_open(GstLibcameraSrc *self)
{
@ -321,8 +380,13 @@ gst_libcamera_src_task_run(gpointer user_data)
GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
GstLibcameraSrcState *state = self->state;
int err = state->queueRequest();
if (err == -ENOMEM) {
/*
* Create and queue one request. If no buffers are available the
* function returns -ENOBUFS, which we ignore here as that's not a
* fatal error.
*/
int ret = state->queueRequest();
if (ret == -ENOMEM) {
GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
("Failed to allocate request for camera '%s'.",
state->cam_->id().c_str()),
@ -331,58 +395,16 @@ gst_libcamera_src_task_run(gpointer user_data)
return;
}
std::unique_ptr<RequestWrap> wrap;
{
MutexLocker locker(state->lock_);
if (!state->completedRequests_.empty()) {
wrap = std::move(state->completedRequests_.front());
state->completedRequests_.pop();
}
}
if (!wrap) {
gst_task_pause(self->task);
return;
}
GstFlowReturn ret = GST_FLOW_OK;
gst_flow_combiner_reset(self->flow_combiner);
for (GstPad *srcpad : state->srcpads_) {
Stream *stream = gst_libcamera_pad_get_stream(srcpad);
GstBuffer *buffer = wrap->detachBuffer(stream);
FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
GST_BUFFER_PTS(buffer) = wrap->pts_;
gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
} else {
GST_BUFFER_PTS(buffer) = 0;
}
GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
ret = gst_pad_push(srcpad, buffer);
ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
srcpad, ret);
}
if (ret != GST_FLOW_OK) {
if (ret == GST_FLOW_EOS) {
g_autoptr(GstEvent) eos = gst_event_new_eos();
guint32 seqnum = gst_util_seqnum_next();
gst_event_set_seqnum(eos, seqnum);
for (GstPad *srcpad : state->srcpads_)
gst_pad_push_event(srcpad, gst_event_ref(eos));
} else if (ret != GST_FLOW_FLUSHING) {
GST_ELEMENT_FLOW_ERROR(self, ret);
}
/* Process one completed request, if available. */
ret = state->processRequest();
switch (ret) {
case -EPIPE:
gst_task_stop(self->task);
return;
case -ENODATA:
gst_task_pause(self->task);
return;
}
/*