summaryrefslogtreecommitdiff
path: root/sys/src/ape/lib/ap/plan9/_buf.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/ape/lib/ap/plan9/_buf.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/ape/lib/ap/plan9/_buf.c')
-rwxr-xr-xsys/src/ape/lib/ap/plan9/_buf.c472
1 files changed, 472 insertions, 0 deletions
diff --git a/sys/src/ape/lib/ap/plan9/_buf.c b/sys/src/ape/lib/ap/plan9/_buf.c
new file mode 100755
index 000000000..21fe282bb
--- /dev/null
+++ b/sys/src/ape/lib/ap/plan9/_buf.c
@@ -0,0 +1,472 @@
+#define _BSDTIME_EXTENSION
+#define _LOCK_EXTENSION
+#include "lib.h"
+#include <stdlib.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+#include <lock.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <unistd.h>
+#include "sys9.h"
+
+typedef struct Muxseg {
+ Lock lock; /* for mutual exclusion access to buffer variables */
+ int curfds; /* number of fds currently buffered */
+ int selwait; /* true if selecting process is waiting */
+ int waittime; /* time for timer process to wait */
+ fd_set rwant; /* fd's that select wants to read */
+ fd_set ewant; /* fd's that select wants to know eof info on */
+ Muxbuf bufs[INITBUFS]; /* can grow, via segbrk() */
+} Muxseg;
+
+#define MUXADDR ((void*)0x6000000)
+static Muxseg *mux = 0; /* shared memory segment */
+
+/* _muxsid and _killmuxsid are known in libbsd's listen.c */
+int _muxsid = -1; /* group id of copy processes */
+static int _mainpid = -1;
+static int timerpid = -1; /* pid of a timer process */
+
+void _killmuxsid(void);
+static void _copyproc(int, Muxbuf*);
+static void _timerproc(void);
+static void _resettimer(void);
+
+static int copynotehandler(void *, char *);
+
+/* assume FD_SETSIZE is 96 */
+#define FD_ANYSET(p) ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
+
+/*
+ * Start making fd read-buffered: make the shared segment, if necessary,
+ * allocate a slot (index into mux->bufs), and fork a child to read the fd
+ * and write into the slot-indexed buffer.
+ * Return -1 if we can't do it.
+ */
+int
+_startbuf(int fd)
+{
+ long i, n, slot;
+ int pid, sid;
+ Fdinfo *f;
+ Muxbuf *b;
+
+ if(mux == 0){
+ _RFORK(RFREND);
+ mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
+ if((long)mux == -1){
+ _syserrno();
+ return -1;
+ }
+ /* segattach has returned zeroed memory */
+ atexit(_killmuxsid);
+ }
+
+ if(fd == -1)
+ return 0;
+
+ lock(&mux->lock);
+ slot = mux->curfds++;
+ if(mux->curfds > INITBUFS) {
+ if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
+ _syserrno();
+ unlock(&mux->lock);
+ return -1;
+ }
+ }
+
+ f = &_fdinfo[fd];
+ b = &mux->bufs[slot];
+ b->n = 0;
+ b->putnext = b->data;
+ b->getnext = b->data;
+ b->eof = 0;
+ b->fd = fd;
+ if(_mainpid == -1)
+ _mainpid = getpid();
+ if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
+ /* copy process ... */
+ if(_muxsid == -1) {
+ _RFORK(RFNOTEG);
+ _muxsid = getpgrp();
+ } else
+ setpgid(getpid(), _muxsid);
+ _NOTIFY(copynotehandler);
+ for(i=0; i<OPEN_MAX; i++)
+ if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
+ _CLOSE(i);
+ _RENDEZVOUS(0, _muxsid);
+ _copyproc(fd, b);
+ }
+
+ /* parent process continues ... */
+ b->copypid = pid;
+ f->buf = b;
+ f->flags |= FD_BUFFERED;
+ unlock(&mux->lock);
+ _muxsid = _RENDEZVOUS(0, 0);
+ /* leave fd open in parent so system doesn't reuse it */
+ return 0;
+}
+
+/*
+ * The given buffered fd is being closed.
+ * Set the fd field in the shared buffer to -1 to tell copyproc
+ * to exit, and kill the copyproc.
+ */
+void
+_closebuf(int fd)
+{
+ Muxbuf *b;
+
+ b = _fdinfo[fd].buf;
+ if(!b)
+ return;
+ lock(&mux->lock);
+ b->fd = -1;
+ unlock(&mux->lock);
+ kill(b->copypid, SIGKILL);
+}
+
+/* child copy procs execute this until eof */
+static void
+_copyproc(int fd, Muxbuf *b)
+{
+ unsigned char *e;
+ int n;
+ int nzeros;
+
+ e = &b->data[PERFDMAX];
+ for(;;) {
+ /* make sure there's room */
+ lock(&mux->lock);
+ if(e - b->putnext < READMAX) {
+ if(b->getnext == b->putnext) {
+ b->getnext = b->putnext = b->data;
+ unlock(&mux->lock);
+ } else {
+ /* sleep until there's room */
+ b->roomwait = 1;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&b->roomwait, 0);
+ }
+ } else
+ unlock(&mux->lock);
+ /*
+ * A Zero-length _READ might mean a zero-length write
+ * happened, or it might mean eof; try several times to
+ * disambiguate (posix read() discards 0-length messages)
+ */
+ nzeros = 0;
+ do {
+ n = _READ(fd, b->putnext, READMAX);
+ if(b->fd == -1) {
+ _exit(0); /* we've been closed */
+ }
+ } while(n == 0 && ++nzeros < 3);
+ lock(&mux->lock);
+ if(n <= 0) {
+ b->eof = 1;
+ if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
+ mux->selwait = 0;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&mux->selwait, fd);
+ } else if(b->datawait) {
+ b->datawait = 0;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&b->datawait, 0);
+ } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
+ mux->selwait = 0;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&mux->selwait, fd);
+ } else
+ unlock(&mux->lock);
+ _exit(0);
+ } else {
+ b->putnext += n;
+ b->n += n;
+ if(b->n > 0) {
+ /* parent process cannot be both in datawait and selwait */
+ if(b->datawait) {
+ b->datawait = 0;
+ unlock(&mux->lock);
+ /* wake up _bufreading process */
+ _RENDEZVOUS((unsigned long)&b->datawait, 0);
+ } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
+ mux->selwait = 0;
+ unlock(&mux->lock);
+ /* wake up selecting process */
+ _RENDEZVOUS((unsigned long)&mux->selwait, fd);
+ } else
+ unlock(&mux->lock);
+ } else
+ unlock(&mux->lock);
+ }
+ }
+}
+
+/* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
+int
+_readbuf(int fd, void *addr, int nwant, int noblock)
+{
+ Muxbuf *b;
+ int ngot;
+
+ b = _fdinfo[fd].buf;
+ if(b->eof && b->n == 0) {
+goteof:
+ return 0;
+ }
+ if(b->n == 0 && noblock) {
+ errno = EAGAIN;
+ return -1;
+ }
+ /* make sure there's data */
+ lock(&mux->lock);
+ ngot = b->putnext - b->getnext;
+ if(ngot == 0) {
+ /* maybe EOF just happened */
+ if(b->eof) {
+ unlock(&mux->lock);
+ goto goteof;
+ }
+ /* sleep until there's data */
+ b->datawait = 1;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&b->datawait, 0);
+ lock(&mux->lock);
+ ngot = b->putnext - b->getnext;
+ }
+ if(ngot == 0) {
+ unlock(&mux->lock);
+ goto goteof;
+ }
+ if(ngot > nwant)
+ ngot = nwant;
+ memcpy(addr, b->getnext, ngot);
+ b->getnext += ngot;
+ b->n -= ngot;
+ if(b->getnext == b->putnext && b->roomwait) {
+ b->getnext = b->putnext = b->data;
+ b->roomwait = 0;
+ unlock(&mux->lock);
+ /* wake up copy process */
+ _RENDEZVOUS((unsigned long)&b->roomwait, 0);
+ } else
+ unlock(&mux->lock);
+ return ngot;
+}
+
+int
+select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
+{
+ int n, i, tmp, t, slots, fd, err;
+ Fdinfo *f;
+ Muxbuf *b;
+
+ if(timeout)
+ t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
+ else
+ t = -1;
+ if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
+ || (efds && FD_ANYSET(efds)))) {
+ /* no requested fds */
+ if(t > 0)
+ _SLEEP(t);
+ return 0;
+ }
+
+ _startbuf(-1);
+
+ /* make sure all requested rfds and efds are buffered */
+ if(nfds >= OPEN_MAX)
+ nfds = OPEN_MAX;
+ for(i = 0; i < nfds; i++)
+ if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
+ f = &_fdinfo[i];
+ if(!(f->flags&FD_BUFFERED))
+ if(_startbuf(i) != 0) {
+ return -1;
+ }
+ b = f->buf;
+ if(rfds && FD_ISSET(i,rfds) && b->eof && b->n == 0)
+ if(efds == 0 || !FD_ISSET(i,efds)) {
+ errno = EBADF; /* how X tells a client is gone */
+ return -1;
+ }
+ }
+
+ /* check wfds; for now, we'll say they are all ready */
+ n = 0;
+ if(wfds && FD_ANYSET(wfds)){
+ for(i = 0; i<nfds; i++)
+ if(FD_ISSET(i, wfds)) {
+ n++;
+ }
+ }
+
+ lock(&mux->lock);
+
+ slots = mux->curfds;
+ FD_ZERO(&mux->rwant);
+ FD_ZERO(&mux->ewant);
+
+ for(i = 0; i<slots; i++) {
+ b = &mux->bufs[i];
+ fd = b->fd;
+ if(fd == -1)
+ continue;
+ err = 0;
+ if(efds && FD_ISSET(fd, efds)) {
+ if(b->eof && b->n == 0){
+ err = 1;
+ n++;
+ }else{
+ FD_CLR(fd, efds);
+ FD_SET(fd, &mux->ewant);
+ }
+ }
+ if(rfds && FD_ISSET(fd, rfds)) {
+ if(!err && (b->n > 0 || b->eof))
+ n++;
+ else{
+ FD_CLR(fd, rfds);
+ FD_SET(fd, &mux->rwant);
+ }
+ }
+ }
+ if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
+ FD_ZERO(&mux->rwant);
+ FD_ZERO(&mux->ewant);
+ unlock(&mux->lock);
+ return n;
+ }
+
+ if(timeout) {
+ mux->waittime = t;
+ if(timerpid == -1)
+ _timerproc();
+ else
+ _resettimer();
+ }
+ mux->selwait = 1;
+ unlock(&mux->lock);
+ fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
+ if(fd >= 0) {
+ b = _fdinfo[fd].buf;
+ if(FD_ISSET(fd, &mux->rwant)) {
+ FD_SET(fd, rfds);
+ n = 1;
+ } else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
+ FD_SET(fd, efds);
+ n = 1;
+ }
+ }
+ FD_ZERO(&mux->rwant);
+ FD_ZERO(&mux->ewant);
+ return n;
+}
+
+static int timerreset;
+static int timerpid;
+
+static void
+alarmed(int v)
+{
+ timerreset = 1;
+}
+
+/* a little over an hour */
+#define LONGWAIT 4000001
+
+static void
+_killtimerproc(void)
+{
+ if(timerpid > 0)
+ kill(timerpid, SIGKILL);
+}
+
+static void
+_timerproc(void)
+{
+ int i;
+
+ if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
+ /* timer process */
+ setpgid(getpid(), _muxsid);
+ signal(SIGALRM, alarmed);
+ for(i=0; i<OPEN_MAX; i++)
+ _CLOSE(i);
+ _RENDEZVOUS(1, 0);
+ for(;;) {
+ _SLEEP(mux->waittime);
+ if(timerreset) {
+ timerreset = 0;
+ } else {
+ lock(&mux->lock);
+ if(mux->selwait && mux->waittime != LONGWAIT) {
+ mux->selwait = 0;
+ mux->waittime = LONGWAIT;
+ unlock(&mux->lock);
+ _RENDEZVOUS((unsigned long)&mux->selwait, -2);
+ } else {
+ mux->waittime = LONGWAIT;
+ unlock(&mux->lock);
+ }
+ }
+ }
+ }
+ atexit(_killtimerproc);
+ /* parent process continues */
+ _RENDEZVOUS(1, 0);
+}
+
+static void
+_resettimer(void)
+{
+ kill(timerpid, SIGALRM);
+}
+
+void
+_killmuxsid(void)
+{
+ if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
+ kill(-_muxsid,SIGTERM);
+}
+
+/* call this on fork(), because reading a BUFFERED fd won't work in child */
+void
+_detachbuf(void)
+{
+ int i;
+ Fdinfo *f;
+
+ if(mux == 0)
+ return;
+ _SEGDETACH(mux);
+ for(i = 0; i < OPEN_MAX; i++){
+ f = &_fdinfo[i];
+ if(f->flags&FD_BUFFERED)
+ f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
+ /* mark 'poisoned' */
+ }
+ mux = 0;
+ _muxsid = -1;
+ _mainpid = -1;
+ timerpid = -1;
+}
+
+static int
+copynotehandler(void *u, char *msg)
+{
+ int i;
+ void(*f)(int);
+
+ if(_finishing)
+ _finish(0, 0);
+ _NOTED(1);
+}