#!/usr/bin/env python import threading, Queue, time, re, os, tempfile import aptsources.sourceslist import aptsources.distro from timeit import Timer import urllib import socket import random socket.setdefaulttimeout(2) class MirrorTest: class PingWorker(threading.Thread): def __init__(self, jobs, results, id): self.id = id self.jobs = jobs self.results = results self.match_result = re.compile(r"^rtt .* = [\.\d]+/([\.\d]+)/.*") threading.Thread.__init__(self) def run(self): result = None while MirrorTest.completed_pings < MirrorTest.todo: try: mirror = self.jobs.get(True, 1) host = mirror.hostname except: continue print "Pinging (Worker %s) %s (%s) ..." % (self.id, host, MirrorTest.completed_pings) commando = os.popen("ping -q -c 4 -W 2 -i 0.3 %s" % host, "r") while True: line = commando.readline() if not line: break result = re.findall(self.match_result, line) MirrorTest.completed_pings_lock.acquire() MirrorTest.completed_pings += 1 if result: self.results.append([float(result[0]), host, mirror]) MirrorTest.completed_pings_lock.release() def speed_test(self, mirror): url = "%s/%s" % (mirror.get_repo_urls()[0], self.test_file) print "Downloading %s ..." % url start = time.time() try: data = urllib.urlopen(url).read(51200) return 50 / (time.time() - start) except: return 0 def __init__(self, hosts, test_file): self.test_file = test_file jobs = Queue.Queue() results = [] for h in hosts: jobs.put(h) self.threads = [] MirrorTest.completed_pings = 0 MirrorTest.completed_pings_lock = threading.Lock() MirrorTest.todo = len(hosts) for i in range(10): t = MirrorTest.PingWorker(jobs, results, i) self.threads.append(t) t.start() for t in self.threads: t.join() results.sort() print "\n\nTop ten RTTs:" for r in results[0:10]: print "%s: %s" % (r[1], r[0]) print "\n\n" results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]]) results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]]) final = map(lambda r: (self.speed_test(r[2]), r[2]), results[0:12]) final.sort() final.reverse() print "\n\nBest mirrors:" for f in final: print "%s: %s KByte/s" % (f[1].hostname, int(f[0])) if __name__ == "__main__": distro = aptsources.distro.get_distro() distro.get_sources(aptsources.sourceslist.SourcesList()) pipe = os.popen("dpkg --print-architecture") arch = pipe.read().strip() test_file = "dists/%s/%s/binary-%s/Packages.gz" % \ (distro.source_template.name, distro.source_template.components[0].name, arch) app = MirrorTest(distro.source_template.mirror_set.values(), test_file)