from CurrentRateMeasure import Measure from math import ceil from sys import stdout from threading import Lock,Condition import SocketServer import BaseHTTPServer import os,sys,string,time import random,socket,thread,re from BT1.PiecePicker import PiecePicker class PiecePickerStreaming(PiecePicker): """ Implements piece picking for streaming video. Keeps track of playback point and avoids requesting obsolete pieces. """ def __init__(self, numpieces, rarest_first_cutoff = 1, rarest_first_priority_cutoff = 3, priority_step = 20, helper = None, rate_predictor = None): PiecePicker.__init__( self, numpieces, rarest_first_cutoff, rarest_first_priority_cutoff, priority_step, helper, rate_predictor ) # size of each piece self.piece_length = 0 # range of pieces to download, inclusive: (first,last) self.download_range = (0,self.numpieces-1) # playback module self.videoplayback = None def set_videoplayback(self, videoplayback): self.videoplayback = videoplayback # update its information for i in xrange(0,self.numpieces): if self.has[i]: self.videoplayback.complete( i, downloaded=False ) def got_have(self, piece): PiecePicker.got_have( self, piece ) def lost_have(self, piece): PiecePicker.lost_have( self, piece ) def complete(self, piece): PiecePicker.complete( self, piece ) if self.videoplayback: self.videoplayback.complete( piece ) def set_download_range(self, begin, end): self.download_range = [begin,end] def streaming_piece_filter(self, piece): return (piece >= self.download_range[0] and piece <= self.download_range[1]) def next(self, haves, wantfunc, sdownload, complete_first = False, helper_con = False): def newwantfunc( piece ): return self.streaming_piece_filter( piece ) and wantfunc( piece ) return PiecePicker.next(self, haves, newwantfunc, sdownload, complete_first, helper_con) class PiecePickerEDF(PiecePickerStreaming): """ Earliest Deadline First -- pick the piece with the lowest number. """ def _next(self, haves, wantfunc, complete_first, helper_con): """ Determine which piece to download next from a peer. haves: set of pieces owned by that peer wantfunc: custom piece filter complete_first: whether to complete partial pieces first helper_con: """ for i in xrange(self.download_range[0],self.download_range[1]+1): if self.has[i]: continue if not wantfunc(i): continue if not haves[i]: continue if self.helper is None or helper_con or not self.helper.is_ignored(i): return i class PiecePickerBiToS(PiecePickerStreaming): """ BiToS -- define a high-priority set, and select out of it with probability p. """ # size of high probability set, as a fraction of the movie HIGH_PROB_SETSIZE = 0.05 # p -- probability of selecting a piece out of the high probability set P = 0.8 def _next(self, haves, wantfunc, complete_first, helper_con): """ Determine which piece to download next from a peer. haves: set of pieces owned by that peer wantfunc: custom piece filter complete_first: whether to complete partial pieces first helper_con: """ def select( f, t ): for i in xrange(f,t): if self.has[i]: continue if not wantfunc(i): continue if not haves[i]: continue if self.helper is None or helper_con or not self.helper.is_ignored(i): return i return None highprob = random.uniform(0,1) < self.P highprob_cutoff = self.download_range[0] + int(self.HIGH_PROB_SETSIZE * self.numpieces) if highprob_cutoff >= self.download_range[1]: highprob = False if highprob: choice = select( self.download_range[0], highprob_cutoff ) if choice is None: choice = select( highprob_cutoff, self.download_range[1]+1 ) else: choice = select( highprob_cutoff, self.download_range[1]+1 ) return choice PiecePickerVOD = PiecePickerBiToS class MovieSelector: """ Selects a movie out of a torrent and provides information regarding the pieces and offsets within the torrent. """ EXTENSIONS = ['asf','avi','dv','flc','mpeg','mpeg4','mpg4','mp4','mpg','mov','ogm','qt','rm','swf','vob','wmv'] def __init__(self, fileselector, storagewrapper, piecepicker): self.fileselector = fileselector self.piecepicker = piecepicker self.storagewrapper = storagewrapper # information about all the files in the .torrent self.fileinfo = [] # information about the movie to download: # (filename,offset,length) self.download_fileinfo = None # (first_piece,offset),(last_piece,offset) self.download_range = None # size of each piece self.piece_length = 0 self.first_piece_length = 0 self.last_piece_length = 0 self.bitrate = None self.size = None self.duration = None def set_bitrate(self,bitrate): self.bitrate = bitrate self.duration = self.size / self.bitrate def set_duration(self,duration): self.duration = duration self.bitrate = self.size / self.duration def select_movie(self,filename): """ Select a movie to download. """ for name,offset,length,begin,end in self.fileinfo: if name == filename: self.download_fileinfo = (name,offset,length) self.download_range = (begin,end) self.size = length self.first_piece_length = self.piece_length - begin[1] self.last_piece_length = end[1] self.piecepicker.set_download_range(begin[0],end[0]) print "Selected: %s" % (self.download_fileinfo,) return def autoselect_movie(self): """ Autoselect a movie to download. """ # filter movies movie_extensions = [".%s" % e for e in self.EXTENSIONS] def is_movie( filename ): for ext in movie_extensions: if filename.endswith( ext ): return True else: return False # select first one movies = [f for f in self.fileinfo if is_movie( f[0] )] movies.sort() if not movies: print "No movies found!" else: print "Movies found: %s" % ([f[0] for f in movies],) self.select_movie( movies[0][0] ) def parse_torrent(self): """ Parse .torrent file information. """ fileinfo = [] total = 0 self.piece_length = piece_length = self.fileselector.piece_length for file, length in self.fileselector.files: if not length: filepieces.append(()) else: # filename, offset, length, (first_piece,offset), (last_piece,offset) info = (file, total, length, ( int(total/piece_length), total % piece_length ), ( int(ceil((total+length-1)/piece_length)), (total+length-1) % piece_length ) ) fileinfo.append(info) total += length self.fileinfo = fileinfo def num_movie_pieces(self): """ Returns the size of the movie in pieces. """ if not self.download_fileinfo: return 0 (bpiece,boffset),(epiece,eoffset) = self.download_range return epiece - bpiece + 1 def have_movie_piece(self,piece): """ Returns whether a certain movie piece has been downloaded. """ (bpiece,boffset),(epiece,eoffset) = self.download_range abspiece = piece + bpiece return self.piecepicker.has[abspiece] def get_movie_piece(self,piece): """ Returns the data of a certain piece in the movie (0=first piece), or None. """ if not self.download_fileinfo: return None if piece < 0 or piece >= self.num_movie_pieces(): return None (bpiece,boffset),(epiece,eoffset) = self.download_range abspiece = piece + bpiece if not self.piecepicker.has[abspiece]: return None begin,length = 0,self.piece_length if abspiece == bpiece: begin = boffset length -= boffset if abspiece == epiece: cutoff = self.piece_length - (eoffset + 1) length -= cutoff data = self.storagewrapper.do_get_piece(abspiece, begin, length) return data.tostring() def movie_piece_length(self,piece): """ Returns the length of a certain piece. """ if piece == 0: return self.first_piece_length if piece == self.num_movie_pieces(): return self.last_piece_length return self.piece_length class VideoPlayback: """ Takes care of providing a bytestream interface based on the available pieces. """ # location of ffmpeg/ffplay FFPLAY="/Users/jandavidmol/svn/ffmpeg-EBU/ffplay" FFMPEG="/Users/jandavidmol/svn/ffmpeg-EBU/ffmpeg" # number of packets required to preparse the video MAX_PREPARSE_PACKETS=10 # max number of pieces in queue to player BUFFER_LENGTH = 2 # amount of time (seconds) to push a packet into # the player queue ahead of schedule PIECE_DUE_SKEW = 0.1 def __init__(self,movieselector,piecepicker,rawserver): self.movieselector = movieselector self.piecepicker = piecepicker self.rawserver = rawserver self.downloadrate = Measure( 10 ) # position of playback, in pieces self.playback_pos = 0 self.playing = False # buffer: a link to the piecepicker buffer self.has = self.piecepicker.has # number of pieces in buffer self.pieces_in_buffer = 0 # start periodic tasks self.tick_second() self.lock = Lock() self.data_ready = Condition() self.playing = False self.prebuffering = True self.estimated_bitrate = None httpd = HTTPServer( self ) httpd.background_serve() self.start_ffplay() # link to others (last thing to do) self.piecepicker.set_videoplayback( self ) def start_ffplay(self): """ Start the external player. """ os.system( "%s -once -x 320 -y 240 http://localhost:8088/ &" % self.FFPLAY ) def parse_video(self): """ Feeds the first MAX_PREPARSE_PACKETS to ffmpeg to determine video bitrate. """ # start ffmpeg (child_out,child_in) = os.popen2( "%s -y -i - -vcodec copy -acodec copy -f avi /dev/null 2>&1" % self.FFMPEG ) # feed all the pieces for i in xrange(0,self.MAX_PREPARSE_PACKETS): piece = self.movieselector.get_movie_piece( i ) if piece is None: break try: child_out.write( piece ) except IOError: break child_out.close() # find the bitrate in the output bitrate = None r = re.compile( "bitrate= *([0-9.]+)kbits/s" ) for x in child_in.readlines(): occ = r.findall( x ) if occ: # use the latest mentioning of bitrate bitrate = float( occ[-1] ) * 1024 / 8 return bitrate def complete(self,abspiece,downloaded=True): """ Called when a piece has been downloaded or was available from the start (disk). """ # determine piece number relative to movie bpiece = self.movieselector.download_range[0][0] piece = abspiece - bpiece if downloaded: self.downloadrate.update_rate( self.movieselector.piece_length ) if piece >= self.playback_pos: self.pieces_in_buffer += 1 if self.playing and self.pos == piece: # we were delaying for this piece self.refill_buffer() if self.prebuffering: if self.estimated_bitrate is None: # extract bitrate once we got the first MAX_PREPARSE_PACKETS if piece < self.MAX_PREPARSE_PACKETS: gotall = True for i in xrange(0,self.MAX_PREPARSE_PACKETS): if not self.movieselector.have_movie_piece( i ): gotall = False break if gotall: bitrate = self.parse_video() if bitrate is not None: self.estimated_bitrate = bitrate self.movieselector.set_bitrate( bitrate ) print "[Estimated bitrate: %.2f KByte/s]" % (bitrate/1024) if self.estimated_bitrate is not None and self.enough_buffer(): # enough buffer and could estimated bitrate - start streaming print "[Prebuffering done]" self.data_ready.acquire() self.prebuffering = False self.data_ready.notify() self.data_ready.release() else: print "[Prebuffering: %.2f seconds left]" % (self.expected_buffering_time()) def set_playback_pos(self,playback_pos): """ Update the playback position. """ # fast forward for i in xrange(self.playback_pos,playback_pos+1): if self.has[i]: self.pieces_in_buffer -= 1 # fast rewind for i in xrange(playback_pos,self.playback_pos+1): if self.has[i]: self.pieces_in_buffer += 1 self.piecepicker.download_range[0] += (playback_pos - self.playback_pos) self.playback_pos = playback_pos def inc_playback_pos(self): self.set_playback_pos( self.playback_pos + 1 ) def expected_download_time(self): """ Expected download time left. """ pieces_left = self.movieselector.num_movie_pieces() - self.playback_pos - self.pieces_in_buffer expected_download_speed = self.downloadrate.rate if expected_download_speed == 0: return 0 if pieces_left <= 0: return 0 return pieces_left * self.movieselector.piece_length / expected_download_speed def expected_playback_time(self): """ Expected playback time left. """ pieces_to_play = self.movieselector.num_movie_pieces() - self.playback_pos if pieces_to_play <= 0: return 0 bitrate = self.movieselector.bitrate if bitrate is None or bitrate == 0: bitrate = 300*1024/8 return pieces_to_play * self.movieselector.piece_length / bitrate def expected_buffering_time(self): """ Expected time required for buffering. """ return max( 0, self.expected_download_time() - self.expected_playback_time() ) def enough_buffer(self): """ Returns True if we can safely start playback without expecting to run out of buffer. """ return self.expected_buffering_time() == 0 def tick_second(self): self.rawserver.add_task( self.tick_second, 1.0 ) print "Estimated download time: %5ds [%7.2f Kbyte/s]" % (self.expected_download_time(),self.downloadrate.rate/1024) if self.playing and self.playbackrate.rate > 0 and not self.prebuffering: self.movieselector.set_bitrate( self.playbackrate.rate ) print "Estimated playback time: %5ds [%7.2f Kbyte/s]" % (self.expected_playback_time(),self.playbackrate.rate/1024) stdout.flush() def size( self ): return self.movieselector.size def read( self, numbytes = None ): """ Read at most numbytes from the stream. If numbytes is not given, pieces are returned. The bytes read will be returned, or None in case of an error or end-of-stream. """ if not self.curpiece: # curpiece_pos could be set to something other than 0! # for instance, a seek request sets curpiece_pos but does not # set curpiece. x = self.pop() if x is None: return None piecenr,self.curpiece = x if numbytes is None: # default on one piece per read numbytes = self.movieselector.piece_length curpos = self.curpiece_pos left = len(self.curpiece) - curpos if left > numbytes: # piece contains enough -- return what was requested data = self.curpiece[curpos:curpos+numbytes] self.curpiece_pos += numbytes else: # return remainder of the piece data = self.curpiece[curpos:] self.curpiece = "" self.curpiece_pos = 0 return data def start( self, bytepos = 0 ): """ Initialise to start playing at position `bytepos'. """ # Determine piece number and offset if bytepos < self.movieselector.first_piece_length: piece = 0 offset = bytepos else: bytepos -= self.movieselector.first_piece_length piece = bytepos / self.movieselector.piece_length + 1 offset = bytepos % self.movieselector.piece_length print "=== START request at offset %d (piece %d) ===" % (bytepos,piece) # Initialise all playing variables self.data_ready.acquire() self.curpiece = "" self.curpiece_pos = offset self.pos = piece self.set_playback_pos( piece ) self.outbuf = [] self.start_playback = time.time() self.playing = True self.prebuffering = True self.playbackrate = Measure( 60 ) self.data_ready.release() self.refill_thread() def stop( self ): """ Playback is stopped. """ print "=== STOP request === " self.playing = False # clear buffer and notify possible readers self.data_ready.acquire() self.outbuf = [] self.prebuffering = False self.data_ready.notify() self.data_ready.release() def done( self ): if not self.playing: return True return self.pos == self.numpieces() and self.curpiece_pos >= len(self.curpiece) def numpieces( self ): return self.movieselector.num_movie_pieces() def piece( self, i ): """ Returns piece #i, or NULL of not available. """ # Simulate packet loss here #if i in [8,9,10,11,12,20,23,26,30,31,40]: # return None piece = self.movieselector.get_movie_piece( i ) return piece def piece_due( self, i ): """ Return the time when we expect to have to send a certain piece to the player. """ now = time.time() - self.start_playback if i == 0: # now return time.time() bytepos = self.movieselector.first_piece_length + (i-1) * self.movieselector.piece_length return self.start_playback + bytepos / self.movieselector.bitrate - self.PIECE_DUE_SKEW def refill_buffer( self ): """ Push pieces into the player FIFO when needed and able. """ self.data_ready.acquire() if self.prebuffering: self.data_ready.release() return while self.pos < self.numpieces() and len( self.outbuf ) < self.BUFFER_LENGTH: piece = self.piece( self.pos ) if piece: # piece found -- add it to the queue print "[%d: pushed]" % self.pos self.outbuf.append( (self.pos,piece) ) self.data_ready.notify() elif time.time() < self.piece_due( self.pos ): # wait for packet print "[%d: waiting %.2fs]" % (self.pos,self.piece_due(self.pos)-time.time()) break else: # drop packet print "[%d: dropped]" % self.pos self.pos += 1 self.inc_playback_pos() self.data_ready.release() def refill_thread( self ): if self.playing: self.refill_buffer() self.rawserver.add_task( self.refill_thread, 0.1 ) def pop( self ): self.data_ready.acquire() while self.prebuffering and not self.done(): # wait until done prebuffering self.data_ready.wait() while not self.outbuf and not self.done(): # wait until a piece is available self.data_ready.wait() if not self.outbuf: piece = None else: piece = self.outbuf.pop( 0 ) self.playbackrate.update_rate( len(piece) ) self.data_ready.release() return piece class HTTPServer(BaseHTTPServer.HTTPServer): def __init__( self, movietransport, port=8088 ): self.movietransport = movietransport BaseHTTPServer.HTTPServer.__init__( self, ("",port), SimpleServer ) def background_serve( self ): thread.start_new_thread( self.serve_forever, () ) class SimpleServer(BaseHTTPServer.BaseHTTPRequestHandler): def do_GET(self): movie = self.server.movietransport size = movie.size() firstbyte, lastbyte = 0, size-1 range = self.headers.getheader('range') if range: type, seek = string.split(range,'=') firstbyte, lastbyte = string.split(seek,'-') movie.start( int(firstbyte) ) self.send_response(200) self.send_header("Content-Type", "video/mpeg") self.send_header("Content-Length", size) self.end_headers() while not movie.done(): data = movie.read() if not data: break try: self.wfile.write(data) except IOError: print "client closed connection for ", self.path break except socket.error: break movie.stop()