summaryrefslogtreecommitdiff
path: root/sys/src/9/ip/rudp.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/9/ip/rudp.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/9/ip/rudp.c')
-rwxr-xr-xsys/src/9/ip/rudp.c1055
1 files changed, 1055 insertions, 0 deletions
diff --git a/sys/src/9/ip/rudp.c b/sys/src/9/ip/rudp.c
new file mode 100755
index 000000000..f3e205aba
--- /dev/null
+++ b/sys/src/9/ip/rudp.c
@@ -0,0 +1,1055 @@
+/*
+ * Reliable User Datagram Protocol, currently only for IPv4.
+ * This protocol is compatible with UDP's packet format.
+ * It could be done over UDP if need be.
+ */
+#include "u.h"
+#include "../port/lib.h"
+#include "mem.h"
+#include "dat.h"
+#include "fns.h"
+#include "../port/error.h"
+
+#include "ip.h"
+
+#define DEBUG 0
+#define DPRINT if(DEBUG)print
+
+#define SEQDIFF(a,b) ( (a)>=(b)?\
+ (a)-(b):\
+ 0xffffffffUL-((b)-(a)) )
+#define INSEQ(a,start,end) ( (start)<=(end)?\
+ ((a)>(start)&&(a)<=(end)):\
+ ((a)>(start)||(a)<=(end)) )
+#define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
+#define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
+
+enum
+{
+ UDP_PHDRSIZE = 12, /* pseudo header */
+// UDP_HDRSIZE = 20, /* pseudo header + udp header */
+ UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
+ UDP_IPHDR = 8, /* ip header */
+ IP_UDPPROTO = 254,
+ UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
+
+ Rudprxms = 200,
+ Rudptickms = 50,
+ Rudpmaxxmit = 10,
+ Maxunacked = 100,
+};
+
+#define Hangupgen 0xffffffff /* used only in hangup messages */
+
+typedef struct Udphdr Udphdr;
+struct Udphdr
+{
+ /* ip header */
+ uchar vihl; /* Version and header length */
+ uchar tos; /* Type of service */
+ uchar length[2]; /* packet length */
+ uchar id[2]; /* Identification */
+ uchar frag[2]; /* Fragment information */
+
+ /* pseudo header starts here */
+ uchar Unused;
+ uchar udpproto; /* Protocol */
+ uchar udpplen[2]; /* Header plus data length */
+ uchar udpsrc[4]; /* Ip source */
+ uchar udpdst[4]; /* Ip destination */
+
+ /* udp header */
+ uchar udpsport[2]; /* Source port */
+ uchar udpdport[2]; /* Destination port */
+ uchar udplen[2]; /* data length */
+ uchar udpcksum[2]; /* Checksum */
+};
+
+typedef struct Rudphdr Rudphdr;
+struct Rudphdr
+{
+ /* ip header */
+ uchar vihl; /* Version and header length */
+ uchar tos; /* Type of service */
+ uchar length[2]; /* packet length */
+ uchar id[2]; /* Identification */
+ uchar frag[2]; /* Fragment information */
+
+ /* pseudo header starts here */
+ uchar Unused;
+ uchar udpproto; /* Protocol */
+ uchar udpplen[2]; /* Header plus data length */
+ uchar udpsrc[4]; /* Ip source */
+ uchar udpdst[4]; /* Ip destination */
+
+ /* udp header */
+ uchar udpsport[2]; /* Source port */
+ uchar udpdport[2]; /* Destination port */
+ uchar udplen[2]; /* data length (includes rudp header) */
+ uchar udpcksum[2]; /* Checksum */
+
+ /* rudp header */
+ uchar relseq[4]; /* id of this packet (or 0) */
+ uchar relsgen[4]; /* generation/time stamp */
+ uchar relack[4]; /* packet being acked (or 0) */
+ uchar relagen[4]; /* generation/time stamp */
+};
+
+
+/*
+ * one state structure per destination
+ */
+typedef struct Reliable Reliable;
+struct Reliable
+{
+ Ref;
+
+ Reliable *next;
+
+ uchar addr[IPaddrlen]; /* always V6 when put here */
+ ushort port;
+
+ Block *unacked; /* unacked msg list */
+ Block *unackedtail; /* and its tail */
+
+ int timeout; /* time since first unacked msg sent */
+ int xmits; /* number of times first unacked msg sent */
+
+ ulong sndseq; /* next packet to be sent */
+ ulong sndgen; /* and its generation */
+
+ ulong rcvseq; /* last packet received */
+ ulong rcvgen; /* and its generation */
+
+ ulong acksent; /* last ack sent */
+ ulong ackrcvd; /* last msg for which ack was rcvd */
+
+ /* flow control */
+ QLock lock;
+ Rendez vous;
+ int blocked;
+};
+
+
+
+/* MIB II counters */
+typedef struct Rudpstats Rudpstats;
+struct Rudpstats
+{
+ ulong rudpInDatagrams;
+ ulong rudpNoPorts;
+ ulong rudpInErrors;
+ ulong rudpOutDatagrams;
+};
+
+typedef struct Rudppriv Rudppriv;
+struct Rudppriv
+{
+ Ipht ht;
+
+ /* MIB counters */
+ Rudpstats ustats;
+
+ /* non-MIB stats */
+ ulong csumerr; /* checksum errors */
+ ulong lenerr; /* short packet */
+ ulong rxmits; /* # of retransmissions */
+ ulong orders; /* # of out of order pkts */
+
+ /* keeping track of the ack kproc */
+ int ackprocstarted;
+ QLock apl;
+};
+
+
+static ulong generation = 0;
+static Rendez rend;
+
+/*
+ * protocol specific part of Conv
+ */
+typedef struct Rudpcb Rudpcb;
+struct Rudpcb
+{
+ QLock;
+ uchar headers;
+ uchar randdrop;
+ Reliable *r;
+};
+
+/*
+ * local functions
+ */
+void relsendack(Conv*, Reliable*, int);
+int reliput(Conv*, Block*, uchar*, ushort);
+Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
+void relput(Reliable*);
+void relforget(Conv *, uchar*, int, int);
+void relackproc(void *);
+void relackq(Reliable *, Block*);
+void relhangup(Conv *, Reliable*);
+void relrexmit(Conv *, Reliable*);
+void relput(Reliable*);
+void rudpkick(void *x);
+
+static void
+rudpstartackproc(Proto *rudp)
+{
+ Rudppriv *rpriv;
+ char kpname[KNAMELEN];
+
+ rpriv = rudp->priv;
+ if(rpriv->ackprocstarted == 0){
+ qlock(&rpriv->apl);
+ if(rpriv->ackprocstarted == 0){
+ sprint(kpname, "#I%drudpack", rudp->f->dev);
+ kproc(kpname, relackproc, rudp);
+ rpriv->ackprocstarted = 1;
+ }
+ qunlock(&rpriv->apl);
+ }
+}
+
+static char*
+rudpconnect(Conv *c, char **argv, int argc)
+{
+ char *e;
+ Rudppriv *upriv;
+
+ upriv = c->p->priv;
+ rudpstartackproc(c->p);
+ e = Fsstdconnect(c, argv, argc);
+ Fsconnected(c, e);
+ iphtadd(&upriv->ht, c);
+
+ return e;
+}
+
+
+static int
+rudpstate(Conv *c, char *state, int n)
+{
+ Rudpcb *ucb;
+ Reliable *r;
+ int m;
+
+ m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
+ ucb = (Rudpcb*)c->ptcl;
+ qlock(ucb);
+ for(r = ucb->r; r; r = r->next)
+ m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
+ m += snprint(state+m, n-m, "\n");
+ qunlock(ucb);
+ return m;
+}
+
+static char*
+rudpannounce(Conv *c, char** argv, int argc)
+{
+ char *e;
+ Rudppriv *upriv;
+
+ upriv = c->p->priv;
+ rudpstartackproc(c->p);
+ e = Fsstdannounce(c, argv, argc);
+ if(e != nil)
+ return e;
+ Fsconnected(c, nil);
+ iphtadd(&upriv->ht, c);
+
+ return nil;
+}
+
+static void
+rudpcreate(Conv *c)
+{
+ c->rq = qopen(64*1024, Qmsg, 0, 0);
+ c->wq = qopen(64*1024, Qkick, rudpkick, c);
+}
+
+static void
+rudpclose(Conv *c)
+{
+ Rudpcb *ucb;
+ Reliable *r, *nr;
+ Rudppriv *upriv;
+
+ upriv = c->p->priv;
+ iphtrem(&upriv->ht, c);
+
+ /* force out any delayed acks */
+ ucb = (Rudpcb*)c->ptcl;
+ qlock(ucb);
+ for(r = ucb->r; r; r = r->next){
+ if(r->acksent != r->rcvseq)
+ relsendack(c, r, 0);
+ }
+ qunlock(ucb);
+
+ qclose(c->rq);
+ qclose(c->wq);
+ qclose(c->eq);
+ ipmove(c->laddr, IPnoaddr);
+ ipmove(c->raddr, IPnoaddr);
+ c->lport = 0;
+ c->rport = 0;
+
+ ucb->headers = 0;
+ ucb->randdrop = 0;
+ qlock(ucb);
+ for(r = ucb->r; r; r = nr){
+ if(r->acksent != r->rcvseq)
+ relsendack(c, r, 0);
+ nr = r->next;
+ relhangup(c, r);
+ relput(r);
+ }
+ ucb->r = 0;
+
+ qunlock(ucb);
+}
+
+/*
+ * randomly don't send packets
+ */
+static void
+doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
+{
+ Rudpcb *ucb;
+
+ ucb = (Rudpcb*)c->ptcl;
+ if(ucb->randdrop && nrand(100) < ucb->randdrop)
+ freeblist(bp);
+ else
+ ipoput4(f, bp, x, ttl, tos, nil);
+}
+
+int
+flow(void *v)
+{
+ Reliable *r = v;
+
+ return UNACKED(r) <= Maxunacked;
+}
+
+void
+rudpkick(void *x)
+{
+ Conv *c = x;
+ Udphdr *uh;
+ ushort rport;
+ uchar laddr[IPaddrlen], raddr[IPaddrlen];
+ Block *bp;
+ Rudpcb *ucb;
+ Rudphdr *rh;
+ Reliable *r;
+ int dlen, ptcllen;
+ Rudppriv *upriv;
+ Fs *f;
+
+ upriv = c->p->priv;
+ f = c->p->f;
+
+ netlog(c->p->f, Logrudp, "rudp: kick\n");
+ bp = qget(c->wq);
+ if(bp == nil)
+ return;
+
+ ucb = (Rudpcb*)c->ptcl;
+ switch(ucb->headers) {
+ case 7:
+ /* get user specified addresses */
+ bp = pullupblock(bp, UDP_USEAD7);
+ if(bp == nil)
+ return;
+ ipmove(raddr, bp->rp);
+ bp->rp += IPaddrlen;
+ ipmove(laddr, bp->rp);
+ bp->rp += IPaddrlen;
+ /* pick interface closest to dest */
+ if(ipforme(f, laddr) != Runi)
+ findlocalip(f, laddr, raddr);
+ bp->rp += IPaddrlen; /* Ignore ifc address */
+ rport = nhgets(bp->rp);
+ bp->rp += 2+2; /* Ignore local port */
+ break;
+ default:
+ ipmove(raddr, c->raddr);
+ ipmove(laddr, c->laddr);
+ rport = c->rport;
+ break;
+ }
+
+ dlen = blocklen(bp);
+
+ /* Make space to fit rudp & ip header */
+ bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
+ if(bp == nil)
+ return;
+
+ uh = (Udphdr *)(bp->rp);
+ uh->vihl = IP_VER4;
+
+ rh = (Rudphdr*)uh;
+
+ ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
+ uh->Unused = 0;
+ uh->udpproto = IP_UDPPROTO;
+ uh->frag[0] = 0;
+ uh->frag[1] = 0;
+ hnputs(uh->udpplen, ptcllen);
+ switch(ucb->headers){
+ case 7:
+ v6tov4(uh->udpdst, raddr);
+ hnputs(uh->udpdport, rport);
+ v6tov4(uh->udpsrc, laddr);
+ break;
+ default:
+ v6tov4(uh->udpdst, c->raddr);
+ hnputs(uh->udpdport, c->rport);
+ if(ipcmp(c->laddr, IPnoaddr) == 0)
+ findlocalip(f, c->laddr, c->raddr);
+ v6tov4(uh->udpsrc, c->laddr);
+ break;
+ }
+ hnputs(uh->udpsport, c->lport);
+ hnputs(uh->udplen, ptcllen);
+ uh->udpcksum[0] = 0;
+ uh->udpcksum[1] = 0;
+
+ qlock(ucb);
+ r = relstate(ucb, raddr, rport, "kick");
+ r->sndseq = NEXTSEQ(r->sndseq);
+ hnputl(rh->relseq, r->sndseq);
+ hnputl(rh->relsgen, r->sndgen);
+
+ hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
+ hnputl(rh->relagen, r->rcvgen);
+
+ if(r->rcvseq != r->acksent)
+ r->acksent = r->rcvseq;
+
+ hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
+
+ relackq(r, bp);
+ qunlock(ucb);
+
+ upriv->ustats.rudpOutDatagrams++;
+
+ DPRINT("sent: %lud/%lud, %lud/%lud\n",
+ r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
+
+ doipoput(c, f, bp, 0, c->ttl, c->tos);
+
+ if(waserror()) {
+ relput(r);
+ qunlock(&r->lock);
+ nexterror();
+ }
+
+ /* flow control of sorts */
+ qlock(&r->lock);
+ if(UNACKED(r) > Maxunacked){
+ r->blocked = 1;
+ sleep(&r->vous, flow, r);
+ r->blocked = 0;
+ }
+
+ qunlock(&r->lock);
+ relput(r);
+ poperror();
+}
+
+void
+rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
+{
+ int len, olen, ottl;
+ Udphdr *uh;
+ Conv *c;
+ Rudpcb *ucb;
+ uchar raddr[IPaddrlen], laddr[IPaddrlen];
+ ushort rport, lport;
+ Rudppriv *upriv;
+ Fs *f;
+ uchar *p;
+
+ upriv = rudp->priv;
+ f = rudp->f;
+
+ upriv->ustats.rudpInDatagrams++;
+
+ uh = (Udphdr*)(bp->rp);
+
+ /* Put back pseudo header for checksum
+ * (remember old values for icmpnoconv())
+ */
+ ottl = uh->Unused;
+ uh->Unused = 0;
+ len = nhgets(uh->udplen);
+ olen = nhgets(uh->udpplen);
+ hnputs(uh->udpplen, len);
+
+ v4tov6(raddr, uh->udpsrc);
+ v4tov6(laddr, uh->udpdst);
+ lport = nhgets(uh->udpdport);
+ rport = nhgets(uh->udpsport);
+
+ if(nhgets(uh->udpcksum)) {
+ if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
+ upriv->ustats.rudpInErrors++;
+ upriv->csumerr++;
+ netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
+ DPRINT("rudp: checksum error %I\n", raddr);
+ freeblist(bp);
+ return;
+ }
+ }
+
+ qlock(rudp);
+
+ c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
+ if(c == nil){
+ /* no conversation found */
+ upriv->ustats.rudpNoPorts++;
+ qunlock(rudp);
+ netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
+ laddr, lport);
+ uh->Unused = ottl;
+ hnputs(uh->udpplen, olen);
+ icmpnoconv(f, bp);
+ freeblist(bp);
+ return;
+ }
+ ucb = (Rudpcb*)c->ptcl;
+ qlock(ucb);
+ qunlock(rudp);
+
+ if(reliput(c, bp, raddr, rport) < 0){
+ qunlock(ucb);
+ freeb(bp);
+ return;
+ }
+
+ /*
+ * Trim the packet down to data size
+ */
+
+ len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
+ bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
+ if(bp == nil) {
+ netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
+ raddr, rport, laddr, lport);
+ DPRINT("rudp: len err %I.%d -> %I.%d\n",
+ raddr, rport, laddr, lport);
+ upriv->lenerr++;
+ return;
+ }
+
+ netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
+ raddr, rport, laddr, lport, len);
+
+ switch(ucb->headers){
+ case 7:
+ /* pass the src address */
+ bp = padblock(bp, UDP_USEAD7);
+ p = bp->rp;
+ ipmove(p, raddr); p += IPaddrlen;
+ ipmove(p, laddr); p += IPaddrlen;
+ ipmove(p, ifc->lifc->local); p += IPaddrlen;
+ hnputs(p, rport); p += 2;
+ hnputs(p, lport);
+ break;
+ default:
+ /* connection oriented rudp */
+ if(ipcmp(c->raddr, IPnoaddr) == 0){
+ /* save the src address in the conversation */
+ ipmove(c->raddr, raddr);
+ c->rport = rport;
+
+ /* reply with the same ip address (if not broadcast) */
+ if(ipforme(f, laddr) == Runi)
+ ipmove(c->laddr, laddr);
+ else
+ v4tov6(c->laddr, ifc->lifc->local);
+ }
+ break;
+ }
+ if(bp->next)
+ bp = concatblock(bp);
+
+ if(qfull(c->rq)) {
+ netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
+ laddr, lport);
+ freeblist(bp);
+ }
+ else
+ qpass(c->rq, bp);
+
+ qunlock(ucb);
+}
+
+static char *rudpunknown = "unknown rudp ctl request";
+
+char*
+rudpctl(Conv *c, char **f, int n)
+{
+ Rudpcb *ucb;
+ uchar ip[IPaddrlen];
+ int x;
+
+ ucb = (Rudpcb*)c->ptcl;
+ if(n < 1)
+ return rudpunknown;
+
+ if(strcmp(f[0], "headers") == 0){
+ ucb->headers = 7; /* new headers format */
+ return nil;
+ } else if(strcmp(f[0], "hangup") == 0){
+ if(n < 3)
+ return "bad syntax";
+ if (parseip(ip, f[1]) == -1)
+ return Ebadip;
+ x = atoi(f[2]);
+ qlock(ucb);
+ relforget(c, ip, x, 1);
+ qunlock(ucb);
+ return nil;
+ } else if(strcmp(f[0], "randdrop") == 0){
+ x = 10; /* default is 10% */
+ if(n > 1)
+ x = atoi(f[1]);
+ if(x > 100 || x < 0)
+ return "illegal rudp drop rate";
+ ucb->randdrop = x;
+ return nil;
+ }
+ return rudpunknown;
+}
+
+void
+rudpadvise(Proto *rudp, Block *bp, char *msg)
+{
+ Udphdr *h;
+ uchar source[IPaddrlen], dest[IPaddrlen];
+ ushort psource, pdest;
+ Conv *s, **p;
+
+ h = (Udphdr*)(bp->rp);
+
+ v4tov6(dest, h->udpdst);
+ v4tov6(source, h->udpsrc);
+ psource = nhgets(h->udpsport);
+ pdest = nhgets(h->udpdport);
+
+ /* Look for a connection */
+ for(p = rudp->conv; *p; p++) {
+ s = *p;
+ if(s->rport == pdest)
+ if(s->lport == psource)
+ if(ipcmp(s->raddr, dest) == 0)
+ if(ipcmp(s->laddr, source) == 0){
+ qhangup(s->rq, msg);
+ qhangup(s->wq, msg);
+ break;
+ }
+ }
+ freeblist(bp);
+}
+
+int
+rudpstats(Proto *rudp, char *buf, int len)
+{
+ Rudppriv *upriv;
+
+ upriv = rudp->priv;
+ return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
+ upriv->ustats.rudpInDatagrams,
+ upriv->ustats.rudpNoPorts,
+ upriv->ustats.rudpInErrors,
+ upriv->ustats.rudpOutDatagrams,
+ upriv->rxmits,
+ upriv->orders);
+}
+
+void
+rudpinit(Fs *fs)
+{
+
+ Proto *rudp;
+
+ rudp = smalloc(sizeof(Proto));
+ rudp->priv = smalloc(sizeof(Rudppriv));
+ rudp->name = "rudp";
+ rudp->connect = rudpconnect;
+ rudp->announce = rudpannounce;
+ rudp->ctl = rudpctl;
+ rudp->state = rudpstate;
+ rudp->create = rudpcreate;
+ rudp->close = rudpclose;
+ rudp->rcv = rudpiput;
+ rudp->advise = rudpadvise;
+ rudp->stats = rudpstats;
+ rudp->ipproto = IP_UDPPROTO;
+ rudp->nc = 32;
+ rudp->ptclsize = sizeof(Rudpcb);
+
+ Fsproto(fs, rudp);
+}
+
+/*********************************************/
+/* Here starts the reliable helper functions */
+/*********************************************/
+/*
+ * Enqueue a copy of an unacked block for possible retransmissions
+ */
+void
+relackq(Reliable *r, Block *bp)
+{
+ Block *np;
+
+ np = copyblock(bp, blocklen(bp));
+ if(r->unacked)
+ r->unackedtail->list = np;
+ else {
+ /* restart timer */
+ r->timeout = 0;
+ r->xmits = 1;
+ r->unacked = np;
+ }
+ r->unackedtail = np;
+ np->list = nil;
+}
+
+/*
+ * retransmit unacked blocks
+ */
+void
+relackproc(void *a)
+{
+ Rudpcb *ucb;
+ Proto *rudp;
+ Reliable *r;
+ Conv **s, *c;
+
+ rudp = (Proto *)a;
+
+loop:
+ tsleep(&up->sleep, return0, 0, Rudptickms);
+
+ for(s = rudp->conv; *s; s++) {
+ c = *s;
+ ucb = (Rudpcb*)c->ptcl;
+ qlock(ucb);
+
+ for(r = ucb->r; r; r = r->next) {
+ if(r->unacked != nil){
+ r->timeout += Rudptickms;
+ if(r->timeout > Rudprxms*r->xmits)
+ relrexmit(c, r);
+ }
+ if(r->acksent != r->rcvseq)
+ relsendack(c, r, 0);
+ }
+ qunlock(ucb);
+ }
+ goto loop;
+}
+
+/*
+ * get the state record for a conversation
+ */
+Reliable*
+relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
+{
+ Reliable *r, **l;
+
+ l = &ucb->r;
+ for(r = *l; r; r = *l){
+ if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
+ port == r->port)
+ break;
+ l = &r->next;
+ }
+
+ /* no state for this addr/port, create some */
+ if(r == nil){
+ while(generation == 0)
+ generation = rand();
+
+ DPRINT("from %s new state %lud for %I!%ud\n",
+ from, generation, addr, port);
+
+ r = smalloc(sizeof(Reliable));
+ memmove(r->addr, addr, IPaddrlen);
+ r->port = port;
+ r->unacked = 0;
+ if(generation == Hangupgen)
+ generation++;
+ r->sndgen = generation++;
+ r->sndseq = 0;
+ r->ackrcvd = 0;
+ r->rcvgen = 0;
+ r->rcvseq = 0;
+ r->acksent = 0;
+ r->xmits = 0;
+ r->timeout = 0;
+ r->ref = 0;
+ incref(r); /* one reference for being in the list */
+
+ *l = r;
+ }
+
+ incref(r);
+ return r;
+}
+
+void
+relput(Reliable *r)
+{
+ if(decref(r) == 0)
+ free(r);
+}
+
+/*
+ * forget a Reliable state
+ */
+void
+relforget(Conv *c, uchar *ip, int port, int originator)
+{
+ Rudpcb *ucb;
+ Reliable *r, **l;
+
+ ucb = (Rudpcb*)c->ptcl;
+
+ l = &ucb->r;
+ for(r = *l; r; r = *l){
+ if(ipcmp(ip, r->addr) == 0 && port == r->port){
+ *l = r->next;
+ if(originator)
+ relsendack(c, r, 1);
+ relhangup(c, r);
+ relput(r); /* remove from the list */
+ break;
+ }
+ l = &r->next;
+ }
+}
+
+/*
+ * process a rcvd reliable packet. return -1 if not to be passed to user process,
+ * 0 therwise.
+ *
+ * called with ucb locked.
+ */
+int
+reliput(Conv *c, Block *bp, uchar *addr, ushort port)
+{
+ Block *nbp;
+ Rudpcb *ucb;
+ Rudppriv *upriv;
+ Udphdr *uh;
+ Reliable *r;
+ Rudphdr *rh;
+ ulong seq, ack, sgen, agen, ackreal;
+ int rv = -1;
+
+ /* get fields */
+ uh = (Udphdr*)(bp->rp);
+ rh = (Rudphdr*)uh;
+ seq = nhgetl(rh->relseq);
+ sgen = nhgetl(rh->relsgen);
+ ack = nhgetl(rh->relack);
+ agen = nhgetl(rh->relagen);
+
+ upriv = c->p->priv;
+ ucb = (Rudpcb*)c->ptcl;
+ r = relstate(ucb, addr, port, "input");
+
+ DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
+ seq, sgen, ack, agen, r->sndgen);
+
+ /* if acking an incorrect generation, ignore */
+ if(ack && agen != r->sndgen)
+ goto out;
+
+ /* Look for a hangup */
+ if(sgen == Hangupgen) {
+ if(agen == r->sndgen)
+ relforget(c, addr, port, 0);
+ goto out;
+ }
+
+ /* make sure we're not talking to a new remote side */
+ if(r->rcvgen != sgen){
+ if(seq != 0 && seq != 1)
+ goto out;
+
+ /* new connection */
+ if(r->rcvgen != 0){
+ DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
+ relhangup(c, r);
+ }
+ r->rcvgen = sgen;
+ }
+
+ /* dequeue acked packets */
+ if(ack && agen == r->sndgen){
+ ackreal = 0;
+ while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
+ nbp = r->unacked;
+ r->unacked = nbp->list;
+ DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
+ ack, agen, r->sndgen);
+ freeb(nbp);
+ r->ackrcvd = NEXTSEQ(r->ackrcvd);
+ ackreal = 1;
+ }
+
+ /* flow control */
+ if(UNACKED(r) < Maxunacked/8 && r->blocked)
+ wakeup(&r->vous);
+
+ /*
+ * retransmit next packet if the acked packet
+ * was transmitted more than once
+ */
+ if(ackreal && r->unacked != nil){
+ r->timeout = 0;
+ if(r->xmits > 1){
+ r->xmits = 1;
+ relrexmit(c, r);
+ }
+ }
+
+ }
+
+ /* no message or input queue full */
+ if(seq == 0 || qfull(c->rq))
+ goto out;
+
+ /* refuse out of order delivery */
+ if(seq != NEXTSEQ(r->rcvseq)){
+ relsendack(c, r, 0); /* tell him we got it already */
+ upriv->orders++;
+ DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
+ goto out;
+ }
+ r->rcvseq = seq;
+
+ rv = 0;
+out:
+ relput(r);
+ return rv;
+}
+
+void
+relsendack(Conv *c, Reliable *r, int hangup)
+{
+ Udphdr *uh;
+ Block *bp;
+ Rudphdr *rh;
+ int ptcllen;
+ Fs *f;
+
+ bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
+ if(bp == nil)
+ return;
+ bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
+ f = c->p->f;
+ uh = (Udphdr *)(bp->rp);
+ uh->vihl = IP_VER4;
+ rh = (Rudphdr*)uh;
+
+ ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
+ uh->Unused = 0;
+ uh->udpproto = IP_UDPPROTO;
+ uh->frag[0] = 0;
+ uh->frag[1] = 0;
+ hnputs(uh->udpplen, ptcllen);
+
+ v6tov4(uh->udpdst, r->addr);
+ hnputs(uh->udpdport, r->port);
+ hnputs(uh->udpsport, c->lport);
+ if(ipcmp(c->laddr, IPnoaddr) == 0)
+ findlocalip(f, c->laddr, c->raddr);
+ v6tov4(uh->udpsrc, c->laddr);
+ hnputs(uh->udplen, ptcllen);
+
+ if(hangup)
+ hnputl(rh->relsgen, Hangupgen);
+ else
+ hnputl(rh->relsgen, r->sndgen);
+ hnputl(rh->relseq, 0);
+ hnputl(rh->relagen, r->rcvgen);
+ hnputl(rh->relack, r->rcvseq);
+
+ if(r->acksent < r->rcvseq)
+ r->acksent = r->rcvseq;
+
+ uh->udpcksum[0] = 0;
+ uh->udpcksum[1] = 0;
+ hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
+
+ DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
+ doipoput(c, f, bp, 0, c->ttl, c->tos);
+}
+
+
+/*
+ * called with ucb locked (and c locked if user initiated close)
+ */
+void
+relhangup(Conv *c, Reliable *r)
+{
+ int n;
+ Block *bp;
+ char hup[ERRMAX];
+
+ n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
+ qproduce(c->eq, hup, n);
+
+ /*
+ * dump any unacked outgoing messages
+ */
+ for(bp = r->unacked; bp != nil; bp = r->unacked){
+ r->unacked = bp->list;
+ bp->list = nil;
+ freeb(bp);
+ }
+
+ r->rcvgen = 0;
+ r->rcvseq = 0;
+ r->acksent = 0;
+ if(generation == Hangupgen)
+ generation++;
+ r->sndgen = generation++;
+ r->sndseq = 0;
+ r->ackrcvd = 0;
+ r->xmits = 0;
+ r->timeout = 0;
+ wakeup(&r->vous);
+}
+
+/*
+ * called with ucb locked
+ */
+void
+relrexmit(Conv *c, Reliable *r)
+{
+ Rudppriv *upriv;
+ Block *np;
+ Fs *f;
+
+ upriv = c->p->priv;
+ f = c->p->f;
+ r->timeout = 0;
+ if(r->xmits++ > Rudpmaxxmit){
+ relhangup(c, r);
+ return;
+ }
+
+ upriv->rxmits++;
+ np = copyblock(r->unacked, blocklen(r->unacked));
+ DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
+ doipoput(c, f, np, 0, c->ttl, c->tos);
+}