summaryrefslogtreecommitdiff
path: root/sys/src/libventi/rpc.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/libventi/rpc.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/libventi/rpc.c')
-rwxr-xr-xsys/src/libventi/rpc.c173
1 files changed, 173 insertions, 0 deletions
diff --git a/sys/src/libventi/rpc.c b/sys/src/libventi/rpc.c
new file mode 100755
index 000000000..5a820a995
--- /dev/null
+++ b/sys/src/libventi/rpc.c
@@ -0,0 +1,173 @@
+/*
+ * Multiplexed Venti client. It would be nice if we
+ * could turn this into a generic library routine rather
+ * than keep it Venti specific. A user-level 9P client
+ * could use something like this too.
+ *
+ * (Actually it does - this should be replaced with libmux,
+ * which should be renamed librpcmux.)
+ *
+ * This is a little more complicated than it might be
+ * because we want it to work well within and without libthread.
+ *
+ * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
+ */
+
+#include <u.h>
+#include <libc.h>
+#include <venti.h>
+
+typedef struct Rwait Rwait;
+struct Rwait
+{
+ Rendez r;
+ Packet *p;
+ int done;
+ int sleeping;
+};
+
+static int gettag(VtConn*, Rwait*);
+static void puttag(VtConn*, Rwait*, int);
+static void muxrpc(VtConn*, Packet*);
+
+Packet*
+_vtrpc(VtConn *z, Packet *p, VtFcall *tx)
+{
+ int i;
+ uchar tag, buf[2], *top;
+ Rwait *r, *rr;
+
+ /* must malloc because stack could be private */
+ r = vtmallocz(sizeof(Rwait));
+
+ qlock(&z->lk);
+ r->r.l = &z->lk;
+ tag = gettag(z, r);
+ if(tx){
+ /* vtfcallrpc can't print packet because it doesn't have tag */
+ tx->tag = tag;
+ if(chattyventi)
+ fprint(2, "%s -> %F\n", argv0, tx);
+ }
+
+ /* slam tag into packet */
+ top = packetpeek(p, buf, 0, 2);
+ if(top == nil){
+ packetfree(p);
+ return nil;
+ }
+ if(top == buf){
+ werrstr("first two bytes must be in same packet fragment");
+ packetfree(p);
+ vtfree(r);
+ return nil;
+ }
+ top[1] = tag;
+ qunlock(&z->lk);
+ if(vtsend(z, p) < 0){
+ vtfree(r);
+ return nil;
+ }
+
+ qlock(&z->lk);
+ /* wait for the muxer to give us our packet */
+ r->sleeping = 1;
+ z->nsleep++;
+ while(z->muxer && !r->done)
+ rsleep(&r->r);
+ z->nsleep--;
+ r->sleeping = 0;
+
+ /* if not done, there's no muxer: start muxing */
+ if(!r->done){
+ if(z->muxer)
+ abort();
+ z->muxer = 1;
+ while(!r->done){
+ qunlock(&z->lk);
+ if((p = vtrecv(z)) == nil){
+ werrstr("unexpected eof on venti connection");
+ z->muxer = 0;
+ vtfree(r);
+ return nil;
+ }
+ qlock(&z->lk);
+ muxrpc(z, p);
+ }
+ z->muxer = 0;
+ /* if there is anyone else sleeping, wake first unfinished to mux */
+ if(z->nsleep)
+ for(i=0; i<256; i++){
+ rr = z->wait[i];
+ if(rr && rr->sleeping && !rr->done){
+ rwakeup(&rr->r);
+ break;
+ }
+ }
+ }
+
+ p = r->p;
+ puttag(z, r, tag);
+ vtfree(r);
+ qunlock(&z->lk);
+ return p;
+}
+
+Packet*
+vtrpc(VtConn *z, Packet *p)
+{
+ return _vtrpc(z, p, nil);
+}
+
+static int
+gettag(VtConn *z, Rwait *r)
+{
+ int i;
+
+Again:
+ while(z->ntag == 256)
+ rsleep(&z->tagrend);
+ for(i=0; i<256; i++)
+ if(z->wait[i] == 0){
+ z->ntag++;
+ z->wait[i] = r;
+ return i;
+ }
+ fprint(2, "libventi: ntag botch\n");
+ goto Again;
+}
+
+static void
+puttag(VtConn *z, Rwait *r, int tag)
+{
+ assert(z->wait[tag] == r);
+ z->wait[tag] = nil;
+ z->ntag--;
+ rwakeup(&z->tagrend);
+}
+
+static void
+muxrpc(VtConn *z, Packet *p)
+{
+ uchar tag, buf[2], *top;
+ Rwait *r;
+
+ if((top = packetpeek(p, buf, 0, 2)) == nil){
+ fprint(2, "libventi: short packet in vtrpc\n");
+ packetfree(p);
+ return;
+ }
+
+ tag = top[1];
+ if((r = z->wait[tag]) == nil){
+ fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
+abort();
+ packetfree(p);
+ return;
+ }
+
+ r->p = p;
+ r->done = 1;
+ rwakeup(&r->r);
+}
+