diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/cmd/cwfs/net.c |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/cmd/cwfs/net.c')
-rwxr-xr-x | sys/src/cmd/cwfs/net.c | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/sys/src/cmd/cwfs/net.c b/sys/src/cmd/cwfs/net.c new file mode 100755 index 000000000..97553be6a --- /dev/null +++ b/sys/src/cmd/cwfs/net.c @@ -0,0 +1,454 @@ +/* network i/o */ + +#include "all.h" +#include "io.h" +#include <fcall.h> /* 9p2000 */ +#include <thread.h> + +enum { + Maxfdata = 8192, + Nqueue = 200, /* queue size (tunable) */ + + Netclosed = 0, /* Connection state */ + Netopen, +}; + +/* + * the kernel file server read packets directly from + * its ethernet(s) and did all the protocol processing. + * if the incoming packets were 9p (over il/ip), they + * were queued for the server processes to operate upon. + * + * in user mode, we have one process per incoming connection + * instead, and those processes get just the data, minus + * tcp and ip headers, so they just see a stream of 9p messages, + * which they then queue for the server processes. + * + * there used to be more queueing (in the kernel), with separate + * processes for ethernet input, il input, 9p processing, il output + * and ethernet output, and queues connecting them. we now let + * the kernel's network queues, protocol stacks and processes do + * much of this work. + * + * partly as a result of this, we can now process 9p messages + * transported via tcp, exploit multiple x86 processors, and + * were able to shed 70% of the file server's source, by line count. + * + * the upshot is that Ether (now Network) is no longer a perfect fit for + * the way network i/o is done now. the notion of `connection' + * is being introduced to complement it. + */ + +typedef struct Network Network; +typedef struct Netconn Netconn; +typedef struct Conn9p Conn9p; + +/* a network, not necessarily an ethernet */ +struct Network { + int ctlrno; + char iname[NAMELEN]; + char oname[NAMELEN]; + + char *dialstr; + char anndir[40]; + char lisdir[40]; + int annfd; /* fd from announce */ +}; + +/* an open tcp (or other transport) connection */ +struct Netconn { + Queue* reply; /* network output */ + char* raddr; /* remote caller's addr */ + Chan* chan; /* list of tcp channels */ + + int alloc; /* flag: allocated */ + + int state; + Conn9p* conn9p; /* not reference-counted */ + + Lock; +}; + +/* + * incoming 9P network connection from a given machine. + * typically will multiplex 9P sessions for multiple users. + */ +struct Conn9p { + QLock; + Ref; + int fd; + char* dir; + Netconn*netconn; /* cross-connection */ + char* raddr; +}; + +static Network netif[Maxnets]; +static struct { + Lock; + Chan* chan; +} netchans; +static Queue *netoq; /* only one network output queue is needed */ + +char *annstrs[Maxnets] = { + "tcp!*!9fs", +}; + +/* never returns nil */ +static Chan* +getchan(Conn9p *conn9p) +{ + Netconn *netconn; + Chan *cp, *xcp; + + lock(&netchans); + + /* look for conn9p's Chan */ + xcp = nil; + for(cp = netchans.chan; cp; cp = netconn->chan) { + netconn = cp->pdata; + if(!netconn->alloc) + xcp = cp; /* remember free Chan */ + else if(netconn->raddr != nil && + strcmp(conn9p->raddr, netconn->raddr) == 0) { + unlock(&netchans); + return cp; /* found conn9p's Chan */ + } + } + + /* conn9p's Chan not found; if no free Chan, allocate & fill in one */ + cp = xcp; + if(cp == nil) { + cp = fs_chaninit(Devnet, 1, sizeof(Netconn)); + netconn = cp->pdata; + netconn->chan = netchans.chan; + netconn->state = Netopen; /* a guess */ + /* cross-connect netconn and conn9p */ + netconn->conn9p = conn9p; /* not reference-counted */ + conn9p->netconn = netconn; + netchans.chan = cp; + } + + /* fill in Chan's netconn */ + netconn = cp->pdata; + netconn->raddr = strdup(conn9p->raddr); + + /* fill in Chan */ + cp->send = serveq; + if (cp->reply == nil) + cp->reply = netoq; + netconn->reply = netoq; + cp->protocol = nil; + cp->msize = 0; + cp->whotime = 0; + strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan); +// cp->whoprint = tcpwhoprint; + netconn->alloc = 1; + + unlock(&netchans); + return cp; +} + +static char * +fd2name(int fd) +{ + char data[128]; + + if (fd2path(fd, data, sizeof data) < 0) + return strdup("/GOK"); + return strdup(data); +} + +static void +hangupdfd(int dfd) +{ + int ctlfd; + char *end, *data; + + data = fd2name(dfd); + close(dfd); + + end = strstr(data, "/data"); + if (end != nil) + strcpy(end, "/ctl"); + ctlfd = open(data, OWRITE); + if (ctlfd >= 0) { + hangup(ctlfd); + close(ctlfd); + } + free(data); +} + +void +closechan(int n) +{ + Chan *cp; + + for(cp = chans; cp; cp = cp->next) + if(cp->whotime != 0 && cp->chan == n) + fileinit(cp); +} + +void +nethangup(Chan *cp, char *msg, int dolock) +{ + Netconn *netconn; + + netconn = cp->pdata; + netconn->state = Netclosed; + + if(msg != nil) + print("hangup! %s %s\n", msg, netconn->raddr); + + fileinit(cp); + cp->whotime = 0; + strcpy(cp->whoname, "<none>"); + + if(dolock) + lock(&netchans); + netconn->alloc = 0; + free(netconn->raddr); + netconn->raddr = nil; + if(dolock) + unlock(&netchans); +} + +void +chanhangup(Chan *cp, char *msg, int dolock) +{ + Netconn *netconn = cp->pdata; + Conn9p *conn9p = netconn->conn9p; + + if (conn9p->fd > 0) + hangupdfd(conn9p->fd); /* drop it */ + nethangup(cp, msg, dolock); +} + +/* + * returns length of next 9p message (including the length) and + * leaves it in the first few bytes of abuf. + */ +static long +size9pmsg(int fd, void *abuf, uint n) +{ + int m; + uchar *buf = abuf; + + if (n < BIT32SZ) + return -1; /* caller screwed up */ + + /* read count */ + m = readn(fd, buf, BIT32SZ); + if(m != BIT32SZ){ + if(m < 0) + return -1; + return 0; + } + return GBIT32(buf); +} + +static int +readalloc9pmsg(int fd, Msgbuf **mbp) +{ + int m, len; + uchar lenbuf[BIT32SZ]; + Msgbuf *mb; + + *mbp = nil; + len = size9pmsg(fd, lenbuf, BIT32SZ); + if (len <= 0) + return len; + if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){ + werrstr("bad length in 9P2000 message header"); + return -1; + } + if ((mb = mballoc(len, nil, Mbeth1)) == nil) + panic("readalloc9pmsg: mballoc failed"); + *mbp = mb; + memmove(mb->data, lenbuf, BIT32SZ); + len -= BIT32SZ; + m = readn(fd, mb->data+BIT32SZ, len); + if(m < len) + return 0; + return BIT32SZ+m; +} + +static void +connection(void *v) +{ + int n; + char buf[64]; + Chan *chan9p; + Conn9p *conn9p = v; + Msgbuf *mb; + NetConnInfo *nci; + + incref(conn9p); /* count connections */ + nci = getnetconninfo(conn9p->dir, conn9p->fd); + if (nci == nil) + panic("connection: getnetconninfo(%s, %d) failed", + conn9p->dir, conn9p->fd); + conn9p->raddr = nci->raddr; + + chan9p = getchan(conn9p); + print("new connection on %s pid %d from %s\n", + conn9p->dir, getpid(), conn9p->raddr); + + /* + * reading from a pipe or a network device + * will give an error after a few eof reads. + * however, we cannot tell the difference + * between a zero-length read and an interrupt + * on the processes writing to us, + * so we wait for the error. + */ + while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) { + if(n == 0) + continue; + mb->param = (uintptr)conn9p; /* has fd for replies */ + mb->chan = chan9p; + + assert(mb->magic == Mbmagic); + incref(conn9p); /* & count packets in flight */ + fs_send(serveq, mb); /* to 9P server processes */ + /* mb will be freed by receiving process */ + } + + rerrstr(buf, sizeof buf); + + qlock(conn9p); + print("connection hung up from %s\n", conn9p->dir); + if (conn9p->fd > 0) /* not poisoned yet? */ + hangupdfd(conn9p->fd); /* poison the fd */ + + nethangup(chan9p, "remote hung up", 1); + closechan(chan9p->chan); + + conn9p->fd = -1; /* poison conn9p */ + if (decref(conn9p) == 0) { /* last conn.? turn the lights off */ + free(conn9p->dir); + qunlock(conn9p); + free(conn9p); + } else + qunlock(conn9p); + + freenetconninfo(nci); + + if(buf[0] == '\0' || strstr(buf, "hungup") != nil) + exits(""); + sysfatal("mount read, pid %d", getpid()); +} + +static void +neti(void *v) +{ + int lisfd, accfd; + Network *net; + Conn9p *conn9p; + + net = v; + print("net%di\n", net->ctlrno); + for(;;) { + lisfd = listen(net->anndir, net->lisdir); + if (lisfd < 0) { + print("listen %s failed: %r\n", net->anndir); + continue; + } + + /* got new call on lisfd */ + accfd = accept(lisfd, net->lisdir); + if (accfd < 0) { + print("accept %d (from %s) failed: %r\n", + lisfd, net->lisdir); + continue; + } + + /* accepted that call */ + conn9p = malloc(sizeof *conn9p); + conn9p->dir = strdup(net->lisdir); + conn9p->fd = accfd; + newproc(connection, conn9p, smprint("9P read %s", conn9p->dir)); + close(lisfd); + } +} + +/* only need one of these for all network connections, thus all interfaces */ +static void +neto(void *) +{ + int len, datafd; + Msgbuf *mb; + Conn9p *conn9p; + + print("neto\n"); + for(;;) { + /* receive 9P answer from 9P server processes */ + while((mb = fs_recv(netoq, 0)) == nil) + continue; + + if(mb->data == nil) { + print("neto: pkt nil cat=%d free=%d\n", + mb->category, mb->flags&FREE); + if(!(mb->flags & FREE)) + mbfree(mb); + continue; + } + + /* send answer back over the network connection in the reply */ + len = mb->count; + conn9p = (Conn9p *)mb->param; + assert(conn9p); + + qlock(conn9p); + datafd = conn9p->fd; + assert(len >= 0); + /* datafd < 0 probably indicates poisoning by the read side */ + if (datafd < 0 || write(datafd, mb->data, len) != len) { + print( "network write error (%r);"); + print(" closing connection for %s\n", conn9p->dir); + nethangup(getchan(conn9p), "network write error", 1); + if (datafd > 0) + hangupdfd(datafd); /* drop it */ + conn9p->fd = -1; /* poison conn9p */ + } + mbfree(mb); + if (decref(conn9p) == 0) + panic("neto: zero ref count"); + qunlock(conn9p); + } +} + +void +netstart(void) +{ + int netorun = 0; + Network *net; + + if(netoq == nil) + netoq = newqueue(Nqueue, "network reply"); + for(net = &netif[0]; net < &netif[Maxnets]; net++){ + if(net->dialstr == nil) + continue; + sprint(net->oname, "neto"); + if (netorun++ == 0) + newproc(neto, nil, net->oname); + sprint(net->iname, "net%di", net->ctlrno); + newproc(neti, net, net->iname); + } +} + +void +netinit(void) +{ + Network *net; + + for (net = netif; net < netif + Maxnets; net++) { + net->dialstr = annstrs[net - netif]; + if (net->dialstr == nil) + continue; + net->annfd = announce(net->dialstr, net->anndir); + /* /bin/service/tcp564 may already have grabbed the port */ + if (net->annfd < 0) + sysfatal("can't announce %s: %r", net->dialstr); + print("netinit: announced on %s\n", net->dialstr); + } +} |