#! /usr/bin/env python ############################################################## # # Name: node.py # # Description: node class for each peer # # Usage: # # # author: Jun Wang j.wang@ewi.tudelft.nl # ############################################################## import sys, time, copy from connectinterface import myconnection #from sender import * from peercache import peerCache from similarity import simMeasure def now(): return time.ctime(time.time()) class node: ######################################################################### # Name: initialization of the node class # Discription: # mothod = {'randDominant', 'buddyDominant'} # randDominant: r = buddyCacheSize/randCacheSize # buddyDominant: r= randCacheSize/buddyCacheSize # peerCache: random peer cache # a peer in cache: {'peerid': id,'port':ownport,'ip':ipaddress,'time':lastseetime} # buddyCache: taste buddy cache # a peer in buddyCache: {'rank':sim,'peerid': id,'port':ownport,'ip':ipaddress,'time':lastseetime, 'user_pref':'null'} ####################################################################################### def __init__(self,nodeid=-1,instanceid=-1,numinstance=-1,svrportbase = 30000,cltportbase = 10000, buddysize = 100, cachesize = 100, method = 'buddyDominant', r = 1): # two caches #currently in DAS2 experiement, these two caches are set to empty self.buddyCache = peerCache(cacheMaxSize = buddysize) self.randCache = peerCache(cacheMaxSize = cachesize) # informaton of peers self.info = {'serverportbase' : svrportbase} self.info['clientportbase'] = cltportbase self.info['nodeid'] = nodeid self.info['instanceid'] = instanceid self.connection = myconnection() self.info['peer_id'] = self.getPeerID(nodeid,instanceid,numinstance) self.info['ownip'] = 'null' self.info['ownport'] ='null' # buddycast configurations self.config = {'method': method} self.config['r'] = r self.config['buddyCacheSize'] = buddysize self.config['randCacheSize'] = cachesize self.config['exchange_pref_cache'] = False def startServer(self, receiverhandle): self.handle = receiverhandle self.handle.nodeobj = self self.connection.start_server(self.handle,self) #print self.nodeid,self.instanceid,self.serverportbase def register(self): ownport = self.info['ownport'] peerMsg ={'peer_id': self.info['peer_id'],'port':ownport} #print 'I am registering' #print self.connection.superPeerHost [cmd,msg] = self.connection.sendMSG(self.connection.superPeerHost,self.connection.superPeerPort,'REG',peerMsg) #print 'finishing registering', #print msg if msg != 'REGED': print 'Client received wrong ack :',self.info['peer_id'] def startMonitorServer(self, receiverhandle,port): self.handle = receiverhandle self.handle.nodeobj = self self.connection.start_monitor_server(self.handle,port) def initialCache(self,numPeers): msg =[numPeers,self.info['peer_id']] [cmd,cache] = self.connection.sendMSG(self.connection.superPeerHost, self.connection.superPeerPort,'INI',msg) if cache != 'null' : data= cache # print self.info['instanceid'],self.info['nodeid'], self.info['peer_id'], 'in cache:', `data` for i in range(len(data)): peerid = data[i]['peer_id'] if (self.randCache.find('peer_id', int(peerid))[0] == -1) and (peerid != self.info['peer_id']) : data[i]['has_pref'] = False self.randCache.add(data[i]) print 'peerid in cache:', peerid, else: print '====this is the first peer!===', #self.connection.close() # close socket to send eof to server def initialUserPref(self): [cmd,user_pref] = self.connection.sendMSG(self.connection.superPeerHost, self.connection.superPeerPort,'OPL',self.info['peer_id']) self.info['user_pref'] = user_pref def getPeerID(self,nodeid,instanceid,numinstance): # peerid = nodeid*numinstance+instanceid return nodeid*numinstance + instanceid def getNodeInstanceID(self,peerid,numinstance): return peerid # not finished yet def push(self): selectedPeer = self.selectConnectPeer() if selectedPeer != 'null': self.exchange(selectedPeer) #else: # print '[peer:', self.info['peer_id'],']', 'do not know where to push?' return def exchange(self,selectedPeer): print '=============================================================' print '[peer:', self.info['peer_id'],']', \ 'push to', '[ peer:',selectedPeer['peer_id'],', port', \ selectedPeer['port'], ']' #print selectedPeer #peerid1 = [] #for apeer in self.randCache: # try: # peerid1.append(apeer['peer_id']) # except: # print 'wrong with random cache in exchange:', self.randCache # break peerMsg = self.collectExchangeData(selectedPeer) [cmd,msg] = self.connection.sendMSG(selectedPeer['ip'], \ selectedPeer['port'],'BC',peerMsg) if cmd == 'ACK': if msg.has_key('buddy'): self.mergeCaches(msg['rand'],msg['buddy']) else: self.mergeCaches(msg['rand']) if msg.has_key('user_pref'): # must have msg['peer_id'] and msg['user_pref'] self.mergeUserPrefCache(msg) #peerid = [] #for apeer in self.randCache: # try: # peerid.append(apeer['peer_id']) # except: # print 'wrong with random cache in exchange:', self.randCache # break #print '[peer:', self.info['peer_id'],']', \ #'randCache Size', self.randCache.cacheSize, peerid self.buddyCache.sortedby('rank') for i in range(len(self.buddyCache)): print '[peer:', self.info['peer_id'],']', 'taste buddy:', self.buddyCache[i] print 'randCache:' #self.randCache.printAll() print '=============================================================' print ' ' def collectExchangeData(self,selectedPeer): ownport = self.info['ownport'] [randCacheList, buddyCacheList]= self.getCaches() if self.config['exchange_pref_cache'] == True: peerMsg ={'peer_id': self.info['peer_id'],'port':ownport,\ 'rand': randCacheList, 'buddy':\ buddyCacheList,'user_pref':self.info['user_pref']} else: peerMsg ={'peer_id': self.info['peer_id'],\ 'port':ownport,'rand': randCacheList,\ 'user_pref':self.info['user_pref']} try: if selectedPeer['has_pref'] == False: peerMsg['request_pref'] = True else: peerMsg['request_pref']= False except: print 'I am wrong:', selectedPeer return peerMsg def mergeUserPrefCache(self,peer): if peer is None: return My_pref = self.info['user_pref'] if peer.has_key('user_pref'): measure = simMeasure() [Sim_rank, My_pref_size, obtained_pref_size, confidence] \ = measure.cooccurrence(My_pref,peer['user_pref']) # results = self.buddyCache.find('peer_id', int(peer['peer_id'])) if results[0] != -1 : self.buddyCache[results[0]]= {'peer_id': peer['peer_id'], \ #'user_pref':peer['user_pref'], \ # 'time': peer['time'], \ # 'port': peer['port'],\ # 'ip': peer['ip'], \ 'rank': Sim_rank} return maxBuddyCacheSize = self.buddyCache.getMaxSize() if maxBuddyCacheSize > self.buddyCache.getCurSize(): self.buddyCache.add({'peer_id': peer['peer_id'], \ # 'user_pref':peer['user_pref'], \ # 'time': peer['time'], \ # 'port': peer['port'],\ # 'ip': peer['ip'], \ 'rank': Sim_rank}) self.buddyCache.sortedby('rank') results = self.randCache.find('peer_id', int(peer['peer_id'])) if results[0] != -1 : self.randCache[results[0]]['has_pref'] = True else: # max size already self.buddyCache.sortedby('rank') lowestRank = self.buddyCache.getPeer( \ self.buddyCache.getCurSize()-1 ) if Sim_rank >= lowestRank['rank']: self.buddyCache.add({'peer_id': peer['peer_id'], \ # 'user_pref':peer['user_pref'], \ # 'time': peer['time'], \ # 'port': peer['port'],\ # 'ip': peer['ip'], \ 'rank': Sim_rank}) self.buddyCache.sortedby('rank') results = self.randCache.find('peer_id', int(peer['peer_id'])) if results[0] != -1 : self.randCache[results[0]]['has_pref'] = True results = self.randCache.find('peer_id', int(lowestRank['peer_id'])) if results[0] != -1 : self.randCache[results[0]]['has_pref'] = False return def mergeCaches(self,randCacheList, buddyCacheList = 'null'): #debug:for print out information only originSize = self.randCache.cacheSize peerid1 = [] for apeer in self.randCache: try: peerid1.append(apeer['peer_id']) except: print 'some thing wrong with random cache in exchange:', self.randCache break if randCacheList != 'null': # print 'try to merge randcache...' for randpeer in randCacheList: try: results = self.randCache.find('peer_id', int(randpeer['peer_id'])) except: print 'some thing woring here:', randpeer, type(randpeer) break if results[0] == -1 : if randpeer['peer_id'] != self.info['peer_id']: randpeer['has_pref'] = False self.randCache.add(randpeer) else: try: if self.randCache[results[0]]['time'] < randpeer['time']: self.randCache[results[0]]['time'] = randpeer['time'] self.randCache[results[0]]['ip'] = randpeer['ip'] self.randCache[results[0]]['port'] = randpeer['port'] except: print 'what is that', randpeer peerid =[] currentSize = self.randCache.cacheSize for apeer in self.randCache: try: peerid.append(apeer['peer_id']) except: print 'some wrong with random cache in exchange:', self.randCache break # print '[peer:', self.info['peer_id'],']', 'merge action', 'orginal cache size', originSize, 'current:', currentSize# peerid1, 'updated cache',peerid # if buddyCacheList != 'null': # for buddypeer in buddyCacheList: # self.buddyCache.add(buddypeer) # to do: add function when when needc change buddycache as well return 1 def getCaches(self): if self.randCache.cacheSize != 0: randCacheList = self.randCache.getAll() else: randCacheList = 'null' if self.buddyCache.cacheSize !=0: buddyCacheList = self.buddyCache.getAll() else: buddyCacheList = 'null' return randCacheList, buddyCacheList def selectConnectPeer(self): # in randdominant case, r is too small then no need buddycache if (self.config['method'] == 'randDominant' ) and ( int(self.config['r']*self.config['randCacheSize']) <1): selectedConnectPeer = self.randCache.getRandomPeer() # buddy peer does not exists in the beginning else: if (self.buddyCache.cacheSize == 0) : if self.randCache.cacheSize > 0: selectedConnectPeer = self.randCache.getRandomPeer() else: selectedConnectPeer = 'null' print '[peer:',self.info['peer_id'],']', 'no peers in the cache when selecting peers!!!' return 'null' # buddy peer exists else: candidatePeers = self.createCandidateTable() #print 'print the candidatePeers:' #candidatePeers._shift() #for candidatepeer in candidatePeers: # print '[peerid:', candidatepeer['peer_id'],']' selectedConnectPeer = self.samplingPeers(candidatePeers) # buddy peer does not exists in the beginning #print 'yes.....',selectedConnectPeer # print 'current random cahce', # print self.randCache.printAll() results = self.randCache.find('peer_id', int(selectedConnectPeer['peer_id']) ) if results[0] != -1 : selectedConnectPeer = self.randCache[results[0]] else: print 'can not find taste peer in randcache', selectedConnectPeer['peer_id'], print 'randcacne', self.randCache.printAll() selectedConnectPeer = 'null' return selectedConnectPeer def samplingPeers(self,candidatePeers): selectedConnectPeer = candidatePeers.getSampledPeer('rank') return selectedConnectPeer def createCandidateTable(self): # print 'in createcnadidate table' if self.config['method'] == 'buddyDominant' : # create a copy of buddyCache candidatePeers = copy.deepcopy(self.buddyCache) #print 'candidate peers' candidatePeers.sortedby('rank') #candidatePeers.printAll() #print '---------peer in the buddy-------' #candidatePeers.printAll() #print '-------------------------------------' #candidatePeers.remove() #candidatePeers.printAll() # get the min rank value lowestRankedPeer = candidatePeers.getPeer(candidatePeers.cacheSize-1) lowestRank = lowestRankedPeer['rank'] # how many peers should be augmented to candidatePeers numRandPeers = int(self.config['r']*self.config['buddyCacheSize']) #print '[[[[[[[[[number of randpeers', numRandPeers # get rand peers if numRandPeers < 1: numRandPeers = 1 if numRandPeers < self.randCache.cacheSize: randPeers = self.randCache.getRandomPeers(numRandPeers) else: randPeers = self.randCache.getAll() #print '---------augmented randpeers---------------------' #print randPeers #print '--------------------------------------------------' for randPeer in randPeers: # check randPeer whether exist in candidatePeers or not #print 'checking',randPeer, 'value to be compared', int(randPeer['peer_id']) if candidatePeers.find('peer_id', int(randPeer['peer_id']))[0] == -1 : randPeer['rank'] = lowestRank randPeer['user_pref']= 'null' candidatePeers.cacheMaxSize = candidatePeers.cacheMaxSize+1 candidatePeers.add(randPeer) elif self.config['method'] == 'randDominant' : # create a copy of buddyCache candidatePeers = copy.deepcopy(self.buddyCache) candidatePeers.printAll() candidatePeers.sortedby('rank',order = 'decrease') # print '---------peer in the buddy-------' # candidatePeers.printAll() # print '-------------------------------------' # candidatePeers.printAll() numBuddyPeers = int(self.config['r']*self.config['randCacheSize']) if numBuddyPeers < candidatePeers.cacheSize: candidatePeers.removeN(candidatePeers.cacheSize-numBuddyPeers) # get the min rank value lowestRankedPeer = candidatePeers.getPeer(0) lowestRank = lowestRankedPeer['rank'] #print 'lowest rankg', lowestRankedPeer # print '---------augmented randpeers---------------------' # print randPeers # print '--------------------------------------------------' self.randCache._shift() for randPeer in self.randCache: # check randPeer whether exist in candidatePeers or not # print 'checking',randPeer, 'value to be compared', int(randPeer['peer_id']) if candidatePeers.find('peer_id', int(randPeer['peer_id']))[0] == -1 : randPeer['rank'] = lowestRank randPeer['user_pref']= 'null' candidatePeers.cacheMaxSize = candidatePeers.cacheMaxSize+1 candidatePeers.add(randPeer) return candidatePeers def get(self): return true def testNode(): print 'this is function for testing selectConnectPeer()' peer = node(1,1,2,svrportbase = 50000,cltportbase = 10000, buddysize=4,cachesize=8) peerid = peer.info['peer_id'] print '[peer:', peerid,']', 'created a node object' peer.buddyCache.add({'rank':6,'peer_id': 2,'user_pref':'null'}) peer.buddyCache.add({'rank':6,'peer_id': 1,'user_pref':'null'}) peer.buddyCache.add({'rank':3,'peer_id': 3,'user_pref':'null'}) peer.buddyCache.add({'rank':1,'peer_id': 4,'user_pref':'null'}) print print 'buddyCache' peer.buddyCache.printAll() peer.randCache.add({'ip':'1.1.1.11','peer_id':11,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.12','peer_id':12,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.13','peer_id':13,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.14','peer_id':14,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.1','peer_id':1,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.2','peer_id':2,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.3','peer_id':3,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.4','peer_id':4,'port':5000,'time':2005}) print print 'random cache' peer.randCache.printAll() randPeers = peer.randCache.getRandomPeers(5) print 'the random peers' print randPeers print 'finished' loop = 0 hist={} numTry = 2000 while loop < numTry: loop = loop + 1 connectPeer = peer.selectConnectPeer() #print 'what is that ', connectPeer try: hist[connectPeer['peer_id']] except: hist[connectPeer['peer_id']] = 0 hist[connectPeer['peer_id']]= hist[connectPeer['peer_id']] + 1 #print 'accumulation:', hist[connectPeer['peer_id']] for a in hist.keys(): print '[peerid:',a,']', ' Prob:', hist[a], float(int(hist[a])*100/int(numTry)),'percent' print 'the buddyCache' peer.buddyCache.printAll() return def testNode2(): print 'this is function for testing selectConnectPeer()' peer = node(1,1,2,svrportbase = 50000,cltportbase = 10000, r = 1,buddysize=4,cachesize=4, method='randDominant') peerid = peer.info['peer_id'] print '[peer:', peerid,']', 'created a node object' peer.buddyCache.add({'rank':30,'ip':'1.1.1.1','peer_id':1,'port':5000,'time':2005,'user_pref':'null'}) peer.buddyCache.add({'rank':20,'ip':'1.1.1.2','peer_id':2,'port':5000,'time':2005,'user_pref':'null'}) peer.buddyCache.add({'rank':60,'ip':'1.1.1.3','peer_id':3,'port':5000,'time':2005,'user_pref':'null'}) peer.buddyCache.add({'rank':1,'ip':'1.1.1.4','peer_id':4,'port':5000,'time':2005,'user_pref':'null'}) print 'buddyCache' peer.buddyCache.printAll() print 's' print 'random Cache table' #peer.buddyCache.printAll() peer.randCache.add({'ip':'1.1.1.11','peer_id':11,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.12','peer_id':12,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.13','peer_id':13,'port':5000,'time':2005}) peer.randCache.add({'ip':'1.1.1.14','peer_id':14,'port':5000,'time':2005}) print 'randCache' peer.randCache.printAll() print 'buddyCache' peer.buddyCache.printAll() loop = 0 hist={} numTry = 5 while loop < numTry: loop = loop + 1 connectPeer = peer.selectConnectPeer() #print connectPeer['peer_id'] try: hist[connectPeer['peer_id']] except: hist[connectPeer['peer_id']] = 0 hist[connectPeer['peer_id']]= hist[connectPeer['peer_id']] + 1 #print 'accumulation:', hist[connectPeer['peer_id']] for a in hist.keys(): print '[peerid:',a,']', ' Prob:', hist[a], float(int(hist[a])*100/int(numTry)),'percent' return if '__main__'== __name__: print '++++++++ for buddyDominate mode +++++++++++' testNode() # print '++++++++ for randDominate mode ++++++++++++' # testNode2()