libcamera: ipc: unix: Make socket operation asynchronous
Blocking socket operation when receiving messages may lead to long delays, and possibly a complete deadlock, if the remote side delays sending of the payload after the header, or doesn't send the payload at all. To avoid this, make the socket non-blocking and implement a simple state machine to receive the header synchronously with the socket read notification. The payload read is still synchronous with the receive() method to avoid data copies. Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Reviewed-by: Niklas Söderlund <niklas.soderlund@ragnatech.se>
This commit is contained in:
parent
a00fdabacd
commit
f137451817
2 changed files with 63 additions and 27 deletions
|
@ -49,6 +49,8 @@ private:
|
|||
void dataNotifier(EventNotifier *notifier);
|
||||
|
||||
int fd_;
|
||||
bool headerReceived_;
|
||||
struct Header header_;
|
||||
EventNotifier *notifier_;
|
||||
};
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include "ipc_unixsocket.h"
|
||||
|
||||
#include <poll.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
|
|||
* transporting entire payloads with guaranteed ordering.
|
||||
*
|
||||
* The IPC design is asynchronous, a message is queued to a receiver which gets
|
||||
* notified that a message is ready to be consumed by a signal. The queuer of
|
||||
* the message gets no notification when a message is delivered nor processed.
|
||||
* If such interactions are needed a protocol specific to the users use-case
|
||||
* should be implemented on top of the IPC objects.
|
||||
* notified that a message is ready to be consumed by the \ref readyRead
|
||||
* signal. The sender of the message gets no notification when a message is
|
||||
* delivered nor processed. If such interactions are needed a protocol specific
|
||||
* to the users use-case should be implemented on top of the IPC objects.
|
||||
*
|
||||
* Establishment of an IPC channel is asymmetrical. The side that initiates
|
||||
* communication first instantiates a local side socket and creates the channel
|
||||
|
@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
|
|||
*/
|
||||
|
||||
IPCUnixSocket::IPCUnixSocket()
|
||||
: fd_(-1), notifier_(nullptr)
|
||||
: fd_(-1), headerReceived_(false), notifier_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -89,7 +90,7 @@ int IPCUnixSocket::create()
|
|||
int sockets[2];
|
||||
int ret;
|
||||
|
||||
ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets);
|
||||
ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets);
|
||||
if (ret) {
|
||||
ret = -errno;
|
||||
LOG(IPCUnixSocket, Error)
|
||||
|
@ -142,6 +143,7 @@ void IPCUnixSocket::close()
|
|||
::close(fd_);
|
||||
|
||||
fd_ = -1;
|
||||
headerReceived_ = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload)
|
|||
* \param[out] payload Payload where to write the received message
|
||||
*
|
||||
* This method receives the message payload from the IPC channel and writes it
|
||||
* to the \a payload. It blocks until one message is received, if an
|
||||
* asynchronous behavior is desired this method should be called when the
|
||||
* readyRead signal is emitted.
|
||||
* to the \a payload. If no message payload is available, it returns
|
||||
* immediately with -EAGAIN. The \ref readyRead signal shall be used to receive
|
||||
* notification of message availability.
|
||||
*
|
||||
* \todo Add state machine to make sure we don't block forever and that
|
||||
* a header is always followed by a payload.
|
||||
*
|
||||
* \return 0 on success or a negative error code otherwise
|
||||
* \retval -EAGAIN No message payload is available
|
||||
* \retval -ENOTCONN The socket is not connected (neither create() nor bind()
|
||||
* has been called)
|
||||
*/
|
||||
int IPCUnixSocket::receive(Payload *payload)
|
||||
{
|
||||
Header hdr;
|
||||
int ret;
|
||||
|
||||
if (!isBound())
|
||||
return -ENOTCONN;
|
||||
|
||||
if (!payload)
|
||||
return -EINVAL;
|
||||
if (!headerReceived_)
|
||||
return -EAGAIN;
|
||||
|
||||
ret = ::recv(fd_, &hdr, sizeof(hdr), 0);
|
||||
if (ret < 0) {
|
||||
ret = -errno;
|
||||
LOG(IPCUnixSocket, Error)
|
||||
<< "Failed to recv header: " << strerror(-ret);
|
||||
payload->data.resize(header_.data);
|
||||
payload->fds.resize(header_.fds);
|
||||
|
||||
int ret = recvData(payload->data.data(), header_.data,
|
||||
payload->fds.data(), header_.fds);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
payload->data.resize(hdr.data);
|
||||
payload->fds.resize(hdr.fds);
|
||||
headerReceived_ = false;
|
||||
notifier_->setEnabled(true);
|
||||
|
||||
return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload)
|
|||
* \brief A Signal emitted when a message is ready to be read
|
||||
*/
|
||||
|
||||
int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num)
|
||||
int IPCUnixSocket::sendData(const void *buffer, size_t length,
|
||||
const int32_t *fds, unsigned int num)
|
||||
{
|
||||
struct iovec iov[1];
|
||||
iov[0].iov_base = const_cast<void *>(buffer);
|
||||
|
@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd
|
|||
return 0;
|
||||
}
|
||||
|
||||
int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num)
|
||||
int IPCUnixSocket::recvData(void *buffer, size_t length,
|
||||
int32_t *fds, unsigned int num)
|
||||
{
|
||||
struct iovec iov[1];
|
||||
iov[0].iov_base = buffer;
|
||||
|
@ -291,6 +295,7 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
|
|||
|
||||
if (recvmsg(fd_, &msg, 0) < 0) {
|
||||
int ret = -errno;
|
||||
if (ret != -EAGAIN)
|
||||
LOG(IPCUnixSocket, Error)
|
||||
<< "Failed to recvmsg: " << strerror(-ret);
|
||||
return ret;
|
||||
|
@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
|
|||
|
||||
void IPCUnixSocket::dataNotifier(EventNotifier *notifier)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (!headerReceived_) {
|
||||
/* Receive the header. */
|
||||
ret = ::recv(fd_, &header_, sizeof(header_), 0);
|
||||
if (ret < 0) {
|
||||
ret = -errno;
|
||||
LOG(IPCUnixSocket, Error)
|
||||
<< "Failed to receive header: " << strerror(-ret);
|
||||
return;
|
||||
}
|
||||
|
||||
headerReceived_ = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the payload has arrived, disable the notifier and emit the
|
||||
* readyRead signal. The notifier will be reenabled by the receive()
|
||||
* method.
|
||||
*/
|
||||
struct pollfd fds = { fd_, POLLIN, 0 };
|
||||
ret = poll(&fds, 1, 0);
|
||||
if (ret < 0)
|
||||
return;
|
||||
|
||||
if (!(fds.revents & POLLIN))
|
||||
return;
|
||||
|
||||
notifier_->setEnabled(false);
|
||||
readyRead.emit(this);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue