Use acked updates for seqno request replies

This commit is contained in:
Matthias Schiffer 2012-10-21 05:32:43 +02:00
parent aa15b96d39
commit 758611ccd2
6 changed files with 56 additions and 12 deletions

View file

@ -40,6 +40,7 @@ static uint16_t nonce = 0;
typedef struct _ack_arg_t { typedef struct _ack_arg_t {
void (*cb)(uint16_t nonce, void *arg); void (*cb)(uint16_t nonce, void *arg);
void (*free)(uint16_t nonce, void *arg);
void *arg; void *arg;
uint16_t nonce; uint16_t nonce;
@ -56,6 +57,7 @@ static void ack_resend(const struct timespec *timeout, void *argp) {
ack_arg_t *arg = argp; ack_arg_t *arg = argp;
if (GET_ACK(arg->nonce) || !(--arg->retries)) { if (GET_ACK(arg->nonce) || !(--arg->retries)) {
arg->free(arg->nonce, arg->arg);
free(arg); free(arg);
return; return;
} }
@ -65,12 +67,13 @@ static void ack_resend(const struct timespec *timeout, void *argp) {
ffd_queue_put_delayed(&ack_requests, ack_resend, timeout, arg->interval, arg); ffd_queue_put_delayed(&ack_requests, ack_resend, timeout, arg->interval, arg);
} }
void ffd_ack_request(void (*cb)(uint16_t nonce, void *arg), unsigned interval, unsigned retries, void *arg) { void ffd_ack_request(void (*cb)(uint16_t nonce, void *arg), void (*free_cb)(uint16_t nonce, void *arg), unsigned interval, unsigned retries, void *arg) {
UNSET_ACK(nonce); UNSET_ACK(nonce);
cb(nonce, arg); cb(nonce, arg);
ack_arg_t *ack_arg = malloc(sizeof(ack_arg_t)); ack_arg_t *ack_arg = malloc(sizeof(ack_arg_t));
ack_arg->cb = cb; ack_arg->cb = cb;
ack_arg->free = free_cb;
ack_arg->arg = arg; ack_arg->arg = arg;
ack_arg->nonce = nonce++; ack_arg->nonce = nonce++;
ack_arg->interval = interval; ack_arg->interval = interval;

View file

@ -407,14 +407,14 @@ static void handle_tlv_seqno_req(const ffd_tlv_seqno_req_t *tlv_req, size_t len,
if ((int16_t)(seqno-announce->selected->metric_seqno.seqno) <= 0) { if ((int16_t)(seqno-announce->selected->metric_seqno.seqno) <= 0) {
fprintf(stderr, "debug: received seqno request, seqno already ok\n"); fprintf(stderr, "debug: received seqno request, seqno already ok\n");
ffd_update_enqueue(&announce->node, announce->type, announce->key, neigh, false); ffd_update_enqueue(&announce->node, announce->type, announce->key, neigh, true);
return; return;
} }
if (!announce->selected->neigh) { if (!announce->selected->neigh) {
fprintf(stderr, "debug: received seqno request, incrementing seqno\n"); fprintf(stderr, "debug: received seqno request, incrementing seqno\n");
announce->selected->metric_seqno.seqno++; announce->selected->metric_seqno.seqno++;
ffd_update_enqueue(&announce->node, announce->type, announce->key, neigh, false); ffd_update_enqueue(&announce->node, announce->type, announce->key, neigh, true);
return; return;
} }

View file

@ -209,7 +209,7 @@ bool ffd_announce_seqno_request(ffd_announce_t *announce, ffd_neigh_t *neigh, ui
void ffd_announce_free(ffd_announce_t *announce); void ffd_announce_free(ffd_announce_t *announce);
void ffd_ack_handle(uint16_t n); void ffd_ack_handle(uint16_t n);
void ffd_ack_request(void (*cb)(uint16_t nonce, void *arg), unsigned interval, unsigned retries, void *arg); void ffd_ack_request(void (*cb)(uint16_t nonce, void *arg), void (*free_cb)(uint16_t nonce, void *arg), unsigned interval, unsigned retries, void *arg);
int ffd_ack_timeout(void); int ffd_ack_timeout(void);
void ffd_ack_run(void); void ffd_ack_run(void);

View file

@ -188,6 +188,18 @@ void ffd_send_hellos(void) {
} }
} }
static bool add_ack_request(ffd_packet_t *packet, uint16_t nonce) {
ffd_tlv_ack_req_t *tlv = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_ACK_REQ, sizeof(ffd_tlv_ack_req_t));
if (!tlv)
return false;
tlv->reserved = 0;
tlv->nonce = htons(nonce);
tlv->interval = htons(FFD_ACK_INTERVAL);
return true;
}
static bool add_node_id(ffd_packet_t *packet, size_t max_len, ffd_node_id_t node_id) { static bool add_node_id(ffd_packet_t *packet, size_t max_len, ffd_node_id_t node_id) {
ffd_tlv_node_id_t *tlv = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_NODE_ID, sizeof(ffd_tlv_node_id_t)); ffd_tlv_node_id_t *tlv = ffd_tlv_add(packet, FFD_PACKET_MAX, TLV_NODE_ID, sizeof(ffd_tlv_node_id_t));
if (!tlv) if (!tlv)
@ -329,7 +341,7 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin
send_neigh(neigh, packet); send_neigh(neigh, packet);
} }
ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh) { ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh, const uint16_t *nonce) {
unsigned max_updates = (FFD_PACKET_MAX+sizeof(ffd_tlv_update_t)+1)/(sizeof(ffd_tlv_update_t)+2); unsigned max_updates = (FFD_PACKET_MAX+sizeof(ffd_tlv_update_t)+1)/(sizeof(ffd_tlv_update_t)+2);
ffd_update_t *update = calloc(1, sizeof(ffd_update_t) + max_updates*sizeof(announce_info_t)); ffd_update_t *update = calloc(1, sizeof(ffd_update_t) + max_updates*sizeof(announce_info_t));
update->iface = iface; update->iface = iface;
@ -340,6 +352,9 @@ ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh) {
update->packet->version_magic = htons(FFD_VERSION_MAGIC); update->packet->version_magic = htons(FFD_VERSION_MAGIC);
update->packet->len = 0; update->packet->len = 0;
if (nonce)
add_ack_request(update->packet, *nonce);
return update; return update;
} }

View file

@ -40,7 +40,7 @@ void ffd_send_retract(ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uin
void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data); void ffd_send_announce_request(ffd_iface_t *iface, ffd_neigh_t *neigh, ffd_node_id_t node, uint16_t type, uint16_t key, bool with_data);
void ffd_send_seqno_request(ffd_neigh_t *neigh, ffd_announce_t *announce, uint16_t seqno); void ffd_send_seqno_request(ffd_neigh_t *neigh, ffd_announce_t *announce, uint16_t seqno);
ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh); ffd_update_t* ffd_send_update_new(ffd_iface_t *iface, ffd_neigh_t *neigh, const uint16_t *nonce);
bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce); bool ffd_send_update_add(ffd_update_t *update, ffd_announce_t *announce);
bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key); bool ffd_send_update_retract(ffd_update_t *update, ffd_node_id_t node, uint16_t type, uint16_t key);
void ffd_send_update_finish(ffd_update_t *update); void ffd_send_update_finish(ffd_update_t *update);

View file

@ -75,6 +75,27 @@ static inline void update_unref(update_arg_t *arg) {
} }
} }
static void update_request_ack(uint16_t nonce, void *argp) {
update_arg_t *arg = argp;
fprintf(stderr, "debug: sending acked update with nonce %u.\n", nonce);
ffd_update_t *update = ffd_send_update_new(NULL, arg->neigh, &nonce);
ffd_announce_t *announce = ffd_announce_find(&arg->node, arg->type, arg->key);
if (announce)
ffd_send_update_add(update, announce);
else
ffd_send_update_retract(update, arg->node, arg->type, arg->key);
ffd_send_update_finish(update);
}
static void update_request_ack_free(uint16_t nonce, void *arg) {
update_unref(arg);
}
void ffd_update_enqueue(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh, bool urgent) { void ffd_update_enqueue(const ffd_node_id_t *node, uint16_t type, uint16_t key, ffd_neigh_t *neigh, bool urgent) {
if (neigh) if (neigh)
ffd_neigh_ref(neigh); ffd_neigh_ref(neigh);
@ -82,12 +103,17 @@ void ffd_update_enqueue(const ffd_node_id_t *node, uint16_t type, uint16_t key,
update_arg_t *arg = update_new(node, type, key, neigh); update_arg_t *arg = update_new(node, type, key, neigh);
if (urgent) { if (urgent) {
if (neigh) {
ffd_ack_request(update_request_ack, update_request_ack_free, FFD_ACK_INTERVAL, 10, update_ref(arg));
}
else {
ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_URGENT_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_URGENT_DELAY, update_ref(arg));
ffd_queue_put_delayed(&pending_updates, NULL, &now, 2*FFD_URGENT_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, 2*FFD_URGENT_DELAY, update_ref(arg));
ffd_queue_put_delayed(&pending_updates, NULL, &now, 3*FFD_URGENT_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, 3*FFD_URGENT_DELAY, update_ref(arg));
ffd_queue_put_delayed(&pending_updates, NULL, &now, 4*FFD_URGENT_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, 4*FFD_URGENT_DELAY, update_ref(arg));
ffd_queue_put_delayed(&pending_updates, NULL, &now, 5*FFD_URGENT_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, 5*FFD_URGENT_DELAY, update_ref(arg));
} }
}
else { else {
ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_DELAY, update_ref(arg)); ffd_queue_put_delayed(&pending_updates, NULL, &now, FFD_DELAY, update_ref(arg));
} }
@ -101,7 +127,7 @@ void ffd_update_run(void) {
while (!ffd_update_timeout()) { while (!ffd_update_timeout()) {
ffd_neigh_t *neigh = ((update_arg_t*)pending_updates->arg)->neigh; ffd_neigh_t *neigh = ((update_arg_t*)pending_updates->arg)->neigh;
ffd_update_t *update = ffd_send_update_new(NULL, neigh); ffd_update_t *update = ffd_send_update_new(NULL, neigh, NULL);
fprintf(stderr, "debug: sending scheduled updates.\n"); fprintf(stderr, "debug: sending scheduled updates.\n");