#!/usr/bin/python from threading import Thread,Lock,Event,Semaphore from Queue import Queue as SyncQueue from urllib2 import URLError,HTTPError,build_opener import urllib import os,random import time import sched from sha import sha from BitTornado.bencode import bdecode,bencode from BitCrawler.aurllib import urlopen as aurlopen from queue import Queue,QueueEntry,History import policy from log import get_logger from i18n import * import util try: from BitTornado.BT1.download import Download except ImportError,why: try: from BitTorrent.download import Download except ImportError: print 'BitTorrent not found' import sys sys.exit() try: sum([1]) except NameError: def sum(seq,start=0): s = 0 for i in seq[start:]: s += i return s _opener = None def urlopen(url,data=None,referer=None): global _opener if not _opener: _opener = build_opener() _opener.addheaders = [('User-Agent', 'Mozilla/4.0 (compatible; Windows; Linux)')] #_urlopen = _opener.open _urlopen = aurlopen try: fd = _urlopen(url,data,referer=referer) except HTTPError,why: raise HTTPError,why except (ValueError,URLError,OSError),why: try: fd = _urlopen(urllib.unquote(url),data,referer=referer) except HTTPError,why: raise HTTPError,why except (ValueError,URLError,OSError),why: from nturl2path import pathname2url try: fd = _urlopen('file:'+pathname2url(url),data,referer=referer) except HTTPError,why: raise HTTPError,why except (ValueError,URLError,OSError): fd = _urlopen('file:'+pathname2url(urllib.unquote(url)),referer=referer) return fd def infohash(metadata): resp = bdecode(metadata) info = resp['info'] infohash_sha = sha(bencode(info)) return infohash_sha.hexdigest() class RateController: MINIMUM_RATE = 3 def __init__(self,job,current_rate): self.job = job self.current_rate = current_rate self.new_rate = current_rate def __repr__(self): return '(%s,%0.1f,%0.1f)' % (self.job.id,self.current_rate,self.new_rate) def apply(self): pass def __cmp__(self,o): return cmp(self.current_rate,o.current_rate) def change_rate(self,offset): self.new_rate = max(self.new_rate+offset,RateController.MINIMUM_RATE) def is_active(self): return self.current_rate > RateController.MINIMUM_RATE def is_seeding(self): return self.job.state == STATE_SEEDING def is_leeching(self): return self.job.state == STATE_RUNNING class UploadRateController(RateController): def __init__(self,job): RateController.__init__(self,job,job.up_rate/1000.0) def apply(self): self.job.dow.setUploadRate(self.new_rate) class DownloadRateController(RateController): def __init__(self,job): RateController.__init__(self,job,job.down_rate/1000.0) def apply(self): self.job.dow.setDownloadRate(self.new_rate) def distribute_rate(jobs,avail_bw,threshold_bw,step): while avail_bw > 0: saved_bw = avail_bw for j in jobs: if j.is_active() and j.new_rate > threshold_bw: j.change_rate(-step) avail_bw -= step elif not j.is_active(): j.change_rate(+avail_bw) if saved_bw == avail_bw: threshold_bw -= step def change_rates(jobs,rate): for j in jobs: j.change_rate(rate) class Scheduler(Thread): file_semaphore = Semaphore() def __init__(self,controller,dispatch,error): Thread.__init__(self) self.policy = policy.get_policy() self.controller = controller self.controller.set_scheduler(self) self.do_dispatch = dispatch self.error = error self.queue = Queue() self.queue.load() self.lock = Lock() self._quit = Event() self.sched = sched.scheduler(time.time,self.wait) self.add_queue = SyncQueue(0) self.num_run = 0 self.log = get_logger() def job(self,id): item = self.queue.get(id) if item: self.controller.request_status(item) return item def jobs(self): for item in self.queue.get(): self.controller.request_status(item) return self.queue.get() def save(self): self.queue.save() def add(self,item): if self.lock.acquire(0): try: self.queue.add(item) except Exception,why: import traceback traceback.print_exc() print why self.lock.release() else: self.add_queue.put(item) def add_metadata(self,metadata,saveas=None,priority='',url=''): try: filename = '%s.torrent' % infohash(metadata) torrent_file = os.path.join(self.policy(policy.TORRENT_PATH), filename) save_torrent = 1 if url.startswith('file://'): url = url[7:] url = os.path.realpath(url) if url.startswith(self.policy(policy.TORRENT_PATH)): torrent_file = url save_torrent = 0 torrent_path = os.path.dirname(torrent_file) if not os.path.exists(torrent_path): os.mkdir(torrent_path) if save_torrent: fd = open(torrent_file,'wb') fd.write(metadata) fd.close() qe = QueueEntry(torrent_file, dest_path=saveas or self.policy(policy.DEST_PATH)) qe.local_policy.update(policy.PRIORITIES,priority or '') self.add(qe) except Exception,why: import traceback traceback.print_exc() return {'error':str(why)} return {'id':qe.id} def add_url(self,url,referer=None,saveas=None,priority=''): try: fd = urlopen(url,referer=referer) meta = fd.read() fullurl = fd.geturl() fd.close() save_torrent = 1 if fullurl.startswith('file://'): torrent_file = urllib.unquote(url) if torrent_file.startswith('file://'): torrent_file = torrent_file[7:] elif fullurl.startswith('file:/') and fullurl[8] == '|': torrent_file = urllib.unquote(url) torrent_file = torrent_file[7]+':'+torrent_file[9:] if torrent_file.find('Temporary Internet Files') >= 0: filename = os.path.split(url)[1] torrent_file = os.path.join(self.policy(policy.TORRENT_PATH), filename) else: filename = urllib.unquote(os.path.split(url)[1]) if filename.find('?') >= 0: filename = '%s.torrent' % infohash(meta) torrent_file = os.path.join(self.policy(policy.TORRENT_PATH), filename) torrent_path = os.path.dirname(torrent_file) if not os.path.exists(torrent_path): os.mkdir(torrent_path) if save_torrent: fd = open(torrent_file,'wb') fd.write(meta) fd.close() qe = QueueEntry(torrent_file, dest_path=saveas or self.policy(policy.DEST_PATH)) qe.local_policy.update(policy.PRIORITIES,priority or '') self.add(qe) except Exception,why: return {'error':str(why)} return {'id':qe.id} def remove(self,item): self.pause(item) self.lock.acquire() try: self.queue.remove(item) except Exception,why: import traceback traceback.print_exc() self.lock.release() def pause(self,item): if item.state == STATE_PAUSED: return if item.state in [STATE_RUNNING,STATE_SEEDING]: self.controller.remove(item) self.num_run -= 1 item.old_state = item.state if item.state == STATE_SEEDING: item.state = STATE_FINISHED else: item.state = STATE_PAUSED item.paused() def resume(self,item,force=0): if item.state in [STATE_RUNNING,STATE_SEEDING]: return item.state = STATE_WAITING if force: self.dispatch(item) def dispatch(self,item): item.state = STATE_RUNNING item.started() minport = self.policy(policy.MIN_PORT) maxport = self.policy(policy.MAX_PORT) minpeer = self.policy(policy.MIN_PEER) maxpeer = self.policy(policy.MAX_PEER) maxinit = self.policy(policy.MAX_INITIATE) maxport = max(minport,maxport) rerequest_interval = self.policy(policy.REREQUEST_INTERVAL) item.params = ['--minport',minport, '--maxport',maxport, '--spew',1, '--min_peers',minpeer, '--max_connections',maxpeer, '--max_initiate',maxinit, '--rerequest_interval',rerequest_interval, '--save_in',item.dest_path, '--max_upload_rate',self.policy(policy.MAX_UPLOAD_RATE), '--max_download_rate',self.policy(policy.MAX_DOWNLOAD_RATE)] report_ip = self.policy(policy.REPORT_IP) if report_ip: item.params += ['--ip',report_ip] item.params += [item.file] self.num_run += 1 try: self.do_dispatch(item,self.cb_finished,self.cb_failed) except Exception,why: self.num_run -= 1 item.state = STATE_PAUSED item.error = str(why) def cb_finished(self,item): self.lock.acquire() try: item.state = STATE_FINISHED item.finished() except Exception,why: import traceback traceback.print_exc() self.lock.release() hist = History() hist.load() hist.add(item.file) hist.save() def cb_failed(self,item): #item.state = STATE_WAITING pass def calculate_upload_rate(self): return completes = [] incompletes = [] for j in self.jobs(): # duplicate with download rate #j.update_scrape() if not j.dow or not hasattr(j.dow.d,'downloader'): continue if j.state == STATE_RUNNING: incompletes.append(UploadRateController(j)) if j.state == STATE_SEEDING: completes.append(UploadRateController(j)) used_upload_bw = sum([j.current_rate for j in incompletes]) used_seed_bw = sum([j.current_rate for j in completes]) used_bw = used_upload_bw+used_seed_bw all = incompletes+completes max_bw = float(self.policy(policy.MAX_UPLOAD_RATE)) max_seed_bw = float(self.policy(policy.MAX_SEED_RATE)) max_upload_bw = max_bw-min(max_seed_bw,used_seed_bw) max_seed_bw = max(max_bw-max_upload_bw,max_seed_bw) #print used_upload_bw,used_seed_bw,used_bw #print max_upload_bw,max_seed_bw,max_bw distribute_step = 5 if used_seed_bw < max_seed_bw: avail_bw = max_seed_bw-used_seed_bw change_rates(completes,+avail_bw) #for j in completes: # j.change_rate(+avail_bw) else: avail_bw = used_seed_bw-max_seed_bw avg_bw = avail_bw/max(len(completes),1) compl = completes[:] compl.sort() distribute_rate(compl,avail_bw,avg_bw,distribute_step) #while avail_bw > 0: # saved_bw = avail_bw # for j in compl: # if j.is_active() and j.new_rate > avg_bw: # j.change_rate(-step) # avail_bw -= step # elif not j.is_active(): # j.change_rate(+avail_bw) # if saved_bw == avail_bw: # avg_bw -= step if used_upload_bw < max_upload_bw: avail_bw = max_upload_bw-used_upload_bw change_rates(incompletes,+avail_bw) #for j in incompletes: # j.change_rate(+avail_bw) else: avail_bw = used_upload_bw-max_upload_bw avg_bw = avail_bw/max(len(incompletes),1) step = 5 incompl = incompletes[:] incompl.sort() distribute_rate(incompl,avail_bw,avg_bw,distribute_step) #while avail_bw > 0: # saved_bw = avail_bw # for j in incompl: # if j.is_active() and j.new_rate > avg_bw: # j.change_rate(-step) # avail_bw -= step # elif not j.is_active(): # j.change_rate(+avail_bw) # if saved_bw == avail_bw: # avg_bw -= step all = incompletes+completes self.log.verbose('Upload Rate: %s\n' % repr(all)) for j in all: j.apply() def calculate_download_rate(self): '''Simple download rate adjuster. Could be enhanced with priorities.''' return incompletes = [] for j in self.jobs(): # duplicate with download rate #j.update_scrape() if not j.dow or not hasattr(j.dow.d,'downloader'): continue if j.state == STATE_RUNNING: incompletes.append(DownloadRateController(j)) used_bw = sum([j.current_rate for j in incompletes]) max_bw = float(self.policy(policy.MAX_DOWNLOAD_RATE)) or 1000000 distribute_step = 5 if used_bw < max_bw: avail_bw = max_bw-used_bw change_rates(incompletes,+avail_bw) else: avail_bw = used_bw-max_bw avg_bw = avail_bw/max(len(incompletes),1) incompl = incompletes[:] incompl.sort() distribute_rate(incompl,avail_bw,avg_bw,distribute_step) self.log.verbose('Download Rate: %s\n' % repr(incompletes)) for j in incompletes: j.apply() def old_calculate_download_rate(self): '''Simple download rate adjuster. Could be enhanced with priorities.''' completes = [] incompletes = [] used_bw = 0 dl_rates = [] for j in self.jobs(): # duplicate with upload rate #j.update_scrape() if not j.dow or not hasattr(j.dow.d,'downloader'): continue if j.state == STATE_RUNNING: incompletes.append(j) dr = j.down_rate/1000 dl_rates.append(dr) max_bw = float(self.policy(policy.MAX_DOWNLOAD_RATE)) or 1000000 active_downs=len(incompletes) used_bw = sum( dl_rates ) if used_bw == 0: return average_max_bw = max_bw / active_downs if len(incompletes) > 1 and used_bw >= max_bw * 0.85: avail_bw = max_bw - used_bw avail_avg_bw = avail_bw / active_downs soft_max_perc_allowed = 0.9 while avail_bw > 0.5: for act in range(active_downs): new_rate=dl_rates[act] + avail_avg_bw if new_rate < (soft_max_perc_allowed + (1 - soft_max_perc_allowed) / active_downs) * max_bw: dl_rates[act] = new_rate avail_bw += -avail_avg_bw avail_avg_bw = avail_avg_bw / active_downs else: dl_rates = [average_max_bw] * len(incompletes) for j in map(None,incompletes,dl_rates): j[0].dow.setDownloadRate(j[1]) def terminate_session(self): self.terminate_seeding() free_space = util.get_free_space(self.policy(policy.DEST_PATH)) if free_space == -1: return if free_space < self.policy(policy.MIN_FREE_SPACE): for j in self.jobs(): if j.state in [STATE_RUNNING,STATE_SEEDING]: self.pause(j) def terminate_seeding(self): now = time.time() for j in self.jobs(): pol = j.get_policy() if pol(policy.USE_LOCAL_POLICY): min_share_ratio = pol(policy.MIN_SHARE_RATIO) max_share_ratio = pol(policy.MAX_SHARE_RATIO) min_seed_time = pol(policy.MIN_SEED_TIME) max_seed_time = pol(policy.MAX_SEED_TIME) min_seeder = pol(policy.MIN_SEEDER) max_seeder = pol(policy.MAX_SEEDER) min_peer_ratio = pol(policy.MIN_PEER_RATIO) max_peer_ratio = pol(policy.MAX_PEER_RATIO) else: min_share_ratio = self.policy(policy.MIN_SHARE_RATIO) max_share_ratio = self.policy(policy.MAX_SHARE_RATIO) min_seed_time = self.policy(policy.MIN_SEED_TIME) max_seed_time = self.policy(policy.MAX_SEED_TIME) min_seeder = self.policy(policy.MIN_SEEDER) max_seeder = self.policy(policy.MAX_SEEDER) min_peer_ratio = self.policy(policy.MIN_PEER_RATIO) max_peer_ratio = self.policy(policy.MAX_PEER_RATIO) seed_time = now-j.finished_time try: seeder = int(j.currentseed) except ValueError: seeder = 0 try: peer_ratio = seeder/int(j.currentpeer) except (ValueError,ZeroDivisionError): peer_ratio = max_peer_ratio if j.state == STATE_SEEDING and \ j.share_ratio >= min_share_ratio and \ seed_time >= min_seed_time and \ seeder >= min_seeder and \ peer_ratio >= min_peer_ratio and \ (j.share_ratio >= max_share_ratio or \ seed_time >= max_share_ratio or \ seeder >= max_seeder or \ peer_ratio >= max_peer_ratio): self.pause(j) def schedule(self): dispatched_count = 0 self.lock.acquire() try: while self.controller.initialized(): item = self.queue.get_next() if not item: break if self.num_run >= self.policy(policy.MAX_JOB_RUN): break self.dispatch(item) dispatched_count += 1 self.calculate_upload_rate() self.calculate_download_rate() except Exception,why: import traceback traceback.print_exc() print why self.lock.release() self.terminate_session() self.queue.save() while not self.add_queue.empty(): self.add(self.add_queue.get()) if dispatched_count > 0: self.scrape(rerun=0) self.add_task(self.policy(policy.SCHEDULING_INTERVAL),self.schedule,()) def scrape_func(self,jobs,rerun=1): for j in jobs: j.update_scrape(interactive=1) if rerun: self.add_task(self.policy(policy.SCRAPE_INTERVAL),self.scrape,()) def scrape(self,rerun=1): if hasattr(self,'scrape_thread'): if self.scrape_thread.isAlive(): return active_jobs = [] for j in self.jobs(): if j.state in [STATE_RUNNING,STATE_SEEDING]: active_jobs.append(j) self.scrape_thread = Thread(target=self.scrape_func, args=(active_jobs,rerun)) self.scrape_thread.start() def stop(self): self._quit.set() for j in self.jobs(): self.controller.remove(j) if hasattr(self,'scrape_thread'): if self.scrape_thread.isAlive(): print 'waiting scrape' self.scrape_thread.join() print 'terminated scrape' self.queue.save() def wait(self,delay): self._quit.wait(delay) if self._quit.isSet(): q = self.sched.queue while q: q.pop() def add_task(self,delay,action,argument): self.sched.enter(delay,0,action,argument) def run(self): self.add_task(0,self.schedule,()) self.add_task(0,self.scrape,()) while not self._quit.isSet(): self.sched.run() print 'queue stopped'