From d3eb3ac8887ea10811125e745bda77750f67fd25 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Wed, 23 Apr 2014 17:36:59 +0200 Subject: Use socketpair instead of pipe for internal message pipes While pipe2 recently got O_DIRECT on Linux, we'll just use SOCK_DGRAM to stay portable, and get proper handling of full queues. --- src/async.c | 72 +++++++++++++++++++++++++++++++++++++++++-------------------- src/fastd.c | 3 ++- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/async.c b/src/async.c index 144a82c..e1b3709 100644 --- a/src/async.c +++ b/src/async.c @@ -27,23 +27,22 @@ #include "async.h" #include "fastd.h" +#include + + +typedef struct fastd_async_hdr { + fastd_async_type_t type; + size_t len; +} fastd_async_hdr_t; + void fastd_async_init(void) { fastd_open_pipe(&ctx.async_rfd, &ctx.async_wfd); + fastd_setfl(ctx.async_wfd, O_NONBLOCK, 0); } -static void handle_resolve_return(void) { - fastd_async_resolve_return_t resolve_return; - while (read(ctx.async_rfd, &resolve_return, sizeof(resolve_return)) < 0) { - if (errno != EINTR) - exit_errno("handle_resolve_return: read"); - } - - fastd_peer_address_t addresses[resolve_return.n_addr]; - while (read(ctx.async_rfd, &addresses, sizeof(addresses)) < 0) { - if (errno != EINTR) - exit_errno("handle_resolve_return: read"); - } +static void handle_resolve_return(const void *buf) { + const fastd_async_resolve_return_t *resolve_return = buf; size_t i; for (i = 0; i < VECTOR_LEN(ctx.peers); i++) { @@ -54,32 +53,49 @@ static void handle_resolve_return(void) { fastd_remote_t *remote; for (remote = peer->remotes; remote; remote = remote->next) { - if (remote == resolve_return.remote) + if (remote == resolve_return->remote) break; } if (!remote) continue; - fastd_peer_handle_resolve(peer, remote, resolve_return.n_addr, addresses); + fastd_peer_handle_resolve(peer, remote, resolve_return->n_addr, resolve_return->addr); break; } - fastd_remote_unref(resolve_return.remote); + fastd_remote_unref(resolve_return->remote); } void fastd_async_handle(void) { - fastd_async_type_t type; + fastd_async_hdr_t header; + struct iovec vec[2] = { + { .iov_base = &header, .iov_len = sizeof(header) }, + }; + struct msghdr msg = { + .msg_iov = vec, + .msg_iovlen = 1, + }; - while (read(ctx.async_rfd, &type, sizeof(type)) < 0) { + while (recvmsg(ctx.async_rfd, &msg, MSG_PEEK) < 0) { if (errno != EINTR) - exit_errno("fastd_async_handle: read"); + exit_errno("fastd_async_handle: recvmsg"); } - switch (type) { + uint8_t buf[header.len]; + vec[1].iov_base = buf; + vec[1].iov_len = sizeof(buf); + msg.msg_iovlen = 2; + + while (recvmsg(ctx.async_rfd, &msg, 0) < 0) { + if (errno != EINTR) + exit_errno("fastd_async_handle: recvmsg"); + } + + switch (header.type) { case ASYNC_TYPE_RESOLVE_RETURN: - handle_resolve_return(); + handle_resolve_return(buf); break; default: @@ -88,11 +104,21 @@ void fastd_async_handle(void) { } void fastd_async_enqueue(fastd_async_type_t type, const void *data, size_t len) { + fastd_async_hdr_t header; + /* use memset to zero the holes in the struct to make valgrind happy */ + memset(&header, 0, sizeof(header)); + header.type = type; + header.len = len; + struct iovec vec[2] = { - { .iov_base = &type, .iov_len = sizeof(type) }, + { .iov_base = &header, .iov_len = sizeof(header) }, { .iov_base = (void *)data, .iov_len = len }, }; + struct msghdr msg = { + .msg_iov = vec, + .msg_iovlen = 2, + }; - if (writev(ctx.async_wfd, vec, 2) < 0) - pr_error_errno("fastd_async_enqueue"); + if (sendmsg(ctx.async_wfd, &msg, 0) < 0) + pr_warn_errno("fastd_async_enqueue: sendmsg"); } diff --git a/src/fastd.c b/src/fastd.c index b6dabf1..e8b4677 100644 --- a/src/fastd.c +++ b/src/fastd.c @@ -124,7 +124,8 @@ static void init_signals(void) { void fastd_open_pipe(int *readfd, int *writefd) { int pipefd[2]; - if (pipe(pipefd)) + /* use socketpair with SOCK_DGRAM instead of pipe2 with O_DIRECT to keep this portable */ + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd)) exit_errno("pipe"); fastd_setfd(pipefd[0], FD_CLOEXEC, 0); -- cgit v1.2.3