From c5aeb0dc4228c6396f35395194e0ec9cc91519c4 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Tue, 10 Nov 2015 20:55:47 +0100 Subject: Implement generic task queue to handle handshakes and maintenance --- src/CMakeLists.txt | 1 + src/fastd.c | 16 ++--------- src/fastd.h | 5 ++-- src/peer.c | 13 +++------ src/peer.h | 8 +++--- src/poll.c | 18 +++++-------- src/task.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/task.h | 66 +++++++++++++++++++++++++++++++++++++++++++++ src/types.h | 8 ++++++ 9 files changed, 171 insertions(+), 42 deletions(-) create mode 100644 src/task.c create mode 100644 src/task.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1e6bb83..c92e457 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,6 +40,7 @@ add_executable(fastd shell.c socket.c status.c + task.c vector.c verify.c ${BISON_fastd_config_parse_OUTPUTS} diff --git a/src/fastd.c b/src/fastd.c index 8723add..603def1 100644 --- a/src/fastd.c +++ b/src/fastd.c @@ -503,7 +503,7 @@ static inline void init(int argc, char *argv[]) { init_config(&status_fd); fastd_update_time(); - ctx.next_maintenance = ctx.now + MAINTENANCE_INTERVAL; + fastd_task_schedule(&ctx.next_maintenance, TASK_TYPE_MAINTENANCE, ctx.now + MAINTENANCE_INTERVAL); fastd_receive_unknown_init(); @@ -574,17 +574,6 @@ static inline void init(int argc, char *argv[]) { } -/** Performs periodic maintenance tasks */ -static inline void maintenance(void) { - if (!fastd_timed_out(ctx.next_maintenance)) - return; - - fastd_socket_handle_binds(); - fastd_peer_maintenance(); - - ctx.next_maintenance += MAINTENANCE_INTERVAL; -} - /** Reaps zombies of asynchronous shell commands. */ static inline void reap_zombies(void) { size_t i; @@ -637,10 +626,9 @@ static inline void handle_signals(void) { /** A single iteration of fastd's main loop */ static inline void run(void) { - fastd_peer_handle_handshake_queue(); + fastd_task_handle(); fastd_poll_handle(); - maintenance(); handle_signals(); } diff --git a/src/fastd.h b/src/fastd.h index d15033c..9d44b21 100644 --- a/src/fastd.h +++ b/src/fastd.h @@ -41,6 +41,7 @@ #include "poll.h" #include "sem.h" #include "shell.h" +#include "task.h" #include "util.h" #include "vector.h" @@ -312,8 +313,8 @@ struct fastd_context { size_t peer_addr_ht_used; /**< The current number of entries in the peer address hashtable */ VECTOR(fastd_peer_t *) *peer_addr_ht; /**< An array of hash buckets for the peer hash table */ - fastd_pqueue_t *handshake_queue; /**< A priority queue of the peers currently queued for handshakes (ordered by the time of the next handshake) */ - fastd_timeout_t next_maintenance; /**< The time of the next maintenance call */ + fastd_pqueue_t *task_queue; /**< Priority queue of scheduled tasks */ + fastd_task_t next_maintenance; /**< Schedules the next maintenance call */ VECTOR(pid_t) async_pids; /**< PIDs of asynchronously executed commands which still have to be reaped */ fastd_poll_fd_t async_rfd; /**< The read side of the pipe used to send data from other threads to the main thread */ diff --git a/src/peer.c b/src/peer.c index 87df22d..7f1ccb5 100644 --- a/src/peer.c +++ b/src/peer.c @@ -264,9 +264,7 @@ void fastd_peer_reset_socket(fastd_peer_t *peer) { */ void fastd_peer_schedule_handshake(fastd_peer_t *peer, int delay) { fastd_peer_unschedule_handshake(peer); - - peer->handshake_entry.value = ctx.now + delay; - fastd_pqueue_insert(&ctx.handshake_queue, &peer->handshake_entry); + fastd_task_schedule(&peer->handshake_task, TASK_TYPE_HANDSHAKE, ctx.now + delay); } /** Checks if the peer group \e group1 lies in \e group2 */ @@ -819,13 +817,8 @@ static void send_handshake(fastd_peer_t *peer, fastd_remote_t *next_remote) { } /** Sends a handshake to one peer, if a scheduled handshake is due */ -void fastd_peer_handle_handshake_queue(void) { - if (!ctx.handshake_queue) - return; - if (!fastd_timed_out(ctx.handshake_queue->value)) - return; - - fastd_peer_t *peer = container_of(ctx.handshake_queue, fastd_peer_t, handshake_entry); +void fastd_peer_handle_handshake_task(fastd_task_t *task) { + fastd_peer_t *peer = container_of(task, fastd_peer_t, handshake_task); fastd_peer_schedule_handshake_default(peer); diff --git a/src/peer.h b/src/peer.h index 5f5da54..e83859e 100644 --- a/src/peer.h +++ b/src/peer.h @@ -91,7 +91,7 @@ struct fastd_peer { fastd_peer_state_t state; /**< The peer's state */ - fastd_pqueue_t handshake_entry; /**< Entry in the handshake queue */ + fastd_task_t handshake_task; /**< Entry in the handshake queue */ fastd_timeout_t last_handshake_timeout; /**< No handshakes are sent to the peer until this timeout has occured to avoid flooding the peer */ fastd_timeout_t last_handshake_response_timeout; /**< All handshakes from last_handshake_address will be ignored until this timeout has occured */ fastd_timeout_t establish_handshake_timeout; /**< A timeout during which all handshakes for this peer will be ignored after a new connection has been established */ @@ -175,7 +175,7 @@ static inline void fastd_peer_schedule_handshake_default(fastd_peer_t *peer) { /** Cancels a scheduled handshake */ static inline void fastd_peer_unschedule_handshake(fastd_peer_t *peer) { - fastd_pqueue_remove(&peer->handshake_entry); + fastd_task_unschedule(&peer->handshake_task); } #ifdef WITH_DYNAMIC_PEERS @@ -192,7 +192,7 @@ static inline void fastd_peer_set_verified(fastd_peer_t *peer, bool ok) { /** Checks if there's a handshake queued for the peer */ static inline bool fastd_peer_handshake_scheduled(fastd_peer_t *peer) { - return fastd_pqueue_linked(&peer->handshake_entry); + return fastd_task_scheduled(&peer->handshake_task); } /** Checks if a peer is floating (is has at least one floating remote or no remotes at all) */ @@ -270,7 +270,7 @@ static inline bool fastd_eth_addr_is_unicast(fastd_eth_addr_t addr) { void fastd_peer_eth_addr_add(fastd_peer_t *peer, fastd_eth_addr_t addr); bool fastd_peer_find_by_eth_addr(const fastd_eth_addr_t addr, fastd_peer_t **peer); -void fastd_peer_handle_handshake_queue(void); +void fastd_peer_handle_handshake_task(fastd_task_t *task); void fastd_peer_maintenance(void); void fastd_peer_reset_all(void); diff --git a/src/poll.c b/src/poll.c index 0c4dfa9..fed7001 100644 --- a/src/poll.c +++ b/src/poll.c @@ -51,12 +51,13 @@ #endif -/** Returns the time to the next handshake or -1 */ -static inline int handshake_timeout(void) { - if (!ctx.handshake_queue) +/** Returns the time to the next task or -1 */ +static inline int task_timeout(void) { + fastd_timeout_t timeout; + if (!fastd_task_timeout(&timeout)) return -1; - int diff_msec = ctx.handshake_queue->value - ctx.now; + int diff_msec = timeout - ctx.now; if (diff_msec < 0) return 0; else @@ -221,14 +222,7 @@ bool fastd_poll_fd_close(fastd_poll_fd_t *fd) { void fastd_poll_handle(void) { size_t i; - int maintenance_timeout = ctx.next_maintenance - ctx.now; - - if (maintenance_timeout < 0) - maintenance_timeout = 0; - - int timeout = handshake_timeout(); - if (timeout < 0 || timeout > maintenance_timeout) - timeout = maintenance_timeout; + int timeout = task_timeout(); if (!VECTOR_LEN(ctx.pollfds)) { for (i = 0; i < VECTOR_LEN(ctx.fds); i++) { diff --git a/src/task.c b/src/task.c new file mode 100644 index 0000000..44a8ebf --- /dev/null +++ b/src/task.c @@ -0,0 +1,78 @@ +/* + Copyright (c) 2012-2015, Matthias Schiffer + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +/** + \file + + Task queue +*/ + +#include "task.h" +#include "peer.h" + + +/** Performs periodic maintenance tasks */ +static inline void maintenance(void) { + fastd_socket_handle_binds(); + fastd_peer_maintenance(); + + fastd_task_reschedule_relative(&ctx.next_maintenance, MAINTENANCE_INTERVAL); +} + +static void handle_task(void) { + fastd_task_t *task = container_of(ctx.task_queue, fastd_task_t, entry); + fastd_pqueue_remove(ctx.task_queue); + + switch (task->type) { + case TASK_TYPE_MAINTENANCE: + maintenance(); + break; + + case TASK_TYPE_HANDSHAKE: + fastd_peer_handle_handshake_task(task); + break; + + default: + exit_bug("unknown task type"); + } +} + +void fastd_task_handle(void) { + while (ctx.task_queue && fastd_timed_out(ctx.task_queue->value)) + handle_task(); +} + +void fastd_task_reschedule(fastd_task_t *task, fastd_timeout_t timeout) { + task->entry.value = timeout; + fastd_pqueue_insert(&ctx.task_queue, &task->entry); +} + +bool fastd_task_timeout(fastd_timeout_t *timeout) { + if (!ctx.task_queue) + return false; + + *timeout = ctx.task_queue->value; + return true; +} diff --git a/src/task.h b/src/task.h new file mode 100644 index 0000000..4c00207 --- /dev/null +++ b/src/task.h @@ -0,0 +1,66 @@ +/* + Copyright (c) 2012-2015, Matthias Schiffer + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +/** + \file + + Task queue +*/ + +#pragma once + +#include "pqueue.h" + + +struct fastd_task { + fastd_pqueue_t entry; + fastd_task_type_t type; +}; + + +void fastd_task_handle(void); + +void fastd_task_reschedule(fastd_task_t *task, fastd_timeout_t timeout); + + +static inline bool fastd_task_scheduled(fastd_task_t *task) { + return fastd_pqueue_linked(&task->entry); +} + +static inline void fastd_task_unschedule(fastd_task_t *task) { + fastd_pqueue_remove(&task->entry); +} + +static inline void fastd_task_reschedule_relative(fastd_task_t *task, int64_t delay) { + fastd_task_reschedule(task, task->entry.value + delay); +} + +static inline void fastd_task_schedule(fastd_task_t *task, fastd_task_type_t type, fastd_timeout_t timeout) { + task->type = type; + fastd_task_reschedule(task, timeout); +} + + +bool fastd_task_timeout(fastd_timeout_t *timeout); diff --git a/src/types.h b/src/types.h index d5c8719..5ba6481 100644 --- a/src/types.h +++ b/src/types.h @@ -85,6 +85,13 @@ typedef enum fastd_poll_type { POLL_TYPE_SOCKET, /**< A network socket */ } fastd_poll_type_t; +/** Task types */ +typedef enum fastd_task_type { + TASK_TYPE_UNSPEC = 0, /**< Unspecified task type */ + TASK_TYPE_MAINTENANCE, /**< Scheduled maintenance */ + TASK_TYPE_HANDSHAKE, /**< Scheduled handshake */ +} fastd_task_type_t; + /** A timestamp used as a timeout */ typedef int64_t fastd_timeout_t; @@ -93,6 +100,7 @@ typedef int64_t fastd_timeout_t; typedef struct fastd_buffer fastd_buffer_t; typedef struct fastd_poll_fd fastd_poll_fd_t; typedef struct fastd_pqueue fastd_pqueue_t; +typedef struct fastd_task fastd_task_t; typedef union fastd_peer_address fastd_peer_address_t; typedef struct fastd_bind_address fastd_bind_address_t; -- cgit v1.2.3