summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2015-11-10 20:55:47 +0100
committerMatthias Schiffer <mschiffer@universe-factory.net>2015-11-10 20:55:47 +0100
commitc5aeb0dc4228c6396f35395194e0ec9cc91519c4 (patch)
treeff8a08aadad6c84d9042ffcc1ead169977216c01
parent598a8acfc1c1aaefb3c7407e517cc1e0f87b753a (diff)
downloadfastd-c5aeb0dc4228c6396f35395194e0ec9cc91519c4.tar
fastd-c5aeb0dc4228c6396f35395194e0ec9cc91519c4.zip
Implement generic task queue to handle handshakes and maintenance
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/fastd.c16
-rw-r--r--src/fastd.h5
-rw-r--r--src/peer.c13
-rw-r--r--src/peer.h8
-rw-r--r--src/poll.c18
-rw-r--r--src/task.c78
-rw-r--r--src/task.h66
-rw-r--r--src/types.h8
9 files changed, 171 insertions, 42 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1e6bb83..c92e457 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -40,6 +40,7 @@ add_executable(fastd
shell.c
socket.c
status.c
+ task.c
vector.c
verify.c
${BISON_fastd_config_parse_OUTPUTS}
diff --git a/src/fastd.c b/src/fastd.c
index 8723add..603def1 100644
--- a/src/fastd.c
+++ b/src/fastd.c
@@ -503,7 +503,7 @@ static inline void init(int argc, char *argv[]) {
init_config(&status_fd);
fastd_update_time();
- ctx.next_maintenance = ctx.now + MAINTENANCE_INTERVAL;
+ fastd_task_schedule(&ctx.next_maintenance, TASK_TYPE_MAINTENANCE, ctx.now + MAINTENANCE_INTERVAL);
fastd_receive_unknown_init();
@@ -574,17 +574,6 @@ static inline void init(int argc, char *argv[]) {
}
-/** Performs periodic maintenance tasks */
-static inline void maintenance(void) {
- if (!fastd_timed_out(ctx.next_maintenance))
- return;
-
- fastd_socket_handle_binds();
- fastd_peer_maintenance();
-
- ctx.next_maintenance += MAINTENANCE_INTERVAL;
-}
-
/** Reaps zombies of asynchronous shell commands. */
static inline void reap_zombies(void) {
size_t i;
@@ -637,10 +626,9 @@ static inline void handle_signals(void) {
/** A single iteration of fastd's main loop */
static inline void run(void) {
- fastd_peer_handle_handshake_queue();
+ fastd_task_handle();
fastd_poll_handle();
- maintenance();
handle_signals();
}
diff --git a/src/fastd.h b/src/fastd.h
index d15033c..9d44b21 100644
--- a/src/fastd.h
+++ b/src/fastd.h
@@ -41,6 +41,7 @@
#include "poll.h"
#include "sem.h"
#include "shell.h"
+#include "task.h"
#include "util.h"
#include "vector.h"
@@ -312,8 +313,8 @@ struct fastd_context {
size_t peer_addr_ht_used; /**< The current number of entries in the peer address hashtable */
VECTOR(fastd_peer_t *) *peer_addr_ht; /**< An array of hash buckets for the peer hash table */
- fastd_pqueue_t *handshake_queue; /**< A priority queue of the peers currently queued for handshakes (ordered by the time of the next handshake) */
- fastd_timeout_t next_maintenance; /**< The time of the next maintenance call */
+ fastd_pqueue_t *task_queue; /**< Priority queue of scheduled tasks */
+ fastd_task_t next_maintenance; /**< Schedules the next maintenance call */
VECTOR(pid_t) async_pids; /**< PIDs of asynchronously executed commands which still have to be reaped */
fastd_poll_fd_t async_rfd; /**< The read side of the pipe used to send data from other threads to the main thread */
diff --git a/src/peer.c b/src/peer.c
index 87df22d..7f1ccb5 100644
--- a/src/peer.c
+++ b/src/peer.c
@@ -264,9 +264,7 @@ void fastd_peer_reset_socket(fastd_peer_t *peer) {
*/
void fastd_peer_schedule_handshake(fastd_peer_t *peer, int delay) {
fastd_peer_unschedule_handshake(peer);
-
- peer->handshake_entry.value = ctx.now + delay;
- fastd_pqueue_insert(&ctx.handshake_queue, &peer->handshake_entry);
+ fastd_task_schedule(&peer->handshake_task, TASK_TYPE_HANDSHAKE, ctx.now + delay);
}
/** Checks if the peer group \e group1 lies in \e group2 */
@@ -819,13 +817,8 @@ static void send_handshake(fastd_peer_t *peer, fastd_remote_t *next_remote) {
}
/** Sends a handshake to one peer, if a scheduled handshake is due */
-void fastd_peer_handle_handshake_queue(void) {
- if (!ctx.handshake_queue)
- return;
- if (!fastd_timed_out(ctx.handshake_queue->value))
- return;
-
- fastd_peer_t *peer = container_of(ctx.handshake_queue, fastd_peer_t, handshake_entry);
+void fastd_peer_handle_handshake_task(fastd_task_t *task) {
+ fastd_peer_t *peer = container_of(task, fastd_peer_t, handshake_task);
fastd_peer_schedule_handshake_default(peer);
diff --git a/src/peer.h b/src/peer.h
index 5f5da54..e83859e 100644
--- a/src/peer.h
+++ b/src/peer.h
@@ -91,7 +91,7 @@ struct fastd_peer {
fastd_peer_state_t state; /**< The peer's state */
- fastd_pqueue_t handshake_entry; /**< Entry in the handshake queue */
+ fastd_task_t handshake_task; /**< Entry in the handshake queue */
fastd_timeout_t last_handshake_timeout; /**< No handshakes are sent to the peer until this timeout has occured to avoid flooding the peer */
fastd_timeout_t last_handshake_response_timeout; /**< All handshakes from last_handshake_address will be ignored until this timeout has occured */
fastd_timeout_t establish_handshake_timeout; /**< A timeout during which all handshakes for this peer will be ignored after a new connection has been established */
@@ -175,7 +175,7 @@ static inline void fastd_peer_schedule_handshake_default(fastd_peer_t *peer) {
/** Cancels a scheduled handshake */
static inline void fastd_peer_unschedule_handshake(fastd_peer_t *peer) {
- fastd_pqueue_remove(&peer->handshake_entry);
+ fastd_task_unschedule(&peer->handshake_task);
}
#ifdef WITH_DYNAMIC_PEERS
@@ -192,7 +192,7 @@ static inline void fastd_peer_set_verified(fastd_peer_t *peer, bool ok) {
/** Checks if there's a handshake queued for the peer */
static inline bool fastd_peer_handshake_scheduled(fastd_peer_t *peer) {
- return fastd_pqueue_linked(&peer->handshake_entry);
+ return fastd_task_scheduled(&peer->handshake_task);
}
/** Checks if a peer is floating (is has at least one floating remote or no remotes at all) */
@@ -270,7 +270,7 @@ static inline bool fastd_eth_addr_is_unicast(fastd_eth_addr_t addr) {
void fastd_peer_eth_addr_add(fastd_peer_t *peer, fastd_eth_addr_t addr);
bool fastd_peer_find_by_eth_addr(const fastd_eth_addr_t addr, fastd_peer_t **peer);
-void fastd_peer_handle_handshake_queue(void);
+void fastd_peer_handle_handshake_task(fastd_task_t *task);
void fastd_peer_maintenance(void);
void fastd_peer_reset_all(void);
diff --git a/src/poll.c b/src/poll.c
index 0c4dfa9..fed7001 100644
--- a/src/poll.c
+++ b/src/poll.c
@@ -51,12 +51,13 @@
#endif
-/** Returns the time to the next handshake or -1 */
-static inline int handshake_timeout(void) {
- if (!ctx.handshake_queue)
+/** Returns the time to the next task or -1 */
+static inline int task_timeout(void) {
+ fastd_timeout_t timeout;
+ if (!fastd_task_timeout(&timeout))
return -1;
- int diff_msec = ctx.handshake_queue->value - ctx.now;
+ int diff_msec = timeout - ctx.now;
if (diff_msec < 0)
return 0;
else
@@ -221,14 +222,7 @@ bool fastd_poll_fd_close(fastd_poll_fd_t *fd) {
void fastd_poll_handle(void) {
size_t i;
- int maintenance_timeout = ctx.next_maintenance - ctx.now;
-
- if (maintenance_timeout < 0)
- maintenance_timeout = 0;
-
- int timeout = handshake_timeout();
- if (timeout < 0 || timeout > maintenance_timeout)
- timeout = maintenance_timeout;
+ int timeout = task_timeout();
if (!VECTOR_LEN(ctx.pollfds)) {
for (i = 0; i < VECTOR_LEN(ctx.fds); i++) {
diff --git a/src/task.c b/src/task.c
new file mode 100644
index 0000000..44a8ebf
--- /dev/null
+++ b/src/task.c
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2012-2015, Matthias Schiffer <mschiffer@universe-factory.net>
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ 2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+/**
+ \file
+
+ Task queue
+*/
+
+#include "task.h"
+#include "peer.h"
+
+
+/** Performs periodic maintenance tasks */
+static inline void maintenance(void) {
+ fastd_socket_handle_binds();
+ fastd_peer_maintenance();
+
+ fastd_task_reschedule_relative(&ctx.next_maintenance, MAINTENANCE_INTERVAL);
+}
+
+static void handle_task(void) {
+ fastd_task_t *task = container_of(ctx.task_queue, fastd_task_t, entry);
+ fastd_pqueue_remove(ctx.task_queue);
+
+ switch (task->type) {
+ case TASK_TYPE_MAINTENANCE:
+ maintenance();
+ break;
+
+ case TASK_TYPE_HANDSHAKE:
+ fastd_peer_handle_handshake_task(task);
+ break;
+
+ default:
+ exit_bug("unknown task type");
+ }
+}
+
+void fastd_task_handle(void) {
+ while (ctx.task_queue && fastd_timed_out(ctx.task_queue->value))
+ handle_task();
+}
+
+void fastd_task_reschedule(fastd_task_t *task, fastd_timeout_t timeout) {
+ task->entry.value = timeout;
+ fastd_pqueue_insert(&ctx.task_queue, &task->entry);
+}
+
+bool fastd_task_timeout(fastd_timeout_t *timeout) {
+ if (!ctx.task_queue)
+ return false;
+
+ *timeout = ctx.task_queue->value;
+ return true;
+}
diff --git a/src/task.h b/src/task.h
new file mode 100644
index 0000000..4c00207
--- /dev/null
+++ b/src/task.h
@@ -0,0 +1,66 @@
+/*
+ Copyright (c) 2012-2015, Matthias Schiffer <mschiffer@universe-factory.net>
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ 2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+/**
+ \file
+
+ Task queue
+*/
+
+#pragma once
+
+#include "pqueue.h"
+
+
+struct fastd_task {
+ fastd_pqueue_t entry;
+ fastd_task_type_t type;
+};
+
+
+void fastd_task_handle(void);
+
+void fastd_task_reschedule(fastd_task_t *task, fastd_timeout_t timeout);
+
+
+static inline bool fastd_task_scheduled(fastd_task_t *task) {
+ return fastd_pqueue_linked(&task->entry);
+}
+
+static inline void fastd_task_unschedule(fastd_task_t *task) {
+ fastd_pqueue_remove(&task->entry);
+}
+
+static inline void fastd_task_reschedule_relative(fastd_task_t *task, int64_t delay) {
+ fastd_task_reschedule(task, task->entry.value + delay);
+}
+
+static inline void fastd_task_schedule(fastd_task_t *task, fastd_task_type_t type, fastd_timeout_t timeout) {
+ task->type = type;
+ fastd_task_reschedule(task, timeout);
+}
+
+
+bool fastd_task_timeout(fastd_timeout_t *timeout);
diff --git a/src/types.h b/src/types.h
index d5c8719..5ba6481 100644
--- a/src/types.h
+++ b/src/types.h
@@ -85,6 +85,13 @@ typedef enum fastd_poll_type {
POLL_TYPE_SOCKET, /**< A network socket */
} fastd_poll_type_t;
+/** Task types */
+typedef enum fastd_task_type {
+ TASK_TYPE_UNSPEC = 0, /**< Unspecified task type */
+ TASK_TYPE_MAINTENANCE, /**< Scheduled maintenance */
+ TASK_TYPE_HANDSHAKE, /**< Scheduled handshake */
+} fastd_task_type_t;
+
/** A timestamp used as a timeout */
typedef int64_t fastd_timeout_t;
@@ -93,6 +100,7 @@ typedef int64_t fastd_timeout_t;
typedef struct fastd_buffer fastd_buffer_t;
typedef struct fastd_poll_fd fastd_poll_fd_t;
typedef struct fastd_pqueue fastd_pqueue_t;
+typedef struct fastd_task fastd_task_t;
typedef union fastd_peer_address fastd_peer_address_t;
typedef struct fastd_bind_address fastd_bind_address_t;