From 56ccdb40aba41facf5f2762f8b163222c7acdc88 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Mon, 18 Mar 2013 18:06:24 +0100 Subject: Add support for scheduled jobs --- include/gmrf/gmrf.h | 4 ++++ mmss/CMakeLists.txt | 1 + mmss/iface.c | 6 ++++++ mmss/mmss.c | 33 ++++++++++++++++++++++++++------- mmss/mmss.h | 20 ++++++++++++++++++++ mmss/queue.c | 2 +- mmss/schedule.c | 45 +++++++++++++++++++++++++++++++++++++++++++++ mmss/types.h | 1 + 8 files changed, 104 insertions(+), 8 deletions(-) create mode 100644 mmss/schedule.c diff --git a/include/gmrf/gmrf.h b/include/gmrf/gmrf.h index f6cf9c4..bccf338 100644 --- a/include/gmrf/gmrf.h +++ b/include/gmrf/gmrf.h @@ -59,6 +59,10 @@ size_t gmrf_iface_get_mtu(gmrf_t *gmrf, gmrf_iface_t *iface); bool gmrf_iface_send(gmrf_t *gmrf, gmrf_iface_t *iface, const void *data, size_t len, const gmrf_addr_t *dest); bool gmrf_iface_send_bc(gmrf_t *gmrf, gmrf_iface_t *iface, const void *data, size_t len); +typedef void (*gmrf_scheduled_func)(gmrf_t *gmrf, gmrf_context_t *ctx, void *arg); + +void gmrf_schedule(gmrf_t *gmrf, gmrf_scheduled_func f, void *arg, unsigned delay); + /* implemented by protocol */ extern const char *gmrf_protocol_name; extern const char *gmrf_protocol_version; diff --git a/mmss/CMakeLists.txt b/mmss/CMakeLists.txt index 7fcc4cb..5105436 100644 --- a/mmss/CMakeLists.txt +++ b/mmss/CMakeLists.txt @@ -5,5 +5,6 @@ add_executable(mmss mmss.c protocol.c queue.c + schedule.c ) target_link_libraries(mmss dl) diff --git a/mmss/iface.c b/mmss/iface.c index cecfed0..b175fa7 100644 --- a/mmss/iface.c +++ b/mmss/iface.c @@ -68,3 +68,9 @@ bool gmrf_iface_send_bc(gmrf_t *gmrf, gmrf_iface_t *iface, const void *data, siz return true; } + +void mmss_dispatch(mmss_packet_t *packet) { + packet->dest->node->proto->handle_packet(packet->dest->node, packet->dest->node->ctx, packet->dest, + &packet->source->address, packet->data, packet->len); + free(packet); +} diff --git a/mmss/mmss.c b/mmss/mmss.c index ae87afb..31a5623 100644 --- a/mmss/mmss.c +++ b/mmss/mmss.c @@ -39,6 +39,21 @@ static void init_nodes(gmrf_t *nodes) { } +static inline int timeout_min(int a, int b) { + if (a < 0) + return b; + else if (b < 0) + return a; + else + return min(a, b); +} + + +static int get_queue_timeout(const mmss_t *mmss) { + return timeout_min(mmss_queue_timeout(mmss, &mmss->packet_queue), mmss_queue_timeout(mmss, &mmss->scheduled_queue)); +} + + int main(int argc, char *argv[]) { if (argc != 2) { fprintf(stderr, "usage: %s protocol_module\n", argv[0]); @@ -75,7 +90,7 @@ int main(int argc, char *argv[]) { init_nodes(nodes); while (true) { - int timeout = mmss_queue_timeout(&mmss, &mmss.packet_queue); + int timeout = get_queue_timeout(&mmss); if (timeout < 0) { fprintf(stderr, "Nothing queued, deadlock occured.\n"); @@ -84,23 +99,27 @@ int main(int argc, char *argv[]) { if (timeout > 0) { assert(!mmss_queue_get(&mmss, &mmss.packet_queue)); + assert(!mmss_queue_get(&mmss, &mmss.scheduled_queue)); mmss.now += timeout; - timeout = mmss_queue_timeout(&mmss, &mmss.packet_queue); + timeout = get_queue_timeout(&mmss); } assert(timeout == 0); while (timeout == 0) { mmss_packet_t *packet = mmss_queue_get(&mmss, &mmss.packet_queue); - assert(packet); + mmss_scheduled_t *scheduled = mmss_queue_get(&mmss, &mmss.scheduled_queue); + + assert(packet || scheduled); - packet->dest->node->proto->handle_packet(packet->dest->node, packet->dest->node->ctx, packet->dest, - &packet->source->address, packet->data, packet->len); + if(packet) + mmss_dispatch(packet); - free(packet); + if (scheduled) + mmss_run_scheduled(scheduled); - timeout = mmss_queue_timeout(&mmss, &mmss.packet_queue); + timeout = get_queue_timeout(&mmss); } } diff --git a/mmss/mmss.h b/mmss/mmss.h index 55e86d5..59e3d4e 100644 --- a/mmss/mmss.h +++ b/mmss/mmss.h @@ -36,6 +36,7 @@ struct mmss { uint64_t now; mmss_queue_t packet_queue; + mmss_queue_t scheduled_queue; }; struct mmss_network { @@ -56,6 +57,12 @@ struct mmss_packet { uint8_t data[]; }; +struct mmss_scheduled { + gmrf_t *node; + gmrf_scheduled_func f; + void *arg; +}; + struct gmrf { gmrf_t *next; @@ -80,4 +87,17 @@ struct gmrf_iface { const mmss_protocol_t* mmss_load_protocol(const char *module); +void mmss_dispatch(mmss_packet_t *packet); +void mmss_run_scheduled(mmss_scheduled_t *scheduled); + + +static inline int max(int a, int b) { + return (a > b) ? a : b; +} + +static inline int min(int a, int b) { + return (a < b) ? a : b; +} + + #endif /* _GMRF_MMSS_MMSS_H_ */ diff --git a/mmss/queue.c b/mmss/queue.c index cf1e10a..c7c475b 100644 --- a/mmss/queue.c +++ b/mmss/queue.c @@ -40,7 +40,7 @@ struct mmss_queue_entry { void mmss_queue_put(mmss_t *mmss, mmss_queue_t *queue, void *data, uint64_t timeout) { - while (*queue && timeout > (*queue)->timeout) + while (*queue && timeout >= (*queue)->timeout) queue = &(*queue)->next; mmss_queue_entry_t *entry = malloc(sizeof(mmss_queue_entry_t)); diff --git a/mmss/schedule.c b/mmss/schedule.c new file mode 100644 index 0000000..0474613 --- /dev/null +++ b/mmss/schedule.c @@ -0,0 +1,45 @@ +/* + Copyright (c) 2013, Matthias Schiffer + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + + +#include "mmss.h" + +#include + + +void gmrf_schedule(gmrf_t *gmrf, gmrf_scheduled_func f, void *arg, unsigned delay) { + mmss_scheduled_t *scheduled = calloc(1, sizeof(mmss_scheduled_t)); + + scheduled->node = gmrf; + scheduled->f = f; + scheduled->arg = arg; + + mmss_queue_put(gmrf->mmss, &gmrf->mmss->scheduled_queue, scheduled, gmrf->mmss->now+delay); +} + +void mmss_run_scheduled(mmss_scheduled_t *scheduled) { + scheduled->f(scheduled->node, scheduled->node->ctx, scheduled->arg); + free(scheduled); +} diff --git a/mmss/types.h b/mmss/types.h index 4ce1e81..04c77be 100644 --- a/mmss/types.h +++ b/mmss/types.h @@ -30,5 +30,6 @@ typedef struct mmss mmss_t; typedef struct mmss_network mmss_network_t; typedef struct mmss_packet mmss_packet_t; +typedef struct mmss_scheduled mmss_scheduled_t; #endif /* _GMRF_MMSS_TYPES_H_ */ -- cgit v1.2.3