1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <fcall.h>
#include <9p.h>
static int
_reqqueuenote(void *uregs, char *note)
{
Reqqueue *q;
if(strcmp(note, "flush") != 0)
return 0;
q = *threaddata();
if(q != nil){
q->cur = nil;
notejmp(uregs, q->flush, 1);
}
return 1;
}
static void
_reqqueueproc(void *v)
{
Reqqueue *q;
Req *r;
void (*f)(Req *);
q = v;
*threaddata() = q;
rfork(RFNOTEG);
threadnotify(_reqqueuenote, 1);
for(;;){
qlock(q);
q->cur = nil;
while(q->next == q)
rsleep(q);
r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
r->qu.next->prev = r->qu.prev;
r->qu.prev->next = r->qu.next;
f = r->qu.f;
qlock(&r->lk);
memset(&r->qu, 0, sizeof(r->qu));
qunlock(&r->lk);
q->cur = r;
if(setjmp(q->flush)){
respond(r, "interrupted");
continue;
}
qunlock(q);
f(r);
}
}
Reqqueue *
reqqueuecreate(void)
{
Reqqueue *q;
q = emalloc9p(sizeof(*q));
memset(q, 0, sizeof(*q));
q->l = q;
q->next = q->prev = q;
q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
print("%d\n", q->pid);
return q;
}
void
reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
{
qlock(q);
r->qu.f = f;
r->qu.next = q;
r->qu.prev = q->prev;
q->prev->next = &r->qu;
q->prev = &r->qu;
rwakeupall(q);
qunlock(q);
}
void
reqqueueflush(Reqqueue *q, Req *r)
{
qlock(q);
if(q->cur == r){
postnote(PNPROC, q->pid, "flush");
qunlock(q);
}else{
if(r->qu.next != nil){
r->qu.next->prev = r->qu.prev;
r->qu.prev->next = r->qu.next;
}
qlock(&r->lk);
memset(&r->qu, 0, sizeof(r->qu));
qunlock(&r->lk);
qunlock(q);
respond(r, "interrupted");
}
}
int
reqqueueflushed(void)
{
Reqqueue *q;
q = *threaddata();
qlock(q);
if(setjmp(q->flush))
return 1;
qunlock(q);
return 0;
}
|