diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/9/ip/rudp.c |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/9/ip/rudp.c')
-rwxr-xr-x | sys/src/9/ip/rudp.c | 1055 |
1 files changed, 1055 insertions, 0 deletions
diff --git a/sys/src/9/ip/rudp.c b/sys/src/9/ip/rudp.c new file mode 100755 index 000000000..f3e205aba --- /dev/null +++ b/sys/src/9/ip/rudp.c @@ -0,0 +1,1055 @@ +/* + * Reliable User Datagram Protocol, currently only for IPv4. + * This protocol is compatible with UDP's packet format. + * It could be done over UDP if need be. + */ +#include "u.h" +#include "../port/lib.h" +#include "mem.h" +#include "dat.h" +#include "fns.h" +#include "../port/error.h" + +#include "ip.h" + +#define DEBUG 0 +#define DPRINT if(DEBUG)print + +#define SEQDIFF(a,b) ( (a)>=(b)?\ + (a)-(b):\ + 0xffffffffUL-((b)-(a)) ) +#define INSEQ(a,start,end) ( (start)<=(end)?\ + ((a)>(start)&&(a)<=(end)):\ + ((a)>(start)||(a)<=(end)) ) +#define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd) +#define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 ) + +enum +{ + UDP_PHDRSIZE = 12, /* pseudo header */ +// UDP_HDRSIZE = 20, /* pseudo header + udp header */ + UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */ + UDP_IPHDR = 8, /* ip header */ + IP_UDPPROTO = 254, + UDP_USEAD7 = 52, /* size of new ipv6 headers struct */ + + Rudprxms = 200, + Rudptickms = 50, + Rudpmaxxmit = 10, + Maxunacked = 100, +}; + +#define Hangupgen 0xffffffff /* used only in hangup messages */ + +typedef struct Udphdr Udphdr; +struct Udphdr +{ + /* ip header */ + uchar vihl; /* Version and header length */ + uchar tos; /* Type of service */ + uchar length[2]; /* packet length */ + uchar id[2]; /* Identification */ + uchar frag[2]; /* Fragment information */ + + /* pseudo header starts here */ + uchar Unused; + uchar udpproto; /* Protocol */ + uchar udpplen[2]; /* Header plus data length */ + uchar udpsrc[4]; /* Ip source */ + uchar udpdst[4]; /* Ip destination */ + + /* udp header */ + uchar udpsport[2]; /* Source port */ + uchar udpdport[2]; /* Destination port */ + uchar udplen[2]; /* data length */ + uchar udpcksum[2]; /* Checksum */ +}; + +typedef struct Rudphdr Rudphdr; +struct Rudphdr +{ + /* ip header */ + uchar vihl; /* Version and header length */ + uchar tos; /* Type of service */ + uchar length[2]; /* packet length */ + uchar id[2]; /* Identification */ + uchar frag[2]; /* Fragment information */ + + /* pseudo header starts here */ + uchar Unused; + uchar udpproto; /* Protocol */ + uchar udpplen[2]; /* Header plus data length */ + uchar udpsrc[4]; /* Ip source */ + uchar udpdst[4]; /* Ip destination */ + + /* udp header */ + uchar udpsport[2]; /* Source port */ + uchar udpdport[2]; /* Destination port */ + uchar udplen[2]; /* data length (includes rudp header) */ + uchar udpcksum[2]; /* Checksum */ + + /* rudp header */ + uchar relseq[4]; /* id of this packet (or 0) */ + uchar relsgen[4]; /* generation/time stamp */ + uchar relack[4]; /* packet being acked (or 0) */ + uchar relagen[4]; /* generation/time stamp */ +}; + + +/* + * one state structure per destination + */ +typedef struct Reliable Reliable; +struct Reliable +{ + Ref; + + Reliable *next; + + uchar addr[IPaddrlen]; /* always V6 when put here */ + ushort port; + + Block *unacked; /* unacked msg list */ + Block *unackedtail; /* and its tail */ + + int timeout; /* time since first unacked msg sent */ + int xmits; /* number of times first unacked msg sent */ + + ulong sndseq; /* next packet to be sent */ + ulong sndgen; /* and its generation */ + + ulong rcvseq; /* last packet received */ + ulong rcvgen; /* and its generation */ + + ulong acksent; /* last ack sent */ + ulong ackrcvd; /* last msg for which ack was rcvd */ + + /* flow control */ + QLock lock; + Rendez vous; + int blocked; +}; + + + +/* MIB II counters */ +typedef struct Rudpstats Rudpstats; +struct Rudpstats +{ + ulong rudpInDatagrams; + ulong rudpNoPorts; + ulong rudpInErrors; + ulong rudpOutDatagrams; +}; + +typedef struct Rudppriv Rudppriv; +struct Rudppriv +{ + Ipht ht; + + /* MIB counters */ + Rudpstats ustats; + + /* non-MIB stats */ + ulong csumerr; /* checksum errors */ + ulong lenerr; /* short packet */ + ulong rxmits; /* # of retransmissions */ + ulong orders; /* # of out of order pkts */ + + /* keeping track of the ack kproc */ + int ackprocstarted; + QLock apl; +}; + + +static ulong generation = 0; +static Rendez rend; + +/* + * protocol specific part of Conv + */ +typedef struct Rudpcb Rudpcb; +struct Rudpcb +{ + QLock; + uchar headers; + uchar randdrop; + Reliable *r; +}; + +/* + * local functions + */ +void relsendack(Conv*, Reliable*, int); +int reliput(Conv*, Block*, uchar*, ushort); +Reliable *relstate(Rudpcb*, uchar*, ushort, char*); +void relput(Reliable*); +void relforget(Conv *, uchar*, int, int); +void relackproc(void *); +void relackq(Reliable *, Block*); +void relhangup(Conv *, Reliable*); +void relrexmit(Conv *, Reliable*); +void relput(Reliable*); +void rudpkick(void *x); + +static void +rudpstartackproc(Proto *rudp) +{ + Rudppriv *rpriv; + char kpname[KNAMELEN]; + + rpriv = rudp->priv; + if(rpriv->ackprocstarted == 0){ + qlock(&rpriv->apl); + if(rpriv->ackprocstarted == 0){ + sprint(kpname, "#I%drudpack", rudp->f->dev); + kproc(kpname, relackproc, rudp); + rpriv->ackprocstarted = 1; + } + qunlock(&rpriv->apl); + } +} + +static char* +rudpconnect(Conv *c, char **argv, int argc) +{ + char *e; + Rudppriv *upriv; + + upriv = c->p->priv; + rudpstartackproc(c->p); + e = Fsstdconnect(c, argv, argc); + Fsconnected(c, e); + iphtadd(&upriv->ht, c); + + return e; +} + + +static int +rudpstate(Conv *c, char *state, int n) +{ + Rudpcb *ucb; + Reliable *r; + int m; + + m = snprint(state, n, "%s", c->inuse?"Open":"Closed"); + ucb = (Rudpcb*)c->ptcl; + qlock(ucb); + for(r = ucb->r; r; r = r->next) + m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r)); + m += snprint(state+m, n-m, "\n"); + qunlock(ucb); + return m; +} + +static char* +rudpannounce(Conv *c, char** argv, int argc) +{ + char *e; + Rudppriv *upriv; + + upriv = c->p->priv; + rudpstartackproc(c->p); + e = Fsstdannounce(c, argv, argc); + if(e != nil) + return e; + Fsconnected(c, nil); + iphtadd(&upriv->ht, c); + + return nil; +} + +static void +rudpcreate(Conv *c) +{ + c->rq = qopen(64*1024, Qmsg, 0, 0); + c->wq = qopen(64*1024, Qkick, rudpkick, c); +} + +static void +rudpclose(Conv *c) +{ + Rudpcb *ucb; + Reliable *r, *nr; + Rudppriv *upriv; + + upriv = c->p->priv; + iphtrem(&upriv->ht, c); + + /* force out any delayed acks */ + ucb = (Rudpcb*)c->ptcl; + qlock(ucb); + for(r = ucb->r; r; r = r->next){ + if(r->acksent != r->rcvseq) + relsendack(c, r, 0); + } + qunlock(ucb); + + qclose(c->rq); + qclose(c->wq); + qclose(c->eq); + ipmove(c->laddr, IPnoaddr); + ipmove(c->raddr, IPnoaddr); + c->lport = 0; + c->rport = 0; + + ucb->headers = 0; + ucb->randdrop = 0; + qlock(ucb); + for(r = ucb->r; r; r = nr){ + if(r->acksent != r->rcvseq) + relsendack(c, r, 0); + nr = r->next; + relhangup(c, r); + relput(r); + } + ucb->r = 0; + + qunlock(ucb); +} + +/* + * randomly don't send packets + */ +static void +doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos) +{ + Rudpcb *ucb; + + ucb = (Rudpcb*)c->ptcl; + if(ucb->randdrop && nrand(100) < ucb->randdrop) + freeblist(bp); + else + ipoput4(f, bp, x, ttl, tos, nil); +} + +int +flow(void *v) +{ + Reliable *r = v; + + return UNACKED(r) <= Maxunacked; +} + +void +rudpkick(void *x) +{ + Conv *c = x; + Udphdr *uh; + ushort rport; + uchar laddr[IPaddrlen], raddr[IPaddrlen]; + Block *bp; + Rudpcb *ucb; + Rudphdr *rh; + Reliable *r; + int dlen, ptcllen; + Rudppriv *upriv; + Fs *f; + + upriv = c->p->priv; + f = c->p->f; + + netlog(c->p->f, Logrudp, "rudp: kick\n"); + bp = qget(c->wq); + if(bp == nil) + return; + + ucb = (Rudpcb*)c->ptcl; + switch(ucb->headers) { + case 7: + /* get user specified addresses */ + bp = pullupblock(bp, UDP_USEAD7); + if(bp == nil) + return; + ipmove(raddr, bp->rp); + bp->rp += IPaddrlen; + ipmove(laddr, bp->rp); + bp->rp += IPaddrlen; + /* pick interface closest to dest */ + if(ipforme(f, laddr) != Runi) + findlocalip(f, laddr, raddr); + bp->rp += IPaddrlen; /* Ignore ifc address */ + rport = nhgets(bp->rp); + bp->rp += 2+2; /* Ignore local port */ + break; + default: + ipmove(raddr, c->raddr); + ipmove(laddr, c->laddr); + rport = c->rport; + break; + } + + dlen = blocklen(bp); + + /* Make space to fit rudp & ip header */ + bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE); + if(bp == nil) + return; + + uh = (Udphdr *)(bp->rp); + uh->vihl = IP_VER4; + + rh = (Rudphdr*)uh; + + ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE); + uh->Unused = 0; + uh->udpproto = IP_UDPPROTO; + uh->frag[0] = 0; + uh->frag[1] = 0; + hnputs(uh->udpplen, ptcllen); + switch(ucb->headers){ + case 7: + v6tov4(uh->udpdst, raddr); + hnputs(uh->udpdport, rport); + v6tov4(uh->udpsrc, laddr); + break; + default: + v6tov4(uh->udpdst, c->raddr); + hnputs(uh->udpdport, c->rport); + if(ipcmp(c->laddr, IPnoaddr) == 0) + findlocalip(f, c->laddr, c->raddr); + v6tov4(uh->udpsrc, c->laddr); + break; + } + hnputs(uh->udpsport, c->lport); + hnputs(uh->udplen, ptcllen); + uh->udpcksum[0] = 0; + uh->udpcksum[1] = 0; + + qlock(ucb); + r = relstate(ucb, raddr, rport, "kick"); + r->sndseq = NEXTSEQ(r->sndseq); + hnputl(rh->relseq, r->sndseq); + hnputl(rh->relsgen, r->sndgen); + + hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */ + hnputl(rh->relagen, r->rcvgen); + + if(r->rcvseq != r->acksent) + r->acksent = r->rcvseq; + + hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE)); + + relackq(r, bp); + qunlock(ucb); + + upriv->ustats.rudpOutDatagrams++; + + DPRINT("sent: %lud/%lud, %lud/%lud\n", + r->sndseq, r->sndgen, r->rcvseq, r->rcvgen); + + doipoput(c, f, bp, 0, c->ttl, c->tos); + + if(waserror()) { + relput(r); + qunlock(&r->lock); + nexterror(); + } + + /* flow control of sorts */ + qlock(&r->lock); + if(UNACKED(r) > Maxunacked){ + r->blocked = 1; + sleep(&r->vous, flow, r); + r->blocked = 0; + } + + qunlock(&r->lock); + relput(r); + poperror(); +} + +void +rudpiput(Proto *rudp, Ipifc *ifc, Block *bp) +{ + int len, olen, ottl; + Udphdr *uh; + Conv *c; + Rudpcb *ucb; + uchar raddr[IPaddrlen], laddr[IPaddrlen]; + ushort rport, lport; + Rudppriv *upriv; + Fs *f; + uchar *p; + + upriv = rudp->priv; + f = rudp->f; + + upriv->ustats.rudpInDatagrams++; + + uh = (Udphdr*)(bp->rp); + + /* Put back pseudo header for checksum + * (remember old values for icmpnoconv()) + */ + ottl = uh->Unused; + uh->Unused = 0; + len = nhgets(uh->udplen); + olen = nhgets(uh->udpplen); + hnputs(uh->udpplen, len); + + v4tov6(raddr, uh->udpsrc); + v4tov6(laddr, uh->udpdst); + lport = nhgets(uh->udpdport); + rport = nhgets(uh->udpsport); + + if(nhgets(uh->udpcksum)) { + if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) { + upriv->ustats.rudpInErrors++; + upriv->csumerr++; + netlog(f, Logrudp, "rudp: checksum error %I\n", raddr); + DPRINT("rudp: checksum error %I\n", raddr); + freeblist(bp); + return; + } + } + + qlock(rudp); + + c = iphtlook(&upriv->ht, raddr, rport, laddr, lport); + if(c == nil){ + /* no conversation found */ + upriv->ustats.rudpNoPorts++; + qunlock(rudp); + netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport, + laddr, lport); + uh->Unused = ottl; + hnputs(uh->udpplen, olen); + icmpnoconv(f, bp); + freeblist(bp); + return; + } + ucb = (Rudpcb*)c->ptcl; + qlock(ucb); + qunlock(rudp); + + if(reliput(c, bp, raddr, rport) < 0){ + qunlock(ucb); + freeb(bp); + return; + } + + /* + * Trim the packet down to data size + */ + + len -= (UDP_RHDRSIZE-UDP_PHDRSIZE); + bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len); + if(bp == nil) { + netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n", + raddr, rport, laddr, lport); + DPRINT("rudp: len err %I.%d -> %I.%d\n", + raddr, rport, laddr, lport); + upriv->lenerr++; + return; + } + + netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n", + raddr, rport, laddr, lport, len); + + switch(ucb->headers){ + case 7: + /* pass the src address */ + bp = padblock(bp, UDP_USEAD7); + p = bp->rp; + ipmove(p, raddr); p += IPaddrlen; + ipmove(p, laddr); p += IPaddrlen; + ipmove(p, ifc->lifc->local); p += IPaddrlen; + hnputs(p, rport); p += 2; + hnputs(p, lport); + break; + default: + /* connection oriented rudp */ + if(ipcmp(c->raddr, IPnoaddr) == 0){ + /* save the src address in the conversation */ + ipmove(c->raddr, raddr); + c->rport = rport; + + /* reply with the same ip address (if not broadcast) */ + if(ipforme(f, laddr) == Runi) + ipmove(c->laddr, laddr); + else + v4tov6(c->laddr, ifc->lifc->local); + } + break; + } + if(bp->next) + bp = concatblock(bp); + + if(qfull(c->rq)) { + netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport, + laddr, lport); + freeblist(bp); + } + else + qpass(c->rq, bp); + + qunlock(ucb); +} + +static char *rudpunknown = "unknown rudp ctl request"; + +char* +rudpctl(Conv *c, char **f, int n) +{ + Rudpcb *ucb; + uchar ip[IPaddrlen]; + int x; + + ucb = (Rudpcb*)c->ptcl; + if(n < 1) + return rudpunknown; + + if(strcmp(f[0], "headers") == 0){ + ucb->headers = 7; /* new headers format */ + return nil; + } else if(strcmp(f[0], "hangup") == 0){ + if(n < 3) + return "bad syntax"; + if (parseip(ip, f[1]) == -1) + return Ebadip; + x = atoi(f[2]); + qlock(ucb); + relforget(c, ip, x, 1); + qunlock(ucb); + return nil; + } else if(strcmp(f[0], "randdrop") == 0){ + x = 10; /* default is 10% */ + if(n > 1) + x = atoi(f[1]); + if(x > 100 || x < 0) + return "illegal rudp drop rate"; + ucb->randdrop = x; + return nil; + } + return rudpunknown; +} + +void +rudpadvise(Proto *rudp, Block *bp, char *msg) +{ + Udphdr *h; + uchar source[IPaddrlen], dest[IPaddrlen]; + ushort psource, pdest; + Conv *s, **p; + + h = (Udphdr*)(bp->rp); + + v4tov6(dest, h->udpdst); + v4tov6(source, h->udpsrc); + psource = nhgets(h->udpsport); + pdest = nhgets(h->udpdport); + + /* Look for a connection */ + for(p = rudp->conv; *p; p++) { + s = *p; + if(s->rport == pdest) + if(s->lport == psource) + if(ipcmp(s->raddr, dest) == 0) + if(ipcmp(s->laddr, source) == 0){ + qhangup(s->rq, msg); + qhangup(s->wq, msg); + break; + } + } + freeblist(bp); +} + +int +rudpstats(Proto *rudp, char *buf, int len) +{ + Rudppriv *upriv; + + upriv = rudp->priv; + return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n", + upriv->ustats.rudpInDatagrams, + upriv->ustats.rudpNoPorts, + upriv->ustats.rudpInErrors, + upriv->ustats.rudpOutDatagrams, + upriv->rxmits, + upriv->orders); +} + +void +rudpinit(Fs *fs) +{ + + Proto *rudp; + + rudp = smalloc(sizeof(Proto)); + rudp->priv = smalloc(sizeof(Rudppriv)); + rudp->name = "rudp"; + rudp->connect = rudpconnect; + rudp->announce = rudpannounce; + rudp->ctl = rudpctl; + rudp->state = rudpstate; + rudp->create = rudpcreate; + rudp->close = rudpclose; + rudp->rcv = rudpiput; + rudp->advise = rudpadvise; + rudp->stats = rudpstats; + rudp->ipproto = IP_UDPPROTO; + rudp->nc = 32; + rudp->ptclsize = sizeof(Rudpcb); + + Fsproto(fs, rudp); +} + +/*********************************************/ +/* Here starts the reliable helper functions */ +/*********************************************/ +/* + * Enqueue a copy of an unacked block for possible retransmissions + */ +void +relackq(Reliable *r, Block *bp) +{ + Block *np; + + np = copyblock(bp, blocklen(bp)); + if(r->unacked) + r->unackedtail->list = np; + else { + /* restart timer */ + r->timeout = 0; + r->xmits = 1; + r->unacked = np; + } + r->unackedtail = np; + np->list = nil; +} + +/* + * retransmit unacked blocks + */ +void +relackproc(void *a) +{ + Rudpcb *ucb; + Proto *rudp; + Reliable *r; + Conv **s, *c; + + rudp = (Proto *)a; + +loop: + tsleep(&up->sleep, return0, 0, Rudptickms); + + for(s = rudp->conv; *s; s++) { + c = *s; + ucb = (Rudpcb*)c->ptcl; + qlock(ucb); + + for(r = ucb->r; r; r = r->next) { + if(r->unacked != nil){ + r->timeout += Rudptickms; + if(r->timeout > Rudprxms*r->xmits) + relrexmit(c, r); + } + if(r->acksent != r->rcvseq) + relsendack(c, r, 0); + } + qunlock(ucb); + } + goto loop; +} + +/* + * get the state record for a conversation + */ +Reliable* +relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from) +{ + Reliable *r, **l; + + l = &ucb->r; + for(r = *l; r; r = *l){ + if(memcmp(addr, r->addr, IPaddrlen) == 0 && + port == r->port) + break; + l = &r->next; + } + + /* no state for this addr/port, create some */ + if(r == nil){ + while(generation == 0) + generation = rand(); + + DPRINT("from %s new state %lud for %I!%ud\n", + from, generation, addr, port); + + r = smalloc(sizeof(Reliable)); + memmove(r->addr, addr, IPaddrlen); + r->port = port; + r->unacked = 0; + if(generation == Hangupgen) + generation++; + r->sndgen = generation++; + r->sndseq = 0; + r->ackrcvd = 0; + r->rcvgen = 0; + r->rcvseq = 0; + r->acksent = 0; + r->xmits = 0; + r->timeout = 0; + r->ref = 0; + incref(r); /* one reference for being in the list */ + + *l = r; + } + + incref(r); + return r; +} + +void +relput(Reliable *r) +{ + if(decref(r) == 0) + free(r); +} + +/* + * forget a Reliable state + */ +void +relforget(Conv *c, uchar *ip, int port, int originator) +{ + Rudpcb *ucb; + Reliable *r, **l; + + ucb = (Rudpcb*)c->ptcl; + + l = &ucb->r; + for(r = *l; r; r = *l){ + if(ipcmp(ip, r->addr) == 0 && port == r->port){ + *l = r->next; + if(originator) + relsendack(c, r, 1); + relhangup(c, r); + relput(r); /* remove from the list */ + break; + } + l = &r->next; + } +} + +/* + * process a rcvd reliable packet. return -1 if not to be passed to user process, + * 0 therwise. + * + * called with ucb locked. + */ +int +reliput(Conv *c, Block *bp, uchar *addr, ushort port) +{ + Block *nbp; + Rudpcb *ucb; + Rudppriv *upriv; + Udphdr *uh; + Reliable *r; + Rudphdr *rh; + ulong seq, ack, sgen, agen, ackreal; + int rv = -1; + + /* get fields */ + uh = (Udphdr*)(bp->rp); + rh = (Rudphdr*)uh; + seq = nhgetl(rh->relseq); + sgen = nhgetl(rh->relsgen); + ack = nhgetl(rh->relack); + agen = nhgetl(rh->relagen); + + upriv = c->p->priv; + ucb = (Rudpcb*)c->ptcl; + r = relstate(ucb, addr, port, "input"); + + DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n", + seq, sgen, ack, agen, r->sndgen); + + /* if acking an incorrect generation, ignore */ + if(ack && agen != r->sndgen) + goto out; + + /* Look for a hangup */ + if(sgen == Hangupgen) { + if(agen == r->sndgen) + relforget(c, addr, port, 0); + goto out; + } + + /* make sure we're not talking to a new remote side */ + if(r->rcvgen != sgen){ + if(seq != 0 && seq != 1) + goto out; + + /* new connection */ + if(r->rcvgen != 0){ + DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen); + relhangup(c, r); + } + r->rcvgen = sgen; + } + + /* dequeue acked packets */ + if(ack && agen == r->sndgen){ + ackreal = 0; + while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){ + nbp = r->unacked; + r->unacked = nbp->list; + DPRINT("%lud/%lud acked, r->sndgen = %lud\n", + ack, agen, r->sndgen); + freeb(nbp); + r->ackrcvd = NEXTSEQ(r->ackrcvd); + ackreal = 1; + } + + /* flow control */ + if(UNACKED(r) < Maxunacked/8 && r->blocked) + wakeup(&r->vous); + + /* + * retransmit next packet if the acked packet + * was transmitted more than once + */ + if(ackreal && r->unacked != nil){ + r->timeout = 0; + if(r->xmits > 1){ + r->xmits = 1; + relrexmit(c, r); + } + } + + } + + /* no message or input queue full */ + if(seq == 0 || qfull(c->rq)) + goto out; + + /* refuse out of order delivery */ + if(seq != NEXTSEQ(r->rcvseq)){ + relsendack(c, r, 0); /* tell him we got it already */ + upriv->orders++; + DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq)); + goto out; + } + r->rcvseq = seq; + + rv = 0; +out: + relput(r); + return rv; +} + +void +relsendack(Conv *c, Reliable *r, int hangup) +{ + Udphdr *uh; + Block *bp; + Rudphdr *rh; + int ptcllen; + Fs *f; + + bp = allocb(UDP_IPHDR + UDP_RHDRSIZE); + if(bp == nil) + return; + bp->wp += UDP_IPHDR + UDP_RHDRSIZE; + f = c->p->f; + uh = (Udphdr *)(bp->rp); + uh->vihl = IP_VER4; + rh = (Rudphdr*)uh; + + ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE); + uh->Unused = 0; + uh->udpproto = IP_UDPPROTO; + uh->frag[0] = 0; + uh->frag[1] = 0; + hnputs(uh->udpplen, ptcllen); + + v6tov4(uh->udpdst, r->addr); + hnputs(uh->udpdport, r->port); + hnputs(uh->udpsport, c->lport); + if(ipcmp(c->laddr, IPnoaddr) == 0) + findlocalip(f, c->laddr, c->raddr); + v6tov4(uh->udpsrc, c->laddr); + hnputs(uh->udplen, ptcllen); + + if(hangup) + hnputl(rh->relsgen, Hangupgen); + else + hnputl(rh->relsgen, r->sndgen); + hnputl(rh->relseq, 0); + hnputl(rh->relagen, r->rcvgen); + hnputl(rh->relack, r->rcvseq); + + if(r->acksent < r->rcvseq) + r->acksent = r->rcvseq; + + uh->udpcksum[0] = 0; + uh->udpcksum[1] = 0; + hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE)); + + DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen); + doipoput(c, f, bp, 0, c->ttl, c->tos); +} + + +/* + * called with ucb locked (and c locked if user initiated close) + */ +void +relhangup(Conv *c, Reliable *r) +{ + int n; + Block *bp; + char hup[ERRMAX]; + + n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port); + qproduce(c->eq, hup, n); + + /* + * dump any unacked outgoing messages + */ + for(bp = r->unacked; bp != nil; bp = r->unacked){ + r->unacked = bp->list; + bp->list = nil; + freeb(bp); + } + + r->rcvgen = 0; + r->rcvseq = 0; + r->acksent = 0; + if(generation == Hangupgen) + generation++; + r->sndgen = generation++; + r->sndseq = 0; + r->ackrcvd = 0; + r->xmits = 0; + r->timeout = 0; + wakeup(&r->vous); +} + +/* + * called with ucb locked + */ +void +relrexmit(Conv *c, Reliable *r) +{ + Rudppriv *upriv; + Block *np; + Fs *f; + + upriv = c->p->priv; + f = c->p->f; + r->timeout = 0; + if(r->xmits++ > Rudpmaxxmit){ + relhangup(c, r); + return; + } + + upriv->rxmits++; + np = copyblock(r->unacked, blocklen(r->unacked)); + DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1); + doipoput(c, f, np, 0, c->ttl, c->tos); +} |