summaryrefslogtreecommitdiff
path: root/sys/src/cmd/aan.c
diff options
context:
space:
mode:
authorTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
committerTaru Karttunen <taruti@taruti.net>2011-03-30 15:46:40 +0300
commite5888a1ffdae813d7575f5fb02275c6bb07e5199 (patch)
treed8d51eac403f07814b9e936eed0c9a79195e2450 /sys/src/cmd/aan.c
Import sources from 2011-03-30 iso image
Diffstat (limited to 'sys/src/cmd/aan.c')
-rwxr-xr-xsys/src/cmd/aan.c520
1 files changed, 520 insertions, 0 deletions
diff --git a/sys/src/cmd/aan.c b/sys/src/cmd/aan.c
new file mode 100755
index 000000000..7931dfe96
--- /dev/null
+++ b/sys/src/cmd/aan.c
@@ -0,0 +1,520 @@
+#include <u.h>
+#include <libc.h>
+#include <auth.h>
+#include <fcall.h>
+#include <thread.h>
+
+#define NS(x) ((vlong)x)
+#define US(x) (NS(x) * 1000LL)
+#define MS(x) (US(x) * 1000LL)
+#define S(x) (MS(x) * 1000LL)
+
+#define LOGNAME "aan"
+
+enum {
+ Synctime = S(8),
+ Nbuf = 10,
+ K = 1024,
+ Bufsize = 8 * K,
+ Stacksize = 8 * K,
+ Timer = 0, // Alt channels.
+ Unsent = 1,
+ Maxto = 24 * 3600, // A full day to reconnect.
+};
+
+typedef struct Endpoints Endpoints;
+struct Endpoints {
+ char *lsys;
+ char *lserv;
+ char *rsys;
+ char *rserv;
+};
+
+typedef struct {
+ ulong nb; // Number of data bytes in this message
+ ulong msg; // Message number
+ ulong acked; // Number of messages acked
+} Hdr;
+
+typedef struct t_Buf {
+ Hdr hdr;
+ uchar buf[Bufsize];
+} Buf;
+
+static char *progname;
+static Channel *unsent;
+static Channel *unacked;
+static Channel *empty;
+static int netfd;
+static int inmsg;
+static char *devdir;
+static int debug;
+static int done;
+static char *dialstring;
+static int maxto = Maxto;
+static char *Logname = LOGNAME;
+static int client;
+
+static Alt a[] = {
+ /* c v op */
+ { nil, nil, CHANRCV }, // timer
+ { nil, nil, CHANRCV }, // unsent
+ { nil, nil, CHANEND },
+};
+
+static void fromnet(void*);
+static void fromclient(void*);
+static void reconnect(void);
+static void synchronize(void);
+static int sendcommand(ulong, ulong);
+static void showmsg(int, char *, Buf *);
+static int writen(int, uchar *, int);
+static int getport(char *);
+static void dmessage(int, char *, ...);
+static void timerproc(void *);
+static Endpoints *getendpoints(char *);
+static void freeendpoints(Endpoints *);
+
+static void
+usage(void)
+{
+ fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", progname);
+ exits("usage");
+}
+
+static int
+catch(void *, char *s)
+{
+ if (!strcmp(s, "alarm")) {
+ syslog(0, Logname, "Timed out while waiting for client on %s, exiting...",
+ devdir);
+ threadexitsall(nil);
+ }
+ return 0;
+}
+
+void
+threadmain(int argc, char **argv)
+{
+ int i, failed;
+ Buf *b;
+ Channel *timer;
+ vlong synctime;
+
+ progname = argv[0];
+ ARGBEGIN {
+ case 'c':
+ client++;
+ break;
+ case 'd':
+ debug++;
+ break;
+ case 'm':
+ maxto = (int)strtol(EARGF(usage()), (char **)nil, 0);
+ break;
+ default:
+ usage();
+ } ARGEND;
+
+ if (argc != 1)
+ usage();
+
+ if (!client) {
+ char *p;
+
+ devdir = argv[0];
+ if ((p = strstr(devdir, "/local")) != nil)
+ *p = '\0';
+ }
+ else
+ dialstring = argv[0];
+
+ if (debug > 0) {
+ int fd = open("#c/cons", OWRITE|OCEXEC);
+ dup(fd, 2);
+ }
+
+ fmtinstall('F', fcallfmt);
+
+ atnotify(catch, 1);
+
+ unsent = chancreate(sizeof(Buf *), Nbuf);
+ unacked = chancreate(sizeof(Buf *), Nbuf);
+ empty = chancreate(sizeof(Buf *), Nbuf);
+ timer = chancreate(sizeof(uchar *), 1);
+
+ for (i = 0; i != Nbuf; i++) {
+ Buf *b = malloc(sizeof(Buf));
+ sendp(empty, b);
+ }
+
+ netfd = -1;
+
+ if (proccreate(fromnet, nil, Stacksize) < 0)
+ sysfatal("%s; Cannot start fromnet; %r", progname);
+
+ reconnect(); // Set up the initial connection.
+ synchronize();
+
+ if (proccreate(fromclient, nil, Stacksize) < 0)
+ sysfatal("%s; Cannot start fromclient; %r", progname);
+
+ if (proccreate(timerproc, timer, Stacksize) < 0)
+ sysfatal("%s; Cannot start timerproc; %r", progname);
+
+ a[Timer].c = timer;
+ a[Unsent].c = unsent;
+ a[Unsent].v = &b;
+
+ synctime = nsec() + Synctime;
+ failed = 0;
+ while (!done) {
+ vlong now;
+ int delta;
+
+ if (failed) {
+ // Wait for the netreader to die.
+ while (netfd >= 0) {
+ dmessage(1, "main; waiting for netreader to die\n");
+ sleep(1000);
+ }
+
+ // the reader died; reestablish the world.
+ reconnect();
+ synchronize();
+ failed = 0;
+ }
+
+ now = nsec();
+ delta = (synctime - nsec()) / MS(1);
+
+ if (delta <= 0) {
+ Hdr hdr;
+
+ hdr.nb = 0;
+ hdr.acked = inmsg;
+ hdr.msg = -1;
+
+ if (writen(netfd, (uchar *)&hdr, sizeof(Hdr)) < 0) {
+ dmessage(2, "main; writen failed; %r\n");
+ failed = 1;
+ continue;
+ }
+ synctime = nsec() + Synctime;
+ assert(synctime > now);
+ }
+
+ switch (alt(a)) {
+ case Timer:
+ break;
+
+ case Unsent:
+ sendp(unacked, b);
+
+ b->hdr.acked = inmsg;
+
+ if (writen(netfd, (uchar *)&b->hdr, sizeof(Hdr)) < 0) {
+ dmessage(2, "main; writen failed; %r\n");
+ failed = 1;
+ }
+
+ if (writen(netfd, b->buf, b->hdr.nb) < 0) {
+ dmessage(2, "main; writen failed; %r\n");
+ failed = 1;
+ }
+
+ if (b->hdr.nb == 0)
+ done = 1;
+ break;
+ }
+ }
+ syslog(0, Logname, "exiting...");
+ threadexitsall(nil);
+}
+
+
+static void
+fromclient(void*)
+{
+ static int outmsg;
+
+ for (;;) {
+ Buf *b;
+
+ b = recvp(empty);
+ if ((int)(b->hdr.nb = read(0, b->buf, Bufsize)) <= 0) {
+ if ((int)b->hdr.nb < 0)
+ dmessage(2, "fromclient; Cannot read 9P message; %r\n");
+ else
+ dmessage(2, "fromclient; Client terminated\n");
+ b->hdr.nb = 0;
+ }
+ b->hdr.msg = outmsg++;
+
+ showmsg(1, "fromclient", b);
+ sendp(unsent, b);
+
+ if (b->hdr.nb == 0)
+ break;
+ }
+}
+
+static void
+fromnet(void*)
+{
+ static int lastacked;
+ Buf *b;
+
+ b = (Buf *)malloc(sizeof(Buf));
+ assert(b);
+
+ while (!done) {
+ int len, acked, i;
+
+ while (netfd < 0) {
+ dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n",
+ inmsg);
+ sleep(1000);
+ }
+
+ // Read the header.
+ if ((len = readn(netfd, &b->hdr, sizeof(Hdr))) <= 0) {
+ if (len < 0)
+ dmessage(1, "fromnet; (hdr) network failure; %r\n");
+ else
+ dmessage(1, "fromnet; (hdr) network closed\n");
+ close(netfd);
+ netfd = -1;
+ continue;
+ }
+ dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d\n", len,
+ b->hdr.nb, b->hdr.msg);
+
+ if (b->hdr.nb == 0) {
+ if ((long)b->hdr.msg >= 0) {
+ dmessage(1, "fromnet; network closed\n");
+ break;
+ }
+ continue;
+ }
+
+ if ((len = readn(netfd, b->buf, b->hdr.nb)) <= 0 || len != b->hdr.nb) {
+ if (len == 0)
+ dmessage(1, "fromnet; network closed\n");
+ else
+ dmessage(1, "fromnet; network failure; %r\n");
+ close(netfd);
+ netfd = -1;
+ continue;
+ }
+
+ if (b->hdr.msg < inmsg) {
+ dmessage(1, "fromnet; skipping message %d, currently at %d\n",
+ b->hdr.msg, inmsg);
+ continue;
+ }
+
+ // Process the acked list.
+ acked = b->hdr.acked - lastacked;
+ for (i = 0; i != acked; i++) {
+ Buf *rb;
+
+ rb = recvp(unacked);
+ if (rb->hdr.msg != lastacked + i) {
+ dmessage(1, "rb %p, msg %d, lastacked %d, i %d\n",
+ rb, rb? rb->hdr.msg: -2, lastacked, i);
+ assert(0);
+ }
+ rb->hdr.msg = -1;
+ sendp(empty, rb);
+ }
+ lastacked = b->hdr.acked;
+
+ inmsg++;
+
+ showmsg(1, "fromnet", b);
+
+ if (writen(1, b->buf, len) < 0)
+ sysfatal("fromnet; cannot write to client; %r");
+ }
+ done = 1;
+}
+
+static void
+reconnect(void)
+{
+ char ldir[40];
+ int lcfd, fd;
+
+ if (dialstring) {
+ syslog(0, Logname, "dialing %s", dialstring);
+ while ((fd = dial(dialstring, nil, nil, nil)) < 0) {
+ char err[32];
+
+ err[0] = '\0';
+ errstr(err, sizeof err);
+ if (strstr(err, "connection refused")) {
+ dmessage(1, "reconnect; server died...\n");
+ threadexitsall("server died...");
+ }
+ dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
+ sleep(1000);
+ }
+ syslog(0, Logname, "reconnected to %s", dialstring);
+ }
+ else {
+ Endpoints *ep;
+
+ syslog(0, Logname, "waiting for connection on %s", devdir);
+ alarm(maxto * 1000);
+ if ((lcfd = listen(devdir, ldir)) < 0)
+ sysfatal("reconnect; cannot listen; %r");
+
+ if ((fd = accept(lcfd, ldir)) < 0)
+ sysfatal("reconnect; cannot accept; %r");
+ alarm(0);
+ close(lcfd);
+
+ ep = getendpoints(ldir);
+ dmessage(1, "rsys '%s'\n", ep->rsys);
+ syslog(0, Logname, "connected from %s", ep->rsys);
+ freeendpoints(ep);
+ }
+
+ netfd = fd; // Wakes up the netreader.
+}
+
+static void
+synchronize(void)
+{
+ Channel *tmp;
+ Buf *b;
+
+ // Ignore network errors here. If we fail during
+ // synchronization, the next alarm will pick up
+ // the error.
+
+ tmp = chancreate(sizeof(Buf *), Nbuf);
+ while ((b = nbrecvp(unacked)) != nil) {
+ writen(netfd, (uchar *)b, sizeof(Hdr) + b->hdr.nb);
+ sendp(tmp, b);
+ }
+ chanfree(unacked);
+ unacked = tmp;
+}
+
+static void
+showmsg(int level, char *s, Buf *b)
+{
+ if (b == nil) {
+ dmessage(level, "%s; b == nil\n", s);
+ return;
+ }
+
+ dmessage(level,
+ "%s; (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s,
+ b->hdr.nb,
+ b->buf[0], b->buf[1], b->buf[2],
+ b->buf[3], b->buf[4], b->buf[5],
+ b->buf[6], b->buf[7], b->buf[8], b);
+}
+
+static int
+writen(int fd, uchar *buf, int nb)
+{
+ int len = nb;
+
+ while (nb > 0) {
+ int n;
+
+ if (fd < 0)
+ return -1;
+
+ if ((n = write(fd, buf, nb)) < 0) {
+ dmessage(1, "writen; Write failed; %r\n");
+ return -1;
+ }
+ dmessage(2, "writen: wrote %d bytes\n", n);
+
+ buf += n;
+ nb -= n;
+ }
+ return len;
+}
+
+static void
+timerproc(void *x)
+{
+ Channel *timer = x;
+ while (!done) {
+ sleep((Synctime / MS(1)) >> 1);
+ sendp(timer, "timer");
+ }
+}
+
+static void
+dmessage(int level, char *fmt, ...)
+{
+ va_list arg;
+
+ if (level > debug)
+ return;
+
+ va_start(arg, fmt);
+ vfprint(2, fmt, arg);
+ va_end(arg);
+}
+
+static void
+getendpoint(char *dir, char *file, char **sysp, char **servp)
+{
+ int fd, n;
+ char buf[128];
+ char *sys, *serv;
+
+ sys = serv = 0;
+
+ snprint(buf, sizeof buf, "%s/%s", dir, file);
+ fd = open(buf, OREAD);
+ if(fd >= 0){
+ n = read(fd, buf, sizeof(buf)-1);
+ if(n>0){
+ buf[n-1] = 0;
+ serv = strchr(buf, '!');
+ if(serv){
+ *serv++ = 0;
+ serv = strdup(serv);
+ }
+ sys = strdup(buf);
+ }
+ close(fd);
+ }
+ if(serv == 0)
+ serv = strdup("unknown");
+ if(sys == 0)
+ sys = strdup("unknown");
+ *servp = serv;
+ *sysp = sys;
+}
+
+static Endpoints *
+getendpoints(char *dir)
+{
+ Endpoints *ep;
+
+ ep = malloc(sizeof(*ep));
+ getendpoint(dir, "local", &ep->lsys, &ep->lserv);
+ getendpoint(dir, "remote", &ep->rsys, &ep->rserv);
+ return ep;
+}
+
+static void
+freeendpoints(Endpoints *ep)
+{
+ free(ep->lsys);
+ free(ep->rsys);
+ free(ep->lserv);
+ free(ep->rserv);
+ free(ep);
+}
+