diff options
author | Ori Bernstein <ori@eigenstate.org> | 2021-01-23 11:03:05 -0800 |
---|---|---|
committer | Ori Bernstein <ori@eigenstate.org> | 2021-01-23 11:03:05 -0800 |
commit | f321298c551e4333fcf2819eabf7ce67ea443e20 (patch) | |
tree | 49e79f111fb9a232b0300eab438b30a20e04d8e4 /sys/src/cmd/upas | |
parent | 5e20e8f963482a2008ed70cc8fa5973078248aed (diff) |
upas/runq: support parallel queue processing, drop -a
When running a mail queue, it's useful to run it with limited
parallelism. This helps mailing lists process messages in a
reasonable time.
At the same time, we can remove the load balancing from runq,
since the kinds of systems that this matters on no longer
exist, and running multiple queues at once can be better
done through xargs.
Diffstat (limited to 'sys/src/cmd/upas')
-rw-r--r-- | sys/src/cmd/upas/q/runq.c | 487 |
1 files changed, 189 insertions, 298 deletions
diff --git a/sys/src/cmd/upas/q/runq.c b/sys/src/cmd/upas/q/runq.c index 69efe4570..16445ea36 100644 --- a/sys/src/cmd/upas/q/runq.c +++ b/sys/src/cmd/upas/q/runq.c @@ -1,9 +1,25 @@ #include "common.h" #include <ctype.h> +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -17,7 +33,6 @@ char *cmd; char *root; int debug; int giveup = 2*24*60*60; -int load; int limit; /* the current directory */ @@ -28,67 +43,48 @@ char *curdir; char *runqlog = "runq"; -int *pidlist; char **badsys; /* array of recalcitrant systems */ int nbad; -int npid = 50; -int sflag; /* single thread per directory */ -int aflag; /* all directories */ +int njob = 1; /* number of concurrent jobs to invoke */ int Eflag; /* ignore E.xxxxxx dates */ int Rflag; /* no giving up, ever */ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); - exits(""); + fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + exits("usage"); } void main(int argc, char **argv) { - char *qdir, *x; + char *qdir; qdir = 0; ARGBEGIN{ - case 'l': - x = ARGF(); - if(x == 0) - usage(); - load = atoi(x); - if(load < 0) - load = 0; - break; case 'E': Eflag++; break; case 'R': /* no giving up -- just leave stuff in the queue */ Rflag++; break; - case 'a': - aflag++; - break; case 'd': debug++; break; case 'r': - limit = atoi(ARGF()); - break; - case 's': - sflag++; + limit = atoi(EARGF(usage())); break; case 't': - giveup = 60*60*atoi(ARGF()); + giveup = 60*60*atoi(EARGF(usage())); break; case 'q': - qdir = ARGF(); - if(qdir == 0) - usage(); + qdir = EARGF(usage()); break; case 'n': - npid = atoi(ARGF()); - if(npid == 0) + njob = atoi(EARGF(usage())); + if(njob == 0) usage(); break; }ARGEND; @@ -96,27 +92,17 @@ main(int argc, char **argv) if(argc != 2) usage(); - pidlist = malloc(npid*sizeof(*pidlist)); - if(pidlist == 0) - error("can't malloc", 0); - - if(aflag == 0 && qdir == 0) { + if(qdir == nil) qdir = getuser(); - if(qdir == 0) - error("unknown user", 0); - } + if(qdir == nil) + error("unknown user", 0); root = argv[0]; cmd = argv[1]; if(chdir(root) < 0) error("can't cd to %s", root); - doload(1); - if(aflag) - doalldirs(); - else - dodir(qdir); - doload(0); + dodir(qdir); exits(0); } @@ -142,74 +128,6 @@ emptydir(char *name) return 0; } -int -forkltd(void) -{ - int i; - int pid; - - for(i = 0; i < npid; i++){ - if(pidlist[i] <= 0) - break; - } - - while(i >= npid){ - pid = waitpid(); - if(pid < 0){ - syslog(0, runqlog, "forkltd confused"); - exits(0); - } - - for(i = 0; i < npid; i++) - if(pidlist[i] == pid) - break; - } - pidlist[i] = fork(); - return pidlist[i]; -} - -/* - * run all user directories, must be bootes (or root on unix) to do this - */ -void -doalldirs(void) -{ - Dir *db; - int fd; - long i, n; - - - fd = open(".", OREAD); - if(fd == -1){ - warning("reading %s", root); - return; - } - n = dirreadall(fd, &db); - if(n > 0){ - for(i=0; i<n; i++){ - if(db[i].qid.type & QTDIR){ - if(emptydir(db[i].name)) - continue; - switch(forkltd()){ - case -1: - syslog(0, runqlog, "out of procs"); - doload(0); - exits(0); - case 0: - if(sysdetach() < 0) - error("%r", 0); - dodir(db[i].name); - exits(0); - default: - break; - } - } - } - free(db); - } - close(fd); -} - /* * cd to a user directory and run it */ @@ -234,30 +152,57 @@ dodir(char *name) void rundir(char *name) { - int fd; - long i; + Job *hd, *j, **p; + int nlive, fidx, fd, found; + Waitmsg *w; - if(aflag && sflag) - fd = sysopenlocked(".", OREAD); - else - fd = open(".", OREAD); + fd = open(".", OREAD); if(fd == -1){ warning("reading %s", name); return; } + fidx= 0; + hd = nil; + nlive = 0; nfiles = dirreadall(fd, &dirbuf); - if(nfiles > 0){ - for(i=0; i<nfiles; i++){ - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') + while(nlive > 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } - if(aflag && sflag) - sysunlockfile(fd); - else - close(fd); + assert(hd == nil); + free(dirbuf); + close(fd); } /* @@ -314,17 +259,16 @@ keeplockalive(char *path, int fd) } /* - * try a message + * Launch trying a message, returning a job + * tracks the running pid. */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +281,14 @@ dofile(Dir *dp) if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +302,35 @@ dofile(Dir *dp) if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +338,36 @@ dofile(Dir *dp) * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +377,18 @@ dofile(Dir *dp) } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +396,34 @@ dofile(Dir *dp) * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); + fprint(2, "Starting %s\n", cmd); + for(i = 0; j->av[i]; i++) + fprint(2, " %s", j->av[i]); fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +431,72 @@ dofile(Dir *dp) } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + return j; + } +done: + freejob(j); + return nil; +} - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } +/* + * Handle the completion of a job. + * Wait for the pid, check its status, + * and then pop the job off the list. + * Return the next running job. + */ +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); } else { - /* it worked remove the message */ - remmatch(dp->name); + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); } - free(wm); - + } else { + /* it worked remove the message */ + remmatch(j->dp->name); } -done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + n = j->next; + freejob(j); + return n; +} + +/* + * Release resources associated with + * a job. + */ +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); } @@ -691,71 +650,3 @@ logit(char *msg, char *file, char **av) } syslog(0, runqlog, "%s", buf); } - -char *loadfile = ".runqload"; - -/* - * load balancing - */ -void -doload(int start) -{ - int fd; - char buf[32]; - int i, n; - Mlock *l; - Dir *d; - - if(load <= 0) - return; - - if(chdir(root) < 0){ - load = 0; - return; - } - - l = syslock(loadfile); - fd = open(loadfile, ORDWR); - if(fd < 0){ - fd = create(loadfile, 0666, ORDWR); - if(fd < 0){ - load = 0; - sysunlock(l); - return; - } - } - - /* get current load */ - i = 0; - n = read(fd, buf, sizeof(buf)-1); - if(n >= 0){ - buf[n] = 0; - i = atoi(buf); - } - if(i < 0) - i = 0; - - /* ignore load if file hasn't been changed in 30 minutes */ - d = dirfstat(fd); - if(d != nil){ - if(d->mtime + 30*60 < time(0)) - i = 0; - free(d); - } - - /* if load already too high, give up */ - if(start && i >= load){ - sysunlock(l); - exits(0); - } - - /* increment/decrement load */ - if(start) - i++; - else - i--; - seek(fd, 0, 0); - fprint(fd, "%d\n", i); - sysunlock(l); - close(fd); -} |