diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/libventi/send.c |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/libventi/send.c')
-rwxr-xr-x | sys/src/libventi/send.c | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/sys/src/libventi/send.c b/sys/src/libventi/send.c new file mode 100755 index 000000000..2b88818da --- /dev/null +++ b/sys/src/libventi/send.c @@ -0,0 +1,251 @@ +#include <u.h> +#include <libc.h> +#include <venti.h> +#include "queue.h" + +long ventisendbytes, ventisendpackets; +long ventirecvbytes, ventirecvpackets; + +static int +_vtsend(VtConn *z, Packet *p) +{ + IOchunk ioc; + int n, tot; + uchar buf[2]; + + if(z->state != VtStateConnected) { + werrstr("session not connected"); + return -1; + } + + /* add framing */ + n = packetsize(p); + if(n >= (1<<16)) { + werrstr("packet too large"); + packetfree(p); + return -1; + } + buf[0] = n>>8; + buf[1] = n; + packetprefix(p, buf, 2); + ventisendbytes += n+2; + ventisendpackets++; + + tot = 0; + for(;;){ + n = packetfragments(p, &ioc, 1, 0); + if(n == 0) + break; + if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){ + vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p); + packetfree(p); + return -1; + } + packetconsume(p, nil, ioc.len); + tot += ioc.len; + } + vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot); + packetfree(p); + return 1; +} + +static int +interrupted(void) +{ + char e[ERRMAX]; + + rerrstr(e, sizeof e); + return strstr(e, "interrupted") != nil; +} + + +static Packet* +_vtrecv(VtConn *z) +{ + uchar buf[10], *b; + int n; + Packet *p; + int size, len; + + if(z->state != VtStateConnected) { + werrstr("session not connected"); + return nil; + } + + p = z->part; + /* get enough for head size */ + size = packetsize(p); + while(size < 2) { + b = packettrailer(p, 2); + assert(b != nil); + if(0) fprint(2, "%d read hdr\n", getpid()); + n = read(z->infd, b, 2); + if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); + if(n==0 || (n<0 && !interrupted())) + goto Err; + size += n; + packettrim(p, 0, size); + } + + if(packetconsume(p, buf, 2) < 0) + goto Err; + len = (buf[0] << 8) | buf[1]; + size -= 2; + + while(size < len) { + n = len - size; + if(n > MaxFragSize) + n = MaxFragSize; + b = packettrailer(p, n); + if(0) fprint(2, "%d read body %d\n", getpid(), n); + n = read(z->infd, b, n); + if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); + if(n > 0) + size += n; + packettrim(p, 0, size); + if(n==0 || (n<0 && !interrupted())) + goto Err; + } + ventirecvbytes += len; + ventirecvpackets++; + p = packetsplit(p, len); + vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len); + return p; +Err: + vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr); + return nil; +} + +/* + * If you fork off two procs running vtrecvproc and vtsendproc, + * then vtrecv/vtsend (and thus vtrpc) will never block except on + * rendevouses, which is nice when it's running in one thread of many. + */ +void +vtrecvproc(void *v) +{ + Packet *p; + VtConn *z; + Queue *q; + + z = v; + q = _vtqalloc(); + + qlock(&z->lk); + z->readq = q; + qlock(&z->inlk); + rwakeup(&z->rpcfork); + qunlock(&z->lk); + + while((p = _vtrecv(z)) != nil) + if(_vtqsend(q, p) < 0){ + packetfree(p); + break; + } + qunlock(&z->inlk); + qlock(&z->lk); + _vtqhangup(q); + while((p = _vtnbqrecv(q)) != nil) + packetfree(p); + _vtqdecref(q); + z->readq = nil; + rwakeup(&z->rpcfork); + qunlock(&z->lk); + vthangup(z); +} + +void +vtsendproc(void *v) +{ + Queue *q; + Packet *p; + VtConn *z; + + z = v; + q = _vtqalloc(); + + qlock(&z->lk); + z->writeq = q; + qlock(&z->outlk); + rwakeup(&z->rpcfork); + qunlock(&z->lk); + + while((p = _vtqrecv(q)) != nil) + if(_vtsend(z, p) < 0) + break; + qunlock(&z->outlk); + qlock(&z->lk); + _vtqhangup(q); + while((p = _vtnbqrecv(q)) != nil) + packetfree(p); + _vtqdecref(q); + z->writeq = nil; + rwakeup(&z->rpcfork); + qunlock(&z->lk); + return; +} + +Packet* +vtrecv(VtConn *z) +{ + Packet *p; + Queue *q; + + qlock(&z->lk); + if(z->state != VtStateConnected){ + werrstr("not connected"); + qunlock(&z->lk); + return nil; + } + if(z->readq){ + q = _vtqincref(z->readq); + qunlock(&z->lk); + p = _vtqrecv(q); + _vtqdecref(q); + return p; + } + + qlock(&z->inlk); + qunlock(&z->lk); + p = _vtrecv(z); + qunlock(&z->inlk); + if(!p) + vthangup(z); + return p; +} + +int +vtsend(VtConn *z, Packet *p) +{ + Queue *q; + + qlock(&z->lk); + if(z->state != VtStateConnected){ + packetfree(p); + werrstr("not connected"); + qunlock(&z->lk); + return -1; + } + if(z->writeq){ + q = _vtqincref(z->writeq); + qunlock(&z->lk); + if(_vtqsend(q, p) < 0){ + _vtqdecref(q); + packetfree(p); + return -1; + } + _vtqdecref(q); + return 0; + } + + qlock(&z->outlk); + qunlock(&z->lk); + if(_vtsend(z, p) < 0){ + qunlock(&z->outlk); + vthangup(z); + return -1; + } + qunlock(&z->outlk); + return 0; +} + |