Aggregate similar updates

This commit is contained in:
Matthias Schiffer 2012-10-21 01:28:33 +02:00
parent 2bfd77c934
commit a45854b5d8
3 changed files with 161 additions and 26 deletions

View file

@ -27,6 +27,7 @@
#include "ffd.h"
#include "neigh.h"
#include "packet.h"
#include "send.h"
#include "tlv.h"
#include "tlv_types.h"
@ -40,6 +41,27 @@
#include <sys/uio.h>
typedef struct _announce_info_t {
ffd_node_id_t node;
uint16_t type;
uint16_t key;
} announce_info_t;
struct _ffd_update_t {
ffd_iface_t *iface;
ffd_neigh_t *neigh;
ffd_packet_t *packet;
size_t max_len;
ffd_node_id_t node;
unsigned n_updates;
announce_info_t announces[];
};
static bool send_eth(const eth_addr_t *addr, unsigned ifindex, const void *buf, size_t len) {
static const uint8_t zeros[46] = {0};
@ -223,6 +245,34 @@ static bool add_update(ffd_packet_t *packet, size_t max_len, ffd_node_id_t *node
return true;
}
static bool add_retract(ffd_packet_t *packet, size_t max_len, ffd_node_id_t *node_id, ffd_node_id_t node, uint16_t type, uint16_t key) {
uint16_t len = packet->len;
if (!node_id || !ffd_are_node_ids_equal(node_id, &node)) {
if (!add_node_id(packet, max_len, node))
return false;
if (node_id)
*node_id = node;
}
ffd_tlv_update_t *update = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t));
if (!update) {
packet->len = len;
return false;
}
update->flags = 0;
update->reserved = 0;
update->interval = htons(FFD_UPDATE_INTERVAL);
update->seqno = 0;
update->metric = 0xffff; /* no need to htons */
update->type = htons(type);
update->key = htons(key);
return true;
}
void ffd_send_update(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_announce_t *announce, bool with_data) {
ffd_packet_t *packet = alloca(sizeof(ffd_packet_t)+FFD_PACKET_MAX);
@ -264,21 +314,80 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin
if (!add_node_id(packet, FFD_PACKET_MAX, node))
return;
ffd_tlv_update_t *req = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t));
if (!req)
ffd_tlv_update_t *update = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_UPDATE, sizeof(ffd_tlv_update_t));
if (!update)
return;
req->flags = 0;
req->reserved = 0;
req->interval = htons(FFD_UPDATE_INTERVAL);
req->seqno = 0;
req->metric = 0xffff; /* no need to htons */
req->type = htons(type);
req->key = htons(key);
update->flags = 0;
update->reserved = 0;
update->interval = htons(FFD_UPDATE_INTERVAL);
update->seqno = 0;
update->metric = 0xffff; /* no need to htons */
update->type = htons(type);
update->key = htons(key);
send_neigh(neigh, packet);
}
ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh) {
unsigned max_updates = (FFD_PACKET_MAX+sizeof(ffd_tlv_update_t)+1)/(sizeof(ffd_tlv_update_t)+2);
ffd_update_t *update = calloc(1, sizeof(ffd_update_t) + max_updates*sizeof(announce_info_t));
update->iface = iface;
update->neigh = neigh;
update->max_len = FFD_PACKET_MAX;
update->packet = malloc(sizeof(ffd_packet_t)+FFD_PACKET_MAX);
update->packet->version_magic = htons(FFD_VERSION_MAGIC);
update->packet->len = 0;
return update;
}
static inline bool is_update_duplicate(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key) {
unsigned i;
for (i = 0; i < update->n_updates; i++) {
if (ffd_are_node_ids_equal(&update->announces[i].node, &node)
&& update->announces[i].type == type && update->announces[i].key == key)
return true;
}
return false;
}
bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce) {
if (is_update_duplicate(update, announce->node, announce->type, announce->key))
return false;
if (!add_update(update->packet, update->max_len, &update->node, announce, false, update->neigh))
return false;
update->announces[update->n_updates++] = (announce_info_t){announce->node, announce->type, announce->key};
return true;
}
bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key) {
if (is_update_duplicate(update, node, type, key))
return false;
if (!add_retract(update->packet, update->max_len, &update->node, node, type, key))
return false;
update->announces[update->n_updates++] = (announce_info_t){node, type, key};
return true;
}
void ffd_send_update_finish(ffd_update_t *update) {
fprintf(stderr, "debug: sending %u aggregated update(s).\n", update->n_updates);
if (update->packet->len)
send_any(update->iface, update->neigh, update->packet);
free(update->packet);
free(update);
}
void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data) {
ffd_packet_t *packet = alloca(sizeof(ffd_packet_t)+FFD_PACKET_MAX);

View file

@ -30,6 +30,9 @@
#include "ffd.h"
typedef struct _ffd_update_t ffd_update_t;
void ffd_send_ack(ffd_neigh_t *neigh, uint16_t nonce);
void ffd_send_hellos(void);
void ffd_send_update(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_announce_t *announce, bool with_data);
@ -37,6 +40,12 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin
void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data);
void ffd_send_seqno_request(ffd_neigh_t *neigh, ffd_announce_t *announce, uint16_t seqno);
ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh);
bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce);
bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key);
void ffd_send_update_finish(ffd_update_t *update);
static inline void ffd_send_seqno_request_for(ffd_neigh_t *neigh, ffd_announce_t *announce) {
if (FFD_IS_INFINITY(announce->feasibility_distance))
return;

View file

@ -35,18 +35,18 @@
static ffd_queue_t *pending_updates = NULL;
typedef struct _update_arg {
typedef struct _update_arg_t {
ffd_node_id_t node;
uint16_t type;
uint16_t key;
ffd_neigh_t *neigh;
unsigned ref;
} update_arg;
} update_arg_t;
static inline update_arg* update_new(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh) {
update_arg *arg = malloc(sizeof(update_arg));
static inline update_arg_t* update_new(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh) {
update_arg_t *arg = malloc(sizeof(update_arg_t));
arg->node = *node;
arg->type = type;
@ -61,12 +61,12 @@ static inline update_arg* update_new(const ffd_node_id_t *node, uint16_t type, u
return arg;
}
static inline update_arg* update_ref(update_arg *arg) {
static inline update_arg_t* update_ref(update_arg_t *arg) {
arg->ref++;
return arg;
}
static inline void update_unref(update_arg *arg) {
static inline void update_unref(update_arg_t *arg) {
if (!--arg->ref) {
if (arg->neigh)
ffd_neigh_unref(arg->neigh);
@ -79,7 +79,7 @@ void ffd_update_enqueue(const ffd_node_id_t *node, uint16_t type, uint16_t key,
if (neigh)
ffd_neigh_ref(neigh);
update_arg *arg = update_new(node, type, key, neigh);
update_arg_t *arg = update_new(node, type, key, neigh);
if (urgent) {
ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_URGENT_DELAY, update_ref(arg));
@ -99,19 +99,36 @@ int ffd_update_timeout(void) {
void ffd_update_run(void) {
while (!ffd_update_timeout()) {
update_arg *arg = pending_updates->arg;
ffd_neigh_t *neigh = ((update_arg_t*)pending_updates->arg)->neigh;
ffd_neigh_t *neigh = arg->neigh;
ffd_announce_t *announce = ffd_announce_find(&arg->node, arg->type, arg->key);
ffd_update_t *update = ffd_send_update_new(NULL, neigh);
fprintf(stderr, "debug: sending scheduled update.\n");
fprintf(stderr, "debug: sending scheduled updates.\n");
if (announce)
ffd_send_update(NULL, neigh, announce, false);
else
ffd_send_retract(neigh, arg->node, arg->type, arg->key);
ffd_queue_t **entry;
for (entry = &pending_updates; *entry;) {
update_arg_t *arg = (*entry)->arg;
update_unref(arg);
ffd_queue_drop(&pending_updates);
if (arg->neigh != neigh) {
entry = &(*entry)->next;
continue;
}
ffd_announce_t *announce = ffd_announce_find(&arg->node, arg->type, arg->key);
bool ret;
if (announce)
ret = ffd_send_update_add(update, announce);
else
ret = ffd_send_update_retract(update, arg->node, arg->type, arg->key);
if (!ret)
break;
update_unref(arg);
ffd_queue_drop(entry);
}
ffd_send_update_finish(update);
}
}