diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/libventi/rpc.c |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/libventi/rpc.c')
-rwxr-xr-x | sys/src/libventi/rpc.c | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/sys/src/libventi/rpc.c b/sys/src/libventi/rpc.c new file mode 100755 index 000000000..5a820a995 --- /dev/null +++ b/sys/src/libventi/rpc.c @@ -0,0 +1,173 @@ +/* + * Multiplexed Venti client. It would be nice if we + * could turn this into a generic library routine rather + * than keep it Venti specific. A user-level 9P client + * could use something like this too. + * + * (Actually it does - this should be replaced with libmux, + * which should be renamed librpcmux.) + * + * This is a little more complicated than it might be + * because we want it to work well within and without libthread. + * + * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel. + */ + +#include <u.h> +#include <libc.h> +#include <venti.h> + +typedef struct Rwait Rwait; +struct Rwait +{ + Rendez r; + Packet *p; + int done; + int sleeping; +}; + +static int gettag(VtConn*, Rwait*); +static void puttag(VtConn*, Rwait*, int); +static void muxrpc(VtConn*, Packet*); + +Packet* +_vtrpc(VtConn *z, Packet *p, VtFcall *tx) +{ + int i; + uchar tag, buf[2], *top; + Rwait *r, *rr; + + /* must malloc because stack could be private */ + r = vtmallocz(sizeof(Rwait)); + + qlock(&z->lk); + r->r.l = &z->lk; + tag = gettag(z, r); + if(tx){ + /* vtfcallrpc can't print packet because it doesn't have tag */ + tx->tag = tag; + if(chattyventi) + fprint(2, "%s -> %F\n", argv0, tx); + } + + /* slam tag into packet */ + top = packetpeek(p, buf, 0, 2); + if(top == nil){ + packetfree(p); + return nil; + } + if(top == buf){ + werrstr("first two bytes must be in same packet fragment"); + packetfree(p); + vtfree(r); + return nil; + } + top[1] = tag; + qunlock(&z->lk); + if(vtsend(z, p) < 0){ + vtfree(r); + return nil; + } + + qlock(&z->lk); + /* wait for the muxer to give us our packet */ + r->sleeping = 1; + z->nsleep++; + while(z->muxer && !r->done) + rsleep(&r->r); + z->nsleep--; + r->sleeping = 0; + + /* if not done, there's no muxer: start muxing */ + if(!r->done){ + if(z->muxer) + abort(); + z->muxer = 1; + while(!r->done){ + qunlock(&z->lk); + if((p = vtrecv(z)) == nil){ + werrstr("unexpected eof on venti connection"); + z->muxer = 0; + vtfree(r); + return nil; + } + qlock(&z->lk); + muxrpc(z, p); + } + z->muxer = 0; + /* if there is anyone else sleeping, wake first unfinished to mux */ + if(z->nsleep) + for(i=0; i<256; i++){ + rr = z->wait[i]; + if(rr && rr->sleeping && !rr->done){ + rwakeup(&rr->r); + break; + } + } + } + + p = r->p; + puttag(z, r, tag); + vtfree(r); + qunlock(&z->lk); + return p; +} + +Packet* +vtrpc(VtConn *z, Packet *p) +{ + return _vtrpc(z, p, nil); +} + +static int +gettag(VtConn *z, Rwait *r) +{ + int i; + +Again: + while(z->ntag == 256) + rsleep(&z->tagrend); + for(i=0; i<256; i++) + if(z->wait[i] == 0){ + z->ntag++; + z->wait[i] = r; + return i; + } + fprint(2, "libventi: ntag botch\n"); + goto Again; +} + +static void +puttag(VtConn *z, Rwait *r, int tag) +{ + assert(z->wait[tag] == r); + z->wait[tag] = nil; + z->ntag--; + rwakeup(&z->tagrend); +} + +static void +muxrpc(VtConn *z, Packet *p) +{ + uchar tag, buf[2], *top; + Rwait *r; + + if((top = packetpeek(p, buf, 0, 2)) == nil){ + fprint(2, "libventi: short packet in vtrpc\n"); + packetfree(p); + return; + } + + tag = top[1]; + if((r = z->wait[tag]) == nil){ + fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag); +abort(); + packetfree(p); + return; + } + + r->p = p; + r->done = 1; + rwakeup(&r->r); +} + |