diff options
author | cinap_lenrek <cinap_lenrek@centraldogma> | 2011-10-24 00:53:27 +0200 |
---|---|---|
committer | cinap_lenrek <cinap_lenrek@centraldogma> | 2011-10-24 00:53:27 +0200 |
commit | f5fe39ad7afa9b761821178ba57f28cea03396b8 (patch) | |
tree | 8e15c945b474ddaa12a9f2b7d910a23ff00efc84 | |
parent | 98d01a7719188f1fae193da77ed0949d903ed764 (diff) |
torrent: listen support
-rw-r--r-- | sys/src/cmd/ip/torrent.c | 325 |
1 files changed, 202 insertions, 123 deletions
diff --git a/sys/src/cmd/ip/torrent.c b/sys/src/cmd/ip/torrent.c index cf5304274..2c37e7b93 100644 --- a/sys/src/cmd/ip/torrent.c +++ b/sys/src/cmd/ip/torrent.c @@ -6,6 +6,7 @@ typedef struct Dict Dict; typedef struct Piece Piece; typedef struct File File; +typedef struct Stats Stats; struct Dict { @@ -33,13 +34,21 @@ struct File vlong len; }; +struct Stats +{ + Lock; + vlong up; + vlong down; + vlong left; +}; + enum { MAXIO = 16*1024, }; int debug, sflag, pflag, vflag; -int pidgroup = -1; -int port = 48123; +int killgroup = -1; +int port = 6881; char *mntweb = "/mnt/web"; uchar infohash[20]; uchar peerid[20]; @@ -50,8 +59,10 @@ Piece *pieces; int nhavemap; uchar *havemap; +int nhavepieces; File *files; +Stats stats; void freedict(Dict *d) @@ -208,7 +219,13 @@ havepiece(int x) free(p); if(memcmp(hash, pieces[x].hash, 20)) return 0; - havemap[x>>3] |= m; + lock(&stats); + if((havemap[x>>3] & m) == 0){ + havemap[x>>3] |= m; + nhavepieces++; + stats.left -= pieces[x].len; + } + unlock(&stats); return 1; } @@ -309,86 +326,60 @@ Err: return -1; } -void -peer(char *ip, char *port) -{ - static Dict *peers; - static QLock peerslk; + +int +peer(int fd, int incoming, char *addr) +{ uchar buf[64+MAXIO], *map, *told, *p, m; - char *addr; - int retry, i, o, l, x, n, fd; int mechoking, hechoking; int mewant, hewant; int workpiece; - Dict *d; + int i, o, l, x, n; - if(ip == nil || port == nil) - return; + if(debug) fprint(2, "peer %s: %s connected\n", addr, incoming ? "incoming" : "outgoing"); - d = mallocz(sizeof(*d) + 64, 1); - snprint(addr = d->str, 64, "tcp!%s!%s", ip, port); - qlock(&peerslk); - if(dlook(peers, addr)){ - qunlock(&peerslk); - free(d); - return; + for(i=0; i<2; i++){ + if((incoming && i) || (!incoming && !i)){ + if(debug) fprint(2, "peer %s: -> handshake\n", addr); + n = pack(buf, sizeof(buf), "*________**", + 20, "\x13BitTorrent protocol", + sizeof(infohash), infohash, + sizeof(peerid), peerid); + if(write(fd, buf, n) != n) + return 1; + } + if((incoming && !i) || (!incoming && i)){ + n = 20 + 8 + sizeof(infohash); + if((n = readn(fd, buf, n)) != n) + return 1; + if(memcmp(buf, "\x13BitTorrent protocol", 20)) + return 0; + if(debug) fprint(2, "peer %s: <- handshake\n", addr); + if(memcmp(infohash, buf + 20 + 8, sizeof(infohash))) + return 0; + } } - d->len = strlen(addr); - d->typ = 'd'; - d->val = d; - d->next = peers; - peers = d; - qunlock(&peerslk); - - if(rfork(RFFDG|RFPROC|RFMEM) <= 0) - return; + if(readn(fd, buf, sizeof(peerid)) != sizeof(peerid)) + return 1; + if(memcmp(peerid, buf, sizeof(peerid)) == 0) + return 0; + if(debug) fprint(2, "peer %s: peerid %.*s\n", addr, sizeof(peerid), (char*)buf); - fd = -1; - retry = 0; - map = malloc(nhavemap); + mechoking = 1; + hechoking = 1; + mewant = 0; + hewant = 0; + workpiece = -1; + map = mallocz(nhavemap, 1); told = malloc(nhavemap); -Retry: - if(fd >= 0){ - close(fd); - sleep(10000 + nrand(5000)); - } - if(++retry >= 10) - goto Exit; - - if(debug) fprint(2, "dial %s\n", addr); - if((fd = dial(addr, nil, nil, nil)) < 0) - goto Retry; - - if(debug) fprint(2, "peer %s: -> handshake\n", addr); - n = pack(buf, sizeof(buf), "*________**", - 20, "\x13BitTorrent protocol", - sizeof(infohash), infohash, - sizeof(peerid), peerid); - if(write(fd, buf, n) != n) - goto Retry; - - if(read(fd, buf, 1) != 1) - goto Retry; - n = buf[0] + 8 + sizeof(infohash) + sizeof(peerid); - if((n = readn(fd, buf+1, n)) != n) - goto Retry; - if(debug) fprint(2, "peer %s: <- handshake %.*s\n", addr, buf[0], (char*)buf+1); - if(memcmp(infohash, buf + 1 + buf[0] + 8, sizeof(infohash))) - goto Exit; if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap); memmove(told, havemap, nhavemap); n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap); if(write(fd, buf, n) != n) - goto Retry; + goto Out; - mechoking = 1; - hechoking = 1; - mewant = 0; - hewant = 0; - workpiece = -1; - memset(map, 0, nhavemap); for(;;){ for(i=0; i<nhavemap; i++){ if(told[i] != havemap[i]){ @@ -399,7 +390,7 @@ Retry: if(debug) fprint(2, "peer %s: -> have %d\n", addr, x); n = pack(buf, sizeof(buf), "lbl", 1+4, 0x04, x); if(write(fd, buf, n) != n) - goto Retry; + goto Out; } } if(!mewant && (map[i] & ~havemap[i])){ @@ -407,7 +398,7 @@ Retry: if(debug) fprint(2, "peer %s: -> interested\n", addr); n = pack(buf, sizeof(buf), "lb", 1, 0x02); if(write(fd, buf, n) != n) - goto Retry; + goto Out; } } if(!hechoking && mewant){ @@ -423,7 +414,7 @@ Retry: if(debug) fprint(2, "peer %s: -> request %d %d %d\n", addr, x, o, l); n = pack(buf, sizeof(buf), "lblll", 1+4+4+4, 0x06, x, o, l); if(write(fd, buf, n) != n) - goto Retry; + goto Out; workpiece = x; } } @@ -432,21 +423,21 @@ Retry: if(debug) fprint(2, "peer %s: -> unchoke\n", addr); n = pack(buf, sizeof(buf), "lb", 1, 0x01); if(write(fd, buf, n) != n) - goto Retry; + goto Out; } if(readn(fd, buf, 4) != 4) - goto Retry; + break; unpack(buf, 4, "l", &n); + if(n < 0 || n > sizeof(buf)) + break; if(n == 0) continue; - if(n < 0 || n > sizeof(buf)) - goto Retry; if(readn(fd, buf, n) != n) - goto Retry; - retry = 0; - p = buf+1; + break; + n--; + p = buf+1; switch(*buf){ case 0x00: // Choke hechoking = 1; @@ -467,7 +458,7 @@ Retry: break; case 0x04: // Have <piceindex> if(unpack(p, n, "l", &x) < 0) - goto Retry; + goto Out; if(debug) fprint(2, "peer %s: <- have %d\n", addr, x); if(x < 0 || x >= npieces) continue; @@ -481,7 +472,7 @@ Retry: break; case 0x06: // Request <index> <begin> <length> if(unpack(p, n, "lll", &x, &o, &l) < 0) - goto Retry; + goto Out; if(debug) fprint(2, "peer %s: <- request %d %d %d\n", addr, x, o, l); if(x < 0 || x >= npieces) continue; @@ -496,13 +487,19 @@ Retry: n = pack(buf, sizeof(buf), "lbll", 1+4+4+l, 0x07, x, o); n += l; if(write(fd, buf, n) != n) - goto Retry; + goto Out; + lock(&stats); + stats.up += n; + unlock(&stats); break; case 0x07: // Piece <index> <begin> <block> if(unpack(p, n, "ll", &x, &o) != 8) - goto Retry; + goto Out; p += 8; n -= 8; + lock(&stats); + stats.down += n; + unlock(&stats); if(debug) fprint(2, "peer %s: <- piece %d %d %d\n", addr, x, o, n); if(x < 0 || x >= npieces) continue; @@ -517,19 +514,102 @@ Retry: break; case 0x08: // Cancel <index> <begin> <length> if(unpack(p, n, "lll", &x, &o, &l) < 0) - goto Retry; + goto Out; if(debug) fprint(2, "peer %s: <- cancel %d %d %d\n", addr, x, o, l); break; case 0x09: // Port <port> if(unpack(p, n, "l", &x) < 0) - goto Retry; + goto Out; if(debug) fprint(2, "peer %s: <- port %d\n", addr, x); break; } } -Exit: + +Out: free(told); free(map); + return 1; +} + +void +server(void) +{ + char addr[64], adir[40], ldir[40]; + int afd, lfd, dfd; + NetConnInfo *ni; + + afd = -1; + for(port=6881; port<6890; port++){ + snprint(addr, sizeof(addr), "tcp!*!%d", port); + if((afd = announce(addr, adir)) >= 0) + break; + } + if(afd < 0){ + fprint(2, "announce: %r"); + return; + } + if(rfork(RFFDG|RFPROC|RFMEM)) + return; + for(;;){ + if((lfd = listen(adir, ldir)) < 0){ + fprint(2, "listen: %r"); + break; + } + if(rfork(RFFDG|RFPROC|RFMEM)){ + close(lfd); + continue; + } + if((dfd = accept(lfd, ldir)) < 0){ + fprint(2, "accept: %r"); + break; + } + ni = getnetconninfo(ldir, dfd); + peer(dfd, 1, ni ? ni->raddr : "???"); + if(ni) freenetconninfo(ni); + break; + } + exits(0); +} + +void +client(char *ip, char *port) +{ + static Dict *peers; + static QLock peerslk; + int try, fd; + char *addr; + Dict *d; + + if(ip == nil || port == nil) + return; + + d = mallocz(sizeof(*d) + 64, 1); + snprint(addr = d->str, 64, "tcp!%s!%s", ip, port); + qlock(&peerslk); + if(dlook(peers, addr)){ + qunlock(&peerslk); + free(d); + return; + } + d->len = strlen(addr); + d->typ = 'd'; + d->val = d; + d->next = peers; + peers = d; + qunlock(&peerslk); + + if(debug) fprint(2, "client %s\n", addr); + + if(rfork(RFFDG|RFPROC|RFMEM)) + return; + for(try = 0; try < 10; try++){ + if((fd = dial(addr, nil, nil, nil)) >= 0){ + if(!peer(fd, 0, addr)) + break; + close(fd); + } + sleep((1000<<try)+nrand(5000)); + } exits(0); } @@ -571,9 +651,9 @@ tracker(char *url) static Dict *trackers; static QLock trackerslk; + Dict *d, *l; int n, fd; char *p; - Dict *d, *l; if(url == nil) return; @@ -594,17 +674,32 @@ tracker(char *url) url = d->str; qunlock(&trackerslk); - if(rfork(RFFDG|RFPROC|RFMEM) <= 0) + if(debug) fprint(2, "tracker %s\n", url); + + if(rfork(RFPROC|RFMEM)) return; for(;;){ + vlong up, down, left; + + lock(&stats); + up = stats.up; + down = stats.down; + left = stats.left; + unlock(&stats); + d = nil; - if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&compact=1", - url, sizeof(infohash), infohash, sizeof(peerid), peerid, port)) >= 0){ + if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&" + "uploaded=%lld&downloaded=%lld&left=%lld&" + "compact=1", + url, sizeof(infohash), infohash, sizeof(peerid), peerid, port, + up, down, left)) >= 0){ n = readall(fd, &p); close(fd); bparse(p, p+n, &d); free(p); + } else { + if(debug) fprint(2, "tracker %s: %r\n", url); } if(l = dlook(d, "peers")){ if(l->typ == 's'){ @@ -617,10 +712,10 @@ tracker(char *url) snprint(ip, sizeof(ip), "%d.%d.%d.%d", b[0], b[1], b[2], b[3]); snprint(port, sizeof(port), "%d", b[4]<<8 | b[5]); - peer(ip, port); + client(ip, port); } } else for(; l && l->typ == 'l'; l = l->next) - peer(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port"))); + client(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port"))); } n = 0; if(p = dstr(dlook(d, "interval"))) @@ -651,34 +746,9 @@ Hfmt(Fmt *f) } int -progress(void) -{ - int i, c; - uchar m; - c = 0; - for(i=0; i<nhavemap; i++) - for(m = 0x80; m; m>>=1) - if(havemap[i] & m) - c++; - if(pflag) - print("%d %d\n", c, npieces); - return c == npieces; -} - -void -killcohort(void) +killnote(void *, char *) { - int i; - for(i=0;i!=3;i++){ /* It's a long way to the kitchen */ - postnote(PNGROUP, pidgroup, "kill"); - sleep(1); - } -} - -int -catchnote(void *, char *msg) -{ - exits(msg); + postnote(PNGROUP, killgroup, "kill"); return 0; } @@ -788,6 +858,7 @@ main(int argc, char *argv[]) else pieces[i].len = blocksize; len -= pieces[i].len; + stats.left += pieces[i].len; } if(len) sysfatal("pieces do not match file length"); @@ -795,23 +866,31 @@ main(int argc, char *argv[]) for(i = 0; i<npieces; i++) havepiece(i); - switch(i = rfork(RFPROC|RFMEM|RFNOTEG|RFNAMEG)){ + srand(time(0)); + atnotify(killnote, 1); + switch(i = rfork(RFPROC|RFMEM|RFNOTEG)){ case -1: sysfatal("fork: %r"); case 0: memmove(peerid, "-NF9001-", 8); - genrandom(peerid+8, sizeof(peerid)-8); + for(i=8; i<sizeof(peerid); i++) + peerid[i] = nrand(10)+'0'; + server(); tracker(dstr(dlook(torrent, "announce"))); for(d = dlook(torrent, "announce-list"); d && d->typ == 'l'; d = d->next) if(d->val && d->val->typ == 'l') tracker(dstr(d->val->val)); + while(waitpid() != -1) + ; break; default: - pidgroup = i; - atexit(killcohort); - atnotify(catchnote, 1); - while(!progress() || sflag) + killgroup = i; + while((nhavepieces < npieces) || sflag){ + if(pflag) + print("%d %d\n", nhavepieces, npieces); sleep(1000); + } } + postnote(PNGROUP, killgroup, "kill"); exits(0); } |