From a45854b5d8dc699e68c5b538e299d5ef8fda59f3 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Sun, 21 Oct 2012 01:28:33 +0200 Subject: Aggregate similar updates --- ffd/send.c | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----- ffd/send.h | 9 +++++ ffd/update.c | 51 ++++++++++++++++-------- 3 files changed, 161 insertions(+), 26 deletions(-) diff --git a/ffd/send.c b/ffd/send.c index ebda0de..dc6544b 100644 --- a/ffd/send.c +++ b/ffd/send.c @@ -27,6 +27,7 @@ #include "ffd.h" #include "neigh.h" #include "packet.h" +#include "send.h" #include "tlv.h" #include "tlv_types.h" @@ -40,6 +41,27 @@ #include + +typedef struct _announce_info_t { + ffd_node_id_t node; + uint16_t type; + uint16_t key; +} announce_info_t; + +struct _ffd_update_t { + ffd_iface_t *iface; + ffd_neigh_t *neigh; + + ffd_packet_t *packet; + size_t max_len; + + ffd_node_id_t node; + + unsigned n_updates; + announce_info_t announces[]; +}; + + static bool send_eth(const eth_addr_t *addr, unsigned ifindex, const void *buf, size_t len) { static const uint8_t zeros[46] = {0}; @@ -223,6 +245,34 @@ static bool add_update(ffd_packet_t *packet, size_t max_len, ffd_node_id_t *node return true; } +static bool add_retract(ffd_packet_t *packet, size_t max_len, ffd_node_id_t *node_id, ffd_node_id_t node, uint16_t type, uint16_t key) { + uint16_t len = packet->len; + + if (!node_id || !ffd_are_node_ids_equal(node_id, &node)) { + if (!add_node_id(packet, max_len, node)) + return false; + + if (node_id) + *node_id = node; + } + + ffd_tlv_update_t *update = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t)); + if (!update) { + packet->len = len; + return false; + } + + update->flags = 0; + update->reserved = 0; + update->interval = htons(FFD_UPDATE_INTERVAL); + update->seqno = 0; + update->metric = 0xffff; /* no need to htons */ + update->type = htons(type); + update->key = htons(key); + + return true; +} + void ffd_send_update(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_announce_t *announce, bool with_data) { ffd_packet_t *packet = alloca(sizeof(ffd_packet_t)+FFD_PACKET_MAX); @@ -264,21 +314,80 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin if (!add_node_id(packet, FFD_PACKET_MAX, node)) return; - ffd_tlv_update_t *req = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t)); - if (!req) + ffd_tlv_update_t *update = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t)); + if (!update) return; - req->flags = 0; - req->reserved = 0; - req->interval = htons(FFD_UPDATE_INTERVAL); - req->seqno = 0; - req->metric = 0xffff; /* no need to htons */ - req->type = htons(type); - req->key = htons(key); + update->flags = 0; + update->reserved = 0; + update->interval = htons(FFD_UPDATE_INTERVAL); + update->seqno = 0; + update->metric = 0xffff; /* no need to htons */ + update->type = htons(type); + update->key = htons(key); send_neigh(neigh, packet); } +ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh) { + unsigned max_updates = (FFD_PACKET_MAX+sizeof(ffd_tlv_update_t)+1)/(sizeof(ffd_tlv_update_t)+2); + ffd_update_t *update = calloc(1, sizeof(ffd_update_t) + max_updates*sizeof(announce_info_t)); + update->iface = iface; + update->neigh = neigh; + + update->max_len = FFD_PACKET_MAX; + update->packet = malloc(sizeof(ffd_packet_t)+FFD_PACKET_MAX); + update->packet->version_magic = htons(FFD_VERSION_MAGIC); + update->packet->len = 0; + + return update; +} + +static inline bool is_update_duplicate(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key) { + unsigned i; + for (i = 0; i < update->n_updates; i++) { + if (ffd_are_node_ids_equal(&update->announces[i].node, &node) + && update->announces[i].type == type && update->announces[i].key == key) + return true; + } + + return false; +} + +bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce) { + if (is_update_duplicate(update, announce->node, announce->type, announce->key)) + return false; + + if (!add_update(update->packet, update->max_len, &update->node, announce, false, update->neigh)) + return false; + + update->announces[update->n_updates++] = (announce_info_t){announce->node, announce->type, announce->key}; + + return true; +} + +bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key) { + if (is_update_duplicate(update, node, type, key)) + return false; + + if (!add_retract(update->packet, update->max_len, &update->node, node, type, key)) + return false; + + update->announces[update->n_updates++] = (announce_info_t){node, type, key}; + + return true; +} + +void ffd_send_update_finish(ffd_update_t *update) { + fprintf(stderr, "debug: sending %u aggregated update(s).\n", update->n_updates); + + if (update->packet->len) + send_any(update->iface, update->neigh, update->packet); + + free(update->packet); + free(update); +} + void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data) { ffd_packet_t *packet = alloca(sizeof(ffd_packet_t)+FFD_PACKET_MAX); diff --git a/ffd/send.h b/ffd/send.h index ce70199..fbd9062 100644 --- a/ffd/send.h +++ b/ffd/send.h @@ -30,6 +30,9 @@ #include "ffd.h" +typedef struct _ffd_update_t ffd_update_t; + + void ffd_send_ack(ffd_neigh_t *neigh, uint16_t nonce); void ffd_send_hellos(void); void ffd_send_update(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_announce_t *announce, bool with_data); @@ -37,6 +40,12 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data); void ffd_send_seqno_request(ffd_neigh_t *neigh, ffd_announce_t *announce, uint16_t seqno); +ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh); +bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce); +bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key); +void ffd_send_update_finish(ffd_update_t *update); + + static inline void ffd_send_seqno_request_for(ffd_neigh_t *neigh, ffd_announce_t *announce) { if (FFD_IS_INFINITY(announce->feasibility_distance)) return; diff --git a/ffd/update.c b/ffd/update.c index 5ae3006..d273fec 100644 --- a/ffd/update.c +++ b/ffd/update.c @@ -35,18 +35,18 @@ static ffd_queue_t *pending_updates = NULL; -typedef struct _update_arg { +typedef struct _update_arg_t { ffd_node_id_t node; uint16_t type; uint16_t key; ffd_neigh_t *neigh; unsigned ref; -} update_arg; +} update_arg_t; -static inline update_arg* update_new(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh) { - update_arg *arg = malloc(sizeof(update_arg)); +static inline update_arg_t* update_new(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh) { + update_arg_t *arg = malloc(sizeof(update_arg_t)); arg->node = *node; arg->type = type; @@ -61,12 +61,12 @@ static inline update_arg* update_new(const ffd_node_id_t *node, uint16_t type, u return arg; } -static inline update_arg* update_ref(update_arg *arg) { +static inline update_arg_t* update_ref(update_arg_t *arg) { arg->ref++; return arg; } -static inline void update_unref(update_arg *arg) { +static inline void update_unref(update_arg_t *arg) { if (!--arg->ref) { if (arg->neigh) ffd_neigh_unref(arg->neigh); @@ -79,7 +79,7 @@ void ffd_update_enqueue(const ffd_node_id_t *node, uint16_t type, uint16_t key, if (neigh) ffd_neigh_ref(neigh); - update_arg *arg = update_new(node, type, key, neigh); + update_arg_t *arg = update_new(node, type, key, neigh); if (urgent) { ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_URGENT_DELAY, update_ref(arg)); @@ -99,19 +99,36 @@ int ffd_update_timeout(void) { void ffd_update_run(void) { while (!ffd_update_timeout()) { - update_arg *arg = pending_updates->arg; + ffd_neigh_t *neigh = ((update_arg_t*)pending_updates->arg)->neigh; - ffd_neigh_t *neigh = arg->neigh; - ffd_announce_t *announce = ffd_announce_find(&arg->node, arg->type, arg->key); + ffd_update_t *update = ffd_send_update_new(NULL, neigh); - fprintf(stderr, "debug: sending scheduled update.\n"); + fprintf(stderr, "debug: sending scheduled updates.\n"); - if (announce) - ffd_send_update(NULL, neigh, announce, false); - else - ffd_send_retract(neigh, arg->node, arg->type, arg->key); + ffd_queue_t **entry; + for (entry = &pending_updates; *entry;) { + update_arg_t *arg = (*entry)->arg; - update_unref(arg); - ffd_queue_drop(&pending_updates); + if (arg->neigh != neigh) { + entry = &(*entry)->next; + continue; + } + + ffd_announce_t *announce = ffd_announce_find(&arg->node, arg->type, arg->key); + bool ret; + + if (announce) + ret = ffd_send_update_add(update, announce); + else + ret = ffd_send_update_retract(update, arg->node, arg->type, arg->key); + + if (!ret) + break; + + update_unref(arg); + ffd_queue_drop(entry); + } + + ffd_send_update_finish(update); } } -- cgit v1.2.3