#!/usr/bin/env python from time import mktime import re from os import listdir import os from ldecoder import parse from sys import argv, exit # Global variable (yes, not the brightest thing) measured_record_count = {} # dictionary with the total record count per node at time measure_interval_stop record_count = {} # same as before but at experiment end traffic_count = {} # dictionary with the total traffic count per node at time measure_interval_stop node_count = 0 def get_nodes(peer_dir): global node_count pattern = re.compile('[0-9]{5}') for d in listdir(peer_dir): if pattern.match(d): node_nr = int(d) dispersy_exists = os.path.exists(os.path.join(peer_dir, d, 'output','dispersy.log')) if node_nr <= node_count and dispersy_exists: yield peer_dir + "/" + d def get_title(node): if re.match(".*[0-9]{5}", node): return node[-5:] elif len(node)>7 and node[-7:] == "tracker": return "tracker" return "???" def get_first_datetime(peers_directory): datetimes = [] for node in get_nodes(peers_directory): _, datetime, _, _ = parse(node + "/output/dispersy.log").next() datetimes.append(datetime) return min(datetimes) def generate(peers_directory, measure_interval_start, measure_interval_stop, time_ratio): first = int(mktime(get_first_datetime(peers_directory).timetuple())) lasts = [] node_list = [x for x in get_nodes(peers_directory)] def node_cmp(a, b): a = int(a[-5:]) b = int(b[-5:]) return cmp(a, b) node_list.sort(cmp=node_cmp) measure_interval_length = measure_interval_stop - measure_interval_start dispersy_msg_distribution = {} dispersy_dropped_msg_distribution = {} for node in node_list: started_monitoring = False done_monitoring = False saved_total = False fn_received_record = node + "/output/received-record.txt" h_received_record = open(fn_received_record, "w+") h_received_record.write("# timestamp timeoffset num-records\n") h_received_record.write("0 0 0\n") c_received_record = 0 fn_created_record = node + "/output/created-record.txt" h_created_record = open(fn_created_record, "w+") h_created_record.write("# timestamp timeoffset num-records\n") h_created_record.write("0 0 0\n") c_created_record = 0 fn_total_record = node + "/output/total-record.txt" h_total_record = open(fn_total_record, "w+") h_total_record.write("# timestamp timeoffset num-records\n") h_total_record.write("0 0 0\n") write_total = False fn_drop = node + "/output/drop.txt" h_drop = open(fn_drop, "w+") h_drop.write("# timestamp timeoffset num-drops\n") h_drop.write("0 0 0\n") c_drop = 0 fn_stat = node + "/output/stat.txt" h_stat = open(fn_stat, "w+") h_stat.write("# timestamp timeoffset total-send total-received\n") h_stat.write("0 0 0 0\n") print " *", node[-5:] record_count[node[-5:]] = {} for lineno, datetime, message, kargs in parse(node + "/output/dispersy.log"): if message == "created-record": c_created_record += 1 c_received_record -= 1 # each created record is also counted as received time = int(mktime(datetime.timetuple())) h_created_record.write(str(time)) h_created_record.write(" ") h_created_record.write(str(time - first)) h_created_record.write(" ") h_created_record.write(str(c_created_record)) h_created_record.write("\n") write_total = True if message == "handled-record": c_received_record += 1 time = int(mktime(datetime.timetuple())) h_received_record.write(str(time)) h_received_record.write(" ") h_received_record.write(str(time - first)) h_received_record.write(" ") h_received_record.write(str(c_received_record)) h_received_record.write("\n") write_total = True if message == "statistics": time = int(mktime(datetime.timetuple())) if "total_send" in kargs and "total_received" in kargs: kargs["total_send"] = kargs["total_send"][1] kargs["total_received"] = kargs["total_received"][1] h_stat.write(str(time)) h_stat.write(" ") timeoffset = time - first h_stat.write(str(timeoffset)) h_stat.write(" ") h_stat.write(str(kargs["total_send"])) h_stat.write(" ") h_stat.write(str(kargs["total_received"])) h_stat.write("\n") # statistics are gathered every second, so we don't need to do much checking for interval limits in here # if statistics will no longer be gathered every second, additional processing should include # - saving the initial timeoffset when traffic_start was seen # - saving the final timeoffset when traffic_stop was seen # - saving the speed identified for the above mentioned interval (initial timeoffset, final timeoffset) if not started_monitoring and not done_monitoring and timeoffset >= measure_interval_start: started_monitoring = True traffic_start = kargs["total_send"] + kargs["total_received"] if not done_monitoring and started_monitoring: traffic_stop = kargs["total_send"] + kargs["total_received"] traffic_count[node[-5:]] = [traffic_start, traffic_stop, traffic_stop - traffic_start] if not done_monitoring and started_monitoring and timeoffset >= measure_interval_stop: done_monitoring = True if "total_dropped" in kargs: h_drop.write(str(time)) h_drop.write(" ") h_drop.write(str(time - first)) h_drop.write(" ") h_drop.write(str(kargs["total_dropped"])) h_drop.write("\n") if message == "statistics-successful-messages": for key, value in kargs.iteritems(): dispersy_msg_distribution[key] = max((value, node), dispersy_msg_distribution.get(key, (0, node))) if message == "statistics-dropped-messages": for key, value in kargs.iteritems(): dispersy_dropped_msg_distribution[key] = max((value, node), dispersy_dropped_msg_distribution.get(key, (0, node))) if write_total: h_total_record.write(str(time)) h_total_record.write(" ") timeoffset = time - first h_total_record.write(str(timeoffset)) h_total_record.write(" ") h_total_record.write(str(c_received_record + c_created_record)) h_total_record.write("\n") write_total = False record_count[node[-5:]][timeoffset] = [c_received_record, c_created_record, c_received_record + c_created_record, 0.0] if started_monitoring and not saved_total and timeoffset <= measure_interval_stop: # this entry will contain the last record_count before going over measure_interval_stop #saved_time = timeoffset measured_record_count[node[-5:]] = record_count[node[-5:]][timeoffset] if not saved_total and timeoffset > measure_interval_stop: #print "time %d (@%d) %d records" %(saved_time, timeoffset, record_count[node[-5:]][2]) saved_total = True if not started_monitoring: print "!!!! Statistics time-range is outside the experiment time." #if not done_monitoring: # print "!!!! The end of the statistics time-range is after the experiment finished." #if not saved_total: # print "!!!! There is no write of total records after the end of the statistics time-range." lasts.append(datetime) # returns the first and the last last timetime last = int(mktime(max(lasts).timetuple())) experiment_length = last - first if measure_interval_stop > experiment_length: print "!!! interval stop after experiment end - adjusting interval stop from %d to %d" %(measure_interval_stop, experiment_length) measure_interval_stop = experiment_length measure_interval_length = measure_interval_stop - measure_interval_start (sum_at_measure_time, sum_at_experiment_end) = sum_created_records(peers_directory, first, last, measure_interval_stop) print "# experiment took", experiment_length, "seconds" print "# number of records at experiment end (%d): %d records" %(experiment_length, sum_at_experiment_end) fn_first_last = peers_directory + "/output/first_last.txt" h_first_last = open(fn_first_last, "w") h_first_last.write("%d %d\n" %(first, last)) h_first_last.close() fn_dispersy_msg_distribution = peers_directory + "/output/dispersy-msg-distribution.txt" h_dispersy_msg_distribution = open(fn_dispersy_msg_distribution, "w+") h_dispersy_msg_distribution.write("# msg_name count\n") for msg, count in dispersy_msg_distribution.iteritems(): h_dispersy_msg_distribution.write("%s %d %s\n" %(msg, count[0], count[1])) h_dispersy_msg_distribution.close() fn_dispersy_dropped_msg_distribution = peers_directory + "/output/dispersy-dropped-msg-distribution.txt" h_dispersy_dropped_msg_distribution = open(fn_dispersy_dropped_msg_distribution, "w+") h_dispersy_dropped_msg_distribution.write("# msg_name count\n") for msg, count in dispersy_dropped_msg_distribution.iteritems(): h_dispersy_dropped_msg_distribution.write("%s %d %s\n" %(msg, count[0], count[1])) h_dispersy_dropped_msg_distribution.close() fn_traffic_count = peers_directory + "/output/traffic_count_per_peer.txt" h_traffic_count = open(fn_traffic_count, "w+") h_traffic_count.write("# peer-id traffic_start traffic_stop traffic_total\n") sum_traffic = 0 for node in traffic_count.keys(): v = traffic_count[node] sum_traffic += v[2] h_traffic_count.write("%s %d %d %d\n" %(node, v[0], v[1], v[2])) h_traffic_count.close() bytes_per_second_experiment = float(sum_traffic)/len(traffic_count)/measure_interval_length bytes_per_second_real = bytes_per_second_experiment/time_ratio fn_record_count = peers_directory + "/output/records_per_peer.txt" h_record_count = open(fn_record_count, "w+") h_record_count.write("# peer-id records_received records_created records_total coverage\n") sum_coverage = 0.0 for node in measured_record_count.keys(): v = measured_record_count[node] v[3] = float(v[2])*100/sum_at_measure_time sum_coverage += v[3] h_record_count.write("%s %d %d %d %f\n" %(node, v[0], v[1], v[2], v[3])) h_record_count.close() l = len(measured_record_count) if l != 0: average_coverage = sum_coverage/l else: average_coverage = 0.0 # count how many records were there in the system at each point in time records_in_time = {} for time in range(experiment_length + 1): records_in_time[time] = 0 for node in record_count.keys(): update_count = len(record_count[node]) update_times = sorted(record_count[node].keys()) for i in range(update_count): extra = record_count[node][update_times[i]][2] # total number of records is in the list on the 3rd position if extra < sum_at_experiment_end: continue if i == update_count - 1: last_time = experiment_length + 1 else: last_time = update_times[i+1] for time in range(update_times[i], last_time): records_in_time[time] += 1 fn = peers_directory + "/output/peers_with_all_records_in_time.txt" h = open(fn, "w+") h.write("# time record_count\n") for time in range(experiment_length + 1): h.write("%d %d\n" %(time, records_in_time[time])) h.close() fn_measure_info = peers_directory + "/output/measure_info.txt" h_measure_info = open(fn_measure_info, "w+") h_measure_info.write("experiment_length=%d\n" %(experiment_length)) h_measure_info.write("measure_interval_start=%d\n" %(measure_interval_start)) h_measure_info.write("measure_interval_stop=%d\n" %(measure_interval_stop)) h_measure_info.write("total_records_at_interval_stop=%d\n" %(sum_at_measure_time)) h_measure_info.write("total_records_at_experiment_end=%d\n" %(sum_at_experiment_end)) h_measure_info.write("average_coverage_at_interval_stop=%f\n" %(average_coverage)) h_measure_info.write("bytes_per_second_experiment=%f\n" %(bytes_per_second_experiment)) h_measure_info.write("bytes_per_second_real=%f\n" %(bytes_per_second_real)) h_measure_info.write("bits_per_second_real=%f\n" %(bytes_per_second_real*8)) h_measure_info.close() return first, last def sum_created_records(peers_directory, first, last, measure_interval_stop): """ I create a file under 'peers/' called 'sum_created_records.txt' with all the available records in the system per timestamp """ sum_records = {} for node in get_nodes(peers_directory): fn_created_record = node + "/output/created-record.txt" h_created_record = open(fn_created_record) if True: for line in h_created_record: if line[0] == "#": continue _, time, records = line.split() time = int(time) records = int(records) if records == 0: continue # skip over lines that do not contain bartercast updates try: sum_records[time] += 1 except KeyError: sum_records[time] = 1 h_created_record.close() sum_at_measure_time = -1 determined_sum = False fp = open(peers_directory + '/output/sum_created_records.txt', 'wb') fp.write("0 0\n") if True: sumr = 0 for time in sorted(sum_records.iterkeys()): sumr += sum_records[time] fp.write("%s %s\n" % (time, sumr)) if time > measure_interval_stop and not determined_sum: determined_sum = True if not determined_sum: # take the last valid sub before measure_interval_stop sum_at_measure_time = sumr return (sum_at_measure_time, sumr) def main(peers_directory, measure_interval_start, measure_interval_stop, time_ratio): global node_count peer_count = open(peers_directory+'/peer.count', 'r') node_count = int(peer_count.readline()) peer_count.close() first, last = generate(peers_directory, measure_interval_start, measure_interval_stop, time_ratio) # first = int(mktime(get_first_datetime().timetuple())) # last = first + 200 if __name__ == "__main__": if len(argv) != 5: print "Usage: %s " %(argv[0]) exit(1) main(argv[1], int(argv[2]), int(argv[3]), float(argv[4]))