summaryrefslogtreecommitdiff
path: root/sys/src/cmd/cwfs/net.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/cmd/cwfs/net.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/cmd/cwfs/net.c')
-rwxr-xr-xsys/src/cmd/cwfs/net.c454
1 files changed, 454 insertions, 0 deletions
diff --git a/sys/src/cmd/cwfs/net.c b/sys/src/cmd/cwfs/net.c
new file mode 100755
index 000000000..97553be6a
--- /dev/null
+++ b/sys/src/cmd/cwfs/net.c
@@ -0,0 +1,454 @@
+/* network i/o */
+
+#include "all.h"
+#include "io.h"
+#include <fcall.h> /* 9p2000 */
+#include <thread.h>
+
+enum {
+ Maxfdata = 8192,
+ Nqueue = 200, /* queue size (tunable) */
+
+ Netclosed = 0, /* Connection state */
+ Netopen,
+};
+
+/*
+ * the kernel file server read packets directly from
+ * its ethernet(s) and did all the protocol processing.
+ * if the incoming packets were 9p (over il/ip), they
+ * were queued for the server processes to operate upon.
+ *
+ * in user mode, we have one process per incoming connection
+ * instead, and those processes get just the data, minus
+ * tcp and ip headers, so they just see a stream of 9p messages,
+ * which they then queue for the server processes.
+ *
+ * there used to be more queueing (in the kernel), with separate
+ * processes for ethernet input, il input, 9p processing, il output
+ * and ethernet output, and queues connecting them. we now let
+ * the kernel's network queues, protocol stacks and processes do
+ * much of this work.
+ *
+ * partly as a result of this, we can now process 9p messages
+ * transported via tcp, exploit multiple x86 processors, and
+ * were able to shed 70% of the file server's source, by line count.
+ *
+ * the upshot is that Ether (now Network) is no longer a perfect fit for
+ * the way network i/o is done now. the notion of `connection'
+ * is being introduced to complement it.
+ */
+
+typedef struct Network Network;
+typedef struct Netconn Netconn;
+typedef struct Conn9p Conn9p;
+
+/* a network, not necessarily an ethernet */
+struct Network {
+ int ctlrno;
+ char iname[NAMELEN];
+ char oname[NAMELEN];
+
+ char *dialstr;
+ char anndir[40];
+ char lisdir[40];
+ int annfd; /* fd from announce */
+};
+
+/* an open tcp (or other transport) connection */
+struct Netconn {
+ Queue* reply; /* network output */
+ char* raddr; /* remote caller's addr */
+ Chan* chan; /* list of tcp channels */
+
+ int alloc; /* flag: allocated */
+
+ int state;
+ Conn9p* conn9p; /* not reference-counted */
+
+ Lock;
+};
+
+/*
+ * incoming 9P network connection from a given machine.
+ * typically will multiplex 9P sessions for multiple users.
+ */
+struct Conn9p {
+ QLock;
+ Ref;
+ int fd;
+ char* dir;
+ Netconn*netconn; /* cross-connection */
+ char* raddr;
+};
+
+static Network netif[Maxnets];
+static struct {
+ Lock;
+ Chan* chan;
+} netchans;
+static Queue *netoq; /* only one network output queue is needed */
+
+char *annstrs[Maxnets] = {
+ "tcp!*!9fs",
+};
+
+/* never returns nil */
+static Chan*
+getchan(Conn9p *conn9p)
+{
+ Netconn *netconn;
+ Chan *cp, *xcp;
+
+ lock(&netchans);
+
+ /* look for conn9p's Chan */
+ xcp = nil;
+ for(cp = netchans.chan; cp; cp = netconn->chan) {
+ netconn = cp->pdata;
+ if(!netconn->alloc)
+ xcp = cp; /* remember free Chan */
+ else if(netconn->raddr != nil &&
+ strcmp(conn9p->raddr, netconn->raddr) == 0) {
+ unlock(&netchans);
+ return cp; /* found conn9p's Chan */
+ }
+ }
+
+ /* conn9p's Chan not found; if no free Chan, allocate & fill in one */
+ cp = xcp;
+ if(cp == nil) {
+ cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
+ netconn = cp->pdata;
+ netconn->chan = netchans.chan;
+ netconn->state = Netopen; /* a guess */
+ /* cross-connect netconn and conn9p */
+ netconn->conn9p = conn9p; /* not reference-counted */
+ conn9p->netconn = netconn;
+ netchans.chan = cp;
+ }
+
+ /* fill in Chan's netconn */
+ netconn = cp->pdata;
+ netconn->raddr = strdup(conn9p->raddr);
+
+ /* fill in Chan */
+ cp->send = serveq;
+ if (cp->reply == nil)
+ cp->reply = netoq;
+ netconn->reply = netoq;
+ cp->protocol = nil;
+ cp->msize = 0;
+ cp->whotime = 0;
+ strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
+// cp->whoprint = tcpwhoprint;
+ netconn->alloc = 1;
+
+ unlock(&netchans);
+ return cp;
+}
+
+static char *
+fd2name(int fd)
+{
+ char data[128];
+
+ if (fd2path(fd, data, sizeof data) < 0)
+ return strdup("/GOK");
+ return strdup(data);
+}
+
+static void
+hangupdfd(int dfd)
+{
+ int ctlfd;
+ char *end, *data;
+
+ data = fd2name(dfd);
+ close(dfd);
+
+ end = strstr(data, "/data");
+ if (end != nil)
+ strcpy(end, "/ctl");
+ ctlfd = open(data, OWRITE);
+ if (ctlfd >= 0) {
+ hangup(ctlfd);
+ close(ctlfd);
+ }
+ free(data);
+}
+
+void
+closechan(int n)
+{
+ Chan *cp;
+
+ for(cp = chans; cp; cp = cp->next)
+ if(cp->whotime != 0 && cp->chan == n)
+ fileinit(cp);
+}
+
+void
+nethangup(Chan *cp, char *msg, int dolock)
+{
+ Netconn *netconn;
+
+ netconn = cp->pdata;
+ netconn->state = Netclosed;
+
+ if(msg != nil)
+ print("hangup! %s %s\n", msg, netconn->raddr);
+
+ fileinit(cp);
+ cp->whotime = 0;
+ strcpy(cp->whoname, "<none>");
+
+ if(dolock)
+ lock(&netchans);
+ netconn->alloc = 0;
+ free(netconn->raddr);
+ netconn->raddr = nil;
+ if(dolock)
+ unlock(&netchans);
+}
+
+void
+chanhangup(Chan *cp, char *msg, int dolock)
+{
+ Netconn *netconn = cp->pdata;
+ Conn9p *conn9p = netconn->conn9p;
+
+ if (conn9p->fd > 0)
+ hangupdfd(conn9p->fd); /* drop it */
+ nethangup(cp, msg, dolock);
+}
+
+/*
+ * returns length of next 9p message (including the length) and
+ * leaves it in the first few bytes of abuf.
+ */
+static long
+size9pmsg(int fd, void *abuf, uint n)
+{
+ int m;
+ uchar *buf = abuf;
+
+ if (n < BIT32SZ)
+ return -1; /* caller screwed up */
+
+ /* read count */
+ m = readn(fd, buf, BIT32SZ);
+ if(m != BIT32SZ){
+ if(m < 0)
+ return -1;
+ return 0;
+ }
+ return GBIT32(buf);
+}
+
+static int
+readalloc9pmsg(int fd, Msgbuf **mbp)
+{
+ int m, len;
+ uchar lenbuf[BIT32SZ];
+ Msgbuf *mb;
+
+ *mbp = nil;
+ len = size9pmsg(fd, lenbuf, BIT32SZ);
+ if (len <= 0)
+ return len;
+ if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
+ werrstr("bad length in 9P2000 message header");
+ return -1;
+ }
+ if ((mb = mballoc(len, nil, Mbeth1)) == nil)
+ panic("readalloc9pmsg: mballoc failed");
+ *mbp = mb;
+ memmove(mb->data, lenbuf, BIT32SZ);
+ len -= BIT32SZ;
+ m = readn(fd, mb->data+BIT32SZ, len);
+ if(m < len)
+ return 0;
+ return BIT32SZ+m;
+}
+
+static void
+connection(void *v)
+{
+ int n;
+ char buf[64];
+ Chan *chan9p;
+ Conn9p *conn9p = v;
+ Msgbuf *mb;
+ NetConnInfo *nci;
+
+ incref(conn9p); /* count connections */
+ nci = getnetconninfo(conn9p->dir, conn9p->fd);
+ if (nci == nil)
+ panic("connection: getnetconninfo(%s, %d) failed",
+ conn9p->dir, conn9p->fd);
+ conn9p->raddr = nci->raddr;
+
+ chan9p = getchan(conn9p);
+ print("new connection on %s pid %d from %s\n",
+ conn9p->dir, getpid(), conn9p->raddr);
+
+ /*
+ * reading from a pipe or a network device
+ * will give an error after a few eof reads.
+ * however, we cannot tell the difference
+ * between a zero-length read and an interrupt
+ * on the processes writing to us,
+ * so we wait for the error.
+ */
+ while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
+ if(n == 0)
+ continue;
+ mb->param = (uintptr)conn9p; /* has fd for replies */
+ mb->chan = chan9p;
+
+ assert(mb->magic == Mbmagic);
+ incref(conn9p); /* & count packets in flight */
+ fs_send(serveq, mb); /* to 9P server processes */
+ /* mb will be freed by receiving process */
+ }
+
+ rerrstr(buf, sizeof buf);
+
+ qlock(conn9p);
+ print("connection hung up from %s\n", conn9p->dir);
+ if (conn9p->fd > 0) /* not poisoned yet? */
+ hangupdfd(conn9p->fd); /* poison the fd */
+
+ nethangup(chan9p, "remote hung up", 1);
+ closechan(chan9p->chan);
+
+ conn9p->fd = -1; /* poison conn9p */
+ if (decref(conn9p) == 0) { /* last conn.? turn the lights off */
+ free(conn9p->dir);
+ qunlock(conn9p);
+ free(conn9p);
+ } else
+ qunlock(conn9p);
+
+ freenetconninfo(nci);
+
+ if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
+ exits("");
+ sysfatal("mount read, pid %d", getpid());
+}
+
+static void
+neti(void *v)
+{
+ int lisfd, accfd;
+ Network *net;
+ Conn9p *conn9p;
+
+ net = v;
+ print("net%di\n", net->ctlrno);
+ for(;;) {
+ lisfd = listen(net->anndir, net->lisdir);
+ if (lisfd < 0) {
+ print("listen %s failed: %r\n", net->anndir);
+ continue;
+ }
+
+ /* got new call on lisfd */
+ accfd = accept(lisfd, net->lisdir);
+ if (accfd < 0) {
+ print("accept %d (from %s) failed: %r\n",
+ lisfd, net->lisdir);
+ continue;
+ }
+
+ /* accepted that call */
+ conn9p = malloc(sizeof *conn9p);
+ conn9p->dir = strdup(net->lisdir);
+ conn9p->fd = accfd;
+ newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
+ close(lisfd);
+ }
+}
+
+/* only need one of these for all network connections, thus all interfaces */
+static void
+neto(void *)
+{
+ int len, datafd;
+ Msgbuf *mb;
+ Conn9p *conn9p;
+
+ print("neto\n");
+ for(;;) {
+ /* receive 9P answer from 9P server processes */
+ while((mb = fs_recv(netoq, 0)) == nil)
+ continue;
+
+ if(mb->data == nil) {
+ print("neto: pkt nil cat=%d free=%d\n",
+ mb->category, mb->flags&FREE);
+ if(!(mb->flags & FREE))
+ mbfree(mb);
+ continue;
+ }
+
+ /* send answer back over the network connection in the reply */
+ len = mb->count;
+ conn9p = (Conn9p *)mb->param;
+ assert(conn9p);
+
+ qlock(conn9p);
+ datafd = conn9p->fd;
+ assert(len >= 0);
+ /* datafd < 0 probably indicates poisoning by the read side */
+ if (datafd < 0 || write(datafd, mb->data, len) != len) {
+ print( "network write error (%r);");
+ print(" closing connection for %s\n", conn9p->dir);
+ nethangup(getchan(conn9p), "network write error", 1);
+ if (datafd > 0)
+ hangupdfd(datafd); /* drop it */
+ conn9p->fd = -1; /* poison conn9p */
+ }
+ mbfree(mb);
+ if (decref(conn9p) == 0)
+ panic("neto: zero ref count");
+ qunlock(conn9p);
+ }
+}
+
+void
+netstart(void)
+{
+ int netorun = 0;
+ Network *net;
+
+ if(netoq == nil)
+ netoq = newqueue(Nqueue, "network reply");
+ for(net = &netif[0]; net < &netif[Maxnets]; net++){
+ if(net->dialstr == nil)
+ continue;
+ sprint(net->oname, "neto");
+ if (netorun++ == 0)
+ newproc(neto, nil, net->oname);
+ sprint(net->iname, "net%di", net->ctlrno);
+ newproc(neti, net, net->iname);
+ }
+}
+
+void
+netinit(void)
+{
+ Network *net;
+
+ for (net = netif; net < netif + Maxnets; net++) {
+ net->dialstr = annstrs[net - netif];
+ if (net->dialstr == nil)
+ continue;
+ net->annfd = announce(net->dialstr, net->anndir);
+ /* /bin/service/tcp564 may already have grabbed the port */
+ if (net->annfd < 0)
+ sysfatal("can't announce %s: %r", net->dialstr);
+ print("netinit: announced on %s\n", net->dialstr);
+ }
+}