diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/libthread |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/libthread')
44 files changed, 4554 insertions, 0 deletions
diff --git a/sys/src/libthread/386.c b/sys/src/libthread/386.c new file mode 100755 index 000000000..e85248fd2 --- /dev/null +++ b/sys/src/libthread/386.c @@ -0,0 +1,24 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static void +launcher386(void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + ulong *tos; + + tos = (ulong*)&t->stk[t->stksize&~7]; + *--tos = (ulong)arg; + *--tos = (ulong)f; + t->sched[JMPBUFPC] = (ulong)launcher386+JMPBUFDPC; + t->sched[JMPBUFSP] = (ulong)tos - 8; /* old PC and new PC */ +} + diff --git a/sys/src/libthread/alpha.c b/sys/src/libthread/alpha.c new file mode 100755 index 000000000..e74855346 --- /dev/null +++ b/sys/src/libthread/alpha.c @@ -0,0 +1,32 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* first argument goes in a register and on the stack; avoid it */ +static void +launcheralpha(int, void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + ulong *tos; + + tos = (ulong*)&t->stk[t->stksize&~7]; + + *--tos = 0; /* pad arguments to 8 bytes */ + *--tos = (ulong)arg; + *--tos = (ulong)f; + *--tos = 0; /* first arg */ + + *--tos = 0; /* for alignment with... */ + *--tos = 0; /* ... place to store return PC */ + + t->sched[JMPBUFPC] = (ulong)launcheralpha+JMPBUFDPC; + t->sched[JMPBUFSP] = (ulong)tos; +} + diff --git a/sys/src/libthread/amd64.c b/sys/src/libthread/amd64.c new file mode 100755 index 000000000..c0b0e30e1 --- /dev/null +++ b/sys/src/libthread/amd64.c @@ -0,0 +1,26 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* first argument goes in a register; simplest just to ignore it */ +static void +launcheramd64(int, void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + uvlong *tos; + + tos = (uvlong*)&t->stk[t->stksize&~7]; + *--tos = (uvlong)arg; + *--tos = (uvlong)f; + *--tos = 0; /* first arg to launcheramd64 */ + t->sched[JMPBUFPC] = (uvlong)launcheramd64+JMPBUFDPC; + t->sched[JMPBUFSP] = (uvlong)tos - 2*8; /* old PC and new PC */ +} + diff --git a/sys/src/libthread/arm.c b/sys/src/libthread/arm.c new file mode 100755 index 000000000..cbaf90001 --- /dev/null +++ b/sys/src/libthread/arm.c @@ -0,0 +1,28 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* first argument goes in a register; simplest just to ignore it */ +static void +launcherarm(int, void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + ulong *tos; + + tos = (ulong*)&t->stk[t->stksize&~7]; + *--tos = (ulong)arg; + *--tos = (ulong)f; + *--tos = 0; /* first arg to launchermips */ + *--tos = 0; /* place to store return PC */ + + t->sched[JMPBUFPC] = (ulong)launcherarm+JMPBUFDPC; + t->sched[JMPBUFSP] = (ulong)tos; +} + diff --git a/sys/src/libthread/channel.acid b/sys/src/libthread/channel.acid new file mode 100755 index 000000000..d008bd423 --- /dev/null +++ b/sys/src/libthread/channel.acid @@ -0,0 +1,675 @@ +sizeof_1_ = 8; +aggr _1_ +{ + 'U' 0 lo; + 'U' 4 hi; +}; + +defn +_1_(addr) { + complex _1_ addr; + print(" lo ", addr.lo, "\n"); + print(" hi ", addr.hi, "\n"); +}; + +sizeofFPdbleword = 8; +aggr FPdbleword +{ + 'F' 0 x; + { + 'U' 0 lo; + 'U' 4 hi; + }; +}; + +defn +FPdbleword(addr) { + complex FPdbleword addr; + print(" x ", addr.x, "\n"); + print("_1_ {\n"); + _1_(addr+0); + print("}\n"); +}; + +UTFmax = 3; +Runesync = 128; +Runeself = 128; +Runeerror = 65533; +sizeofFmt = 48; +aggr Fmt +{ + 'b' 0 runes; + 'X' 4 start; + 'X' 8 to; + 'X' 12 stop; + 'X' 16 flush; + 'X' 20 farg; + 'D' 24 nfmt; + 'X' 28 args; + 'D' 32 r; + 'D' 36 width; + 'D' 40 prec; + 'U' 44 flags; +}; + +defn +Fmt(addr) { + complex Fmt addr; + print(" runes ", addr.runes, "\n"); + print(" start ", addr.start\X, "\n"); + print(" to ", addr.to\X, "\n"); + print(" stop ", addr.stop\X, "\n"); + print(" flush ", addr.flush\X, "\n"); + print(" farg ", addr.farg\X, "\n"); + print(" nfmt ", addr.nfmt, "\n"); + print(" args ", addr.args\X, "\n"); + print(" r ", addr.r, "\n"); + print(" width ", addr.width, "\n"); + print(" prec ", addr.prec, "\n"); + print(" flags ", addr.flags, "\n"); +}; + +FmtWidth = 1; +FmtLeft = 2; +FmtPrec = 4; +FmtSharp = 8; +FmtSpace = 16; +FmtSign = 32; +FmtZero = 64; +FmtUnsigned = 128; +FmtShort = 256; +FmtLong = 512; +FmtVLong = 1024; +FmtComma = 2048; +FmtByte = 4096; +FmtFlag = 8192; +sizeofTm = 40; +aggr Tm +{ + 'D' 0 sec; + 'D' 4 min; + 'D' 8 hour; + 'D' 12 mday; + 'D' 16 mon; + 'D' 20 year; + 'D' 24 wday; + 'D' 28 yday; + 'a' 32 zone; + 'D' 36 tzoff; +}; + +defn +Tm(addr) { + complex Tm addr; + print(" sec ", addr.sec, "\n"); + print(" min ", addr.min, "\n"); + print(" hour ", addr.hour, "\n"); + print(" mday ", addr.mday, "\n"); + print(" mon ", addr.mon, "\n"); + print(" year ", addr.year, "\n"); + print(" wday ", addr.wday, "\n"); + print(" yday ", addr.yday, "\n"); + print(" zone ", addr.zone, "\n"); + print(" tzoff ", addr.tzoff, "\n"); +}; + +PNPROC = 1; +PNGROUP = 2; +Profoff = 0; +Profuser = 1; +Profkernel = 2; +Proftime = 3; +Profsample = 4; +sizeofLock = 4; +aggr Lock +{ + 'D' 0 val; +}; + +defn +Lock(addr) { + complex Lock addr; + print(" val ", addr.val, "\n"); +}; + +sizeofQLp = 12; +aggr QLp +{ + 'D' 0 inuse; + 'A' QLp 4 next; + 'C' 8 state; +}; + +defn +QLp(addr) { + complex QLp addr; + print(" inuse ", addr.inuse, "\n"); + print(" next ", addr.next\X, "\n"); + print(" state ", addr.state, "\n"); +}; + +sizeofQLock = 16; +aggr QLock +{ + Lock 0 lock; + 'D' 4 locked; + 'A' QLp 8 $head; + 'A' QLp 12 $tail; +}; + +defn +QLock(addr) { + complex QLock addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" locked ", addr.locked, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofRWLock = 20; +aggr RWLock +{ + Lock 0 lock; + 'D' 4 readers; + 'D' 8 writer; + 'A' QLp 12 $head; + 'A' QLp 16 $tail; +}; + +defn +RWLock(addr) { + complex RWLock addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" readers ", addr.readers, "\n"); + print(" writer ", addr.writer, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofRendez = 12; +aggr Rendez +{ + 'A' QLock 0 l; + 'A' QLp 4 $head; + 'A' QLp 8 $tail; +}; + +defn +Rendez(addr) { + complex Rendez addr; + print(" l ", addr.l\X, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofNetConnInfo = 36; +aggr NetConnInfo +{ + 'X' 0 dir; + 'X' 4 root; + 'X' 8 spec; + 'X' 12 lsys; + 'X' 16 lserv; + 'X' 20 rsys; + 'X' 24 rserv; + 'X' 28 laddr; + 'X' 32 raddr; +}; + +defn +NetConnInfo(addr) { + complex NetConnInfo addr; + print(" dir ", addr.dir\X, "\n"); + print(" root ", addr.root\X, "\n"); + print(" spec ", addr.spec\X, "\n"); + print(" lsys ", addr.lsys\X, "\n"); + print(" lserv ", addr.lserv\X, "\n"); + print(" rsys ", addr.rsys\X, "\n"); + print(" rserv ", addr.rserv\X, "\n"); + print(" laddr ", addr.laddr\X, "\n"); + print(" raddr ", addr.raddr\X, "\n"); +}; + +RFNAMEG = 1; +RFENVG = 2; +RFFDG = 4; +RFNOTEG = 8; +RFPROC = 16; +RFMEM = 32; +RFNOWAIT = 64; +RFCNAMEG = 1024; +RFCENVG = 2048; +RFCFDG = 4096; +RFREND = 8192; +RFNOMNT = 16384; +sizeofQid = 16; +aggr Qid +{ + 'W' 0 path; + 'U' 8 vers; + 'b' 12 type; +}; + +defn +Qid(addr) { + complex Qid addr; + print(" path ", addr.path, "\n"); + print(" vers ", addr.vers, "\n"); + print(" type ", addr.type, "\n"); +}; + +sizeofDir = 60; +aggr Dir +{ + 'u' 0 type; + 'U' 4 dev; + Qid 8 qid; + 'U' 24 mode; + 'U' 28 atime; + 'U' 32 mtime; + 'V' 36 length; + 'X' 44 name; + 'X' 48 uid; + 'X' 52 gid; + 'X' 56 muid; +}; + +defn +Dir(addr) { + complex Dir addr; + print(" type ", addr.type, "\n"); + print(" dev ", addr.dev, "\n"); + print("Qid qid {\n"); + Qid(addr.qid); + print("}\n"); + print(" mode ", addr.mode, "\n"); + print(" atime ", addr.atime, "\n"); + print(" mtime ", addr.mtime, "\n"); + print(" length ", addr.length, "\n"); + print(" name ", addr.name\X, "\n"); + print(" uid ", addr.uid\X, "\n"); + print(" gid ", addr.gid\X, "\n"); + print(" muid ", addr.muid\X, "\n"); +}; + +sizeofWaitmsg = 20; +aggr Waitmsg +{ + 'D' 0 pid; + 'a' 4 time; + 'X' 16 msg; +}; + +defn +Waitmsg(addr) { + complex Waitmsg addr; + print(" pid ", addr.pid, "\n"); + print(" time ", addr.time, "\n"); + print(" msg ", addr.msg\X, "\n"); +}; + +sizeofIOchunk = 8; +aggr IOchunk +{ + 'X' 0 addr; + 'U' 4 len; +}; + +defn +IOchunk(addr) { + complex IOchunk addr; + print(" addr ", addr.addr\X, "\n"); + print(" len ", addr.len, "\n"); +}; + +Nqwds = 2; +Nqshift = 5; +Nqmask = -1; +Nqbits = 64; +sizeofChannel = 36; +aggr Channel +{ + 'D' 0 s; + 'U' 4 f; + 'U' 8 n; + 'D' 12 e; + 'D' 16 freed; + 'X' 20 qentry; + 'D' 24 nentry; + 'D' 28 closed; + 'a' 32 v; +}; + +defn +Channel(addr) { + complex Channel addr; + print(" s ", addr.s, "\n"); + print(" f ", addr.f, "\n"); + print(" n ", addr.n, "\n"); + print(" e ", addr.e, "\n"); + print(" freed ", addr.freed, "\n"); + print(" qentry ", addr.qentry\X, "\n"); + print(" nentry ", addr.nentry, "\n"); + print(" closed ", addr.closed, "\n"); + print(" v ", addr.v, "\n"); +}; + +CHANEND = 0; +CHANSND = 1; +CHANRCV = 2; +CHANNOP = 3; +CHANNOBLK = 4; +sizeofAlt = 24; +aggr Alt +{ + 'A' Channel 0 c; + 'X' 4 v; + 'D' 8 op; + 'X' 12 err; + 'A' Channel 16 tag; + 'D' 20 entryno; +}; + +defn +Alt(addr) { + complex Alt addr; + print(" c ", addr.c\X, "\n"); + print(" v ", addr.v\X, "\n"); + print(" op ", addr.op, "\n"); + print(" err ", addr.err\X, "\n"); + print(" tag ", addr.tag\X, "\n"); + print(" entryno ", addr.entryno, "\n"); +}; + +sizeofRef = 4; +aggr Ref +{ + 'D' 0 ref; +}; + +defn +Ref(addr) { + complex Ref addr; + print(" ref ", addr.ref, "\n"); +}; + +Dead = 0; +Running = 1; +Ready = 2; +Rendezvous = 3; +Channone = 0; +Chanalt = 1; +Chansend = 2; +Chanrecv = 3; +RENDHASH = 13; +Printsize = 2048; +NPRIV = 8; +sizeofRgrp = 56; +aggr Rgrp +{ + Lock 0 lock; + 'a' 4 hash; +}; + +defn +Rgrp(addr) { + complex Rgrp addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" hash ", addr.hash, "\n"); +}; + +sizeofTqueue = 12; +aggr Tqueue +{ + 'D' 0 asleep; + 'X' 4 $head; + 'X' 8 $tail; +}; + +defn +Tqueue(addr) { + complex Tqueue addr; + print(" asleep ", addr.asleep, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofThread = 120; +aggr Thread +{ + Lock 0 lock; + 'a' 4 sched; + 'D' 12 id; + 'D' 16 grp; + 'D' 20 moribund; + 'D' 24 state; + 'D' 28 nextstate; + 'X' 32 stk; + 'U' 36 stksize; + 'A' Thread 40 next; + 'X' 44 proc; + 'A' Thread 48 nextt; + 'D' 52 ret; + 'X' 56 cmdname; + 'D' 60 inrendez; + 'A' Thread 64 rendhash; + 'X' 68 rendtag; + 'X' 72 rendval; + 'D' 76 rendbreak; + 'D' 80 chan; + 'A' Alt 84 alt; + 'a' 88 udata; +}; + +defn +Thread(addr) { + complex Thread addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" sched ", addr.sched, "\n"); + print(" id ", addr.id, "\n"); + print(" grp ", addr.grp, "\n"); + print(" moribund ", addr.moribund, "\n"); + print(" state ", addr.state, "\n"); + print(" nextstate ", addr.nextstate, "\n"); + print(" stk ", addr.stk\X, "\n"); + print(" stksize ", addr.stksize, "\n"); + print(" next ", addr.next\X, "\n"); + print(" proc ", addr.proc\X, "\n"); + print(" nextt ", addr.nextt\X, "\n"); + print(" ret ", addr.ret, "\n"); + print(" cmdname ", addr.cmdname\X, "\n"); + print(" inrendez ", addr.inrendez, "\n"); + print(" rendhash ", addr.rendhash\X, "\n"); + print(" rendtag ", addr.rendtag\X, "\n"); + print(" rendval ", addr.rendval\X, "\n"); + print(" rendbreak ", addr.rendbreak, "\n"); + print(" chan ", addr.chan, "\n"); + print(" alt ", addr.alt\X, "\n"); + print(" udata ", addr.udata, "\n"); +}; + +sizeofExecargs = 16; +aggr Execargs +{ + 'X' 0 prog; + 'X' 4 args; + 'a' 8 fd; +}; + +defn +Execargs(addr) { + complex Execargs addr; + print(" prog ", addr.prog\X, "\n"); + print(" args ", addr.args\X, "\n"); + print(" fd ", addr.fd, "\n"); +}; + +sizeofProc = 2424; +aggr Proc +{ + Lock 0 lock; + 'a' 4 sched; + 'D' 12 pid; + 'D' 16 splhi; + 'A' Thread 20 thread; + 'D' 24 needexec; + Execargs 28 exec; + 'A' Proc 44 newproc; + 'a' 48 exitstr; + 'D' 176 rforkflag; + 'D' 180 nthreads; + Tqueue 184 threads; + Tqueue 196 ready; + Lock 208 readylock; + 'a' 212 printbuf; + 'D' 2260 blocked; + 'D' 2264 pending; + 'D' 2268 nonotes; + 'U' 2272 nextID; + 'A' Proc 2276 next; + 'X' 2280 arg; + 'a' 2284 str; + 'X' 2412 wdata; + 'X' 2416 udata; + 'C' 2420 threadint; +}; + +defn +Proc(addr) { + complex Proc addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" sched ", addr.sched, "\n"); + print(" pid ", addr.pid, "\n"); + print(" splhi ", addr.splhi, "\n"); + print(" thread ", addr.thread\X, "\n"); + print(" needexec ", addr.needexec, "\n"); + print("Execargs exec {\n"); + Execargs(addr.exec); + print("}\n"); + print(" newproc ", addr.newproc\X, "\n"); + print(" exitstr ", addr.exitstr, "\n"); + print(" rforkflag ", addr.rforkflag, "\n"); + print(" nthreads ", addr.nthreads, "\n"); + print("Tqueue threads {\n"); + Tqueue(addr.threads); + print("}\n"); + print("Tqueue ready {\n"); + Tqueue(addr.ready); + print("}\n"); + print("Lock readylock {\n"); + Lock(addr.readylock); + print("}\n"); + print(" printbuf ", addr.printbuf, "\n"); + print(" blocked ", addr.blocked, "\n"); + print(" pending ", addr.pending, "\n"); + print(" nonotes ", addr.nonotes, "\n"); + print(" nextID ", addr.nextID, "\n"); + print(" next ", addr.next\X, "\n"); + print(" arg ", addr.arg\X, "\n"); + print(" str ", addr.str, "\n"); + print(" wdata ", addr.wdata\X, "\n"); + print(" udata ", addr.udata\X, "\n"); + print(" threadint ", addr.threadint, "\n"); +}; + +sizeofPqueue = 12; +aggr Pqueue +{ + Lock 0 lock; + 'A' Proc 4 $head; + 'A' Proc 8 $tail; +}; + +defn +Pqueue(addr) { + complex Pqueue addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofIoproc = 160; +aggr Ioproc +{ + 'D' 0 tid; + 'A' Channel 4 c; + 'A' Channel 8 creply; + 'D' 12 inuse; + 'X' 16 op; + 'X' 20 arg; + 'D' 24 ret; + 'a' 28 err; + 'A' Ioproc 156 next; +}; + +defn +Ioproc(addr) { + complex Ioproc addr; + print(" tid ", addr.tid, "\n"); + print(" c ", addr.c\X, "\n"); + print(" creply ", addr.creply\X, "\n"); + print(" inuse ", addr.inuse, "\n"); + print(" op ", addr.op\X, "\n"); + print(" arg ", addr.arg\X, "\n"); + print(" ret ", addr.ret, "\n"); + print(" err ", addr.err, "\n"); + print(" next ", addr.next\X, "\n"); +}; + +complex Pqueue _threadpq; +complex Channel _threadwaitchan; +complex Rgrp _threadrgrp; +CHANCLOSD = 12649965; +complex Lock chanlock; +complex Channel _chanfree:c; +complex Channel chanfree:c; +complex Channel chaninit:c; +complex Channel chancreate:c; +complex Channel isopenfor:c; +complex Alt alt:alts; +complex Alt alt:a; +complex Alt alt:xa; +complex Alt alt:ca; +complex Channel alt:c; +complex Thread alt:t; +complex Channel chanclose:c; +complex Alt chanclose:a; +complex Channel chanclosing:c; +complex Channel runop:c; +complex Channel recv:c; +complex Channel nbrecv:c; +complex Channel send:c; +complex Channel nbsend:c; +complex Channel channelsize:c; +complex Channel sendul:c; +complex Channel recvul:c; +complex Channel sendp:c; +complex Channel recvp:c; +complex Channel nbsendul:c; +complex Channel nbrecvul:c; +complex Channel nbsendp:c; +complex Channel nbrecvp:c; +complex Channel emptyentry:c; +complex Alt enqueue:a; +complex Channel enqueue:c; +complex Alt dequeue:a; +complex Channel dequeue:c; +complex Alt canexec:a; +complex Channel canexec:c; +complex Alt altexecbuffered:a; +complex Channel altexecbuffered:c; +complex Alt altexec:a; +complex Alt altexec:b; +complex Channel altexec:c; diff --git a/sys/src/libthread/channel.c b/sys/src/libthread/channel.c new file mode 100755 index 000000000..54bd5373d --- /dev/null +++ b/sys/src/libthread/channel.c @@ -0,0 +1,613 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* Value to indicate the channel is closed */ +enum { + CHANCLOSD = 0xc105ed, +}; + +static char errcl[] = "channel was closed"; +static Lock chanlock; /* central channel access lock */ + +static void enqueue(Alt*, Channel**); +static void dequeue(Alt*); +static int canexec(Alt*); +static int altexec(Alt*, int); + +#define Closed ((void*)CHANCLOSD) +#define Intred ((void*)~0) /* interrupted */ + +static void +_chanfree(Channel *c) +{ + int i, inuse; + + if(c->closed == 1) /* chanclose is ongoing */ + inuse = 1; + else{ + inuse = 0; + for(i = 0; i < c->nentry; i++) /* alt ongoing */ + if(c->qentry[i]) + inuse = 1; + } + if(inuse) + c->freed = 1; + else{ + if(c->qentry) + free(c->qentry); + free(c); + } +} + +void +chanfree(Channel *c) +{ + lock(&chanlock); + _chanfree(c); + unlock(&chanlock); +} + +int +chaninit(Channel *c, int elemsize, int elemcnt) +{ + if(elemcnt < 0 || elemsize <= 0 || c == nil) + return -1; + c->f = 0; + c->n = 0; + c->closed = 0; + c->freed = 0; + c->e = elemsize; + c->s = elemcnt; + _threaddebug(DBGCHAN, "chaninit %p", c); + return 1; +} + +Channel* +chancreate(int elemsize, int elemcnt) +{ + Channel *c; + + if(elemcnt < 0 || elemsize <= 0) + return nil; + c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); + c->e = elemsize; + c->s = elemcnt; + _threaddebug(DBGCHAN, "chancreate %p", c); + return c; +} + +static int +isopenfor(Channel *c, int op) +{ + return c->closed == 0 || (op == CHANRCV && c->n > 0); +} + +int +alt(Alt *alts) +{ + Alt *a, *xa, *ca; + Channel volatile *c; + int n, s, waiting, allreadycl; + void* r; + Thread *t; + + /* + * The point of going splhi here is that note handlers + * might reasonably want to use channel operations, + * but that will hang if the note comes while we hold the + * chanlock. Instead, we delay the note until we've dropped + * the lock. + */ + t = _threadgetproc()->thread; + if(t->moribund || _threadexitsallstatus) + yield(); /* won't return */ + s = _procsplhi(); + lock(&chanlock); + t->alt = alts; + t->chan = Chanalt; + + /* test whether any channels can proceed */ + n = 0; + a = nil; + + for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ + xa->entryno = -1; + if(xa->op == CHANNOP) + continue; + + c = xa->c; + if(c==nil){ + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + return -1; + } + + if(isopenfor(c, xa->op) && canexec(xa)) + if(nrand(++n) == 0) + a = xa; + } + + + if(a==nil){ + /* nothing can proceed */ + if(xa->op == CHANNOBLK){ + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + if(xa->op == CHANNOBLK) + return xa - alts; + } + + /* enqueue on all channels open for us. */ + c = nil; + ca = nil; + waiting = 0; + allreadycl = 0; + for(xa=alts; xa->op!=CHANEND; xa++) + if(xa->op==CHANNOP) + continue; + else if(isopenfor(xa->c, xa->op)){ + waiting = 1; + enqueue(xa, &c); + } else if(xa->err != errcl) + ca = xa; + else + allreadycl = 1; + + if(waiting == 0) + if(ca != nil){ + /* everything was closed, select last channel */ + ca->err = errcl; + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + return ca - alts; + } else if(allreadycl){ + /* everything was already closed */ + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + return -1; + } + /* + * wait for successful rendezvous. + * we can't just give up if the rendezvous + * is interrupted -- someone else might come + * along and try to rendezvous with us, so + * we need to be here. + * if the channel was closed, the op is done + * and we flag an error for the entry. + */ + Again: + unlock(&chanlock); + _procsplx(s); + r = _threadrendezvous(&c, 0); + s = _procsplhi(); + lock(&chanlock); + + if(r==Intred){ /* interrupted */ + if(c!=nil) /* someone will meet us; go back */ + goto Again; + c = (Channel*)~0; /* so no one tries to meet us */ + } + + /* dequeue from channels, find selected one */ + a = nil; + for(xa=alts; xa->op!=CHANEND; xa++){ + if(xa->op==CHANNOP) + continue; + if(xa->c == c){ + a = xa; + a->err = nil; + if(r == Closed) + a->err = errcl; + } + dequeue(xa); + } + unlock(&chanlock); + _procsplx(s); + if(a == nil){ /* we were interrupted */ + assert(c==(Channel*)~0); + return -1; + } + }else + altexec(a, s); /* unlocks chanlock, does splx */ + _sched(); + t->chan = Channone; + return a - alts; +} + +int +chanclose(Channel *c) +{ + Alt *a; + int i, s; + + s = _procsplhi(); /* note handlers; see :/^alt */ + lock(&chanlock); + if(c->closed){ + /* Already close; we fail but it's ok. don't print */ + unlock(&chanlock); + _procsplx(s); + return -1; + } + c->closed = 1; /* Being closed */ + /* + * Locate entries that will fail due to close + * (send, and receive if nothing buffered) and wake them up. + * the situation cannot change because all queries + * should be committed by now and new ones will find the channel + * closed. We still need to take the lock during the iteration + * because we can wake threads on qentrys we have not seen yet + * as in alt and there would be a race in the access to *a. + */ + for(i = 0; i < c->nentry; i++){ + if((a = c->qentry[i]) == nil || *a->tag != nil) + continue; + + if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0)) + continue; + *a->tag = c; + unlock(&chanlock); + _procsplx(s); + while(_threadrendezvous(a->tag, Closed) == Intred) + ; + s = _procsplhi(); + lock(&chanlock); + } + + c->closed = 2; /* Fully closed */ + if(c->freed) + _chanfree(c); + unlock(&chanlock); + _procsplx(s); + return 0; +} + +int +chanclosing(Channel *c) +{ + int n, s; + + s = _procsplhi(); /* note handlers; see :/^alt */ + lock(&chanlock); + if(c->closed == 0) + n = -1; + else + n = c->n; + unlock(&chanlock); + _procsplx(s); + return n; +} + +/* + * superseded by chanclosing +int +chanisclosed(Channel *c) +{ + return chanisclosing(c) >= 0; +} + */ + +static int +runop(int op, Channel *c, void *v, int nb) +{ + int r; + Alt a[2]; + + /* + * we could do this without calling alt, + * but the only reason would be performance, + * and i'm not convinced it matters. + */ + a[0].op = op; + a[0].c = c; + a[0].v = v; + a[0].err = nil; + a[1].op = CHANEND; + if(nb) + a[1].op = CHANNOBLK; + switch(r=alt(a)){ + case -1: /* interrupted */ + return -1; + case 1: /* nonblocking, didn't accomplish anything */ + assert(nb); + return 0; + case 0: + /* + * Okay, but return -1 if the op is done because of a close. + */ + if(a[0].err != nil) + return -1; + return 1; + default: + fprint(2, "ERROR: channel alt returned %d\n", r); + abort(); + return -1; + } +} + +int +recv(Channel *c, void *v) +{ + return runop(CHANRCV, c, v, 0); +} + +int +nbrecv(Channel *c, void *v) +{ + return runop(CHANRCV, c, v, 1); +} + +int +send(Channel *c, void *v) +{ + return runop(CHANSND, c, v, 0); +} + +int +nbsend(Channel *c, void *v) +{ + return runop(CHANSND, c, v, 1); +} + +static void +channelsize(Channel *c, int sz) +{ + if(c->e != sz){ + fprint(2, "expected channel with elements of size %d, got size %d\n", + sz, c->e); + abort(); + } +} + +int +sendul(Channel *c, ulong v) +{ + channelsize(c, sizeof(ulong)); + return send(c, &v); +} + +ulong +recvul(Channel *c) +{ + ulong v; + + channelsize(c, sizeof(ulong)); + if(recv(c, &v) < 0) + return ~0; + return v; +} + +int +sendp(Channel *c, void *v) +{ + channelsize(c, sizeof(void*)); + return send(c, &v); +} + +void* +recvp(Channel *c) +{ + void *v; + + channelsize(c, sizeof(void*)); + if(recv(c, &v) < 0) + return nil; + return v; +} + +int +nbsendul(Channel *c, ulong v) +{ + channelsize(c, sizeof(ulong)); + return nbsend(c, &v); +} + +ulong +nbrecvul(Channel *c) +{ + ulong v; + + channelsize(c, sizeof(ulong)); + if(nbrecv(c, &v) == 0) + return 0; + return v; +} + +int +nbsendp(Channel *c, void *v) +{ + channelsize(c, sizeof(void*)); + return nbsend(c, &v); +} + +void* +nbrecvp(Channel *c) +{ + void *v; + + channelsize(c, sizeof(void*)); + if(nbrecv(c, &v) == 0) + return nil; + return v; +} + +static int +emptyentry(Channel *c) +{ + int i, extra; + + assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); + + for(i=0; i<c->nentry; i++) + if(c->qentry[i]==nil) + return i; + + extra = 16; + c->nentry += extra; + c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); + if(c->qentry == nil) + sysfatal("realloc channel entries: %r"); + memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); + return i; +} + +static void +enqueue(Alt *a, Channel **c) +{ + int i; + + _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c); + a->tag = c; + i = emptyentry(a->c); + a->c->qentry[i] = a; +} + +static void +dequeue(Alt *a) +{ + int i; + Channel *c; + + c = a->c; + for(i=0; i<c->nentry; i++) + if(c->qentry[i]==a){ + _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); + c->qentry[i] = nil; + /* release if freed and not closing */ + if(c->freed && c->closed != 1) + _chanfree(c); + return; + } +} + +static int +canexec(Alt *a) +{ + int i, otherop; + Channel *c; + + c = a->c; + /* are there senders or receivers blocked? */ + otherop = (CHANSND+CHANRCV) - a->op; + for(i=0; i<c->nentry; i++) + if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ + _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); + return 1; + } + + /* is there room in the channel? */ + if((a->op==CHANSND && c->n < c->s) + || (a->op==CHANRCV && c->n > 0)){ + _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); + return 1; + } + + return 0; +} + +static void* +altexecbuffered(Alt *a, int willreplace) +{ + uchar *v; + Channel *c; + + c = a->c; + /* use buffered channel queue */ + if(a->op==CHANRCV && c->n > 0){ + _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); + v = c->v + c->e*(c->f%c->s); + if(!willreplace) + c->n--; + c->f++; + return v; + } + if(a->op==CHANSND && c->n < c->s){ + _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); + v = c->v + c->e*((c->f+c->n)%c->s); + if(!willreplace) + c->n++; + return v; + } + abort(); + return nil; +} + +static void +altcopy(void *dst, void *src, int sz) +{ + if(dst){ + if(src) + memmove(dst, src, sz); + else + memset(dst, 0, sz); + } +} + +static int +altexec(Alt *a, int spl) +{ + volatile Alt *b; + int i, n, otherop; + Channel *c; + void *me, *waiter, *buf; + + c = a->c; + + /* rendezvous with others */ + otherop = (CHANSND+CHANRCV) - a->op; + n = 0; + b = nil; + me = a->v; + for(i=0; i<c->nentry; i++) + if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) + if(nrand(++n) == 0) + b = c->qentry[i]; + if(b != nil){ + _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); + waiter = b->v; + if(c->s && c->n){ + /* + * if buffer is full and there are waiters + * and we're meeting a waiter, + * we must be receiving. + * + * we use the value in the channel buffer, + * copy the waiter's value into the channel buffer + * on behalf of the waiter, and then wake the waiter. + */ + if(a->op!=CHANRCV) + abort(); + buf = altexecbuffered(a, 1); + altcopy(me, buf, c->e); + altcopy(buf, waiter, c->e); + }else{ + if(a->op==CHANRCV) + altcopy(me, waiter, c->e); + else + altcopy(waiter, me, c->e); + } + *b->tag = c; /* commits us to rendezvous */ + _threaddebug(DBGCHAN, "unlocking the chanlock"); + unlock(&chanlock); + _procsplx(spl); + _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); + while(_threadrendezvous(b->tag, 0) == Intred) + ; + return 1; + } + + buf = altexecbuffered(a, 0); + if(a->op==CHANRCV) + altcopy(me, buf, c->e); + else + altcopy(buf, me, c->e); + + unlock(&chanlock); + _procsplx(spl); + return 1; +} diff --git a/sys/src/libthread/chanprint.c b/sys/src/libthread/chanprint.c new file mode 100755 index 000000000..781c6f6fa --- /dev/null +++ b/sys/src/libthread/chanprint.c @@ -0,0 +1,20 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> + +int +chanprint(Channel *c, char *fmt, ...) +{ + va_list arg; + char *p; + int n; + + va_start(arg, fmt); + p = vsmprint(fmt, arg); + va_end(arg); + if(p == nil) + sysfatal("vsmprint failed: %r"); + n = sendp(c, p); + yield(); /* let recipient handle message immediately */ + return n; +} diff --git a/sys/src/libthread/create.c b/sys/src/libthread/create.c new file mode 100755 index 000000000..c1564af35 --- /dev/null +++ b/sys/src/libthread/create.c @@ -0,0 +1,152 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +Pqueue _threadpq; + +static int +nextID(void) +{ + static Lock l; + static int id; + int i; + + lock(&l); + i = ++id; + unlock(&l); + return i; +} + +/* + * Create and initialize a new Thread structure attached to a given proc. + */ +static int +newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp) +{ + int id; + Thread *t; + + if(stacksize < 32) + sysfatal("bad stacksize %d", stacksize); + t = _threadmalloc(sizeof(Thread), 1); + t->stksize = stacksize; + t->stk = _threadmalloc(stacksize, 0); + memset(t->stk, 0xFE, stacksize); + _threadinitstack(t, f, arg); + t->grp = grp; + if(name) + t->cmdname = strdup(name); + t->id = nextID(); + id = t->id; + t->next = (Thread*)~0; + t->proc = p; + _threaddebug(DBGSCHED, "create thread %d.%d name %s", p->pid, t->id, name); + lock(&p->lock); + p->nthreads++; + if(p->threads.head == nil) + p->threads.head = t; + else + *p->threads.tail = t; + p->threads.tail = &t->nextt; + t->nextt = nil; + t->state = Ready; + _threadready(t); + unlock(&p->lock); + return id; +} + +/* + * Create a new thread and schedule it to run. + * The thread grp is inherited from the currently running thread. + */ +int +threadcreate(void (*f)(void *arg), void *arg, uint stacksize) +{ + return newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp()); +} + +/* + * Create and initialize a new Proc structure with a single Thread + * running inside it. Add the Proc to the global process list. + */ +Proc* +_newproc(void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp, int rforkflag) +{ + Proc *p; + + p = _threadmalloc(sizeof *p, 1); + p->pid = -1; + p->rforkflag = rforkflag; + newthread(p, f, arg, stacksize, name, grp); + + lock(&_threadpq.lock); + if(_threadpq.head == nil) + _threadpq.head = p; + else + *_threadpq.tail = p; + _threadpq.tail = &p->next; + unlock(&_threadpq.lock); + return p; +} + +int +procrfork(void (*f)(void *), void *arg, uint stacksize, int rforkflag) +{ + Proc *p; + int id; + + p = _threadgetproc(); + assert(p->newproc == nil); + p->newproc = _newproc(f, arg, stacksize, nil, p->thread->grp, rforkflag); + id = p->newproc->threads.head->id; + _sched(); + return id; +} + +int +proccreate(void (*f)(void*), void *arg, uint stacksize) +{ + return procrfork(f, arg, stacksize, 0); +} + +void +_freeproc(Proc *p) +{ + Thread *t, *nextt; + + for(t = p->threads.head; t; t = nextt){ + if(t->cmdname) + free(t->cmdname); + assert(t->stk != nil); + free(t->stk); + nextt = t->nextt; + free(t); + } + free(p); +} + +void +_freethread(Thread *t) +{ + Proc *p; + Thread **l; + + p = t->proc; + lock(&p->lock); + for(l=&p->threads.head; *l; l=&(*l)->nextt){ + if(*l == t){ + *l = t->nextt; + if(*l == nil) + p->threads.tail = l; + break; + } + } + unlock(&p->lock); + if (t->cmdname) + free(t->cmdname); + assert(t->stk != nil); + free(t->stk); + free(t); +} + diff --git a/sys/src/libthread/debug.c b/sys/src/libthread/debug.c new file mode 100755 index 000000000..183869d18 --- /dev/null +++ b/sys/src/libthread/debug.c @@ -0,0 +1,51 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +int _threaddebuglevel; + +void +_threaddebug(ulong flag, char *fmt, ...) +{ + char buf[128]; + va_list arg; + Fmt f; + Proc *p; + + if((_threaddebuglevel&flag) == 0) + return; + + fmtfdinit(&f, 2, buf, sizeof buf); + + p = _threadgetproc(); + if(p==nil) + fmtprint(&f, "noproc "); + else if(p->thread) + fmtprint(&f, "%d.%d ", p->pid, p->thread->id); + else + fmtprint(&f, "%d._ ", p->pid); + + va_start(arg, fmt); + fmtvprint(&f, fmt, arg); + va_end(arg); + fmtprint(&f, "\n"); + fmtfdflush(&f); +} + +void +_threadassert(char *s) +{ + char buf[256]; + int n; + Proc *p; + + p = _threadgetproc(); + if(p && p->thread) + n = sprint(buf, "%d.%d ", p->pid, p->thread->id); + else + n = 0; + snprint(buf+n, sizeof(buf)-n, "%s: assertion failed\n", s); + write(2, buf, strlen(buf)); + abort(); +} diff --git a/sys/src/libthread/dial.c b/sys/src/libthread/dial.c new file mode 100755 index 000000000..d2ca013da --- /dev/null +++ b/sys/src/libthread/dial.c @@ -0,0 +1,221 @@ +/* + * old single-process version of dial that libthread can cope with + */ +#include <u.h> +#include <libc.h> + +typedef struct DS DS; + +static int call(char*, char*, DS*); +static int csdial(DS*); +static void _dial_string_parse(char*, DS*); + +enum +{ + Maxstring = 128, + Maxpath = 256, +}; + +struct DS { + /* dist string */ + char buf[Maxstring]; + char *netdir; + char *proto; + char *rem; + + /* other args */ + char *local; + char *dir; + int *cfdp; +}; + + +/* + * the dialstring is of the form '[/net/]proto!dest' + */ +int +_threaddial(char *dest, char *local, char *dir, int *cfdp) +{ + DS ds; + int rv; + char err[ERRMAX], alterr[ERRMAX]; + + ds.local = local; + ds.dir = dir; + ds.cfdp = cfdp; + + _dial_string_parse(dest, &ds); + if(ds.netdir) + return csdial(&ds); + + ds.netdir = "/net"; + rv = csdial(&ds); + if(rv >= 0) + return rv; + err[0] = '\0'; + errstr(err, sizeof err); + if(strstr(err, "refused") != 0){ + werrstr("%s", err); + return rv; + } + ds.netdir = "/net.alt"; + rv = csdial(&ds); + if(rv >= 0) + return rv; + + alterr[0] = 0; + errstr(alterr, sizeof alterr); + if(strstr(alterr, "translate") || strstr(alterr, "does not exist")) + werrstr("%s", err); + else + werrstr("%s", alterr); + return rv; +} + +static int +csdial(DS *ds) +{ + int n, fd, rv; + char *p, buf[Maxstring], clone[Maxpath], err[ERRMAX], besterr[ERRMAX]; + + /* + * open connection server + */ + snprint(buf, sizeof(buf), "%s/cs", ds->netdir); + fd = open(buf, ORDWR); + if(fd < 0){ + /* no connection server, don't translate */ + snprint(clone, sizeof(clone), "%s/%s/clone", ds->netdir, ds->proto); + return call(clone, ds->rem, ds); + } + + /* + * ask connection server to translate + */ + snprint(buf, sizeof(buf), "%s!%s", ds->proto, ds->rem); + if(write(fd, buf, strlen(buf)) < 0){ + close(fd); + return -1; + } + + /* + * loop through each address from the connection server till + * we get one that works. + */ + *besterr = 0; + rv = -1; + seek(fd, 0, 0); + while((n = read(fd, buf, sizeof(buf) - 1)) > 0){ + buf[n] = 0; + p = strchr(buf, ' '); + if(p == 0) + continue; + *p++ = 0; + rv = call(buf, p, ds); + if(rv >= 0) + break; + err[0] = '\0'; + errstr(err, sizeof err); + if(strstr(err, "does not exist") == 0) + strcpy(besterr, err); + } + close(fd); + + if(rv < 0 && *besterr) + werrstr("%s", besterr); + else + werrstr("%s", err); + return rv; +} + +static int +call(char *clone, char *dest, DS *ds) +{ + int fd, cfd, n; + char cname[Maxpath], name[Maxpath], data[Maxpath], *p; + + /* because cs is in a different name space, replace the mount point */ + if(*clone == '/'){ + p = strchr(clone+1, '/'); + if(p == nil) + p = clone; + else + p++; + } else + p = clone; + snprint(cname, sizeof cname, "%s/%s", ds->netdir, p); + + cfd = open(cname, ORDWR); + if(cfd < 0) + return -1; + + /* get directory name */ + n = read(cfd, name, sizeof(name)-1); + if(n < 0){ + close(cfd); + return -1; + } + name[n] = 0; + for(p = name; *p == ' '; p++) + ; + snprint(name, sizeof(name), "%ld", strtoul(p, 0, 0)); + p = strrchr(cname, '/'); + *p = 0; + if(ds->dir) + snprint(ds->dir, NETPATHLEN, "%s/%s", cname, name); + snprint(data, sizeof(data), "%s/%s/data", cname, name); + + /* connect */ + if(ds->local) + snprint(name, sizeof(name), "connect %s %s", dest, ds->local); + else + snprint(name, sizeof(name), "connect %s", dest); + if(write(cfd, name, strlen(name)) < 0){ + close(cfd); + return -1; + } + + /* open data connection */ + fd = open(data, ORDWR); + if(fd < 0){ + close(cfd); + return -1; + } + if(ds->cfdp) + *ds->cfdp = cfd; + else + close(cfd); + return fd; +} + +/* + * parse a dial string + */ +static void +_dial_string_parse(char *str, DS *ds) +{ + char *p, *p2; + + strncpy(ds->buf, str, Maxstring); + ds->buf[Maxstring-1] = 0; + + p = strchr(ds->buf, '!'); + if(p == 0) { + ds->netdir = 0; + ds->proto = "net"; + ds->rem = ds->buf; + } else { + if(*ds->buf != '/' && *ds->buf != '#'){ + ds->netdir = 0; + ds->proto = ds->buf; + } else { + for(p2 = p; *p2 != '/'; p2--) + ; + *p2++ = 0; + ds->netdir = ds->buf; + ds->proto = p2; + } + *p = 0; + ds->rem = p + 1; + } +} diff --git a/sys/src/libthread/example.c b/sys/src/libthread/example.c new file mode 100755 index 000000000..dcef548a5 --- /dev/null +++ b/sys/src/libthread/example.c @@ -0,0 +1,85 @@ +/* +Threadmain spawns two subprocesses, one +to read the mouse, and one to receive +timer events. The events are sent via a +channel to the main proc which prints a +word when an event comes in. When mouse +button three is pressed, the application +terminates. +*/ + +#include <u.h> +#include <libc.h> +#include <thread.h> + +enum +{ + STACK = 2048, +}; + +void +mouseproc(void *arg) +{ + char m[48]; + int mfd; + Channel *mc; + + mc = arg; + if((mfd = open("/dev/mouse", OREAD)) < 0) + sysfatal("open /dev/mouse: %r"); + for(;;){ + if(read(mfd, m, sizeof m) != sizeof m) + sysfatal("eof"); + if(atoi(m+1+2*12)&4) + sysfatal("button 3"); + send(mc, m); + } +} + +void +clockproc(void *arg) +{ + int t; + Channel *c; + + c = arg; + for(t=0;; t++){ + sleep(1000); + sendul(c, t); + } +} + + +void +threadmain(int argc, char *argv[]) +{ + char m[48]; + int t; + Alt a[] = { + /* c v op */ + {nil, m, CHANRCV}, + {nil, &t, CHANRCV}, + {nil, nil, CHANEND}, + }; + + /* create mouse event channel and mouse process */ + a[0].c = chancreate(sizeof m, 0); + proccreate(mouseproc, a[0].c, STACK); + + /* create clock event channel and clock process */ + a[1].c = chancreate(sizeof(ulong), 0); /* clock event channel */ + proccreate(clockproc, a[1].c, STACK); + + for(;;){ + switch(alt(a)){ + case 0: /*mouse event */ + fprint(2, "click "); + break; + case 1: /* clock event */ + fprint(2, "tic "); + break; + default: + sysfatal("can't happen"); + } + } +} diff --git a/sys/src/libthread/exec.c b/sys/src/libthread/exec.c new file mode 100755 index 000000000..b93eeb28f --- /dev/null +++ b/sys/src/libthread/exec.c @@ -0,0 +1,80 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +#define PIPEMNT "/mnt/temp" + +void +procexec(Channel *pidc, char *prog, char *args[]) +{ + int n; + Proc *p; + Thread *t; + + _threaddebug(DBGEXEC, "procexec %s", prog); + /* must be only thread in proc */ + p = _threadgetproc(); + t = p->thread; + if(p->threads.head != t || p->threads.head->nextt != nil){ + werrstr("not only thread in proc"); + Bad: + if(pidc) + sendul(pidc, ~0); + return; + } + + /* + * We want procexec to behave like exec; if exec succeeds, + * never return, and if it fails, return with errstr set. + * Unfortunately, the exec happens in another proc since + * we have to wait for the exec'ed process to finish. + * To provide the semantics, we open a pipe with the + * write end close-on-exec and hand it to the proc that + * is doing the exec. If the exec succeeds, the pipe will + * close so that our read below fails. If the exec fails, + * then the proc doing the exec sends the errstr down the + * pipe to us. + */ + if(bind("#|", PIPEMNT, MREPL) < 0) + goto Bad; + if((p->exec.fd[0] = open(PIPEMNT "/data", OREAD)) < 0){ + unmount(nil, PIPEMNT); + goto Bad; + } + if((p->exec.fd[1] = open(PIPEMNT "/data1", OWRITE|OCEXEC)) < 0){ + close(p->exec.fd[0]); + unmount(nil, PIPEMNT); + goto Bad; + } + unmount(nil, PIPEMNT); + + /* exec in parallel via the scheduler */ + assert(p->needexec==0); + p->exec.prog = prog; + p->exec.args = args; + p->needexec = 1; + _sched(); + + close(p->exec.fd[1]); + if((n = read(p->exec.fd[0], p->exitstr, ERRMAX-1)) > 0){ /* exec failed */ + p->exitstr[n] = '\0'; + errstr(p->exitstr, ERRMAX); + close(p->exec.fd[0]); + goto Bad; + } + close(p->exec.fd[0]); + + if(pidc) + sendul(pidc, t->ret); + + /* wait for exec'ed program, then exit */ + _schedexecwait(); +} + +void +procexecl(Channel *pidc, char *f, ...) +{ + procexec(pidc, f, &f+1); +} + diff --git a/sys/src/libthread/exit.c b/sys/src/libthread/exit.c new file mode 100755 index 000000000..d36e11af4 --- /dev/null +++ b/sys/src/libthread/exit.c @@ -0,0 +1,72 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" +#include <tos.h> + +char *_threadexitsallstatus; +Channel *_threadwaitchan; + +void +threadexits(char *exitstr) +{ + Proc *p; + Thread *t; + + p = _threadgetproc(); + t = p->thread; + t->moribund = 1; + if(exitstr==nil) + exitstr=""; + utfecpy(p->exitstr, p->exitstr+ERRMAX, exitstr); + _sched(); +} + +void +threadexitsall(char *exitstr) +{ + Proc *p; + int pid[64]; + int i, npid, mypid; + + if(exitstr == nil) + exitstr = ""; + _threadexitsallstatus = exitstr; + _threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus); + mypid = _tos->pid; //getpid(); + + /* + * signal others. + * copying all the pids first avoids other threads + * teardown procedures getting in the way. + * + * avoid mallocs since malloc can post a note which can + * call threadexitsall... + */ + for(;;){ + lock(&_threadpq.lock); + npid = 0; + for(p = _threadpq.head; p && npid < nelem(pid); p=p->next){ + if(p->threadint == 0 && p->pid != mypid){ + pid[npid++] = p->pid; + p->threadint = 1; + } + } + unlock(&_threadpq.lock); + if(npid == 0) + break; + for(i=0; i<npid; i++) + postnote(PNPROC, pid[i], "threadint"); + } + + /* leave */ + exits(exitstr); +} + +Channel* +threadwaitchan(void) +{ + if(_threadwaitchan==nil) + _threadwaitchan = chancreate(sizeof(Waitmsg*), 16); + return _threadwaitchan; +} diff --git a/sys/src/libthread/id.c b/sys/src/libthread/id.c new file mode 100755 index 000000000..ebb563307 --- /dev/null +++ b/sys/src/libthread/id.c @@ -0,0 +1,139 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" +#include <tos.h> + +int +threadid(void) +{ + return _threadgetproc()->thread->id; +} + +int +threadpid(int id) +{ + int pid; + Proc *p; + Thread *t; + + if (id < 0) + return -1; + if (id == 0) + return _threadgetproc()->pid; + lock(&_threadpq.lock); + for (p = _threadpq.head; p; p = p->next){ + lock(&p->lock); + for (t = p->threads.head; t; t = t->nextt) + if (t->id == id){ + pid = p->pid; + unlock(&p->lock); + unlock(&_threadpq.lock); + return pid; + } + unlock(&p->lock); + } + unlock(&_threadpq.lock); + return -1; +} + +int +threadsetgrp(int ng) +{ + int og; + Thread *t; + + t = _threadgetproc()->thread; + og = t->grp; + t->grp = ng; + return og; +} + +int +threadgetgrp(void) +{ + return _threadgetproc()->thread->grp; +} + +void +threadsetname(char *fmt, ...) +{ + int fd; + char buf[128]; + va_list arg; + Proc *p; + Thread *t; + + p = _threadgetproc(); + t = p->thread; + if (t->cmdname) + free(t->cmdname); + va_start(arg, fmt); + t->cmdname = vsmprint(fmt, arg); + va_end(arg); + if(t->cmdname && p->nthreads == 1){ + snprint(buf, sizeof buf, "#p/%lud/args", _tos->pid); //getpid()); + if((fd = open(buf, OWRITE)) >= 0){ + write(fd, t->cmdname, strlen(t->cmdname)+1); + close(fd); + } + } +} + +char* +threadgetname(void) +{ + return _threadgetproc()->thread->cmdname; +} + +void** +threaddata(void) +{ + return &_threadgetproc()->thread->udata[0]; +} + +void** +_workerdata(void) +{ + return &_threadgetproc()->wdata; +} + +void** +procdata(void) +{ + return &_threadgetproc()->udata; +} + +static Lock privlock; +static int privmask = 1; + +int +tprivalloc(void) +{ + int i; + + lock(&privlock); + for(i=0; i<NPRIV; i++) + if(!(privmask&(1<<i))){ + privmask |= 1<<i; + unlock(&privlock); + return i; + } + unlock(&privlock); + return -1; +} + +void +tprivfree(int i) +{ + if(i < 0 || i >= NPRIV) + abort(); + lock(&privlock); + privmask &= ~(1<<i); +} + +void** +tprivaddr(int i) +{ + return &_threadgetproc()->thread->udata[i]; +} diff --git a/sys/src/libthread/iocall.c b/sys/src/libthread/iocall.c new file mode 100755 index 000000000..4035c40f8 --- /dev/null +++ b/sys/src/libthread/iocall.c @@ -0,0 +1,52 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +long +iocall(Ioproc *io, long (*op)(va_list*), ...) +{ + int ret, inted; + Ioproc *msg; + + if(send(io->c, &io) == -1){ + werrstr("interrupted"); + return -1; + } + assert(!io->inuse); + io->inuse = 1; + io->op = op; + va_start(io->arg, op); + msg = io; + inted = 0; + while(send(io->creply, &msg) == -1){ + msg = nil; + inted = 1; + } + if(inted){ + werrstr("interrupted"); + return -1; + } + + /* + * If we get interrupted, we have to stick around so that + * the IO proc has someone to talk to. Send it an interrupt + * and try again. + */ + inted = 0; + while(recv(io->creply, nil) == -1){ + inted = 1; + iointerrupt(io); + } + USED(inted); + va_end(io->arg); + ret = io->ret; + if(ret < 0) + errstr(io->err, sizeof io->err); + io->inuse = 0; + + /* release resources */ + while(send(io->creply, &io) == -1) + ; + return ret; +} diff --git a/sys/src/libthread/ioclose.c b/sys/src/libthread/ioclose.c new file mode 100755 index 000000000..dffad1b89 --- /dev/null +++ b/sys/src/libthread/ioclose.c @@ -0,0 +1,19 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_ioclose(va_list *arg) +{ + int fd; + + fd = va_arg(*arg, int); + return close(fd); +} + +int +ioclose(Ioproc *io, int fd) +{ + return iocall(io, _ioclose, fd); +} diff --git a/sys/src/libthread/iodial.c b/sys/src/libthread/iodial.c new file mode 100755 index 000000000..c3128954b --- /dev/null +++ b/sys/src/libthread/iodial.c @@ -0,0 +1,24 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_iodial(va_list *arg) +{ + char *addr, *local, *dir; + int *cdfp; + + addr = va_arg(*arg, char*); + local = va_arg(*arg, char*); + dir = va_arg(*arg, char*); + cdfp = va_arg(*arg, int*); + + return dial(addr, local, dir, cdfp); +} + +int +iodial(Ioproc *io, char *addr, char *local, char *dir, int *cdfp) +{ + return iocall(io, _iodial, addr, local, dir, cdfp); +} diff --git a/sys/src/libthread/ioopen.c b/sys/src/libthread/ioopen.c new file mode 100755 index 000000000..9cec70464 --- /dev/null +++ b/sys/src/libthread/ioopen.c @@ -0,0 +1,21 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_ioopen(va_list *arg) +{ + char *path; + int mode; + + path = va_arg(*arg, char*); + mode = va_arg(*arg, int); + return open(path, mode); +} + +int +ioopen(Ioproc *io, char *path, int mode) +{ + return iocall(io, _ioopen, path, mode); +} diff --git a/sys/src/libthread/ioproc.c b/sys/src/libthread/ioproc.c new file mode 100755 index 000000000..e5452ad35 --- /dev/null +++ b/sys/src/libthread/ioproc.c @@ -0,0 +1,77 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +enum +{ + STACK = 8192, +}; + +void +iointerrupt(Ioproc *io) +{ + if(!io->inuse) + return; + threadint(io->tid); +} + +static void +xioproc(void *a) +{ + Ioproc *io, *x; + io = a; + /* + * first recvp acquires the ioproc. + * second tells us that the data is ready. + */ + for(;;){ + while(recv(io->c, &x) == -1) + ; + if(x == 0) /* our cue to leave */ + break; + assert(x == io); + + /* caller is now committed -- even if interrupted he'll return */ + while(recv(io->creply, &x) == -1) + ; + if(x == 0) /* caller backed out */ + continue; + assert(x == io); + + io->ret = io->op(&io->arg); + if(io->ret < 0) + rerrstr(io->err, sizeof io->err); + while(send(io->creply, &io) == -1) + ; + while(recv(io->creply, &x) == -1) + ; + } +} + +Ioproc* +ioproc(void) +{ + Ioproc *io; + + io = mallocz(sizeof(*io), 1); + if(io == nil) + sysfatal("ioproc malloc: %r"); + io->c = chancreate(sizeof(void*), 0); + io->creply = chancreate(sizeof(void*), 0); + io->tid = proccreate(xioproc, io, STACK); + return io; +} + +void +closeioproc(Ioproc *io) +{ + if(io == nil) + return; + iointerrupt(io); + while(send(io->c, 0) == -1) + ; + chanfree(io->c); + chanfree(io->creply); + free(io); +} diff --git a/sys/src/libthread/ioread.c b/sys/src/libthread/ioread.c new file mode 100755 index 000000000..c370cd797 --- /dev/null +++ b/sys/src/libthread/ioread.c @@ -0,0 +1,23 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_ioread(va_list *arg) +{ + int fd; + void *a; + long n; + + fd = va_arg(*arg, int); + a = va_arg(*arg, void*); + n = va_arg(*arg, long); + return read(fd, a, n); +} + +long +ioread(Ioproc *io, int fd, void *a, long n) +{ + return iocall(io, _ioread, fd, a, n); +} diff --git a/sys/src/libthread/ioreadn.c b/sys/src/libthread/ioreadn.c new file mode 100755 index 000000000..4941156af --- /dev/null +++ b/sys/src/libthread/ioreadn.c @@ -0,0 +1,23 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_ioreadn(va_list *arg) +{ + int fd; + void *a; + long n; + + fd = va_arg(*arg, int); + a = va_arg(*arg, void*); + n = va_arg(*arg, long); + return readn(fd, a, n); +} + +long +ioreadn(Ioproc *io, int fd, void *a, long n) +{ + return iocall(io, _ioreadn, fd, a, n); +} diff --git a/sys/src/libthread/iosleep.c b/sys/src/libthread/iosleep.c new file mode 100755 index 000000000..799b5377b --- /dev/null +++ b/sys/src/libthread/iosleep.c @@ -0,0 +1,19 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_iosleep(va_list *arg) +{ + long n; + + n = va_arg(*arg, long); + return sleep(n); +} + +int +iosleep(Ioproc *io, long n) +{ + return iocall(io, _iosleep, n); +} diff --git a/sys/src/libthread/iowrite.c b/sys/src/libthread/iowrite.c new file mode 100755 index 000000000..885776c87 --- /dev/null +++ b/sys/src/libthread/iowrite.c @@ -0,0 +1,23 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long +_iowrite(va_list *arg) +{ + int fd; + void *a; + long n; + + fd = va_arg(*arg, int); + a = va_arg(*arg, void*); + n = va_arg(*arg, long); + return write(fd, a, n); +} + +long +iowrite(Ioproc *io, int fd, void *a, long n) +{ + return iocall(io, _iowrite, fd, a, n); +} diff --git a/sys/src/libthread/kill.c b/sys/src/libthread/kill.c new file mode 100755 index 000000000..46c442066 --- /dev/null +++ b/sys/src/libthread/kill.c @@ -0,0 +1,90 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static void tinterrupt(Proc*, Thread*); + +static void +threadxxxgrp(int grp, int dokill) +{ + Proc *p; + Thread *t; + + lock(&_threadpq.lock); + for(p=_threadpq.head; p; p=p->next){ + lock(&p->lock); + for(t=p->threads.head; t; t=t->nextt) + if(t->grp == grp){ + if(dokill) + t->moribund = 1; + tinterrupt(p, t); + } + unlock(&p->lock); + } + unlock(&_threadpq.lock); + _threadbreakrendez(); +} + +static void +threadxxx(int id, int dokill) +{ + Proc *p; + Thread *t; + + lock(&_threadpq.lock); + for(p=_threadpq.head; p; p=p->next){ + lock(&p->lock); + for(t=p->threads.head; t; t=t->nextt) + if(t->id == id){ + if(dokill) + t->moribund = 1; + tinterrupt(p, t); + unlock(&p->lock); + unlock(&_threadpq.lock); + _threadbreakrendez(); + return; + } + unlock(&p->lock); + } + unlock(&_threadpq.lock); + _threaddebug(DBGNOTE, "Can't find thread to kill"); + return; +} + +void +threadkillgrp(int grp) +{ + threadxxxgrp(grp, 1); +} + +void +threadkill(int id) +{ + threadxxx(id, 1); +} + +void +threadintgrp(int grp) +{ + threadxxxgrp(grp, 0); +} + +void +threadint(int id) +{ + threadxxx(id, 0); +} + +static void +tinterrupt(Proc *p, Thread *t) +{ + switch(t->state){ + case Running: + postnote(PNPROC, p->pid, "threadint"); + break; + case Rendezvous: + _threadflagrendez(t); + break; + } +} diff --git a/sys/src/libthread/lib.c b/sys/src/libthread/lib.c new file mode 100755 index 000000000..9ae037aab --- /dev/null +++ b/sys/src/libthread/lib.c @@ -0,0 +1,38 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +static long totalmalloc; + +void* +_threadmalloc(long size, int z) +{ + void *m; + + m = malloc(size); + if (m == nil) + sysfatal("Malloc of size %ld failed: %r", size); + setmalloctag(m, getcallerpc(&size)); + totalmalloc += size; + if (size > 100000000) { + fprint(2, "Malloc of size %ld, total %ld\n", size, totalmalloc); + abort(); + } + if (z) + memset(m, 0, size); + return m; +} + +void +_threadsysfatal(char *fmt, va_list arg) +{ + char buf[1024]; /* size doesn't matter; we're about to exit */ + + vseprint(buf, buf+sizeof(buf), fmt, arg); + if(argv0) + fprint(2, "%s: %s\n", argv0, buf); + else + fprint(2, "%s\n", buf); + threadexitsall(buf); +} diff --git a/sys/src/libthread/main.c b/sys/src/libthread/main.c new file mode 100755 index 000000000..72e3a8701 --- /dev/null +++ b/sys/src/libthread/main.c @@ -0,0 +1,224 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +typedef struct Mainarg Mainarg; +struct Mainarg +{ + int argc; + char **argv; +}; + +int mainstacksize; +int _threadnotefd; +int _threadpasserpid; +static jmp_buf _mainjmp; +static void mainlauncher(void*); +extern void (*_sysfatal)(char*, va_list); +extern void (*__assert)(char*); +extern int (*_dial)(char*, char*, char*, int*); + +extern int _threaddial(char*, char*, char*, int*); + +static Proc **mainp; + +void +main(int argc, char **argv) +{ + Mainarg *a; + Proc *p; + + rfork(RFREND); + mainp = &p; + if(setjmp(_mainjmp)) + _schedinit(p); + +//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0; + _systhreadinit(); + _qlockinit(_threadrendezvous); + _sysfatal = _threadsysfatal; + _dial = _threaddial; + __assert = _threadassert; + notify(_threadnote); + if(mainstacksize == 0) + mainstacksize = 8*1024; + + a = _threadmalloc(sizeof *a, 1); + a->argc = argc; + a->argv = argv; + + p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0); + _schedinit(p); + abort(); /* not reached */ +} + +static void +mainlauncher(void *arg) +{ + Mainarg *a; + + a = arg; + threadmain(a->argc, a->argv); + threadexits("threadmain"); +} + +static char* +skip(char *p) +{ + while(*p == ' ') + p++; + while(*p != ' ' && *p != 0) + p++; + return p; +} + +static long +_times(long *t) +{ + char b[200], *p; + int f; + ulong r; + + memset(b, 0, sizeof(b)); + f = open("/dev/cputime", OREAD|OCEXEC); + if(f < 0) + return 0; + if(read(f, b, sizeof(b)) <= 0){ + close(f); + return 0; + } + p = b; + if(t) + t[0] = atol(p); + p = skip(p); + if(t) + t[1] = atol(p); + p = skip(p); + r = atol(p); + if(t){ + p = skip(p); + t[2] = atol(p); + p = skip(p); + t[3] = atol(p); + } + return r; +} + +static void +efork(Execargs *e) +{ + char buf[ERRMAX]; + + _threaddebug(DBGEXEC, "_schedexec %s", e->prog); + close(e->fd[0]); + exec(e->prog, e->args); + _threaddebug(DBGEXEC, "_schedexec failed: %r"); + rerrstr(buf, sizeof buf); + if(buf[0]=='\0') + strcpy(buf, "exec failed"); + write(e->fd[1], buf, strlen(buf)); + close(e->fd[1]); + _exits(buf); +} + +int +_schedexec(Execargs *e) +{ + int pid; + + switch(pid = rfork(RFREND|RFNOTEG|RFFDG|RFMEM|RFPROC)){ + case 0: + efork(e); + default: + return pid; + } +} + +int +_schedfork(Proc *p) +{ + int pid; + + switch(pid = rfork(RFPROC|RFMEM|RFNOWAIT|p->rforkflag)){ + case 0: + *mainp = p; /* write to stack, so local to proc */ + longjmp(_mainjmp, 1); + default: + return pid; + } +} + +void +_schedexit(Proc *p) +{ + char ex[ERRMAX]; + Proc **l; + + lock(&_threadpq.lock); + for(l=&_threadpq.head; *l; l=&(*l)->next){ + if(*l == p){ + *l = p->next; + if(*l == nil) + _threadpq.tail = l; + break; + } + } + unlock(&_threadpq.lock); + + utfecpy(ex, ex+sizeof ex, p->exitstr); + free(p); + _exits(ex); +} + +void +_schedexecwait(void) +{ + int pid; + Channel *c; + Proc *p; + Thread *t; + Waitmsg *w; + + p = _threadgetproc(); + t = p->thread; + pid = t->ret; + _threaddebug(DBGEXEC, "_schedexecwait %d", t->ret); + + rfork(RFCFDG); + for(;;){ + w = wait(); + if(w == nil) + break; + if(w->pid == pid) + break; + free(w); + } + if(w != nil){ + if((c = _threadwaitchan) != nil) + sendp(c, w); + else + free(w); + } + threadexits("procexec"); +} + +static Proc **procp; + +void +_systhreadinit(void) +{ + procp = privalloc(); +} + +Proc* +_threadgetproc(void) +{ + return *procp; +} + +void +_threadsetproc(Proc *p) +{ + *procp = p; +} diff --git a/sys/src/libthread/mips.c b/sys/src/libthread/mips.c new file mode 100755 index 000000000..23904aeca --- /dev/null +++ b/sys/src/libthread/mips.c @@ -0,0 +1,27 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* first argument goes in a register; simplest just to ignore it */ +static void +launchermips(int, void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + ulong *tos; + + tos = (ulong*)&t->stk[t->stksize&~7]; + *--tos = (ulong)arg; + *--tos = (ulong)f; + *--tos = 0; /* first arg to launchermips */ + *--tos = 0; /* place to store return PC */ + t->sched[JMPBUFPC] = (ulong)launchermips+JMPBUFDPC; + t->sched[JMPBUFSP] = (ulong)tos; +} + diff --git a/sys/src/libthread/mkfile b/sys/src/libthread/mkfile new file mode 100755 index 000000000..2a5a3cb2d --- /dev/null +++ b/sys/src/libthread/mkfile @@ -0,0 +1,80 @@ +</$objtype/mkfile + +LIB=/$objtype/lib/libthread.a + +OFILES=\ + $objtype.$O\ + channel.$O\ + chanprint.$O\ + create.$O\ + debug.$O\ + dial.$O\ + exec.$O\ + exit.$O\ + id.$O\ + iocall.$O\ + ioclose.$O\ + iodial.$O\ + ioopen.$O\ + ioproc.$O\ + ioread.$O\ + ioreadn.$O\ + iosleep.$O\ + iowrite.$O\ + kill.$O\ + lib.$O\ + main.$O\ + note.$O\ + ref.$O\ + rendez.$O\ + sched.$O\ + +CFILES=\ + 386.c\ + alpha.c\ + amd64.c\ + arm.c\ + channel.c\ + chanprint.c\ + create.c\ + debug.c\ + dial.c\ + example.c\ + exec.c\ + exit.c\ + id.c\ + iocall.c\ + ioclose.c\ + iodial.c\ + ioopen.c\ + ioproc.c\ + ioread.c\ + ioreadn.c\ + iowrite.c\ + kill.c\ + lib.c\ + main.c\ + mips.c\ + note.c\ + power.c\ + rendez.c\ + sched.c\ + test.c\ + tprimes.c\ + +HFILES=\ + /sys/include/thread.h\ + threadimpl.h\ + +UPDATE=mkfile\ + /386/lib/libthread.a\ + $HFILES\ + $CFILES\ + $SFILES\ + +all:V: $LIB sched.acid channel.acid + +</sys/src/cmd/mksyslib + +$O.tprimes: tprimes.$O $LIB + $LD -o $target $prereq diff --git a/sys/src/libthread/note.c b/sys/src/libthread/note.c new file mode 100755 index 000000000..74f7e9013 --- /dev/null +++ b/sys/src/libthread/note.c @@ -0,0 +1,142 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +int _threadnopasser; + +#define NFN 33 +#define ERRLEN 48 +typedef struct Note Note; +struct Note +{ + Lock inuse; + Proc *proc; /* recipient */ + char s[ERRMAX]; /* arg2 */ +}; + +static Note notes[128]; +static Note *enotes = notes+nelem(notes); +static int (*onnote[NFN])(void*, char*); +static int onnotepid[NFN]; +static Lock onnotelock; + +int +threadnotify(int (*f)(void*, char*), int in) +{ + int i, topid; + int (*from)(void*, char*), (*to)(void*, char*); + + if(in){ + from = nil; + to = f; + topid = _threadgetproc()->pid; + }else{ + from = f; + to = nil; + topid = 0; + } + lock(&onnotelock); + for(i=0; i<NFN; i++) + if(onnote[i]==from){ + onnote[i] = to; + onnotepid[i] = topid; + break; + } + unlock(&onnotelock); + return i<NFN; +} + +static void +delayednotes(Proc *p, void *v) +{ + int i; + Note *n; + int (*fn)(void*, char*); + + if(!p->pending) + return; + + p->pending = 0; + for(n=notes; n<enotes; n++){ + if(n->proc == p){ + for(i=0; i<NFN; i++){ + if(onnotepid[i]!=p->pid || (fn = onnote[i])==nil) + continue; + if((*fn)(v, n->s)) + break; + } + if(i==NFN){ + _threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p); + if(v != nil) + noted(NDFLT); + else if(strncmp(n->s, "sys:", 4)==0) + abort(); + threadexitsall(n->s); + } + n->proc = nil; + unlock(&n->inuse); + } + } +} + +void +_threadnote(void *v, char *s) +{ + Proc *p; + Note *n; + + _threaddebug(DBGNOTE, "Got note %s", s); + if(strncmp(s, "sys:", 4) == 0) + noted(NDFLT); + + if(_threadexitsallstatus){ + _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'\n", _threadexitsallstatus); + _exits(_threadexitsallstatus); + } + + if(strcmp(s, "threadint")==0) + noted(NCONT); + + p = _threadgetproc(); + if(p == nil) + noted(NDFLT); + + for(n=notes; n<enotes; n++) + if(canlock(&n->inuse)) + break; + if(n==enotes) + sysfatal("libthread: too many delayed notes"); + utfecpy(n->s, n->s+ERRMAX, s); + n->proc = p; + p->pending = 1; + if(!p->splhi) + delayednotes(p, v); + noted(NCONT); +} + +int +_procsplhi(void) +{ + int s; + Proc *p; + + p = _threadgetproc(); + s = p->splhi; + p->splhi = 1; + return s; +} + +void +_procsplx(int s) +{ + Proc *p; + + p = _threadgetproc(); + p->splhi = s; + if(s) + return; + if(p->pending) + delayednotes(p, nil); +} + diff --git a/sys/src/libthread/power.c b/sys/src/libthread/power.c new file mode 100755 index 000000000..f43237624 --- /dev/null +++ b/sys/src/libthread/power.c @@ -0,0 +1,27 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +/* first argument goes in a register; simplest just to ignore it */ +static void +launcherpower(int, void (*f)(void *arg), void *arg) +{ + (*f)(arg); + threadexits(nil); +} + +void +_threadinitstack(Thread *t, void (*f)(void*), void *arg) +{ + ulong *tos; + + tos = (ulong*)&t->stk[t->stksize&~7]; + *--tos = (ulong)arg; + *--tos = (ulong)f; + *--tos = 0; /* first arg to launchermips */ + *--tos = 0; /* place to store return PC */ + t->sched[JMPBUFPC] = (ulong)launcherpower+JMPBUFDPC; + t->sched[JMPBUFSP] = (ulong)tos; +} + diff --git a/sys/src/libthread/ref.c b/sys/src/libthread/ref.c new file mode 100755 index 000000000..9605af7ff --- /dev/null +++ b/sys/src/libthread/ref.c @@ -0,0 +1,16 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +void +incref(Ref *r) +{ + ainc(&r->ref); +} + +long +decref(Ref *r) +{ + return adec(&r->ref); +} diff --git a/sys/src/libthread/rendez.c b/sys/src/libthread/rendez.c new file mode 100755 index 000000000..8a6d0f8a6 --- /dev/null +++ b/sys/src/libthread/rendez.c @@ -0,0 +1,99 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" + +Rgrp _threadrgrp; +static int isdirty; + +static void* +finish(Thread *t, void *val) +{ + void *ret; + + ret = t->rendval; + t->rendval = val; + while(t->state == Running) + sleep(0); + lock(&t->proc->lock); + if(t->state == Rendezvous){ /* not always true: might be Dead */ + t->state = Ready; + _threadready(t); + } + unlock(&t->proc->lock); + return ret; +} + +void* +_threadrendezvous(void *tag, void *val) +{ + void *ret; + Thread *t, **l; + + lock(&_threadrgrp.lock); + l = &_threadrgrp.hash[((uintptr)tag)%nelem(_threadrgrp.hash)]; + for(t=*l; t; l=&t->rendhash, t=*l){ + if(t->rendtag==tag){ + _threaddebug(DBGREND, "Rendezvous with thread %d.%d", t->proc->pid, t->id); + *l = t->rendhash; + ret = finish(t, val); + unlock(&_threadrgrp.lock); + return ret; + } + } + + /* Going to sleep here. */ + t = _threadgetproc()->thread; + t->rendbreak = 0; + t->inrendez = 1; + t->rendtag = tag; + t->rendval = val; + t->rendhash = *l; + *l = t; + t->nextstate = Rendezvous; + _threaddebug(DBGREND, "Rendezvous for tag %p", t->rendtag); + unlock(&_threadrgrp.lock); + _sched(); + t->inrendez = 0; + _threaddebug(DBGREND, "Woke after rendezvous; val is %p", t->rendval); + return t->rendval; +} + +/* + * This is called while holding _threadpq.lock and p->lock, + * so we can't lock _threadrgrp.lock. Instead our caller has + * to call _threadbreakrendez after dropping those locks. + */ +void +_threadflagrendez(Thread *t) +{ + t->rendbreak = 1; + isdirty = 1; +} + +void +_threadbreakrendez(void) +{ + int i; + Thread *t, **l; + + if(isdirty == 0) + return; + lock(&_threadrgrp.lock); + if(isdirty == 0){ + unlock(&_threadrgrp.lock); + return; + } + isdirty = 0; + for(i=0; i<nelem(_threadrgrp.hash); i++){ + l = &_threadrgrp.hash[i]; + for(t=*l; t; t=*l){ + if(t->rendbreak){ + *l = t->rendhash; + finish(t, (void*)~0); + }else + l=&t->rendhash; + } + } + unlock(&_threadrgrp.lock); +} diff --git a/sys/src/libthread/sched.acid b/sys/src/libthread/sched.acid new file mode 100755 index 000000000..3bd7fa2a1 --- /dev/null +++ b/sys/src/libthread/sched.acid @@ -0,0 +1,692 @@ +sizeof_1_ = 8; +aggr _1_ +{ + 'U' 0 lo; + 'U' 4 hi; +}; + +defn +_1_(addr) { + complex _1_ addr; + print(" lo ", addr.lo, "\n"); + print(" hi ", addr.hi, "\n"); +}; + +sizeofFPdbleword = 8; +aggr FPdbleword +{ + 'F' 0 x; + { + 'U' 0 lo; + 'U' 4 hi; + }; +}; + +defn +FPdbleword(addr) { + complex FPdbleword addr; + print(" x ", addr.x, "\n"); + print("_1_ {\n"); + _1_(addr+0); + print("}\n"); +}; + +UTFmax = 3; +Runesync = 128; +Runeself = 128; +Runeerror = 65533; +sizeofFmt = 48; +aggr Fmt +{ + 'b' 0 runes; + 'X' 4 start; + 'X' 8 to; + 'X' 12 stop; + 'X' 16 flush; + 'X' 20 farg; + 'D' 24 nfmt; + 'X' 28 args; + 'D' 32 r; + 'D' 36 width; + 'D' 40 prec; + 'U' 44 flags; +}; + +defn +Fmt(addr) { + complex Fmt addr; + print(" runes ", addr.runes, "\n"); + print(" start ", addr.start\X, "\n"); + print(" to ", addr.to\X, "\n"); + print(" stop ", addr.stop\X, "\n"); + print(" flush ", addr.flush\X, "\n"); + print(" farg ", addr.farg\X, "\n"); + print(" nfmt ", addr.nfmt, "\n"); + print(" args ", addr.args\X, "\n"); + print(" r ", addr.r, "\n"); + print(" width ", addr.width, "\n"); + print(" prec ", addr.prec, "\n"); + print(" flags ", addr.flags, "\n"); +}; + +FmtWidth = 1; +FmtLeft = 2; +FmtPrec = 4; +FmtSharp = 8; +FmtSpace = 16; +FmtSign = 32; +FmtZero = 64; +FmtUnsigned = 128; +FmtShort = 256; +FmtLong = 512; +FmtVLong = 1024; +FmtComma = 2048; +FmtByte = 4096; +FmtFlag = 8192; +sizeofTm = 40; +aggr Tm +{ + 'D' 0 sec; + 'D' 4 min; + 'D' 8 hour; + 'D' 12 mday; + 'D' 16 mon; + 'D' 20 year; + 'D' 24 wday; + 'D' 28 yday; + 'a' 32 zone; + 'D' 36 tzoff; +}; + +defn +Tm(addr) { + complex Tm addr; + print(" sec ", addr.sec, "\n"); + print(" min ", addr.min, "\n"); + print(" hour ", addr.hour, "\n"); + print(" mday ", addr.mday, "\n"); + print(" mon ", addr.mon, "\n"); + print(" year ", addr.year, "\n"); + print(" wday ", addr.wday, "\n"); + print(" yday ", addr.yday, "\n"); + print(" zone ", addr.zone, "\n"); + print(" tzoff ", addr.tzoff, "\n"); +}; + +PNPROC = 1; +PNGROUP = 2; +Profoff = 0; +Profuser = 1; +Profkernel = 2; +Proftime = 3; +Profsample = 4; +sizeofLock = 4; +aggr Lock +{ + 'D' 0 val; +}; + +defn +Lock(addr) { + complex Lock addr; + print(" val ", addr.val, "\n"); +}; + +sizeofQLp = 12; +aggr QLp +{ + 'D' 0 inuse; + 'A' QLp 4 next; + 'C' 8 state; +}; + +defn +QLp(addr) { + complex QLp addr; + print(" inuse ", addr.inuse, "\n"); + print(" next ", addr.next\X, "\n"); + print(" state ", addr.state, "\n"); +}; + +sizeofQLock = 16; +aggr QLock +{ + Lock 0 lock; + 'D' 4 locked; + 'A' QLp 8 $head; + 'A' QLp 12 $tail; +}; + +defn +QLock(addr) { + complex QLock addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" locked ", addr.locked, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofRWLock = 20; +aggr RWLock +{ + Lock 0 lock; + 'D' 4 readers; + 'D' 8 writer; + 'A' QLp 12 $head; + 'A' QLp 16 $tail; +}; + +defn +RWLock(addr) { + complex RWLock addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" readers ", addr.readers, "\n"); + print(" writer ", addr.writer, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofRendez = 12; +aggr Rendez +{ + 'A' QLock 0 l; + 'A' QLp 4 $head; + 'A' QLp 8 $tail; +}; + +defn +Rendez(addr) { + complex Rendez addr; + print(" l ", addr.l\X, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofNetConnInfo = 36; +aggr NetConnInfo +{ + 'X' 0 dir; + 'X' 4 root; + 'X' 8 spec; + 'X' 12 lsys; + 'X' 16 lserv; + 'X' 20 rsys; + 'X' 24 rserv; + 'X' 28 laddr; + 'X' 32 raddr; +}; + +defn +NetConnInfo(addr) { + complex NetConnInfo addr; + print(" dir ", addr.dir\X, "\n"); + print(" root ", addr.root\X, "\n"); + print(" spec ", addr.spec\X, "\n"); + print(" lsys ", addr.lsys\X, "\n"); + print(" lserv ", addr.lserv\X, "\n"); + print(" rsys ", addr.rsys\X, "\n"); + print(" rserv ", addr.rserv\X, "\n"); + print(" laddr ", addr.laddr\X, "\n"); + print(" raddr ", addr.raddr\X, "\n"); +}; + +RFNAMEG = 1; +RFENVG = 2; +RFFDG = 4; +RFNOTEG = 8; +RFPROC = 16; +RFMEM = 32; +RFNOWAIT = 64; +RFCNAMEG = 1024; +RFCENVG = 2048; +RFCFDG = 4096; +RFREND = 8192; +RFNOMNT = 16384; +sizeofQid = 16; +aggr Qid +{ + 'W' 0 path; + 'U' 8 vers; + 'b' 12 type; +}; + +defn +Qid(addr) { + complex Qid addr; + print(" path ", addr.path, "\n"); + print(" vers ", addr.vers, "\n"); + print(" type ", addr.type, "\n"); +}; + +sizeofDir = 60; +aggr Dir +{ + 'u' 0 type; + 'U' 4 dev; + Qid 8 qid; + 'U' 24 mode; + 'U' 28 atime; + 'U' 32 mtime; + 'V' 36 length; + 'X' 44 name; + 'X' 48 uid; + 'X' 52 gid; + 'X' 56 muid; +}; + +defn +Dir(addr) { + complex Dir addr; + print(" type ", addr.type, "\n"); + print(" dev ", addr.dev, "\n"); + print("Qid qid {\n"); + Qid(addr.qid); + print("}\n"); + print(" mode ", addr.mode, "\n"); + print(" atime ", addr.atime, "\n"); + print(" mtime ", addr.mtime, "\n"); + print(" length ", addr.length, "\n"); + print(" name ", addr.name\X, "\n"); + print(" uid ", addr.uid\X, "\n"); + print(" gid ", addr.gid\X, "\n"); + print(" muid ", addr.muid\X, "\n"); +}; + +sizeofWaitmsg = 20; +aggr Waitmsg +{ + 'D' 0 pid; + 'a' 4 time; + 'X' 16 msg; +}; + +defn +Waitmsg(addr) { + complex Waitmsg addr; + print(" pid ", addr.pid, "\n"); + print(" time ", addr.time, "\n"); + print(" msg ", addr.msg\X, "\n"); +}; + +sizeofIOchunk = 8; +aggr IOchunk +{ + 'X' 0 addr; + 'U' 4 len; +}; + +defn +IOchunk(addr) { + complex IOchunk addr; + print(" addr ", addr.addr\X, "\n"); + print(" len ", addr.len, "\n"); +}; + +Nqwds = 2; +Nqshift = 5; +Nqmask = -1; +Nqbits = 64; +sizeofChannel = 36; +aggr Channel +{ + 'D' 0 s; + 'U' 4 f; + 'U' 8 n; + 'D' 12 e; + 'D' 16 freed; + 'X' 20 qentry; + 'D' 24 nentry; + 'D' 28 closed; + 'a' 32 v; +}; + +defn +Channel(addr) { + complex Channel addr; + print(" s ", addr.s, "\n"); + print(" f ", addr.f, "\n"); + print(" n ", addr.n, "\n"); + print(" e ", addr.e, "\n"); + print(" freed ", addr.freed, "\n"); + print(" qentry ", addr.qentry\X, "\n"); + print(" nentry ", addr.nentry, "\n"); + print(" closed ", addr.closed, "\n"); + print(" v ", addr.v, "\n"); +}; + +CHANEND = 0; +CHANSND = 1; +CHANRCV = 2; +CHANNOP = 3; +CHANNOBLK = 4; +sizeofAlt = 24; +aggr Alt +{ + 'A' Channel 0 c; + 'X' 4 v; + 'D' 8 op; + 'X' 12 err; + 'A' Channel 16 tag; + 'D' 20 entryno; +}; + +defn +Alt(addr) { + complex Alt addr; + print(" c ", addr.c\X, "\n"); + print(" v ", addr.v\X, "\n"); + print(" op ", addr.op, "\n"); + print(" err ", addr.err\X, "\n"); + print(" tag ", addr.tag\X, "\n"); + print(" entryno ", addr.entryno, "\n"); +}; + +sizeofRef = 4; +aggr Ref +{ + 'D' 0 ref; +}; + +defn +Ref(addr) { + complex Ref addr; + print(" ref ", addr.ref, "\n"); +}; + +Dead = 0; +Running = 1; +Ready = 2; +Rendezvous = 3; +Channone = 0; +Chanalt = 1; +Chansend = 2; +Chanrecv = 3; +RENDHASH = 13; +Printsize = 2048; +NPRIV = 8; +sizeofRgrp = 56; +aggr Rgrp +{ + Lock 0 lock; + 'a' 4 hash; +}; + +defn +Rgrp(addr) { + complex Rgrp addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" hash ", addr.hash, "\n"); +}; + +sizeofTqueue = 12; +aggr Tqueue +{ + 'D' 0 asleep; + 'X' 4 $head; + 'X' 8 $tail; +}; + +defn +Tqueue(addr) { + complex Tqueue addr; + print(" asleep ", addr.asleep, "\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofThread = 120; +aggr Thread +{ + Lock 0 lock; + 'a' 4 sched; + 'D' 12 id; + 'D' 16 grp; + 'D' 20 moribund; + 'D' 24 state; + 'D' 28 nextstate; + 'X' 32 stk; + 'U' 36 stksize; + 'A' Thread 40 next; + 'X' 44 proc; + 'A' Thread 48 nextt; + 'D' 52 ret; + 'X' 56 cmdname; + 'D' 60 inrendez; + 'A' Thread 64 rendhash; + 'X' 68 rendtag; + 'X' 72 rendval; + 'D' 76 rendbreak; + 'D' 80 chan; + 'A' Alt 84 alt; + 'a' 88 udata; +}; + +defn +Thread(addr) { + complex Thread addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" sched ", addr.sched, "\n"); + print(" id ", addr.id, "\n"); + print(" grp ", addr.grp, "\n"); + print(" moribund ", addr.moribund, "\n"); + print(" state ", addr.state, "\n"); + print(" nextstate ", addr.nextstate, "\n"); + print(" stk ", addr.stk\X, "\n"); + print(" stksize ", addr.stksize, "\n"); + print(" next ", addr.next\X, "\n"); + print(" proc ", addr.proc\X, "\n"); + print(" nextt ", addr.nextt\X, "\n"); + print(" ret ", addr.ret, "\n"); + print(" cmdname ", addr.cmdname\X, "\n"); + print(" inrendez ", addr.inrendez, "\n"); + print(" rendhash ", addr.rendhash\X, "\n"); + print(" rendtag ", addr.rendtag\X, "\n"); + print(" rendval ", addr.rendval\X, "\n"); + print(" rendbreak ", addr.rendbreak, "\n"); + print(" chan ", addr.chan, "\n"); + print(" alt ", addr.alt\X, "\n"); + print(" udata ", addr.udata, "\n"); +}; + +sizeofExecargs = 16; +aggr Execargs +{ + 'X' 0 prog; + 'X' 4 args; + 'a' 8 fd; +}; + +defn +Execargs(addr) { + complex Execargs addr; + print(" prog ", addr.prog\X, "\n"); + print(" args ", addr.args\X, "\n"); + print(" fd ", addr.fd, "\n"); +}; + +sizeofProc = 2424; +aggr Proc +{ + Lock 0 lock; + 'a' 4 sched; + 'D' 12 pid; + 'D' 16 splhi; + 'A' Thread 20 thread; + 'D' 24 needexec; + Execargs 28 exec; + 'A' Proc 44 newproc; + 'a' 48 exitstr; + 'D' 176 rforkflag; + 'D' 180 nthreads; + Tqueue 184 threads; + Tqueue 196 ready; + Lock 208 readylock; + 'a' 212 printbuf; + 'D' 2260 blocked; + 'D' 2264 pending; + 'D' 2268 nonotes; + 'U' 2272 nextID; + 'A' Proc 2276 next; + 'X' 2280 arg; + 'a' 2284 str; + 'X' 2412 wdata; + 'X' 2416 udata; + 'C' 2420 threadint; +}; + +defn +Proc(addr) { + complex Proc addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" sched ", addr.sched, "\n"); + print(" pid ", addr.pid, "\n"); + print(" splhi ", addr.splhi, "\n"); + print(" thread ", addr.thread\X, "\n"); + print(" needexec ", addr.needexec, "\n"); + print("Execargs exec {\n"); + Execargs(addr.exec); + print("}\n"); + print(" newproc ", addr.newproc\X, "\n"); + print(" exitstr ", addr.exitstr, "\n"); + print(" rforkflag ", addr.rforkflag, "\n"); + print(" nthreads ", addr.nthreads, "\n"); + print("Tqueue threads {\n"); + Tqueue(addr.threads); + print("}\n"); + print("Tqueue ready {\n"); + Tqueue(addr.ready); + print("}\n"); + print("Lock readylock {\n"); + Lock(addr.readylock); + print("}\n"); + print(" printbuf ", addr.printbuf, "\n"); + print(" blocked ", addr.blocked, "\n"); + print(" pending ", addr.pending, "\n"); + print(" nonotes ", addr.nonotes, "\n"); + print(" nextID ", addr.nextID, "\n"); + print(" next ", addr.next\X, "\n"); + print(" arg ", addr.arg\X, "\n"); + print(" str ", addr.str, "\n"); + print(" wdata ", addr.wdata\X, "\n"); + print(" udata ", addr.udata\X, "\n"); + print(" threadint ", addr.threadint, "\n"); +}; + +sizeofPqueue = 12; +aggr Pqueue +{ + Lock 0 lock; + 'A' Proc 4 $head; + 'A' Proc 8 $tail; +}; + +defn +Pqueue(addr) { + complex Pqueue addr; + print("Lock lock {\n"); + Lock(addr.lock); + print("}\n"); + print(" $head ", addr.$head\X, "\n"); + print(" $tail ", addr.$tail\X, "\n"); +}; + +sizeofIoproc = 160; +aggr Ioproc +{ + 'D' 0 tid; + 'A' Channel 4 c; + 'A' Channel 8 creply; + 'D' 12 inuse; + 'X' 16 op; + 'X' 20 arg; + 'D' 24 ret; + 'a' 28 err; + 'A' Ioproc 156 next; +}; + +defn +Ioproc(addr) { + complex Ioproc addr; + print(" tid ", addr.tid, "\n"); + print(" c ", addr.c\X, "\n"); + print(" creply ", addr.creply\X, "\n"); + print(" inuse ", addr.inuse, "\n"); + print(" op ", addr.op\X, "\n"); + print(" arg ", addr.arg\X, "\n"); + print(" ret ", addr.ret, "\n"); + print(" err ", addr.err, "\n"); + print(" next ", addr.next\X, "\n"); +}; + +complex Pqueue _threadpq; +complex Channel _threadwaitchan; +complex Rgrp _threadrgrp; +sizeof_2_ = 24; +aggr _2_ +{ + 'X' 0 pp; + 'X' 4 next; + 'X' 8 last; + 'X' 12 first; + 'U' 16 pid; + 'U' 20 what; +}; + +defn +_2_(addr) { + complex _2_ addr; + print(" pp ", addr.pp\X, "\n"); + print(" next ", addr.next\X, "\n"); + print(" last ", addr.last\X, "\n"); + print(" first ", addr.first\X, "\n"); + print(" pid ", addr.pid, "\n"); + print(" what ", addr.what, "\n"); +}; + +sizeofTos = 56; +aggr Tos +{ + _2_ 0 prof; + 'W' 24 cyclefreq; + 'V' 32 kcycles; + 'V' 40 pcycles; + 'U' 48 pid; + 'U' 52 clock; +}; + +defn +Tos(addr) { + complex Tos addr; + print("_2_ prof {\n"); + _2_(addr.prof); + print("}\n"); + print(" cyclefreq ", addr.cyclefreq, "\n"); + print(" kcycles ", addr.kcycles, "\n"); + print(" pcycles ", addr.pcycles, "\n"); + print(" pid ", addr.pid, "\n"); + print(" clock ", addr.clock, "\n"); +}; + +complex Tos _tos; +complex Proc _schedinit:p; +complex Thread _schedinit:t; +complex Thread _schedinit:l; +complex Proc needstack:p; +complex Thread needstack:t; +complex Proc _sched:p; +complex Thread _sched:t; +complex Proc runthread:p; +complex Thread runthread:t; +complex Tqueue runthread:q; +complex Thread _threadready:t; +complex Tqueue _threadready:q; diff --git a/sys/src/libthread/sched.c b/sys/src/libthread/sched.c new file mode 100755 index 000000000..50603a3c4 --- /dev/null +++ b/sys/src/libthread/sched.c @@ -0,0 +1,189 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include "threadimpl.h" +#include <tos.h> + +static Thread *runthread(Proc*); + +static char *_psstate[] = { + "Moribund", + "Dead", + "Exec", + "Fork", + "Running", + "Ready", + "Rendezvous", +}; + +static char* +psstate(int s) +{ + if(s < 0 || s >= nelem(_psstate)) + return "unknown"; + return _psstate[s]; +} + +void +_schedinit(void *arg) +{ + Proc *p; + Thread *t, **l; + + p = arg; + _threadsetproc(p); + p->pid = _tos->pid; //getpid(); + while(setjmp(p->sched)) + ; + _threaddebug(DBGSCHED, "top of schedinit, _threadexitsallstatus=%p", _threadexitsallstatus); + if(_threadexitsallstatus) + exits(_threadexitsallstatus); + lock(&p->lock); + if((t=p->thread) != nil){ + p->thread = nil; + if(t->moribund){ + t->state = Dead; + for(l=&p->threads.head; *l; l=&(*l)->nextt) + if(*l == t){ + *l = t->nextt; + if(*l==nil) + p->threads.tail = l; + p->nthreads--; + break; + } + unlock(&p->lock); + if(t->inrendez){ + _threadflagrendez(t); + _threadbreakrendez(); + } + free(t->stk); + free(t->cmdname); + free(t); /* XXX how do we know there are no references? */ + t = nil; + _sched(); + } + if(p->needexec){ + t->ret = _schedexec(&p->exec); + p->needexec = 0; + } + if(p->newproc){ + t->ret = _schedfork(p->newproc); + p->newproc = nil; + } + t->state = t->nextstate; + if(t->state == Ready) + _threadready(t); + } + unlock(&p->lock); + _sched(); +} + +void +needstack(int n) +{ + int x; + Proc *p; + Thread *t; + + p = _threadgetproc(); + t = p->thread; + + if((uchar*)&x - n < (uchar*)t->stk){ + fprint(2, "%s %lud: &x=%p n=%d t->stk=%p\n", + argv0, _tos->pid, &x, n, t->stk); + fprint(2, "%s %lud: stack overflow\n", argv0, _tos->pid); + abort(); + } +} + +void +_sched(void) +{ + Proc *p; + Thread *t; + +Resched: + p = _threadgetproc(); + if((t = p->thread) != nil){ + needstack(128); + _threaddebug(DBGSCHED, "pausing, state=%s", psstate(t->state)); + if(setjmp(t->sched)==0) + longjmp(p->sched, 1); + return; + }else{ + t = runthread(p); + if(t == nil){ + _threaddebug(DBGSCHED, "all threads gone; exiting"); + _schedexit(p); + } + _threaddebug(DBGSCHED, "running %d.%d", t->proc->pid, t->id); + p->thread = t; + if(t->moribund){ + _threaddebug(DBGSCHED, "%d.%d marked to die"); + goto Resched; + } + t->state = Running; + t->nextstate = Ready; + longjmp(t->sched, 1); + } +} + +static Thread* +runthread(Proc *p) +{ + Thread *t; + Tqueue *q; + + if(p->nthreads==0) + return nil; + q = &p->ready; + lock(&p->readylock); + if(q->head == nil){ + q->asleep = 1; + _threaddebug(DBGSCHED, "sleeping for more work"); + unlock(&p->readylock); + while(rendezvous(q, 0) == (void*)~0){ + if(_threadexitsallstatus) + exits(_threadexitsallstatus); + } + /* lock picked up from _threadready */ + } + t = q->head; + q->head = t->next; + unlock(&p->readylock); + return t; +} + +void +_threadready(Thread *t) +{ + Tqueue *q; + + assert(t->state == Ready); + _threaddebug(DBGSCHED, "readying %d.%d", t->proc->pid, t->id); + q = &t->proc->ready; + lock(&t->proc->readylock); + t->next = nil; + if(q->head==nil) + q->head = t; + else + *q->tail = t; + q->tail = &t->next; + if(q->asleep){ + q->asleep = 0; + /* lock passes to runthread */ + _threaddebug(DBGSCHED, "waking process %d", t->proc->pid); + while(rendezvous(q, 0) == (void*)~0){ + if(_threadexitsallstatus) + exits(_threadexitsallstatus); + } + }else + unlock(&t->proc->readylock); +} + +void +yield(void) +{ + _sched(); +} + diff --git a/sys/src/libthread/test.c b/sys/src/libthread/test.c new file mode 100755 index 000000000..f0cfb8820 --- /dev/null +++ b/sys/src/libthread/test.c @@ -0,0 +1,14 @@ +#include <u.h> +#include <libc.h> + +void +printabcd(int a, int b, int c, int d) +{ + print("%d %d %d %d\n", a, b, c, d); +} + +void +main(int argc, char **argv) +{ + printabcd(atoi(argv[0]), atoi(argv[1]), atoi(argv[2]), atoi(argv[3])); +} diff --git a/sys/src/libthread/threadimpl.h b/sys/src/libthread/threadimpl.h new file mode 100755 index 000000000..c5f7c0f2e --- /dev/null +++ b/sys/src/libthread/threadimpl.h @@ -0,0 +1,198 @@ +/* + * Some notes on locking: + * + * All the locking woes come from implementing + * threadinterrupt (and threadkill). + * + * _threadgetproc()->thread is always a live pointer. + * p->threads, p->ready, and _threadrgrp also contain + * live thread pointers. These may only be consulted + * while holding p->lock or _threadrgrp.lock; in procs + * other than p, the pointers are only guaranteed to be live + * while the lock is still being held. + * + * Thread structures can only be freed by the proc + * they belong to. Threads marked with t->inrendez + * need to be extracted from the _threadrgrp before + * being freed. + * + * _threadrgrp.lock cannot be acquired while holding p->lock. + */ + +typedef struct Pqueue Pqueue; +typedef struct Rgrp Rgrp; +typedef struct Tqueue Tqueue; +typedef struct Thread Thread; +typedef struct Execargs Execargs; +typedef struct Proc Proc; + +/* must match list in sched.c */ +typedef enum +{ + Dead, + Running, + Ready, + Rendezvous, +} State; + +typedef enum +{ + Channone, + Chanalt, + Chansend, + Chanrecv, +} Chanstate; + +enum +{ + RENDHASH = 13, + Printsize = 2048, + NPRIV = 8, +}; + +struct Rgrp +{ + Lock lock; + Thread *hash[RENDHASH]; +}; + +struct Tqueue /* Thread queue */ +{ + int asleep; + Thread *head; + Thread **tail; +}; + +struct Thread +{ + Lock lock; /* protects thread data structure */ + jmp_buf sched; /* for context switches */ + int id; /* thread id */ + int grp; /* thread group */ + int moribund; /* thread needs to die */ + State state; /* run state */ + State nextstate; /* next run state */ + uchar *stk; /* top of stack (lowest address of stack) */ + uint stksize; /* stack size */ + Thread *next; /* next on ready queue */ + + Proc *proc; /* proc of this thread */ + Thread *nextt; /* next on list of threads in this proc*/ + int ret; /* return value for Exec, Fork */ + + char *cmdname; /* ptr to name of thread */ + + int inrendez; + Thread *rendhash; /* Trgrp linked list */ + void* rendtag; /* rendezvous tag */ + void* rendval; /* rendezvous value */ + int rendbreak; /* rendezvous has been taken */ + + Chanstate chan; /* which channel operation is current */ + Alt *alt; /* pointer to current alt structure (debugging) */ + + void* udata[NPRIV]; /* User per-thread data pointer */ +}; + +struct Execargs +{ + char *prog; + char **args; + int fd[2]; +}; + +struct Proc +{ + Lock lock; + jmp_buf sched; /* for context switches */ + int pid; /* process id */ + int splhi; /* delay notes */ + Thread *thread; /* running thread */ + + int needexec; + Execargs exec; /* exec argument */ + Proc *newproc; /* fork argument */ + char exitstr[ERRMAX]; /* exit status */ + + int rforkflag; + int nthreads; + Tqueue threads; /* All threads of this proc */ + Tqueue ready; /* Runnable threads */ + Lock readylock; + + char printbuf[Printsize]; + int blocked; /* In a rendezvous */ + int pending; /* delayed note pending */ + int nonotes; /* delay notes */ + uint nextID; /* ID of most recently created thread */ + Proc *next; /* linked list of Procs */ + + void *arg; /* passed between shared and unshared stk */ + char str[ERRMAX]; /* used by threadexits to avoid malloc */ + + void* wdata; /* Lib(worker) per-proc data pointer */ + void* udata; /* User per-proc data pointer */ + char threadint; /* tag for threadexitsall() */ +}; + +struct Pqueue { /* Proc queue */ + Lock lock; + Proc *head; + Proc **tail; +}; + +struct Ioproc +{ + int tid; + Channel *c, *creply; + int inuse; + long (*op)(va_list*); + va_list arg; + long ret; + char err[ERRMAX]; + Ioproc *next; +}; + +void _freeproc(Proc*); +void _freethread(Thread*); +Proc* _newproc(void(*)(void*), void*, uint, char*, int, int); +int _procsplhi(void); +void _procsplx(int); +void _sched(void); +int _schedexec(Execargs*); +void _schedexecwait(void); +void _schedexit(Proc*); +int _schedfork(Proc*); +void _schedinit(void*); +void _systhreadinit(void); +void _threadassert(char*); +void _threadbreakrendez(void); +void _threaddebug(ulong, char*, ...); +void _threadexitsall(char*); +void _threadflagrendez(Thread*); +Proc* _threadgetproc(void); +void _threadsetproc(Proc*); +void _threadinitstack(Thread*, void(*)(void*), void*); +void* _threadmalloc(long, int); +void _threadnote(void*, char*); +void _threadready(Thread*); +void* _threadrendezvous(void*, void*); +void _threadsignal(void); +void _threadsysfatal(char*, va_list); +void** _workerdata(void); + +extern int _threaddebuglevel; +extern char* _threadexitsallstatus; +extern Pqueue _threadpq; +extern Channel* _threadwaitchan; +extern Rgrp _threadrgrp; + +#define DBGAPPL (1 << 0) +#define DBGSCHED (1 << 16) +#define DBGCHAN (1 << 17) +#define DBGREND (1 << 18) +/* #define DBGKILL (1 << 19) */ +#define DBGNOTE (1 << 20) +#define DBGEXEC (1 << 21) + +#define ioproc_arg(io, type) (va_arg((io)->arg, type)) diff --git a/sys/src/libthread/tprimes.c b/sys/src/libthread/tprimes.c new file mode 100755 index 000000000..50ab63996 --- /dev/null +++ b/sys/src/libthread/tprimes.c @@ -0,0 +1,58 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> + +int quiet; +int goal; +int buffer; +int (*fn)(void(*)(void*), void*, uint) = threadcreate; + +void +primethread(void *arg) +{ + Channel *c, *nc; + int p, i; + + c = arg; + p = recvul(c); + if(p > goal) + threadexitsall(nil); + if(!quiet) + print("%d\n", p); + nc = chancreate(sizeof(ulong), buffer); + (*fn)(primethread, nc, 1024); + for(;;){ + i = recvul(c); + if(i%p) + sendul(nc, i); + } +} + +void +threadmain(int argc, char **argv) +{ + int i; + Channel *c; + + ARGBEGIN{ + case 'q': + quiet = 1; + break; + case 'b': + buffer = atoi(ARGF()); + break; + case 'p': + fn=proccreate; + break; + }ARGEND + + if(argc>0) + goal = atoi(argv[0]); + else + goal = 100; + + c = chancreate(sizeof(ulong), buffer); + threadcreate(primethread, c, 1024); + for(i=2;; i++) + sendul(c, i); +} diff --git a/sys/src/libthread/xinc386.s b/sys/src/libthread/xinc386.s new file mode 100755 index 000000000..006c821c9 --- /dev/null +++ b/sys/src/libthread/xinc386.s @@ -0,0 +1,29 @@ +/*#define XADDL(s,d) BYTE $0x0F; BYTE $0xC1; BYTE $((0<<6)|(s<<3)|(d))*/ + +/*TEXT xadd(SB),$0 /* long xadd(long *, long); */ + +/* MOVL l+0(FP),BX +/* MOVL i+4(FP),AX +/* LOCK +/* XADDL(0,3) +/* RET +*/ + +TEXT _xinc(SB),$0 /* void _xinc(long *); */ + + MOVL l+0(FP),AX + LOCK + INCL 0(AX) + RET + +TEXT _xdec(SB),$0 /* long _xdec(long *); */ + + MOVL l+0(FP),AX + LOCK + DECL 0(AX) + JZ iszero + MOVL $1, AX + RET +iszero: + MOVL $0, AX + RET diff --git a/sys/src/libthread/xincalpha.s b/sys/src/libthread/xincalpha.s new file mode 100755 index 000000000..5ac5ed54f --- /dev/null +++ b/sys/src/libthread/xincalpha.s @@ -0,0 +1,19 @@ +TEXT _xdec(SB), $-8 + MOVQ R0, R1 /* p */ +dec1: + MOVLL (R1), R0 /* *p */ + SUBL $1, R0 + MOVQ R0, R2 + MOVLC R2, (R1) /* --(*p) */ + BEQ R2, dec1 /* write failed, retry */ + RET + +TEXT _xinc(SB), $-8 + MOVQ R0, R1 /* p */ +inc1: + MOVLL (R1), R0 /* *p */ + ADDL $1, R0 + MOVLC R0, (R1) /* (*p)++ */ + BEQ R0, inc1 /* write failed, retry */ + RET + diff --git a/sys/src/libthread/xincamd64.s b/sys/src/libthread/xincamd64.s new file mode 100755 index 000000000..ed1c81419 --- /dev/null +++ b/sys/src/libthread/xincamd64.s @@ -0,0 +1,20 @@ +/*TEXT xadd(SB),$0 /* long xadd(long *, long); */ + +/* MOVL i+8(FP),AX +/* LOCK +/* XADDL AX, (RARG) +/* RET +*/ + +TEXT _xinc(SB),$0 /* void _xinc(long *); */ + + LOCK; INCL 0(RARG) + RET + +TEXT _xdec(SB),$0 /* long _xdec(long *); */ + + MOVL $0, AX + MOVL $1, BX + LOCK; DECL 0(RARG) + CMOVLNE BX, AX + RET diff --git a/sys/src/libthread/xincarm.c b/sys/src/libthread/xincarm.c new file mode 100755 index 000000000..a8e092914 --- /dev/null +++ b/sys/src/libthread/xincarm.c @@ -0,0 +1 @@ +#include "xincport.h" diff --git a/sys/src/libthread/xincmips.s b/sys/src/libthread/xincmips.s new file mode 100755 index 000000000..8e53c1adf --- /dev/null +++ b/sys/src/libthread/xincmips.s @@ -0,0 +1,46 @@ +/* + * R4000 user level lock code + */ + +#define LL(base, rt) WORD $((060<<26)|((base)<<21)|((rt)<<16)) +#define SC(base, rt) WORD $((070<<26)|((base)<<21)|((rt)<<16)) +#define NOOP WORD $0x27 + +#ifdef oldstyle +TEXT xadd(SB), $0 + + MOVW R1, R2 /* address of counter */ +loop: MOVW n+4(FP), R3 /* increment */ + LL(2, 1) + NOOP + ADD R1,R3,R3 + SC(2, 3) + NOOP + BEQ R3,loop + RET +#endif + +TEXT _xinc(SB), $0 + + MOVW R1, R2 /* address of counter */ +loop: MOVW $1, R3 + LL(2, 1) + NOOP + ADD R1,R3,R3 + SC(2, 3) + NOOP + BEQ R3,loop + RET + +TEXT _xdec(SB), $0 + + MOVW R1, R2 /* address of counter */ +loop1: MOVW $-1, R3 + LL(2, 1) + NOOP + ADD R1,R3,R3 + MOVW R3, R1 + SC(2, 3) + NOOP + BEQ R3,loop1 + RET diff --git a/sys/src/libthread/xincport.h b/sys/src/libthread/xincport.h new file mode 100755 index 000000000..d5d8fe323 --- /dev/null +++ b/sys/src/libthread/xincport.h @@ -0,0 +1,25 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> + +static Lock l; + +void +_xinc(long *p) +{ + + lock(&l); + (*p)++; + unlock(&l); +} + +long +_xdec(long *p) +{ + long r; + + lock(&l); + r = --(*p); + unlock(&l); + return r; +} diff --git a/sys/src/libthread/xincpower.s b/sys/src/libthread/xincpower.s new file mode 100755 index 000000000..66bff2e39 --- /dev/null +++ b/sys/src/libthread/xincpower.s @@ -0,0 +1,21 @@ +TEXT _xinc(SB),$0 /* void _xinc(long *); */ + + MOVW R3, R4 +xincloop: + LWAR (R4), R3 + ADD $1, R3 + DCBT (R4) /* fix 405 errata cpu_210 */ + STWCCC R3, (R4) + BNE xincloop + RETURN + +TEXT _xdec(SB),$0 /* long _xdec(long *); */ + + MOVW R3, R4 +xdecloop: + LWAR (R4), R3 + ADD $-1, R3 + DCBT (R4) /* fix 405 errata cpu_210 */ + STWCCC R3, (R4) + BNE xdecloop + RETURN |