summaryrefslogtreecommitdiff
path: root/sys/src/9/port/devpipe.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/9/port/devpipe.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/9/port/devpipe.c')
-rwxr-xr-xsys/src/9/port/devpipe.c391
1 files changed, 391 insertions, 0 deletions
diff --git a/sys/src/9/port/devpipe.c b/sys/src/9/port/devpipe.c
new file mode 100755
index 000000000..bea354b78
--- /dev/null
+++ b/sys/src/9/port/devpipe.c
@@ -0,0 +1,391 @@
+#include "u.h"
+#include "../port/lib.h"
+#include "mem.h"
+#include "dat.h"
+#include "fns.h"
+#include "../port/error.h"
+
+#include "netif.h"
+
+typedef struct Pipe Pipe;
+struct Pipe
+{
+ QLock;
+ Pipe *next;
+ int ref;
+ ulong path;
+ Queue *q[2];
+ int qref[2];
+};
+
+struct
+{
+ Lock;
+ ulong path;
+} pipealloc;
+
+enum
+{
+ Qdir,
+ Qdata0,
+ Qdata1,
+};
+
+Dirtab pipedir[] =
+{
+ ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
+ "data", {Qdata0}, 0, 0600,
+ "data1", {Qdata1}, 0, 0600,
+};
+#define NPIPEDIR 3
+
+static void
+pipeinit(void)
+{
+ if(conf.pipeqsize == 0){
+ if(conf.nmach > 1)
+ conf.pipeqsize = 256*1024;
+ else
+ conf.pipeqsize = 32*1024;
+ }
+}
+
+/*
+ * create a pipe, no streams are created until an open
+ */
+static Chan*
+pipeattach(char *spec)
+{
+ Pipe *p;
+ Chan *c;
+
+ c = devattach('|', spec);
+ p = malloc(sizeof(Pipe));
+ if(p == 0)
+ exhausted("memory");
+ p->ref = 1;
+
+ p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
+ if(p->q[0] == 0){
+ free(p);
+ exhausted("memory");
+ }
+ p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
+ if(p->q[1] == 0){
+ free(p->q[0]);
+ free(p);
+ exhausted("memory");
+ }
+
+ lock(&pipealloc);
+ p->path = ++pipealloc.path;
+ unlock(&pipealloc);
+
+ mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
+ c->aux = p;
+ c->dev = 0;
+ return c;
+}
+
+static int
+pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
+{
+ Qid q;
+ int len;
+ Pipe *p;
+
+ if(i == DEVDOTDOT){
+ devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
+ return 1;
+ }
+ i++; /* skip . */
+ if(tab==0 || i>=ntab)
+ return -1;
+
+ tab += i;
+ p = c->aux;
+ switch((ulong)tab->qid.path){
+ case Qdata0:
+ len = qlen(p->q[0]);
+ break;
+ case Qdata1:
+ len = qlen(p->q[1]);
+ break;
+ default:
+ len = tab->length;
+ break;
+ }
+ mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
+ devdir(c, q, tab->name, len, eve, tab->perm, dp);
+ return 1;
+}
+
+
+static Walkqid*
+pipewalk(Chan *c, Chan *nc, char **name, int nname)
+{
+ Walkqid *wq;
+ Pipe *p;
+
+ wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
+ if(wq != nil && wq->clone != nil && wq->clone != c){
+ p = c->aux;
+ qlock(p);
+ p->ref++;
+ if(c->flag & COPEN){
+ print("channel open in pipewalk\n");
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ p->qref[0]++;
+ break;
+ case Qdata1:
+ p->qref[1]++;
+ break;
+ }
+ }
+ qunlock(p);
+ }
+ return wq;
+}
+
+static int
+pipestat(Chan *c, uchar *db, int n)
+{
+ Pipe *p;
+ Dir dir;
+
+ p = c->aux;
+
+ switch(NETTYPE(c->qid.path)){
+ case Qdir:
+ devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
+ break;
+ case Qdata0:
+ devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
+ break;
+ case Qdata1:
+ devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
+ break;
+ default:
+ panic("pipestat");
+ }
+ n = convD2M(&dir, db, n);
+ if(n < BIT16SZ)
+ error(Eshortstat);
+ return n;
+}
+
+/*
+ * if the stream doesn't exist, create it
+ */
+static Chan*
+pipeopen(Chan *c, int omode)
+{
+ Pipe *p;
+
+ if(c->qid.type & QTDIR){
+ if(omode != OREAD)
+ error(Ebadarg);
+ c->mode = omode;
+ c->flag |= COPEN;
+ c->offset = 0;
+ return c;
+ }
+
+ p = c->aux;
+ qlock(p);
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ p->qref[0]++;
+ break;
+ case Qdata1:
+ p->qref[1]++;
+ break;
+ }
+ qunlock(p);
+
+ c->mode = openmode(omode);
+ c->flag |= COPEN;
+ c->offset = 0;
+ c->iounit = qiomaxatomic;
+ return c;
+}
+
+static void
+pipeclose(Chan *c)
+{
+ Pipe *p;
+
+ p = c->aux;
+ qlock(p);
+
+ if(c->flag & COPEN){
+ /*
+ * closing either side hangs up the stream
+ */
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ p->qref[0]--;
+ if(p->qref[0] == 0){
+ qhangup(p->q[1], 0);
+ qclose(p->q[0]);
+ }
+ break;
+ case Qdata1:
+ p->qref[1]--;
+ if(p->qref[1] == 0){
+ qhangup(p->q[0], 0);
+ qclose(p->q[1]);
+ }
+ break;
+ }
+ }
+
+
+ /*
+ * if both sides are closed, they are reusable
+ */
+ if(p->qref[0] == 0 && p->qref[1] == 0){
+ qreopen(p->q[0]);
+ qreopen(p->q[1]);
+ }
+
+ /*
+ * free the structure on last close
+ */
+ p->ref--;
+ if(p->ref == 0){
+ qunlock(p);
+ free(p->q[0]);
+ free(p->q[1]);
+ free(p);
+ } else
+ qunlock(p);
+}
+
+static long
+piperead(Chan *c, void *va, long n, vlong)
+{
+ Pipe *p;
+
+ p = c->aux;
+
+ switch(NETTYPE(c->qid.path)){
+ case Qdir:
+ return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
+ case Qdata0:
+ return qread(p->q[0], va, n);
+ case Qdata1:
+ return qread(p->q[1], va, n);
+ default:
+ panic("piperead");
+ }
+ return -1; /* not reached */
+}
+
+static Block*
+pipebread(Chan *c, long n, ulong offset)
+{
+ Pipe *p;
+
+ p = c->aux;
+
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ return qbread(p->q[0], n);
+ case Qdata1:
+ return qbread(p->q[1], n);
+ }
+
+ return devbread(c, n, offset);
+}
+
+/*
+ * a write to a closed pipe causes a note to be sent to
+ * the process.
+ */
+static long
+pipewrite(Chan *c, void *va, long n, vlong)
+{
+ Pipe *p;
+
+ if(!islo())
+ print("pipewrite hi %#p\n", getcallerpc(&c));
+ if(waserror()) {
+ /* avoid notes when pipe is a mounted queue */
+ if((c->flag & CMSG) == 0)
+ postnote(up, 1, "sys: write on closed pipe", NUser);
+ nexterror();
+ }
+
+ p = c->aux;
+
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ n = qwrite(p->q[1], va, n);
+ break;
+
+ case Qdata1:
+ n = qwrite(p->q[0], va, n);
+ break;
+
+ default:
+ panic("pipewrite");
+ }
+
+ poperror();
+ return n;
+}
+
+static long
+pipebwrite(Chan *c, Block *bp, ulong)
+{
+ long n;
+ Pipe *p;
+
+ if(waserror()) {
+ /* avoid notes when pipe is a mounted queue */
+ if((c->flag & CMSG) == 0)
+ postnote(up, 1, "sys: write on closed pipe", NUser);
+ nexterror();
+ }
+
+ p = c->aux;
+ switch(NETTYPE(c->qid.path)){
+ case Qdata0:
+ n = qbwrite(p->q[1], bp);
+ break;
+
+ case Qdata1:
+ n = qbwrite(p->q[0], bp);
+ break;
+
+ default:
+ n = 0;
+ panic("pipebwrite");
+ }
+
+ poperror();
+ return n;
+}
+
+Dev pipedevtab = {
+ '|',
+ "pipe",
+
+ devreset,
+ pipeinit,
+ devshutdown,
+ pipeattach,
+ pipewalk,
+ pipestat,
+ pipeopen,
+ devcreate,
+ pipeclose,
+ piperead,
+ pipebread,
+ pipewrite,
+ pipebwrite,
+ devremove,
+ devwstat,
+};