diff options
author | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
---|---|---|
committer | Taru Karttunen <taruti@taruti.net> | 2011-03-30 15:46:40 +0300 |
commit | e5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch) | |
tree | d8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/cmd/execnet/client.c |
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/cmd/execnet/client.c')
-rwxr-xr-x | sys/src/cmd/execnet/client.c | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/sys/src/cmd/execnet/client.c b/sys/src/cmd/execnet/client.c new file mode 100755 index 000000000..57414966d --- /dev/null +++ b/sys/src/cmd/execnet/client.c @@ -0,0 +1,432 @@ +#include <u.h> +#include <libc.h> +#include <fcall.h> +#include <thread.h> +#include <9p.h> +#include "dat.h" + +int nclient; +Client **client; +#define Zmsg ((Msg*)~0) +char nocmd[] = ""; + +static void readthread(void*); +static void writethread(void*); +static void kickwriter(Client*); + +int +newclient(void) +{ + int i; + Client *c; + + for(i=0; i<nclient; i++) + if(client[i]->ref==0 && !client[i]->moribund) + return i; + + c = emalloc(sizeof(Client)); + c->writerkick = chancreate(sizeof(void*), 1); + c->execpid = chancreate(sizeof(ulong), 0); + c->cmd = nocmd; + + c->readerproc = ioproc(); + c->writerproc = ioproc(); + c->num = nclient; + if(nclient%16 == 0) + client = erealloc(client, (nclient+16)*sizeof(client[0])); + client[nclient++] = c; + return nclient-1; +} + +void +die(Client *c) +{ + Msg *m, *next; + Req *r, *rnext; + + c->moribund = 1; + kickwriter(c); + iointerrupt(c->readerproc); + iointerrupt(c->writerproc); + if(--c->activethread == 0){ + if(c->cmd != nocmd){ + free(c->cmd); + c->cmd = nocmd; + } + c->pid = 0; + c->moribund = 0; + c->status = Closed; + for(m=c->mq; m && m != Zmsg; m=next){ + next = m->link; + free(m); + } + c->mq = nil; + if(c->rq != nil){ + for(r=c->rq; r; r=rnext){ + rnext = r->aux; + respond(r, "hangup"); + } + c->rq = nil; + } + if(c->wq != nil){ + for(r=c->wq; r; r=rnext){ + rnext = r->aux; + respond(r, "hangup"); + } + c->wq = nil; + } + c->rq = nil; + c->wq = nil; + c->emq = nil; + c->erq = nil; + c->ewq = nil; + } +} + +void +closeclient(Client *c) +{ + if(--c->ref == 0){ + if(c->pid > 0) + postnote(PNPROC, c->pid, "kill"); + c->status = Hangup; + close(c->fd[0]); + c->fd[0] = c->fd[1] = -1; + c->moribund = 1; + kickwriter(c); + iointerrupt(c->readerproc); + iointerrupt(c->writerproc); + c->activethread++; + die(c); + } +} + +void +queuerdreq(Client *c, Req *r) +{ + if(c->rq==nil) + c->erq = &c->rq; + *c->erq = r; + r->aux = nil; + c->erq = (Req**)&r->aux; +} + +void +queuewrreq(Client *c, Req *r) +{ + if(c->wq==nil) + c->ewq = &c->wq; + *c->ewq = r; + r->aux = nil; + c->ewq = (Req**)&r->aux; +} + +void +queuemsg(Client *c, Msg *m) +{ + if(c->mq==nil) + c->emq = &c->mq; + *c->emq = m; + if(m != Zmsg){ + m->link = nil; + c->emq = (Msg**)&m->link; + }else + c->emq = nil; +} + +void +matchmsgs(Client *c) +{ + Req *r; + Msg *m; + int n, rm; + + while(c->rq && c->mq){ + r = c->rq; + c->rq = r->aux; + + rm = 0; + m = c->mq; + if(m == Zmsg){ + respond(r, "execnet: no more data"); + break; + } + n = r->ifcall.count; + if(n >= m->ep - m->rp){ + n = m->ep - m->rp; + c->mq = m->link; + rm = 1; + } + if(n) + memmove(r->ofcall.data, m->rp, n); + if(rm) + free(m); + else + m->rp += n; + r->ofcall.count = n; + respond(r, nil); + } +} + +void +findrdreq(Client *c, Req *r) +{ + Req **l; + + for(l=&c->rq; *l; l=(Req**)&(*l)->aux){ + if(*l == r){ + *l = r->aux; + if(*l == nil) + c->erq = l; + respond(r, "flushed"); + break; + } + } +} + +void +findwrreq(Client *c, Req *r) +{ + Req **l; + + for(l=&c->wq; *l; l=(Req**)&(*l)->aux){ + if(*l == r){ + *l = r->aux; + if(*l == nil) + c->ewq = l; + respond(r, "flushed"); + return; + } + } +} + +void +dataread(Req *r, Client *c) +{ + queuerdreq(c, r); + matchmsgs(c); +} + +static void +readthread(void *a) +{ + uchar *buf; + int n; + Client *c; + Ioproc *io; + Msg *m; + char tmp[32]; + + c = a; + snprint(tmp, sizeof tmp, "read%d", c->num); + threadsetname(tmp); + + buf = emalloc(8192); + io = c->readerproc; + while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){ + m = emalloc(sizeof(Msg)+n); + m->rp = (uchar*)&m[1]; + m->ep = m->rp + n; + if(n) + memmove(m->rp, buf, n); + queuemsg(c, m); + matchmsgs(c); + } + queuemsg(c, Zmsg); + free(buf); + die(c); +} + +static void +kickwriter(Client *c) +{ + nbsendp(c->writerkick, nil); +} + +void +clientflush(Req *or, Client *c) +{ + if(or->ifcall.type == Tread) + findrdreq(c, or); + else{ + if(c->execreq == or){ + c->execreq = nil; + iointerrupt(c->writerproc); + } + findwrreq(c, or); + if(c->curw == or){ + c->curw = nil; + iointerrupt(c->writerproc); + kickwriter(c); + } + } +} + +void +datawrite(Req *r, Client *c) +{ + queuewrreq(c, r); + kickwriter(c); +} + +static void +writethread(void *a) +{ + char e[ERRMAX]; + uchar *buf; + int n; + Ioproc *io; + Req *r; + Client *c; + char tmp[32]; + + c = a; + snprint(tmp, sizeof tmp, "write%d", c->num); + threadsetname(tmp); + + buf = emalloc(8192); + io = c->writerproc; + for(;;){ + while(c->wq == nil){ + if(c->moribund) + goto Out; + recvp(c->writerkick); + if(c->moribund) + goto Out; + } + r = c->wq; + c->wq = r->aux; + c->curw = r; + n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count); + if(chatty9p) + fprint(2, "io->write returns %d\n", n); + if(n >= 0){ + r->ofcall.count = n; + respond(r, nil); + }else{ + rerrstr(e, sizeof e); + respond(r, e); + } + } +Out: + free(buf); + die(c); +} + +static void +execproc(void *a) +{ + int i, fd; + Client *c; + char tmp[32]; + + c = a; + snprint(tmp, sizeof tmp, "execproc%d", c->num); + threadsetname(tmp); + if(pipe(c->fd) < 0){ + rerrstr(c->err, sizeof c->err); + sendul(c->execpid, -1); + return; + } + rfork(RFFDG); + fd = c->fd[1]; + close(c->fd[0]); + dup(fd, 0); + dup(fd, 1); + for(i=3; i<100; i++) /* should do better */ + close(i); + strcpy(c->err, "exec failed"); + procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil); +} + +static void +execthread(void *a) +{ + Client *c; + int p; + char tmp[32]; + + c = a; + snprint(tmp, sizeof tmp, "exec%d", c->num); + threadsetname(tmp); + c->execpid = chancreate(sizeof(ulong), 0); + proccreate(execproc, c, STACK); + p = recvul(c->execpid); + chanfree(c->execpid); + c->execpid = nil; + close(c->fd[1]); + c->fd[1] = c->fd[0]; + if(p != -1){ + c->pid = p; + c->activethread = 2; + threadcreate(readthread, c, STACK); + threadcreate(writethread, c, STACK); + if(c->execreq) + respond(c->execreq, nil); + }else{ + if(c->execreq) + respond(c->execreq, c->err); + } +} + +void +ctlwrite(Req *r, Client *c) +{ + char *f[3], *s, *p; + int nf; + + s = emalloc(r->ifcall.count+1); + memmove(s, r->ifcall.data, r->ifcall.count); + s[r->ifcall.count] = '\0'; + + f[0] = s; + p = strchr(s, ' '); + if(p == nil) + nf = 1; + else{ + *p++ = '\0'; + f[1] = p; + nf = 2; + } + + if(f[0][0] == '\0'){ + free(s); + respond(r, nil); + return; + } + + r->ofcall.count = r->ifcall.count; + if(strcmp(f[0], "hangup") == 0){ + if(c->pid == 0){ + respond(r, "connection already hung up"); + goto Out; + } + postnote(PNPROC, c->pid, "kill"); + respond(r, nil); + goto Out; + } + + if(strcmp(f[0], "connect") == 0){ + if(c->cmd != nocmd){ + respond(r, "already have connection"); + goto Out; + } + if(nf == 1){ + respond(r, "need argument to connect"); + goto Out; + } + c->status = Exec; + if(p = strrchr(f[1], '!')) + *p = '\0'; + c->cmd = emalloc(4+1+strlen(f[1])+1); + strcpy(c->cmd, "exec "); + strcat(c->cmd, f[1]); + c->execreq = r; + threadcreate(execthread, c, STACK); + goto Out; + } + + respond(r, "bad or inappropriate control message"); +Out: + free(s); +} |