From b9054051b9bf232968dc52b4732a65381c6a0e5f Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Sat, 14 Apr 2012 16:59:54 +0200 Subject: Reduce task queue usage --- src/fastd.c | 166 +++++++++++++++++++++--------------------- src/fastd.h | 4 + src/handshake.c | 2 +- src/protocol_ec25519_fhmqvc.c | 10 +-- src/task.c | 45 ------------ src/task.h | 21 ------ 6 files changed, 95 insertions(+), 153 deletions(-) diff --git a/src/fastd.c b/src/fastd.c index 711d903..71d0a04 100644 --- a/src/fastd.c +++ b/src/fastd.c @@ -186,6 +186,91 @@ static void close_sockets(fastd_context *ctx) { } } +static void fastd_send_type(fastd_context *ctx, fastd_peer *peer, uint8_t packet_type, fastd_buffer buffer) { + int sockfd; + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + + switch (peer->address.sa.sa_family) { + case AF_INET: + msg.msg_name = &peer->address.in; + msg.msg_namelen = sizeof(struct sockaddr_in); + sockfd = ctx->sockfd; + break; + + case AF_INET6: + msg.msg_name = &peer->address.in6; + msg.msg_namelen = sizeof(struct sockaddr_in6); + sockfd = ctx->sock6fd; + break; + + default: + exit_bug(ctx, "unsupported address family"); + } + + struct iovec iov[2] = { + { .iov_base = &packet_type, .iov_len = 1 }, + { .iov_base = buffer.data, .iov_len = buffer.len } + }; + + msg.msg_iov = iov; + msg.msg_iovlen = buffer.len ? 2 : 1; + + sendmsg(sockfd, &msg, 0); + + fastd_buffer_free(buffer); +} + +void fastd_send(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { + fastd_send_type(ctx, peer, PACKET_DATA, buffer); +} + +void fastd_send_handshake(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { + fastd_send_type(ctx, peer, PACKET_HANDSHAKE, buffer); +} + +void fastd_handle_receive(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { + if (ctx->conf->mode == MODE_TAP) { + const fastd_eth_addr *src_addr = fastd_get_source_address(ctx, buffer); + + if (fastd_eth_addr_is_unicast(src_addr)) + fastd_peer_eth_addr_add(ctx, peer, src_addr); + } + + if (write(ctx->tunfd, buffer.data, buffer.len) < 0) + warn_errno(ctx, "write"); + + if (ctx->conf->mode == MODE_TAP && ctx->conf->peer_to_peer) { + const fastd_eth_addr *dest_addr = fastd_get_dest_address(ctx, buffer); + + if (fastd_eth_addr_is_unicast(dest_addr)) { + fastd_peer *dest_peer = fastd_peer_find_by_eth_addr(ctx, dest_addr); + + if (dest_peer && dest_peer != peer && dest_peer->state == STATE_ESTABLISHED) { + ctx->conf->protocol->send(ctx, dest_peer, buffer); + } + else { + fastd_buffer_free(buffer); + } + } + else { + fastd_peer *dest_peer; + for (dest_peer = ctx->peers; dest_peer; dest_peer = dest_peer->next) { + if (dest_peer != peer && dest_peer->state == STATE_ESTABLISHED) { + fastd_buffer send_buffer = fastd_buffer_alloc(buffer.len, ctx->conf->method->min_encrypt_head_space(ctx), 0); + memcpy(send_buffer.data, buffer.data, buffer.len); + ctx->conf->protocol->send(ctx, dest_peer, send_buffer); + } + } + + fastd_buffer_free(buffer); + } + } + else { + fastd_buffer_free(buffer); + } +} + static void on_up(fastd_context *ctx) { if (!ctx->conf->on_up) return; @@ -273,87 +358,6 @@ static void handle_tasks(fastd_context *ctx) { fastd_task *task; while ((task = fastd_task_get(ctx)) != NULL) { switch (task->type) { - case TASK_SEND: - if (task->peer) { - int sockfd; - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - - switch (task->peer->address.sa.sa_family) { - case AF_INET: - msg.msg_name = &task->peer->address.in; - msg.msg_namelen = sizeof(struct sockaddr_in); - sockfd = ctx->sockfd; - break; - - case AF_INET6: - msg.msg_name = &task->peer->address.in6; - msg.msg_namelen = sizeof(struct sockaddr_in6); - sockfd = ctx->sock6fd; - break; - - default: - exit_bug(ctx, "unsupported address family"); - } - - uint8_t packet_type = task->send.packet_type; - - struct iovec iov[2] = { - { .iov_base = &packet_type, .iov_len = 1 }, - { .iov_base = task->send.buffer.data, .iov_len = task->send.buffer.len } - }; - - msg.msg_iov = iov; - msg.msg_iovlen = task->send.buffer.len ? 2 : 1; - - sendmsg(sockfd, &msg, 0); - } - - fastd_buffer_free(task->send.buffer); - break; - - case TASK_HANDLE_RECV: - if (ctx->conf->mode == MODE_TAP) { - const fastd_eth_addr *src_addr = fastd_get_source_address(ctx, task->handle_recv.buffer); - - if (fastd_eth_addr_is_unicast(src_addr)) - fastd_peer_eth_addr_add(ctx, task->peer, src_addr); - } - - if (write(ctx->tunfd, task->handle_recv.buffer.data, task->handle_recv.buffer.len) < 0) - warn_errno(ctx, "write"); - - if (ctx->conf->mode == MODE_TAP && ctx->conf->peer_to_peer) { - const fastd_eth_addr *dest_addr = fastd_get_dest_address(ctx, task->handle_recv.buffer); - - if (fastd_eth_addr_is_unicast(dest_addr)) { - fastd_peer *dest_peer = fastd_peer_find_by_eth_addr(ctx, dest_addr); - - if (dest_peer && dest_peer != task->peer && dest_peer->state == STATE_ESTABLISHED) { - ctx->conf->protocol->send(ctx, dest_peer, task->handle_recv.buffer); - } - else { - fastd_buffer_free(task->handle_recv.buffer); - } - } - else { - fastd_peer *dest_peer; - for (dest_peer = ctx->peers; dest_peer; dest_peer = dest_peer->next) { - if (dest_peer != task->peer && dest_peer->state == STATE_ESTABLISHED) { - fastd_buffer send_buffer = fastd_buffer_alloc(task->handle_recv.buffer.len, ctx->conf->method->min_encrypt_head_space(ctx), 0); - memcpy(send_buffer.data, task->handle_recv.buffer.data, task->handle_recv.buffer.len); - ctx->conf->protocol->send(ctx, dest_peer, send_buffer); - } - } - - fastd_buffer_free(task->handle_recv.buffer); - } - } - else { - fastd_buffer_free(task->handle_recv.buffer); - } - break; - case TASK_HANDSHAKE: pr_debug(ctx, "sending handshake to %P...", task->peer); ctx->conf->protocol->handshake_init(ctx, task->peer); diff --git a/src/fastd.h b/src/fastd.h index 2113ccd..102c2c4 100644 --- a/src/fastd.h +++ b/src/fastd.h @@ -163,6 +163,10 @@ struct _fastd_string_stack { char str[]; }; +void fastd_send(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); +void fastd_send_handshake(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); +void fastd_handle_receive(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); + void fastd_printf(const fastd_context *ctx, const char *format, ...); void fastd_read_peer_dir(fastd_context *ctx, fastd_config *conf, const char *dir); diff --git a/src/handshake.c b/src/handshake.c index 5553eb9..c301e4e 100644 --- a/src/handshake.c +++ b/src/handshake.c @@ -210,7 +210,7 @@ void fastd_handshake_handle(fastd_context *ctx, fastd_peer *peer, fastd_buffer b fastd_handshake_add_uint8(ctx, &reply_buffer, RECORD_REPLY_CODE, reply_code); fastd_handshake_add_uint8(ctx, &reply_buffer, RECORD_ERROR_DETAIL, error_detail); - fastd_task_put_send_handshake(ctx, peer, reply_buffer); + fastd_send_handshake(ctx, peer, reply_buffer); } else { ctx->conf->protocol->handshake_handle(ctx, peer, &handshake); diff --git a/src/protocol_ec25519_fhmqvc.c b/src/protocol_ec25519_fhmqvc.c index 3168d03..b4dab07 100644 --- a/src/protocol_ec25519_fhmqvc.c +++ b/src/protocol_ec25519_fhmqvc.c @@ -242,7 +242,7 @@ static void protocol_handshake_init(fastd_context *ctx, fastd_peer *peer) { fastd_handshake_add(ctx, &buffer, RECORD_SENDER_HANDSHAKE_KEY, PUBLICKEYBYTES, handshake->public_key.p); - fastd_task_put_send_handshake(ctx, peer, buffer); + fastd_send_handshake(ctx, peer, buffer); } static inline bool has_field(const fastd_handshake *handshake, uint8_t type, size_t length) { @@ -303,7 +303,7 @@ static void respond_handshake(fastd_context *ctx, fastd_peer *peer, const fastd_ fastd_handshake_add(ctx, &buffer, RECORD_RECEIPIENT_HANDSHAKE_KEY, PUBLICKEYBYTES, peer->protocol_state->accepting_handshake->peer_key.p); fastd_handshake_add(ctx, &buffer, RECORD_T, HMACBYTES, hmacbuf); - fastd_task_put_send_handshake(ctx, peer, buffer); + fastd_send_handshake(ctx, peer, buffer); peer->protocol_state->accepting_handshake->state = HANDSHAKE_STATE_RESPONSE; } @@ -423,7 +423,7 @@ static void finish_handshake(fastd_context *ctx, fastd_peer *peer, const fastd_h fastd_handshake_add(ctx, &buffer, RECORD_RECEIPIENT_HANDSHAKE_KEY, PUBLICKEYBYTES, peer->protocol_state->initiating_handshake->peer_key.p); fastd_handshake_add(ctx, &buffer, RECORD_T, HMACBYTES, hmacbuf); - fastd_task_put_send_handshake(ctx, peer, buffer); + fastd_send_handshake(ctx, peer, buffer); establish(ctx, peer, peer->protocol_state->initiating_handshake->peer_config, true, &peer->protocol_state->initiating_handshake->public_key, @@ -650,7 +650,7 @@ static void protocol_handle_recv(fastd_context *ctx, fastd_peer *peer, fastd_buf fastd_peer_seen(ctx, peer); if (recv_buffer.len) - fastd_task_put_handle_recv(ctx, peer, recv_buffer); + fastd_handle_receive(ctx, peer, recv_buffer); else fastd_buffer_free(recv_buffer); @@ -679,7 +679,7 @@ static void protocol_send(fastd_context *ctx, fastd_peer *peer, fastd_buffer buf if (!ctx->conf->method->encrypt(ctx, session->method_state, &send_buffer, buffer)) goto fail; - fastd_task_put_send(ctx, peer, send_buffer); + fastd_send(ctx, peer, send_buffer); fastd_task_delete_peer_keepalives(ctx, peer); fastd_task_schedule_keepalive(ctx, peer, ctx->conf->keepalive_interval*1000); diff --git a/src/task.c b/src/task.c index fb75766..3483853 100644 --- a/src/task.c +++ b/src/task.c @@ -31,35 +31,6 @@ fastd_task* fastd_task_get(fastd_context *ctx) { return container_of(fastd_queue_get(ctx, &ctx->task_queue), fastd_task, entry); } -static void fastd_task_put_send_type(fastd_context *ctx, fastd_peer *peer, uint8_t packet_type, fastd_buffer buffer) { - fastd_task *task = malloc(sizeof(fastd_task)); - - task->type = TASK_SEND; - task->peer = peer; - task->send.packet_type = packet_type; - task->send.buffer = buffer; - - fastd_queue_put(ctx, &ctx->task_queue, &task->entry, 0); -} - -void fastd_task_put_send_handshake(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { - fastd_task_put_send_type(ctx, peer, PACKET_HANDSHAKE, buffer); -} - -void fastd_task_put_send(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { - fastd_task_put_send_type(ctx, peer, PACKET_DATA, buffer); -} - -void fastd_task_put_handle_recv(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer) { - fastd_task *task = malloc(sizeof(fastd_task)); - - task->type = TASK_HANDLE_RECV; - task->peer = peer; - task->handle_recv.buffer = buffer; - - fastd_queue_put(ctx, &ctx->task_queue, &task->entry, 0); -} - static bool is_handshake(fastd_queue_entry *data, void *extra) { fastd_task *task = container_of(data, fastd_task, entry); fastd_peer *peer = extra; @@ -153,22 +124,6 @@ static bool delete_task(fastd_queue_entry *data, void *extra) { if (e->keepalive_only && task->type != TASK_KEEPALIVE) return true; - switch (task->type) { - case TASK_SEND: - fastd_buffer_free(task->send.buffer); - break; - - case TASK_HANDLE_RECV: - fastd_buffer_free(task->handle_recv.buffer); - break; - - case TASK_HANDSHAKE: - break; - - case TASK_KEEPALIVE: - break; - } - free(task); return false; diff --git a/src/task.h b/src/task.h index 259ccfc..954d141 100644 --- a/src/task.h +++ b/src/task.h @@ -34,8 +34,6 @@ typedef enum _fastd_task_type { - TASK_SEND, - TASK_HANDLE_RECV, TASK_HANDSHAKE, TASK_KEEPALIVE, } fastd_task_type; @@ -43,25 +41,11 @@ typedef enum _fastd_task_type { typedef struct _fastd_task_any { } fastd_task_any; -typedef struct _fastd_task_send { - fastd_packet_type packet_type; - fastd_buffer buffer; -} fastd_task_send; - -typedef struct _fastd_task_handle_recv { - fastd_buffer buffer; -} fastd_task_handle_recv; - typedef struct _fastd_task { fastd_queue_entry entry; fastd_task_type type; fastd_peer *peer; - - union { - fastd_task_send send; - fastd_task_handle_recv handle_recv; - }; } fastd_task; @@ -72,11 +56,6 @@ static inline int fastd_task_timeout(fastd_context *ctx) { fastd_task* fastd_task_get(fastd_context *ctx); -void fastd_task_put_send_handshake(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); - -void fastd_task_put_send(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); -void fastd_task_put_handle_recv(fastd_context *ctx, fastd_peer *peer, fastd_buffer buffer); - void fastd_task_schedule_handshake(fastd_context *ctx, fastd_peer *peer, int timeout); void fastd_task_schedule_keepalive(fastd_context *ctx, fastd_peer *peer, int timeout); -- cgit v1.2.3