summaryrefslogtreecommitdiff
path: root/sys/src/cmd/unix/drawterm/kern/qio.c
diff options
context:
space:
mode:
authorcinap_lenrek <cinap_lenrek@felloff.net>2013-11-23 01:05:33 +0100
committercinap_lenrek <cinap_lenrek@felloff.net>2013-11-23 01:05:33 +0100
commit2f9ae0f8ac8610e13ced184847b57b87fe5db580 (patch)
treef9ad2223d518585a2cfe9ea1c73e1e37d07bf637 /sys/src/cmd/unix/drawterm/kern/qio.c
parentea5797c0731203c09ec5fb7172e77eab2750f1a9 (diff)
removing (outdated) drawterm
drawterm is much better maintained by russ cox, so removing this outdated copy. for a more recent version, go to: http://swtch.com/drawterm/
Diffstat (limited to 'sys/src/cmd/unix/drawterm/kern/qio.c')
-rw-r--r--sys/src/cmd/unix/drawterm/kern/qio.c1524
1 files changed, 0 insertions, 1524 deletions
diff --git a/sys/src/cmd/unix/drawterm/kern/qio.c b/sys/src/cmd/unix/drawterm/kern/qio.c
deleted file mode 100644
index edee200be..000000000
--- a/sys/src/cmd/unix/drawterm/kern/qio.c
+++ /dev/null
@@ -1,1524 +0,0 @@
-#include "u.h"
-#include "lib.h"
-#include "dat.h"
-#include "fns.h"
-#include "error.h"
-
-static ulong padblockcnt;
-static ulong concatblockcnt;
-static ulong pullupblockcnt;
-static ulong copyblockcnt;
-static ulong consumecnt;
-static ulong producecnt;
-static ulong qcopycnt;
-
-static int debugging;
-
-#define QDEBUG if(0)
-
-/*
- * IO queues
- */
-struct Queue
-{
- Lock lk;
-
- Block* bfirst; /* buffer */
- Block* blast;
-
- int len; /* bytes allocated to queue */
- int dlen; /* data bytes in queue */
- int limit; /* max bytes in queue */
- int inilim; /* initial limit */
- int state;
- int noblock; /* true if writes return immediately when q full */
- int eof; /* number of eofs read by user */
-
- void (*kick)(void*); /* restart output */
- void (*bypass)(void*, Block*); /* bypass queue altogether */
- void* arg; /* argument to kick */
-
- QLock rlock; /* mutex for reading processes */
- Rendez rr; /* process waiting to read */
- QLock wlock; /* mutex for writing processes */
- Rendez wr; /* process waiting to write */
-
- char err[ERRMAX];
-};
-
-enum
-{
- Maxatomic = 64*1024,
-};
-
-uint qiomaxatomic = Maxatomic;
-
-void
-ixsummary(void)
-{
- debugging ^= 1;
- iallocsummary();
- print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
- padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
- print("consume %lud, produce %lud, qcopy %lud\n",
- consumecnt, producecnt, qcopycnt);
-}
-
-/*
- * free a list of blocks
- */
-void
-freeblist(Block *b)
-{
- Block *next;
-
- for(; b != 0; b = next){
- next = b->next;
- b->next = 0;
- freeb(b);
- }
-}
-
-/*
- * pad a block to the front (or the back if size is negative)
- */
-Block*
-padblock(Block *bp, int size)
-{
- int n;
- Block *nbp;
-
- QDEBUG checkb(bp, "padblock 1");
- if(size >= 0){
- if(bp->rp - bp->base >= size){
- bp->rp -= size;
- return bp;
- }
-
- if(bp->next)
- panic("padblock 0x%p", getcallerpc(&bp));
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- nbp->rp += size;
- nbp->wp = nbp->rp;
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- nbp->rp -= size;
- } else {
- size = -size;
-
- if(bp->next)
- panic("padblock 0x%p", getcallerpc(&bp));
-
- if(bp->lim - bp->wp >= size)
- return bp;
-
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- }
- QDEBUG checkb(nbp, "padblock 1");
- return nbp;
-}
-
-/*
- * return count of bytes in a string of blocks
- */
-int
-blocklen(Block *bp)
-{
- int len;
-
- len = 0;
- while(bp) {
- len += BLEN(bp);
- bp = bp->next;
- }
- return len;
-}
-
-/*
- * return count of space in blocks
- */
-int
-blockalloclen(Block *bp)
-{
- int len;
-
- len = 0;
- while(bp) {
- len += BALLOC(bp);
- bp = bp->next;
- }
- return len;
-}
-
-/*
- * copy the string of blocks into
- * a single block and free the string
- */
-Block*
-concatblock(Block *bp)
-{
- int len;
- Block *nb, *f;
-
- if(bp->next == 0)
- return bp;
-
- nb = allocb(blocklen(bp));
- for(f = bp; f; f = f->next) {
- len = BLEN(f);
- memmove(nb->wp, f->rp, len);
- nb->wp += len;
- }
- concatblockcnt += BLEN(nb);
- freeblist(bp);
- QDEBUG checkb(nb, "concatblock 1");
- return nb;
-}
-
-/*
- * make sure the first block has at least n bytes
- */
-Block*
-pullupblock(Block *bp, int n)
-{
- int i;
- Block *nbp;
-
- /*
- * this should almost always be true, it's
- * just to avoid every caller checking.
- */
- if(BLEN(bp) >= n)
- return bp;
-
- /*
- * if not enough room in the first block,
- * add another to the front of the list.
- */
- if(bp->lim - bp->rp < n){
- nbp = allocb(n);
- nbp->next = bp;
- bp = nbp;
- }
-
- /*
- * copy bytes from the trailing blocks into the first
- */
- n -= BLEN(bp);
- while((nbp = bp->next)){
- i = BLEN(nbp);
- if(i > n) {
- memmove(bp->wp, nbp->rp, n);
- pullupblockcnt++;
- bp->wp += n;
- nbp->rp += n;
- QDEBUG checkb(bp, "pullupblock 1");
- return bp;
- } else {
- /* shouldn't happen but why crash if it does */
- if(i < 0){
- print("pullup negative length packet\n");
- i = 0;
- }
- memmove(bp->wp, nbp->rp, i);
- pullupblockcnt++;
- bp->wp += i;
- bp->next = nbp->next;
- nbp->next = 0;
- freeb(nbp);
- n -= i;
- if(n == 0){
- QDEBUG checkb(bp, "pullupblock 2");
- return bp;
- }
- }
- }
- freeb(bp);
- return 0;
-}
-
-/*
- * make sure the first block has at least n bytes
- */
-Block*
-pullupqueue(Queue *q, int n)
-{
- Block *b;
-
- if(BLEN(q->bfirst) >= n)
- return q->bfirst;
- q->bfirst = pullupblock(q->bfirst, n);
- for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
- ;
- q->blast = b;
- return q->bfirst;
-}
-
-/*
- * trim to len bytes starting at offset
- */
-Block *
-trimblock(Block *bp, int offset, int len)
-{
- ulong l;
- Block *nb, *startb;
-
- QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
- freeblist(bp);
- return nil;
- }
-
- while((l = BLEN(bp)) < offset) {
- offset -= l;
- nb = bp->next;
- bp->next = nil;
- freeb(bp);
- bp = nb;
- }
-
- startb = bp;
- bp->rp += offset;
-
- while((l = BLEN(bp)) < len) {
- len -= l;
- bp = bp->next;
- }
-
- bp->wp -= (BLEN(bp) - len);
-
- if(bp->next) {
- freeblist(bp->next);
- bp->next = nil;
- }
-
- return startb;
-}
-
-/*
- * copy 'count' bytes into a new block
- */
-Block*
-copyblock(Block *bp, int count)
-{
- int l;
- Block *nbp;
-
- QDEBUG checkb(bp, "copyblock 0");
- nbp = allocb(count);
- for(; count > 0 && bp != 0; bp = bp->next){
- l = BLEN(bp);
- if(l > count)
- l = count;
- memmove(nbp->wp, bp->rp, l);
- nbp->wp += l;
- count -= l;
- }
- if(count > 0){
- memset(nbp->wp, 0, count);
- nbp->wp += count;
- }
- copyblockcnt++;
- QDEBUG checkb(nbp, "copyblock 1");
-
- return nbp;
-}
-
-Block*
-adjustblock(Block* bp, int len)
-{
- int n;
- Block *nbp;
-
- if(len < 0){
- freeb(bp);
- return nil;
- }
-
- if(bp->rp+len > bp->lim){
- nbp = copyblock(bp, len);
- freeblist(bp);
- QDEBUG checkb(nbp, "adjustblock 1");
-
- return nbp;
- }
-
- n = BLEN(bp);
- if(len > n)
- memset(bp->wp, 0, len-n);
- bp->wp = bp->rp+len;
- QDEBUG checkb(bp, "adjustblock 2");
-
- return bp;
-}
-
-
-/*
- * throw away up to count bytes from a
- * list of blocks. Return count of bytes
- * thrown away.
- */
-int
-pullblock(Block **bph, int count)
-{
- Block *bp;
- int n, bytes;
-
- bytes = 0;
- if(bph == nil)
- return 0;
-
- while(*bph != nil && count != 0) {
- bp = *bph;
- n = BLEN(bp);
- if(count < n)
- n = count;
- bytes += n;
- count -= n;
- bp->rp += n;
- QDEBUG checkb(bp, "pullblock ");
- if(BLEN(bp) == 0) {
- *bph = bp->next;
- bp->next = nil;
- freeb(bp);
- }
- }
- return bytes;
-}
-
-/*
- * get next block from a queue, return null if nothing there
- */
-Block*
-qget(Queue *q)
-{
- int dowakeup;
- Block *b;
-
- /* sync with qwrite */
- ilock(&q->lk);
-
- b = q->bfirst;
- if(b == nil){
- q->state |= Qstarve;
- iunlock(&q->lk);
- return nil;
- }
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- QDEBUG checkb(b, "qget");
-
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
-
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->wr);
-
- return b;
-}
-
-/*
- * throw away the next 'len' bytes in the queue
- */
-int
-qdiscard(Queue *q, int len)
-{
- Block *b;
- int dowakeup, n, sofar;
-
- ilock(&q->lk);
- for(sofar = 0; sofar < len; sofar += n){
- b = q->bfirst;
- if(b == nil)
- break;
- QDEBUG checkb(b, "qdiscard");
- n = BLEN(b);
- if(n <= len - sofar){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- freeb(b);
- } else {
- n = len - sofar;
- b->rp += n;
- q->dlen -= n;
- }
- }
-
- /*
- * if writer flow controlled, restart
- *
- * This used to be
- * q->len < q->limit/2
- * but it slows down tcp too much for certain write sizes.
- * I really don't understand it completely. It may be
- * due to the queue draining so fast that the transmission
- * stalls waiting for the app to produce more data. - presotto
- */
- if((q->state & Qflow) && q->len < q->limit){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
-
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->wr);
-
- return sofar;
-}
-
-/*
- * Interrupt level copy out of a queue, return # bytes copied.
- */
-int
-qconsume(Queue *q, void *vp, int len)
-{
- Block *b;
- int n, dowakeup;
- uchar *p = vp;
- Block *tofree = nil;
-
- /* sync with qwrite */
- ilock(&q->lk);
-
- for(;;) {
- b = q->bfirst;
- if(b == 0){
- q->state |= Qstarve;
- iunlock(&q->lk);
- return -1;
- }
- QDEBUG checkb(b, "qconsume 1");
-
- n = BLEN(b);
- if(n > 0)
- break;
- q->bfirst = b->next;
- q->len -= BALLOC(b);
-
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- };
-
- if(n < len)
- len = n;
- memmove(p, b->rp, len);
- consumecnt += n;
- b->rp += len;
- q->dlen -= len;
-
- /* discard the block if we're done with it */
- if((q->state & Qmsg) || len == n){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
-
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- }
-
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
-
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->wr);
-
- if(tofree != nil)
- freeblist(tofree);
-
- return len;
-}
-
-int
-qpass(Queue *q, Block *b)
-{
- int dlen, len, dowakeup;
-
- /* sync with qread */
- dowakeup = 0;
- ilock(&q->lk);
- if(q->len >= q->limit){
- freeblist(b);
- iunlock(&q->lk);
- return -1;
- }
- if(q->state & Qclosed){
- freeblist(b);
- iunlock(&q->lk);
- return BALLOC(b);
- }
-
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
-
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
- return len;
-}
-
-int
-qpassnolim(Queue *q, Block *b)
-{
- int dlen, len, dowakeup;
-
- /* sync with qread */
- dowakeup = 0;
- ilock(&q->lk);
-
- if(q->state & Qclosed){
- freeblist(b);
- iunlock(&q->lk);
- return BALLOC(b);
- }
-
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
-
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
- return len;
-}
-
-/*
- * if the allocated space is way out of line with the used
- * space, reallocate to a smaller block
- */
-Block*
-packblock(Block *bp)
-{
- Block **l, *nbp;
- int n;
-
- for(l = &bp; *l; l = &(*l)->next){
- nbp = *l;
- n = BLEN(nbp);
- if((n<<2) < BALLOC(nbp)){
- *l = allocb(n);
- memmove((*l)->wp, nbp->rp, n);
- (*l)->wp += n;
- (*l)->next = nbp->next;
- freeb(nbp);
- }
- }
-
- return bp;
-}
-
-int
-qproduce(Queue *q, void *vp, int len)
-{
- Block *b;
- int dowakeup;
- uchar *p = vp;
-
- /* sync with qread */
- dowakeup = 0;
- ilock(&q->lk);
-
- /* no waiting receivers, room in buffer? */
- if(q->len >= q->limit){
- q->state |= Qflow;
- iunlock(&q->lk);
- return -1;
- }
-
- /* save in buffer */
- b = iallocb(len);
- if(b == 0){
- iunlock(&q->lk);
- return 0;
- }
- memmove(b->wp, p, len);
- producecnt += len;
- b->wp += len;
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- /* b->next = 0; done by iallocb() */
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- QDEBUG checkb(b, "qproduce");
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
-
- if(q->len >= q->limit)
- q->state |= Qflow;
- iunlock(&q->lk);
-
- if(dowakeup)
- wakeup(&q->rr);
-
- return len;
-}
-
-/*
- * copy from offset in the queue
- */
-Block*
-qcopy(Queue *q, int len, ulong offset)
-{
- int sofar;
- int n;
- Block *b, *nb;
- uchar *p;
-
- nb = allocb(len);
-
- ilock(&q->lk);
-
- /* go to offset */
- b = q->bfirst;
- for(sofar = 0; ; sofar += n){
- if(b == nil){
- iunlock(&q->lk);
- return nb;
- }
- n = BLEN(b);
- if(sofar + n > offset){
- p = b->rp + offset - sofar;
- n -= offset - sofar;
- break;
- }
- QDEBUG checkb(b, "qcopy");
- b = b->next;
- }
-
- /* copy bytes from there */
- for(sofar = 0; sofar < len;){
- if(n > len - sofar)
- n = len - sofar;
- memmove(nb->wp, p, n);
- qcopycnt += n;
- sofar += n;
- nb->wp += n;
- b = b->next;
- if(b == nil)
- break;
- n = BLEN(b);
- p = b->rp;
- }
- iunlock(&q->lk);
-
- return nb;
-}
-
-/*
- * called by non-interrupt code
- */
-Queue*
-qopen(int limit, int msg, void (*kick)(void*), void *arg)
-{
- Queue *q;
-
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
-
- q->limit = q->inilim = limit;
- q->kick = kick;
- q->arg = arg;
- q->state = msg;
-
- q->state |= Qstarve;
- q->eof = 0;
- q->noblock = 0;
-
- return q;
-}
-
-/* open a queue to be bypassed */
-Queue*
-qbypass(void (*bypass)(void*, Block*), void *arg)
-{
- Queue *q;
-
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
-
- q->limit = 0;
- q->arg = arg;
- q->bypass = bypass;
- q->state = 0;
-
- return q;
-}
-
-static int
-notempty(void *a)
-{
- Queue *q = a;
-
- return (q->state & Qclosed) || q->bfirst != 0;
-}
-
-/*
- * wait for the queue to be non-empty or closed.
- * called with q ilocked.
- */
-static int
-qwait(Queue *q)
-{
- /* wait for data */
- for(;;){
- if(q->bfirst != nil)
- break;
-
- if(q->state & Qclosed){
- if(++q->eof > 3)
- return -1;
- if(*q->err && strcmp(q->err, Ehungup) != 0)
- return -1;
- return 0;
- }
-
- q->state |= Qstarve; /* flag requesting producer to wake me */
- iunlock(&q->lk);
- sleep(&q->rr, notempty, q);
- ilock(&q->lk);
- }
- return 1;
-}
-
-/*
- * add a block list to a queue
- */
-void
-qaddlist(Queue *q, Block *b)
-{
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->len += blockalloclen(b);
- q->dlen += blocklen(b);
- while(b->next)
- b = b->next;
- q->blast = b;
-}
-
-/*
- * called with q ilocked
- */
-Block*
-qremove(Queue *q)
-{
- Block *b;
-
- b = q->bfirst;
- if(b == nil)
- return nil;
- q->bfirst = b->next;
- b->next = nil;
- q->dlen -= BLEN(b);
- q->len -= BALLOC(b);
- QDEBUG checkb(b, "qremove");
- return b;
-}
-
-/*
- * copy the contents of a string of blocks into
- * memory. emptied blocks are freed. return
- * pointer to first unconsumed block.
- */
-Block*
-bl2mem(uchar *p, Block *b, int n)
-{
- int i;
- Block *next;
-
- for(; b != nil; b = next){
- i = BLEN(b);
- if(i > n){
- memmove(p, b->rp, n);
- b->rp += n;
- return b;
- }
- memmove(p, b->rp, i);
- n -= i;
- p += i;
- b->rp += i;
- next = b->next;
- freeb(b);
- }
- return nil;
-}
-
-/*
- * copy the contents of memory into a string of blocks.
- * return nil on error.
- */
-Block*
-mem2bl(uchar *p, int len)
-{
- int n;
- Block *b, *first, **l;
-
- first = nil;
- l = &first;
- if(waserror()){
- freeblist(first);
- nexterror();
- }
- do {
- n = len;
- if(n > Maxatomic)
- n = Maxatomic;
-
- *l = b = allocb(n);
- /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
- memmove(b->wp, p, n);
- b->wp += n;
- p += n;
- len -= n;
- l = &b->next;
- } while(len > 0);
- poperror();
-
- return first;
-}
-
-/*
- * put a block back to the front of the queue
- * called with q ilocked
- */
-void
-qputback(Queue *q, Block *b)
-{
- b->next = q->bfirst;
- if(q->bfirst == nil)
- q->blast = b;
- q->bfirst = b;
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
-}
-
-/*
- * flow control, get producer going again
- * called with q ilocked
- */
-static void
-qwakeup_iunlock(Queue *q)
-{
- int dowakeup = 0;
-
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- }
-
- iunlock(&q->lk);
-
- /* wakeup flow controlled writers */
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->wr);
- }
-}
-
-/*
- * get next block from a queue (up to a limit)
- */
-Block*
-qbread(Queue *q, int len)
-{
- Block *b, *nb;
- int n;
-
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
-
- ilock(&q->lk);
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(&q->lk);
- qunlock(&q->rlock);
- poperror();
- return nil;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(&q->lk);
- error(q->err);
- }
-
- /* if we get here, there's at least one block in the queue */
- b = qremove(q);
- n = BLEN(b);
-
- /* split block if it's too big and this is not a message queue */
- nb = b;
- if(n > len){
- if((q->state&Qmsg) == 0){
- n -= len;
- b = allocb(n);
- memmove(b->wp, nb->rp+len, n);
- b->wp += n;
- qputback(q, b);
- }
- nb->wp = nb->rp + len;
- }
-
- /* restart producer */
- qwakeup_iunlock(q);
-
- poperror();
- qunlock(&q->rlock);
- return nb;
-}
-
-/*
- * read a queue. if no data is queued, post a Block
- * and wait on its Rendez.
- */
-long
-qread(Queue *q, void *vp, int len)
-{
- Block *b, *first, **l;
- int m, n;
-
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
-
- ilock(&q->lk);
-again:
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(&q->lk);
- qunlock(&q->rlock);
- poperror();
- return 0;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(&q->lk);
- error(q->err);
- }
-
- /* if we get here, there's at least one block in the queue */
- if(q->state & Qcoalesce){
- /* when coalescing, 0 length blocks just go away */
- b = q->bfirst;
- if(BLEN(b) <= 0){
- freeb(qremove(q));
- goto again;
- }
-
- /* grab the first block plus as many
- * following blocks as will completely
- * fit in the read.
- */
- n = 0;
- l = &first;
- m = BLEN(b);
- for(;;) {
- *l = qremove(q);
- l = &b->next;
- n += m;
-
- b = q->bfirst;
- if(b == nil)
- break;
- m = BLEN(b);
- if(n+m > len)
- break;
- }
- } else {
- first = qremove(q);
- n = BLEN(first);
- }
-
- /* copy to user space outside of the ilock */
- iunlock(&q->lk);
- b = bl2mem(vp, first, len);
- ilock(&q->lk);
-
- /* take care of any left over partial block */
- if(b != nil){
- n -= BLEN(b);
- if(q->state & Qmsg)
- freeb(b);
- else
- qputback(q, b);
- }
-
- /* restart producer */
- qwakeup_iunlock(q);
-
- poperror();
- qunlock(&q->rlock);
- return n;
-}
-
-static int
-qnotfull(void *a)
-{
- Queue *q = a;
-
- return q->len < q->limit || (q->state & Qclosed);
-}
-
-ulong noblockcnt;
-
-/*
- * add a block to a queue obeying flow control
- */
-long
-qbwrite(Queue *q, Block *b)
-{
- int n, dowakeup;
-
- n = BLEN(b);
-
- if(q->bypass){
- (*q->bypass)(q->arg, b);
- return n;
- }
-
- dowakeup = 0;
- qlock(&q->wlock);
- if(waserror()){
- if(b != nil)
- freeb(b);
- qunlock(&q->wlock);
- nexterror();
- }
-
- ilock(&q->lk);
-
- /* give up if the queue is closed */
- if(q->state & Qclosed){
- iunlock(&q->lk);
- error(q->err);
- }
-
- /* if nonblocking, don't queue over the limit */
- if(q->len >= q->limit){
- if(q->noblock){
- iunlock(&q->lk);
- freeb(b);
- noblockcnt += n;
- qunlock(&q->wlock);
- poperror();
- return n;
- }
- }
-
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- b->next = 0;
- q->len += BALLOC(b);
- q->dlen += n;
- QDEBUG checkb(b, "qbwrite");
- b = nil;
-
- /* make sure other end gets awakened */
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(&q->lk);
-
- /* get output going again */
- if(q->kick && (dowakeup || (q->state&Qkick)))
- q->kick(q->arg);
-
- /* wakeup anyone consuming at the other end */
- if(dowakeup){
- wakeup(&q->rr);
-
- /* if we just wokeup a higher priority process, let it run */
- /*
- p = wakeup(&q->rr);
- if(p != nil && p->priority > up->priority)
- sched();
- */
- }
-
- /*
- * flow control, wait for queue to get below the limit
- * before allowing the process to continue and queue
- * more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This
- * means that things like 9p flushes and ssl messages
- * will not be disrupted by software interrupts.
- *
- * Note - this is moderately dangerous since a process
- * that keeps getting interrupted and rewriting will
- * queue infinite crud.
- */
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
-
- ilock(&q->lk);
- q->state |= Qflow;
- iunlock(&q->lk);
- sleep(&q->wr, qnotfull, q);
- }
- USED(b);
-
- qunlock(&q->wlock);
- poperror();
- return n;
-}
-
-/*
- * write to a queue. only Maxatomic bytes at a time is atomic.
- */
-int
-qwrite(Queue *q, void *vp, int len)
-{
- int n, sofar;
- Block *b;
- uchar *p = vp;
-
- QDEBUG if(!islo())
- print("qwrite hi %p\n", getcallerpc(&q));
-
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
-
- b = allocb(n);
- /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
- if(waserror()){
- freeb(b);
- nexterror();
- }
- memmove(b->wp, p+sofar, n);
- poperror();
- b->wp += n;
-
- qbwrite(q, b);
-
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
-
- return len;
-}
-
-/*
- * used by print() to write to a queue. Since we may be splhi or not in
- * a process, don't qlock.
- */
-int
-qiwrite(Queue *q, void *vp, int len)
-{
- int n, sofar, dowakeup;
- Block *b;
- uchar *p = vp;
-
- dowakeup = 0;
-
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
-
- b = iallocb(n);
- if(b == nil)
- break;
- memmove(b->wp, p+sofar, n);
- b->wp += n;
-
- ilock(&q->lk);
-
- QDEBUG checkb(b, "qiwrite");
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- q->len += BALLOC(b);
- q->dlen += n;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
-
- iunlock(&q->lk);
-
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->rr);
- }
-
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
-
- return sofar;
-}
-
-/*
- * be extremely careful when calling this,
- * as there is no reference accounting
- */
-void
-qfree(Queue *q)
-{
- qclose(q);
- free(q);
-}
-
-/*
- * Mark a queue as closed. No further IO is permitted.
- * All blocks are released.
- */
-void
-qclose(Queue *q)
-{
- Block *bfirst;
-
- if(q == nil)
- return;
-
- /* mark it */
- ilock(&q->lk);
- q->state |= Qclosed;
- q->state &= ~(Qflow|Qstarve);
- strcpy(q->err, Ehungup);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- q->noblock = 0;
- iunlock(&q->lk);
-
- /* free queued blocks */
- freeblist(bfirst);
-
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
-}
-
-/*
- * Mark a queue as closed. Wakeup any readers. Don't remove queued
- * blocks.
- */
-void
-qhangup(Queue *q, char *msg)
-{
- /* mark it */
- ilock(&q->lk);
- q->state |= Qclosed;
- if(msg == 0 || *msg == 0)
- strcpy(q->err, Ehungup);
- else
- strncpy(q->err, msg, ERRMAX-1);
- iunlock(&q->lk);
-
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
-}
-
-/*
- * return non-zero if the q is hungup
- */
-int
-qisclosed(Queue *q)
-{
- return q->state & Qclosed;
-}
-
-/*
- * mark a queue as no longer hung up
- */
-void
-qreopen(Queue *q)
-{
- ilock(&q->lk);
- q->state &= ~Qclosed;
- q->state |= Qstarve;
- q->eof = 0;
- q->limit = q->inilim;
- iunlock(&q->lk);
-}
-
-/*
- * return bytes queued
- */
-int
-qlen(Queue *q)
-{
- return q->dlen;
-}
-
-/*
- * return space remaining before flow control
- */
-int
-qwindow(Queue *q)
-{
- int l;
-
- l = q->limit - q->len;
- if(l < 0)
- l = 0;
- return l;
-}
-
-/*
- * return true if we can read without blocking
- */
-int
-qcanread(Queue *q)
-{
- return q->bfirst!=0;
-}
-
-/*
- * change queue limit
- */
-void
-qsetlimit(Queue *q, int limit)
-{
- q->limit = limit;
-}
-
-/*
- * set blocking/nonblocking
- */
-void
-qnoblock(Queue *q, int onoff)
-{
- q->noblock = onoff;
-}
-
-/*
- * flush the output queue
- */
-void
-qflush(Queue *q)
-{
- Block *bfirst;
-
- /* mark it */
- ilock(&q->lk);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- iunlock(&q->lk);
-
- /* free queued blocks */
- freeblist(bfirst);
-
- /* wake up readers/writers */
- wakeup(&q->wr);
-}
-
-int
-qfull(Queue *q)
-{
- return q->state & Qflow;
-}
-
-int
-qstate(Queue *q)
-{
- return q->state;
-}