summaryrefslogtreecommitdiff
path: root/sys/src/lib9p
diff options
context:
space:
mode:
authoraiju <aiju@phicode.de>2011-08-16 22:00:34 +0200
committeraiju <aiju@phicode.de>2011-08-16 22:00:34 +0200
commitc65100ffa0031d7a4744b3715b4c4c16da9074e9 (patch)
tree4fd4f3bc670fc2ce1ed6217a13478ac3286c4b40 /sys/src/lib9p
parent2f2c93066909f04e346f739f6ed30f536a85fd28 (diff)
lib9p: added toilet queues
Diffstat (limited to 'sys/src/lib9p')
-rw-r--r--sys/src/lib9p/mkfile1
-rw-r--r--sys/src/lib9p/queue.c113
2 files changed, 114 insertions, 0 deletions
diff --git a/sys/src/lib9p/mkfile b/sys/src/lib9p/mkfile
index abbf8dc65..78129109b 100644
--- a/sys/src/lib9p/mkfile
+++ b/sys/src/lib9p/mkfile
@@ -12,6 +12,7 @@ OFILES=\
req.$O\
parse.$O\
post.$O\
+ queue.$O\
rfork.$O\
srv.$O\
thread.$O\
diff --git a/sys/src/lib9p/queue.c b/sys/src/lib9p/queue.c
new file mode 100644
index 000000000..307c385fc
--- /dev/null
+++ b/sys/src/lib9p/queue.c
@@ -0,0 +1,113 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <fcall.h>
+#include <9p.h>
+
+static int
+_reqqueuenote(void *uregs, char *note)
+{
+ Reqqueue *q;
+
+ if(strcmp(note, "flush") != 0)
+ return 0;
+ q = *threaddata();
+ if(q != nil){
+ q->cur = nil;
+ notejmp(uregs, q->flush, 1);
+ }
+ return 1;
+}
+
+static void
+_reqqueueproc(void *v)
+{
+ Reqqueue *q;
+ Req *r;
+ void (*f)(Req *);
+
+ q = v;
+ *threaddata() = q;
+ rfork(RFNOTEG);
+ threadnotify(_reqqueuenote, 1);
+ for(;;){
+ qlock(q);
+ q->cur = nil;
+ while(q->next == q)
+ rsleep(q);
+ r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ f = r->qu.f;
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ q->cur = r;
+ if(setjmp(q->flush)){
+ respond(r, "interrupted");
+ continue;
+ }
+ qunlock(q);
+ f(r);
+ }
+}
+
+Reqqueue *
+reqqueuecreate(void)
+{
+ Reqqueue *q;
+
+ q = emalloc9p(sizeof(*q));
+ memset(q, 0, sizeof(*q));
+ q->l = q;
+ q->next = q->prev = q;
+ q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
+ print("%d\n", q->pid);
+ return q;
+}
+
+void
+reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
+{
+ qlock(q);
+ r->qu.f = f;
+ r->qu.next = q;
+ r->qu.prev = q->prev;
+ q->prev->next = &r->qu;
+ q->prev = &r->qu;
+ rwakeupall(q);
+ qunlock(q);
+}
+
+void
+reqqueueflush(Reqqueue *q, Req *r)
+{
+ qlock(q);
+ if(q->cur == r){
+ postnote(PNPROC, q->pid, "flush");
+ qunlock(q);
+ }else{
+ if(r->qu.next != nil){
+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ }
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ qunlock(q);
+ respond(r, "interrupted");
+ }
+}
+
+int
+reqqueueflushed(void)
+{
+ Reqqueue *q;
+
+ q = *threaddata();
+ qlock(q);
+ if(setjmp(q->flush))
+ return 1;
+ qunlock(q);
+ return 0;
+}