summaryrefslogtreecommitdiff
path: root/sys/src/cmd/webfs/buq.c
diff options
context:
space:
mode:
authorcinap_lenrek <cinap_lenrek@rei2.9hal>2012-01-11 16:17:54 +0100
committercinap_lenrek <cinap_lenrek@rei2.9hal>2012-01-11 16:17:54 +0100
commit75e1ef0ab60acb6bccc54254b82770aec5786ead (patch)
treed273fc755a20e67801aa0a13df30ab75b2883419 /sys/src/cmd/webfs/buq.c
parent62fb4f97177d8e76f1fd49bb9d0073007b7c9bcc (diff)
new webfs, rc based hget
Diffstat (limited to 'sys/src/cmd/webfs/buq.c')
-rw-r--r--sys/src/cmd/webfs/buq.c263
1 files changed, 263 insertions, 0 deletions
diff --git a/sys/src/cmd/webfs/buq.c b/sys/src/cmd/webfs/buq.c
new file mode 100644
index 000000000..cbf5c9672
--- /dev/null
+++ b/sys/src/cmd/webfs/buq.c
@@ -0,0 +1,263 @@
+#include <u.h>
+#include <libc.h>
+#include <ctype.h>
+#include <fcall.h>
+#include <thread.h>
+#include <9p.h>
+
+#include "dat.h"
+#include "fns.h"
+
+static void
+matchreq(Buq *q)
+{
+ Req *r;
+ Buf *b;
+ int l;
+
+ while(r = q->rh){
+ if((b = q->bh) == nil){
+ if(q->closed){
+ if((q->rh = r->aux) == nil)
+ q->rt = &q->rh;
+ if(r->ifcall.type == Tread)
+ r->ofcall.count = 0;
+ respond(r, q->error);
+ continue;
+ }
+ break;
+ }
+ if((q->rh = r->aux) == nil)
+ q->rt = &q->rh;
+ if(r->ifcall.type == Topen){
+ respond(r, nil);
+ continue;
+ }
+ l = b->ep - b->rp;
+ if(l > r->ifcall.count)
+ l = r->ifcall.count;
+ memmove(r->ofcall.data, b->rp, l);
+ r->ofcall.count = l;
+ respond(r, nil);
+ b->rp += l;
+ q->size -= l;
+ if(b->rp >= b->ep){
+ if((q->bh = b->next) == nil)
+ q->bt = &q->bh;
+ if(r = b->wreq){
+ r->ofcall.count = r->ifcall.count;
+ respond(r, nil);
+ }
+ free(b);
+ }
+ }
+ rwakeupall(&q->rz);
+}
+
+int
+buread(Buq *q, void *v, int l)
+{
+ Req *r;
+ Buf *b;
+
+ qlock(q);
+ while((b = q->bh) == nil){
+ if(q->closed){
+ l = 0;
+ if(q->error){
+ werrstr("%s", q->error);
+ l = -1;
+ }
+ qunlock(q);
+ return l;
+ }
+ rsleep(&q->rz);
+ }
+ if(l > (b->ep - b->rp))
+ l = b->ep - b->rp;
+ memmove(v, b->rp, l);
+ b->rp += l;
+ q->size -= l;
+ rwakeup(&q->rz);
+ if(b->rp < b->ep){
+ qunlock(q);
+ return l;
+ }
+ if((q->bh = b->next) == nil)
+ q->bt = &q->bh;
+ qunlock(q);
+ if(r = b->wreq){
+ r->ofcall.count = r->ifcall.count;
+ respond(r, nil);
+ }
+ free(b);
+ return l;
+}
+
+int
+buwrite(Buq *q, void *v, int l)
+{
+ Buf *b;
+
+ b = emalloc(sizeof(*b) + l);
+ b->wreq = nil;
+ b->rp = b->end;
+ b->ep = b->rp + l;
+ memmove(b->rp, v, l);
+ b->next = nil;
+ qlock(q);
+ if(q->closed){
+ l = 0;
+ if(q->error){
+ werrstr("%s", q->error);
+ l = -1;
+ }
+ qunlock(q);
+ free(b);
+ return l;
+ }
+ *q->bt = b;
+ q->bt = &b->next;
+ q->size += l;
+ matchreq(q);
+ while(!q->closed && q->size >= q->limit)
+ rsleep(&q->rz);
+ qunlock(q);
+ return l;
+}
+
+void
+buclose(Buq *q, char *error)
+{
+ if(q == nil)
+ return;
+ qlock(q);
+ if(!q->closed){
+ if(error)
+ q->error = estrdup9p(error);
+ q->closed = 1;
+ matchreq(q);
+ }
+ qunlock(q);
+}
+
+Buq*
+bualloc(int limit)
+{
+ Buq *q;
+
+ q = emalloc(sizeof(*q));
+ q->limit = limit;
+ q->rt = &q->rh;
+ q->bt = &q->bh;
+ q->rz.l = q;
+ incref(q);
+ return q;
+}
+
+void
+bufree(Buq *q)
+{
+ Buf *b;
+ Key *k;
+
+ if(q == nil || decref(q))
+ return;
+ while(b = q->bh){
+ q->bh = b->next;
+ free(b);
+ }
+ freeurl(q->url);
+ while(k = q->hdr){
+ q->hdr = k->next;
+ free(k);
+ }
+ free(q->error);
+ free(q);
+}
+
+void
+bureq(Buq *q, Req *r)
+{
+ Buf *b;
+ int l;
+
+ switch(r->ifcall.type){
+ default:
+ respond(r, "bug in bureq");
+ return;
+ case Twrite:
+ l = r->ifcall.count;
+ if((q->size + l) < q->limit){
+ r->ofcall.count = buwrite(q, r->ifcall.data, r->ifcall.count);
+ respond(r, nil);
+ return;
+ }
+ b = emalloc(sizeof(*b));
+ b->wreq = r;
+ b->rp = (uchar*)r->ifcall.data;
+ b->ep = b->rp + l;
+ b->next = nil;
+ qlock(q);
+ *q->bt = b;
+ q->bt = &b->next;
+ q->size += l;
+ break;
+ case Tread:
+ case Topen:
+ r->aux = nil;
+ qlock(q);
+ *q->rt = r;
+ q->rt = (Req**)&r->aux;
+ break;
+ }
+ matchreq(q);
+ qunlock(q);
+}
+
+void
+buflushreq(Buq *q, Req *r)
+{
+ Buf **bb, *b;
+ Req **rr;
+ int l;
+
+ switch(r->ifcall.type){
+ default:
+ respond(r, "bug in bufflushreq");
+ return;
+ case Twrite:
+ qlock(q);
+ for(bb = &q->bh; b = *bb; bb = &b->next){
+ if(b->wreq != r)
+ continue;
+ /* fake successfull write */
+ l = b->ep - b->rp;
+ b = realloc(b, sizeof(*b) + l);
+ memmove(b->end, b->rp, l);
+ b->rp = b->end;
+ b->ep = b->rp + l;
+ b->wreq = nil;
+ *bb = b;
+ if(b->next == nil)
+ q->bt = &b->next;
+ r->ofcall.count = r->ifcall.count;
+ respond(r, nil);
+ break;
+ }
+ break;
+ case Topen:
+ case Tread:
+ qlock(q);
+ for(rr = &q->rh; *rr; rr = (Req**)&((*rr)->aux)){
+ if(*rr != r)
+ continue;
+ if((*rr = r->aux) == nil)
+ q->rt = rr;
+ respond(r, "interrupted");
+ break;
+ }
+ break;
+ }
+ qunlock(q);
+}