diff options
author | aiju <devnull@localhost> | 2018-12-08 15:07:53 +0000 |
---|---|---|
committer | aiju <devnull@localhost> | 2018-12-08 15:07:53 +0000 |
commit | 58fa29447b845f91dfc2a6734f525ed47375393b (patch) | |
tree | e2ee80e7728e26bc74e1f667ea48968b11a24fc6 /sys | |
parent | 03e60450c2acc20866867cc5d3649aaed07d0326 (diff) |
dtracy: add support for aggregations
Diffstat (limited to 'sys')
-rw-r--r-- | sys/include/dtracy.h | 61 | ||||
-rw-r--r-- | sys/src/9/port/devdtracy.c | 57 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/act.c | 79 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/agg.c | 199 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/cgen.c | 4 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/dat.h | 18 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/dtracy.c | 43 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/fns.h | 4 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/mkfile | 1 | ||||
-rw-r--r-- | sys/src/cmd/dtracy/parse.y | 7 | ||||
-rw-r--r-- | sys/src/libdtracy/agg.c | 138 | ||||
-rw-r--r-- | sys/src/libdtracy/chan.c | 59 | ||||
-rw-r--r-- | sys/src/libdtracy/mkfile | 1 | ||||
-rw-r--r-- | sys/src/libdtracy/pack.c | 20 | ||||
-rw-r--r-- | sys/src/libdtracy/prog.c | 34 |
15 files changed, 703 insertions, 22 deletions
diff --git a/sys/include/dtracy.h b/sys/include/dtracy.h index f1f335694..208b4a687 100644 --- a/sys/include/dtracy.h +++ b/sys/include/dtracy.h @@ -9,7 +9,14 @@ enum { DTSTRMAX = 256, DTRECMAX = 1024, + + DTMAXAGGBUF = 16, + + DTBUFSZ = 65536, + DTANUMBUCKETS = 1024, + DTABUCKETS = DTBUFSZ - 4 * DTANUMBUCKETS, }; +#define DTANIL ((u32int)-1) typedef struct DTName DTName; typedef struct DTProbe DTProbe; @@ -21,6 +28,7 @@ typedef struct DTEnab DTEnab; typedef struct DTChan DTChan; typedef struct DTExpr DTExpr; typedef struct DTProvider DTProvider; +typedef struct DTAgg DTAgg; typedef struct DTBuf DTBuf; struct DTName { @@ -30,7 +38,7 @@ struct DTName { }; /* - we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID. + we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID called EPID. we could also use probe IDs and action group IDs but using a single 32-bit ID for both is more flexible/efficient. */ struct DTEnab { @@ -123,14 +131,52 @@ struct DTExpr { u32int *b; }; +/* + aggregation buffers are hashtables and use a different record format. + there are DTANUMBUCKETS 4-byte buckets at the end of the buffer. + each entry is (link,id,key,val) with a 4-byte link field for the hash chains and a 4-byte aggregation id. + + the aggregation id actually contains all the data in the DTAgg struct: + 4-bit type + 12-bit keysize in qwords + 16-bit unique id + + the struct is just for kernel convenience +*/ + +enum { + AGGCNT, + AGGSUM, + AGGAVG, + AGGSTD, + AGGMIN, + AGGMAX, +}; + +struct DTAgg { + int id; + u16int keysize; /* in bytes */ + u16int recsize; + uchar type; +}; + /* an action is an expression, plus info about what to do with the result */ struct DTAct { enum { ACTTRACE, /* record the result. size is the number of bytes used. 0 <= size <= 8 */ ACTTRACESTR, /* take the result to be a pointer to a null-terminated string. store it as zero-padded char[size]. */ + /* + ACTAGGKEY and ACTAGGVAL together record a value in an aggregation. + they must occur as a pair and targ must point to an already allocated aggregation buffer. + currently 0 <= size <= 8. + */ + ACTAGGKEY, + ACTAGGVAL, + ACTCANCEL, /* (must be last action) don't write anything into the main buffer. used to avoid pointless records when using aggregations. */ } type; DTExpr *p; int size; + DTAgg agg; }; /* an action group is an optional predicate and a set of actions. */ @@ -144,14 +190,13 @@ struct DTActGr { int reclen; /* record size, including 12-byte header */ }; -/* a clause list probe wildcard expressions and an action group. only used during set-up. */ +/* a clause lists probe wildcard expressions and an action group. only used during set-up. */ struct DTClause { int nprob; char **probs; DTActGr *gr; }; -enum { DTBUFSZ = 65536 }; struct DTBuf { int wr; uchar data[DTBUFSZ]; @@ -170,6 +215,9 @@ struct DTChan { /* we have 2 buffers per cpu, one for writing and one for reading. dtcread() swaps them if empty. */ DTBuf **wrbufs; DTBuf **rdbufs; + /* aggregations use separate buffers */ + DTBuf **aggwrbufs; + DTBuf **aggrdbufs; /* list of enablings. */ DTEnab *enab; @@ -191,7 +239,7 @@ int dtefmt(Fmt *); /* action group functions */ void dtgpack(Fmt *, DTActGr *); char *dtgunpack(char *, DTActGr **); -int dtgverify(DTActGr *); +int dtgverify(DTChan *, DTActGr *); void dtgfree(DTActGr *); /* clause functions */ @@ -205,9 +253,14 @@ void dtcfree(DTChan *); int dtcaddgr(DTChan *, DTName, DTActGr *); int dtcaddcl(DTChan *, DTClause *); int dtcread(DTChan *, void *, int); +int dtcaggread(DTChan *, void *, int); void dtcreset(DTChan *); void dtcrun(DTChan *, int); +/* aggbuf functions */ +int dtaunpackid(DTAgg *); +void dtarecord(DTChan *, int, DTAgg *, uchar *, int, s64int); + extern DTProvider *dtproviders[]; extern int dtnmach; diff --git a/sys/src/9/port/devdtracy.c b/sys/src/9/port/devdtracy.c index e35bb6d7e..a80576895 100644 --- a/sys/src/9/port/devdtracy.c +++ b/sys/src/9/port/devdtracy.c @@ -38,7 +38,7 @@ prog(DTKChan *p, char *s) dtclfree(c); if(rc < 0){ dtcreset(p->ch); - error("failed to add clause"); + error(up->syserrstr); } } } @@ -54,6 +54,7 @@ enum { Qprog, Qbuf, Qepid, + Qaggbuf, }; static Dirtab dtracydir[] = { @@ -61,6 +62,7 @@ static Dirtab dtracydir[] = { "prog", { Qprog, 0, 0 }, 0, 0660, "buf", { Qbuf, 0, 0, }, 0, 0440, "epid", { Qepid, 0, 0 }, 0, 0440, + "aggbuf", { Qaggbuf, 0, 0 }, 0, 0440, }; enum { @@ -270,10 +272,49 @@ epidread(DTKAux *aux, DTChan *c, char *a, long n, vlong off) } static long +lockedread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int)) +{ + long rc; + + if(waserror()){ + qunlock(&dtracylock); + nexterror(); + } + eqlock(&dtracylock); + rc = readf(c, a, n); + qunlock(&dtracylock); + poperror(); + return rc; +} + +static long +handleread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int)) +{ + long rc, m; + int i; + + for(;;){ + rc = lockedread(c, a, n, readf); + if(rc < 0) return -1; + if(rc > 0) break; + tsleep(&up->sleep, return0, 0, 250); + } + m = rc; + for(i = 0; i < 3 && m < n/2; i++){ + tsleep(&up->sleep, return0, 0, 50); + rc = lockedread(c, (uchar *)a + m, n - m, readf); + if(rc < 0) break; + m += rc; + } + return m; +} + +static long dtracyread(Chan *c, void *a, long n, vlong off) { int rc; DTKChan *p; + DTChan *ch; eqlock(&dtracylock); if(waserror()){ @@ -299,9 +340,15 @@ dtracyread(Chan *c, void *a, long n, vlong off) rc = readstr(off, a, n, up->genbuf); break; case Qbuf: - while(rc = dtcread(p->ch, a, n), rc == 0) - tsleep(&up->sleep, return0, 0, 250); - break; + ch = p->ch; + qunlock(&dtracylock); + poperror(); + return handleread(ch, a, n, dtcread); + case Qaggbuf: + ch = p->ch; + qunlock(&dtracylock); + poperror(); + return handleread(ch, a, n, dtcaggread); case Qepid: rc = epidread(c->aux, p->ch, a, n, off); break; @@ -460,8 +507,6 @@ dtgetvar(int v) switch(v){ case DTV_PID: return up != nil ? up->pid : 0; - case DTV_MACHNO: - return m->machno; default: return 0; } diff --git a/sys/src/cmd/dtracy/act.c b/sys/src/cmd/dtracy/act.c index 335d1bd23..063be6cb9 100644 --- a/sys/src/cmd/dtracy/act.c +++ b/sys/src/cmd/dtracy/act.c @@ -55,6 +55,27 @@ addprobe(char *s) clause->probs[clause->nprob++] = strdup(s); } +static char *aggtypes[] = { + [AGGCNT] "count", + [AGGMIN] "min", + [AGGMAX] "max", + [AGGSUM] "sum", + [AGGAVG] "avg", + [AGGSTD] "std", +}; + +int +aggtype(Symbol *s) +{ + int i; + + for(i = 0; i < nelem(aggtypes); i++) + if(strcmp(s->name, aggtypes[i]) == 0) + return i; + error("%s unknown aggregation type", s->name); + return 0; +} + void addstat(int type, ...) { @@ -73,6 +94,19 @@ addstat(int type, ...) case STATPRINT: case STATPRINTF: break; + case STATAGG: + s->agg.name = va_arg(va, Symbol *); + s->agg.key = va_arg(va, Node *); + s->agg.type = aggtype(va_arg(va, Symbol *)); + s->agg.value = va_arg(va, Node *); + if(s->agg.type == AGGCNT){ + if(s->agg.value != nil) + error("too many arguments for count()"); + }else{ + if(s->agg.value == nil) + error("need argument for %s()", aggtypes[s->agg.type]); + } + break; default: sysfatal("addstat: unknown type %d", type); } @@ -158,12 +192,26 @@ prepprintf(Node **arg, int narg, DTActGr *g, int *recoff) (*arg)->str = fmtstrflush(&f); } +int aggid; + +int +allagg(Clause *c) +{ + Stat *s; + + for(s = c->stats; s < c->stats + c->nstats; s++) + if(s->type != STATAGG) + return 0; + return 1; +} + DTClause * mkdtclause(Clause *c) { DTClause *d; Stat *s; int recoff, i; + Node *n; d = emalloc(sizeof(DTClause)); d->nprob = c->nprob; @@ -175,7 +223,7 @@ mkdtclause(Clause *c) for(s = c->stats; s < c->stats + c->nstats; s++) switch(s->type){ case STATEXPR: - actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0}); + actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0, noagg}); break; case STATPRINT: for(i = 0; i < s->narg; i++) @@ -184,7 +232,22 @@ mkdtclause(Clause *c) case STATPRINTF: prepprintf(s->arg, s->narg, d->gr, &recoff); break; + case STATAGG: { + DTAgg agg = {.id = s->agg.type << 28 | 1 << 16 | aggid++}; + assert(dtaunpackid(&agg) >= 0); + aggs = realloc(aggs, sizeof(Agg) * aggid); + memset(&aggs[aggid-1], 0, sizeof(Agg)); + aggs[aggid-1].DTAgg = agg; + aggs[aggid-1].name = strdup(s->agg.name == nil ? "" : s->agg.name->name); + actgradd(d->gr, (DTAct){ACTAGGKEY, codegen(s->agg.key), 8, agg}); + n = s->agg.value; + if(n == nil) n = node(ONUM, 0ULL); + actgradd(d->gr, (DTAct){ACTAGGVAL, codegen(n), 8, agg}); + break; } + } + if(allagg(c)) + actgradd(d->gr, (DTAct){ACTCANCEL, codegen(node(ONUM, 0)), 0, noagg}); return d; } @@ -392,6 +455,7 @@ parseclause(Clause *cl, uchar *p, uchar *e, Enab *en, Biobuf *bp) case STATPRINTF: execprintf(s->arg, s->narg, p, e, en); break; + case STATAGG: break; default: sysfatal("parseclause: unknown type %d", s->type); } @@ -546,6 +610,17 @@ dump(void) print("\t\ttrace string (%d bytes)\n", a->size); dumpexpr(a->p, "\t\t\t"); break; + case ACTAGGKEY: + print("\t\taggregation key (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id); + dumpexpr(a->p, "\t\t\t"); + break; + case ACTAGGVAL: + print("\t\taggregation value (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id); + dumpexpr(a->p, "\t\t\t"); + break; + case ACTCANCEL: + print("\t\tcancel record\n"); + break; default: print("\t\t??? %d\n", a->type); } @@ -564,6 +639,8 @@ dump(void) for(j = 0; j < s->narg; j++) print("\t\t\targ %ε\n", s->arg[j]); break; + case STATAGG: + break; default: print("\t\t??? %d\n", s->type); } diff --git a/sys/src/cmd/dtracy/agg.c b/sys/src/cmd/dtracy/agg.c new file mode 100644 index 000000000..c54636479 --- /dev/null +++ b/sys/src/cmd/dtracy/agg.c @@ -0,0 +1,199 @@ +#include <u.h> +#include <libc.h> +#include <dtracy.h> +#include <bio.h> +#include <avl.h> +#include <mp.h> +#include "dat.h" +#include "fns.h" + +typedef struct ANode ANode; + +struct ANode { + Avl; + s64int val, cnt; + u64int sq[2]; + int keysize; + uchar key[1]; +}; + +Agg *aggs; +static Avltree **trees; +static ANode *key; +int interrupted; + +static int +aggcmp(Avl *ap, Avl *bp) +{ + ANode *a, *b; + + a = (ANode *) ap; + b = (ANode *) bp; + return memcmp(a->key, b->key, a->keysize); +} + +static void +createrecord(int type, ANode *n, s64int *q) +{ + switch(type){ + case AGGCNT: n->cnt = q[0]; break; + case AGGSUM: case AGGMIN: case AGGMAX: n->val = q[0]; break; + case AGGAVG: n->cnt = q[1]; n->val = q[0]; break; + case AGGSTD: n->cnt = q[1]; n->val = q[0]; n->sq[0] = q[2]; n->sq[1] = q[3]; break; + default: abort(); + } +} + +static void +updaterecord(int type, ANode *n, s64int *q) +{ + u64int r; + + switch(type){ + case AGGCNT: n->cnt += q[0]; break; + case AGGSUM: n->val += q[0]; break; + case AGGAVG: n->cnt += q[1]; n->val += q[0]; break; + case AGGSTD: + n->cnt += q[1]; + n->val += q[0]; + r = n->sq[0] + q[2]; + if(r < q[2]) n->sq[1]++; + n->sq[0] = r; + n->sq[1] += q[3]; + break; + default: abort(); + } +} + + +int +aggparsebuf(uchar *p, int n) +{ + uchar *e; + Agg *a; + u32int id; + Avltree *tp; + ANode *np; + + e = p + n; + for(; p + 8 < e; p += a->recsize){ + id = *(u32int*)&p[4]; + if((u16int)id >= aggid){ + inval: + fprint(2, "invalid record in aggregation buffer\n"); + return -1; + } + a = &aggs[(u16int)id]; + if(a->type != id>>28) goto inval; + if(a->keysize != (id>>13&0x7ff8)) goto inval; + if(p + a->recsize > e) goto inval; + tp = trees[(u16int)id]; + key->keysize = a->keysize; + memcpy(key->key, &p[8], a->keysize); + np = (ANode *) avllookup(tp, key, 0); + if(np == nil){ + np = emalloc(sizeof(ANode) - 1 + a->keysize); + *np = *key; + createrecord(a->type, np, (s64int*)&p[8+a->keysize]); + avlinsert(tp, np); + }else + updaterecord(a->type, np, (s64int*)&p[8+a->keysize]); + } + return 0; +} + +void +agginit(void) +{ + int i, m; + + trees = emalloc(sizeof(Avltree *) * aggid); + m = 0; + for(i = 0; i < aggid; i++){ + trees[i] = avlcreate(aggcmp); + if(aggs[i].keysize > m) + m = aggs[i].keysize; + } + key = emalloc(sizeof(ANode) - 1 + m); +} + +int +aggnote(void *, char *note) +{ + if(strcmp(note, "interrupt") != 0 || interrupted) + return 0; + interrupted = 1; + return 1; +} + +void +aggkeyprint(Fmt *f, Agg *, ANode *a) +{ + fmtprint(f, "%20lld ", *(u64int*)a->key); +} + +static double +variance(ANode *a) +{ + mpint *x, *y, *z; + double r; + + x = vtomp(a->val, nil); + y = uvtomp(a->sq[0], nil); + z = vtomp(a->sq[1], nil); + mpleft(z, 64, z); + mpadd(z, y, y); + vtomp(a->cnt, z); + mpmul(x, x, x); + mpmul(y, z, y); + mpsub(y, x, x); + r = mptod(x) / a->cnt; + mpfree(x); + mpfree(y); + mpfree(z); + return r; +} + +void +aggvalprint(Fmt *f, int type, ANode *a) +{ + double x, s; + + switch(type){ + case AGGCNT: fmtprint(f, "%20lld", a->cnt); break; + case AGGSUM: case AGGMIN: case AGGMAX: fmtprint(f, "%20lld", a->val); break; + case AGGAVG: fmtprint(f, "%20g", (double)a->val / a->cnt); break; + case AGGSTD: + x = (double)a->val / a->cnt; + s = variance(a); + if(s < 0) + fmtprint(f, "%20g %20s", x, "NaN"); + else{ + fmtprint(f, "%20g %20g", x, sqrt(s)); + } + break; + default: + abort(); + } +} + +void +aggdump(void) +{ + Fmt f; + char buf[8192]; + int i; + ANode *a; + + fmtfdinit(&f, 1, buf, sizeof(buf)); + for(i = 0; i < aggid; i++){ + a = (ANode *) avlmin(trees[i]); + for(; a != nil; a = (ANode *) avlnext(a)){ + fmtprint(&f, "%s\t", aggs[i].name); + aggkeyprint(&f, &aggs[i], a); + aggvalprint(&f, aggs[i].type, a); + fmtprint(&f, "\n"); + } + } + fmtfdflush(&f); +} diff --git a/sys/src/cmd/dtracy/cgen.c b/sys/src/cmd/dtracy/cgen.c index 6b4f77d19..ace7f63f2 100644 --- a/sys/src/cmd/dtracy/cgen.c +++ b/sys/src/cmd/dtracy/cgen.c @@ -296,10 +296,10 @@ tracegen(Node *n, DTActGr *g, int *recoff) case ORECORD: switch(n->typ->type){ case TYPINT: - actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size}); + actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size, noagg}); break; case TYPSTRING: - actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size}); + actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size, noagg}); break; default: sysfatal("tracegen: don't know how to record %τ", n->typ); diff --git a/sys/src/cmd/dtracy/dat.h b/sys/src/cmd/dtracy/dat.h index fc6737b4c..5087a609e 100644 --- a/sys/src/cmd/dtracy/dat.h +++ b/sys/src/cmd/dtracy/dat.h @@ -5,6 +5,7 @@ typedef struct Clause Clause; typedef struct Enab Enab; typedef struct Stat Stat; typedef struct Type Type; +typedef struct Agg Agg; enum { SYMHASH = 256, @@ -89,10 +90,19 @@ struct Stat { STATEXPR, STATPRINT, STATPRINTF, + STATAGG, } type; + /* STATEXPR */ Node *n; + /* STATPRINT, STATPRINTF */ int narg; Node **arg; + /* STATAGG */ + struct { + Symbol *name; + int type; + Node *key, *value; + } agg; }; struct Clause { @@ -112,6 +122,11 @@ struct Enab { Enab *next; }; +struct Agg { + DTAgg; + char *name; +}; + extern int errors; #pragma varargck type "α" int @@ -121,3 +136,6 @@ extern int errors; #pragma varargck argpos error 1 extern int dflag; +extern DTAgg noagg; +extern int aggid; +extern Agg *aggs; diff --git a/sys/src/cmd/dtracy/dtracy.c b/sys/src/cmd/dtracy/dtracy.c index 8585180f2..540ed79bd 100644 --- a/sys/src/cmd/dtracy/dtracy.c +++ b/sys/src/cmd/dtracy/dtracy.c @@ -5,6 +5,8 @@ #include "dat.h" #include "fns.h" +DTAgg noagg; + char *dtracyroot = "#Δ"; int dtracyno; int ctlfd, buffd; @@ -160,23 +162,56 @@ err: } -void +int bufread(Biobuf *bp) { static uchar buf[65536]; int n; n = read(buffd, buf, sizeof(buf)); - if(n < 0) sysfatal("bufread: %r"); + if(n < 0) + sysfatal("bufread: %r"); if(parsebuf(buf, n, bp) < 0) sysfatal("parsebuf: %r"); Bflush(bp); + return 0; +} + +void +aggproc(void) +{ + char buf[65536]; + int buffd, n; + extern int interrupted; + + switch(rfork(RFPROC|RFMEM)){ + case -1: sysfatal("rfork: %r"); + case 0: return; + default: break; + } + snprint(buf, sizeof(buf), "%s/%d/aggbuf", dtracyroot, dtracyno); + buffd = open(buf, OREAD); + if(buffd < 0) sysfatal("open: %r"); + agginit(); + atnotify(aggnote, 1); + while(!interrupted){ + n = read(buffd, buf, sizeof(buf)); + if(n < 0){ + if(interrupted) + break; + sysfatal("aggbufread: %r"); + } + if(aggparsebuf((uchar *) buf, n) < 0) + exits("error"); + } + aggdump(); + exits(nil); } static void usage(void) { - fprint(2, "usage: %s [ -cd ] script\n", argv0); + fprint(2, "usage: %s [ -d ] script\n", argv0); exits("usage"); } @@ -217,6 +252,8 @@ main(int argc, char **argv) fprint(ctlfd, "go"); out = Bfdopen(1, OWRITE); if(out == nil) sysfatal("Bfdopen: %r"); + if(aggid > 0) + aggproc(); for(;;) bufread(out); } diff --git a/sys/src/cmd/dtracy/fns.h b/sys/src/cmd/dtracy/fns.h index f418e4f34..83a677025 100644 --- a/sys/src/cmd/dtracy/fns.h +++ b/sys/src/cmd/dtracy/fns.h @@ -33,3 +33,7 @@ Type *type(int, ...); int min(int, int); int max(int, int); Node *addtype(Type *, Node *); +int aggparsebuf(uchar *, int); +int aggnote(void *, char *); +void aggdump(void); +void agginit(void); diff --git a/sys/src/cmd/dtracy/mkfile b/sys/src/cmd/dtracy/mkfile index 7d1b4b45c..0c0d36afa 100644 --- a/sys/src/cmd/dtracy/mkfile +++ b/sys/src/cmd/dtracy/mkfile @@ -9,6 +9,7 @@ OFILES=\ cgen.$O\ act.$O\ type.$O\ + agg.$O\ YFILES=parse.y diff --git a/sys/src/cmd/dtracy/parse.y b/sys/src/cmd/dtracy/parse.y index 9f231dbc9..81a57f79b 100644 --- a/sys/src/cmd/dtracy/parse.y +++ b/sys/src/cmd/dtracy/parse.y @@ -16,7 +16,8 @@ Type *t; } -%type <n> expr +%type <n> expr optexpr +%type <sym> optsym %type <t> type %token <sym> TSYM @@ -63,7 +64,9 @@ stats0: stat | stats0 ';' stat stat: expr { addstat(STATEXPR, exprcheck($1, 0)); } | TPRINT { addstat(STATPRINT); } pelist | TPRINTF { addstat(STATPRINTF); } pelist - +| '@' optsym '[' expr ']' '=' TSYM '(' optexpr ')' { addstat(STATAGG, $2, $4, $7, $9); } +optsym: TSYM | { $$ = nil; } +optexpr: expr | { $$ = nil; } pelist: '(' ')' diff --git a/sys/src/libdtracy/agg.c b/sys/src/libdtracy/agg.c new file mode 100644 index 000000000..d374ed913 --- /dev/null +++ b/sys/src/libdtracy/agg.c @@ -0,0 +1,138 @@ +#include <u.h> +#include <libc.h> +#include <dtracy.h> + +int +dtaunpackid(DTAgg *a) +{ + a->type = a->id >> 28 & 15; + a->keysize = a->id >> 13 & 0x7ff8; + switch(a->type){ + case AGGCNT: + case AGGSUM: + case AGGMIN: + case AGGMAX: + a->recsize = 8 + a->keysize + 8; + return 0; + case AGGAVG: + a->recsize = 8 + a->keysize + 16; + return 0; + case AGGSTD: + a->recsize = 8 + a->keysize + 32; + return 0; + default: + return -1; + } +} + +static u64int +hash(uchar *s, int n, int m) +{ + u64int h; + int i; + + h = 0xcbf29ce484222325ULL; + for(i = 0; i < n; i++){ + h ^= s[i]; + h *= 0x100000001b3ULL; + } + for(; i < m; i++) + h *= 0x100000001b3ULL; + return h; +} + +static int +keyeq(uchar *a, uchar *b, int n, int m) +{ + int i; + + for(i = 0; i < n; i++) + if(a[i] != b[i]) + return 0; + for(; i < m; i++) + if(a[i] != 0) + return 0; + return 1; +} + +/* calculate v*v with 128 bits precision and add it to the 128-bit word at q */ +static void +addsquare(u64int *q, s64int v) +{ + u32int v0; + s32int v1; + s64int s0, s1, s2; + u64int r; + + v0 = v; + v1 = v>>32; + s0 = (s64int)v0 * (s64int)v0; + s1 = (s64int)v0 * (s64int)v1; + s2 = (s64int)v1 * (s64int)v1; + r = s0 + (s1<<33); + if(r < (u64int)s0) q[1]++; + q[0] += r; + if(q[0] < r) q[1]++; + q[1] += s2 + (s1>>31); +} + +static void +updaterecord(int type, u64int *q, s64int val) +{ + switch(type){ + case AGGCNT: q[0] += 1; break; + case AGGSUM: q[0] += val; break; + case AGGAVG: q[0] += val; q[1]++; break; + case AGGMIN: if(val < q[0]) q[0] = val; break; + case AGGMAX: if(val > q[0]) q[0] = val; break; + case AGGSTD: q[0] += val; q[1]++; addsquare(&q[2], val); break; + } +} + +static void +createrecord(int type, u64int *q, s64int val) +{ + switch(type){ + case AGGCNT: q[0] = 1; break; + case AGGSUM: case AGGMIN: case AGGMAX: q[0] = val; break; + case AGGAVG: q[0] = val; q[1] = 1; break; + case AGGSTD: q[0] = val; q[1] = 1; q[2] = 0; q[3] = 0; addsquare(&q[2], val); break; + } +} + +/* runs in probe context */ +void +dtarecord(DTChan *ch, int mach, DTAgg *a, uchar *key, int nkey, s64int val) +{ + u64int h; + u32int *p, *q; + DTBuf *c; + + c = ch->aggwrbufs[mach]; + h = hash(key, nkey, a->keysize); + p = (u32int*)(c->data + DTABUCKETS + (h % DTANUMBUCKETS) * 4); + while(*p != DTANIL){ + assert((uint)*p < DTABUCKETS); + q = (u32int*)(c->data + *p); + if(q[1] == a->id && keyeq((uchar*)(q + 2), key, nkey, a->keysize) == 0){ + updaterecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val); + return; + } + p = q; + } + if(c->wr + a->recsize > DTABUCKETS) + return; + *p = c->wr; + q = (u32int*)(c->data + c->wr); + q[0] = DTANIL; + q[1] = a->id; + if(nkey == a->keysize) + memmove(&q[2], key, nkey); + else if(nkey > a->keysize){ + memmove(&q[2], key, nkey); + memset((uchar*)q + 8 + nkey, 0, a->keysize - nkey); + }else + memmove(&q[2], key, a->keysize); + createrecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val); + c->wr += a->recsize; +} diff --git a/sys/src/libdtracy/chan.c b/sys/src/libdtracy/chan.c index 16414834f..c29f24eb4 100644 --- a/sys/src/libdtracy/chan.c +++ b/sys/src/libdtracy/chan.c @@ -44,6 +44,14 @@ dtcnew(void) c->rdbufs[i] = dtmalloc(sizeof(DTBuf)); c->wrbufs[i] = dtmalloc(sizeof(DTBuf)); } + c->aggrdbufs = dtmalloc(sizeof(DTBuf *) * dtnmach); + c->aggwrbufs = dtmalloc(sizeof(DTBuf *) * dtnmach); + for(i = 0; i < dtnmach; i++){ + c->aggrdbufs[i] = dtmalloc(sizeof(DTBuf)); + c->aggwrbufs[i] = dtmalloc(sizeof(DTBuf)); + memset(c->aggrdbufs[i]->data, -1, DTBUFSZ); + memset(c->aggwrbufs[i]->data, -1, DTBUFSZ); + } return c; } @@ -63,6 +71,12 @@ dtcfree(DTChan *ch) } free(ch->rdbufs); free(ch->wrbufs); + for(i = 0; i < dtnmach; i++){ + free(ch->aggrdbufs[i]); + free(ch->aggwrbufs[i]); + } + free(ch->aggrdbufs); + free(ch->aggwrbufs); free(ch); } @@ -73,7 +87,7 @@ dtcaddgr(DTChan *c, DTName name, DTActGr *gr) DTEnab *ep; int i, nl, n; - if(dtgverify(gr) < 0) + if(dtgverify(c, gr) < 0) return -1; gr->chan = c; @@ -194,13 +208,54 @@ dtcread(DTChan *c, void *buf, int n) return 0; } +static void +dtcaggbufswap(DTChan *c, int n) +{ + DTBuf *z; + + dtmachlock(n); + z = c->aggrdbufs[n]; + c->aggrdbufs[n] = c->aggwrbufs[n]; + c->aggwrbufs[n] = z; + dtmachunlock(n); +} + +int +dtcaggread(DTChan *c, void *buf, int n) +{ + int i, swapped; + + if(c->state == DTCFAULT){ + werrstr("%s", c->errstr); + return -1; + } + for(i = 0; i < dtnmach; i++){ + if(swapped = c->aggrdbufs[i]->wr == 0) + dtcaggbufswap(c, i); + if(c->aggrdbufs[i]->wr != 0){ + if(c->aggrdbufs[i]->wr > n){ + werrstr("short read"); + return -1; + } + n = c->aggrdbufs[i]->wr; + memmove(buf, c->aggrdbufs[i]->data, n); + c->aggrdbufs[i]->wr = 0; + memset(c->aggrdbufs[i]->data + DTABUCKETS, -1, 4 * DTANUMBUCKETS); + if(!swapped) + dtcaggbufswap(c, i); + return n; + } + } + return 0; +} + void dtcreset(DTChan *c) { DTEnab *ep, *eq; for(ep = c->enab; ep != nil; ep = ep->channext){ - /* careful! has to look atomic for etptrigger */ + /* careful! has to look atomic for dtptrigger */ ep->probprev->probnext = ep->probnext; ep->probnext->probprev = ep->probprev; } diff --git a/sys/src/libdtracy/mkfile b/sys/src/libdtracy/mkfile index 29f7d91bd..e91c70740 100644 --- a/sys/src/libdtracy/mkfile +++ b/sys/src/libdtracy/mkfile @@ -8,6 +8,7 @@ OFILES=\ dtefmt.$O\ pack.$O\ chan.$O\ + agg.$O\ HFILES=\ /sys/include/dtracy.h\ diff --git a/sys/src/libdtracy/pack.c b/sys/src/libdtracy/pack.c index 34efd8374..de5721454 100644 --- a/sys/src/libdtracy/pack.c +++ b/sys/src/libdtracy/pack.c @@ -27,6 +27,12 @@ dtgpack(Fmt *f, DTActGr *g) fmtprint(f, "t%d\n", g->acts[i].type); fmtprint(f, "s%d\n", g->acts[i].size); dtepack(f, g->acts[i].p); + switch(g->acts[i].type){ + case ACTAGGKEY: + case ACTAGGVAL: + fmtprint(f, "A%#.8ux\n", g->acts[i].agg.id); + break; + } } fmtprint(f, "G"); } @@ -132,6 +138,18 @@ dtgunpack(char *s, DTActGr **rp) case ACTTRACESTR: g->reclen += g->acts[i].size; break; + case ACTAGGKEY: + if(*s++ != 'A') goto fail; + s = u32unpack(s, (u32int *) &g->acts[i].agg.id); + if(s == nil) goto fail; + break; + case ACTAGGVAL: + if(*s++ != 'A') goto fail; + s = u32unpack(s, (u32int *) &g->acts[i].agg.id); + if(s == nil) goto fail; + break; + case ACTCANCEL: + break; default: goto fail; } @@ -182,7 +200,7 @@ dtclfree(DTClause *c) int i; if(c == nil) return; - if(--c->gr->ref == 0) + if(c->gr != nil && --c->gr->ref == 0) dtgfree(c->gr); for(i = 0; i < c->nprob; i++) free(c->probs[i]); diff --git a/sys/src/libdtracy/prog.c b/sys/src/libdtracy/prog.c index 2c294068b..dfc670a67 100644 --- a/sys/src/libdtracy/prog.c +++ b/sys/src/libdtracy/prog.c @@ -80,7 +80,7 @@ invalid: } int -dtgverify(DTActGr *g) +dtgverify(DTChan *, DTActGr *g) { int i; @@ -96,6 +96,26 @@ dtgverify(DTActGr *g) if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > DTRECMAX) return -1; break; + case ACTAGGKEY: + if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8) + return -1; + if(i == g->nact - 1 || g->acts[i+1].type != ACTAGGVAL || g->acts[i+1].agg.id != g->acts[i].agg.id) + return -1; + break; + case ACTAGGVAL: + if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8) + return -1; + if(i == 0 || g->acts[i-1].type != ACTAGGKEY) + return -1; + if(dtaunpackid(&g->acts[i].agg) < 0) + return -1; + break; + case ACTCANCEL: + if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0) + return -1; + if(i != g->nact - 1) + return -1; + break; default: return -1; } @@ -225,6 +245,7 @@ dtgexec(DTActGr *g, ExecInfo *info) DTBuf *b; u8int *bp; s64int v; + uchar aggkey[8]; int i, j; b = g->chan->wrbufs[info->machno]; @@ -240,6 +261,8 @@ dtgexec(DTActGr *g, ExecInfo *info) PUT4(info->epid); PUT8(info->ts); for(i = 0; i < g->nact; i++){ + if(g->acts[i].type == ACTCANCEL) + return 0; if(dteexec(g->acts[i].p, info, &v) < 0) return -1; switch(g->acts[i].type){ @@ -256,6 +279,15 @@ dtgexec(DTActGr *g, ExecInfo *info) } bp += g->acts[i].size; break; + case ACTAGGKEY: + for(j = 0; j < g->acts[i].size; j++){ + aggkey[j] = v; + v >>= 8; + } + break; + case ACTAGGVAL: + dtarecord(g->chan, info->machno, &g->acts[i].agg, aggkey, g->acts[i-1].size, v); + break; } } assert(bp - b->data - b->wr == g->reclen); |