Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bartercast statistics community #1230

Merged
merged 7 commits into from
Feb 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Tribler/Main/tribler_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def define_communities(*args):
from Tribler.community.metadata.community import MetadataCommunity
from Tribler.community.tunnel.tunnel_community import TunnelSettings
from Tribler.community.tunnel.hidden_community import HiddenTunnelCommunity
from Tribler.community.bartercast4.community import BarterCommunity

# make sure this is only called once
session.remove_observer(define_communities)
Expand All @@ -515,6 +516,7 @@ def define_communities(*args):
# must be called on the Dispersy thread
dispersy.define_auto_load(SearchCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
dispersy.define_auto_load(AllChannelCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
dispersy.define_auto_load(BarterCommunity, session.dispersy_member, load=True)

# load metadata community
# dispersy.define_auto_load(MetadataCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
Expand Down
93 changes: 92 additions & 1 deletion Tribler/Main/vwxGUI/DispersyDebugFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from Tribler.Main.vwxGUI.GuiUtility import GUIUtility
from Tribler.Main.Utility.GuiDBHandler import startWorker, GUI_PRI_DISPERSY
from Tribler.Main.Utility.utility import compute_ratio, eta_value, size_format
from operator import itemgetter
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics

DATA_NONE = ""

Expand Down Expand Up @@ -77,11 +79,13 @@ def __init__(self, parent, id, dispersy):
self.__community_panel = CommunityPanel(self.__notebook, -1)
self.__rawinfo_panel = RawInfoPanel(self.__notebook, -1)
self.__runtime_panel = RuntimeProfilingPanel(self.__notebook, -1)
self.__sharedstatistics_panel = SharedStatisticsPanel(self.__notebook, -1)

self.__notebook.AddPage(self.__summary_panel, "Summary")
self.__notebook.AddPage(self.__community_panel, "Community")
self.__notebook.AddPage(self.__rawinfo_panel, "Raw Info")
self.__notebook.AddPage(self.__runtime_panel, "Runtime Profiling")
self.__notebook.AddPage(self.__sharedstatistics_panel, "Network Health")

hsizer = wx.BoxSizer(wx.HORIZONTAL)
self.__incstuff_checkbox = wx.CheckBox(self, -1, "include stuff")
Expand Down Expand Up @@ -131,6 +135,7 @@ def do_gui(delayedResult):
self.__community_panel.UpdateInfo(stats)
self.__rawinfo_panel.UpdateInfo(stats)
self.__runtime_panel.UpdateInfo(stats)
self.__sharedstatistics_panel.UpdateInfo(_barter_statistics)
self.Layout()

startWorker(do_gui, do_db, uId=u"DispersyDebugFrame_UpdateInfo", priority=GUI_PRI_DISPERSY)
Expand Down Expand Up @@ -296,7 +301,7 @@ def OnListCtrlSelected(self, event):
self.__detail_panel.UpdateInfo(community_data)

def UpdateInfo(self, stats):
community_list = sorted(stats.communities, key=lambda community:
community_list = sorted(stats.communities, key=lambda community:
(not community.dispersy_enable_candidate_walker,
community.classification, community.cid))
self.__community_data_list = []
Expand Down Expand Up @@ -669,3 +674,89 @@ def UpdateInfo(self, stats):

if prev_selection_idx is not None:
self.__list1.Select(prev_selection_idx)

# --------------------------------------------------
# Shared Statistics Panel
# --------------------------------------------------
class SharedStatisticsPanel(wx.Panel):

def __init__(self, parent, id):
super(SharedStatisticsPanel, self).__init__(parent, id)
self.SetBackgroundColour(LIST_GREY)

self.__info = None
self.__selected_statistic = None

self.__statistic_list = AutoWidthListCtrl(self, -1, style=wx.LC_REPORT | wx.LC_ALIGN_LEFT |
wx.LC_SINGLE_SEL | wx.BORDER_SUNKEN)
self.__statistic_list.InsertColumn(0, "Statistic", width=200)
self.__statistic_list.InsertColumn(1, "Total count")
self.__statistic_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.OnStatisticSelected)

self.__detail_list = AutoWidthListCtrl(self, -1, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.__detail_list.InsertColumn(0, "Pubkey", width=200)
self.__detail_list.InsertColumn(1, "Count")

hsizer = wx.BoxSizer(wx.HORIZONTAL)
hsizer.Add(self.__statistic_list, 1, wx.EXPAND | wx.RIGHT, 2)
hsizer.Add(self.__detail_list, 2, wx.EXPAND)
self.SetSizer(hsizer)

def OnStatisticSelected(self, event):
stat = self.__info[event.GetIndex()][0]
if self.__selected_statistic == stat:
return

self.__selected_statistic = stat
self.__detail_list.UpdateData(self.__info[event.GetIndex()][1])

def UpdateInfo(self, stats):
if not getattr(stats, "bartercast", None):
return

self.__STATISTICS = stats.bartercast.keys()
raw_info = {}

if stats is None:
self.__statistic_list.DeleteAllItems()
self.__detail_list.DeleteAllItems()
return

idx = 0
# initialize info list so we can replace elements
if not self.__info or len(self.__info) < len(self.__STATISTICS):
self.__info = [None] * len(self.__STATISTICS)

for stat in self.__STATISTICS:

self.__info[idx] = (stat, [])
raw_info[stat] = stats.bartercast[stat]

data_list = raw_info[stat]
# data_list.sort(key=lambda kv: kv[1], reverse=True)
data_list = sorted(data_list.items(), key=itemgetter(1), reverse=True)

total_count = 0

# for key, value in data_list.items():
for item in data_list:
key = item[0]
value = item[1]
# @TODO: maintain this total in Statistics?
total_count += value

# only draw updated values if we are inspecting the statistic
# if self.__selected_statistic is stat:
peer_str = "%s" % key
count_str = "%s" % value
self.__info[idx][1].append((peer_str, count_str))

total_count_str = "%s" % total_count

# update GUI
if idx < self.__statistic_list.GetItemCount():
self.__statistic_list.SetStringItem(idx, 0, BartercastStatisticTypes.reverse_mapping[stat])
self.__statistic_list.SetStringItem(idx, 1, total_count_str)
else:
self.__statistic_list.Append([BartercastStatisticTypes.reverse_mapping[stat]])
idx += 1
5 changes: 3 additions & 2 deletions Tribler/community/allchannel/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ def disp_create_votecast(self, cid, vote, timestamp, store=True, update=True, fo
else:
communityclass = PreviewChannelCommunity

community = self._get_channel_community(cid)
community = self.dispersy.reclassify_community(community, communityclass)
community_old = self._get_channel_community(cid)
community = self.dispersy.reclassify_community(community_old, communityclass)
community._candidates = community_old._candidates

# check if we need to cancel a previous vote
latest_dispersy_id = self._votecast_db.get_latest_vote_dispersy_id(community._channel_id, None)
Expand Down
Empty file.
164 changes: 164 additions & 0 deletions Tribler/community/bartercast4/community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Written by Cor-Paul Bezemer
from conversion import StatisticsConversion
from payload import StatisticsRequestPayload, StatisticsResponsePayload

from Tribler.dispersy.authentication import MemberAuthentication
from Tribler.dispersy.community import Community
from Tribler.dispersy.conversion import DefaultConversion
from Tribler.dispersy.destination import CandidateDestination
from Tribler.dispersy.distribution import DirectDistribution
from Tribler.dispersy.message import Message, DelayMessageByProof
from Tribler.dispersy.resolution import PublicResolution
from .statistics import BartercastStatisticTypes, _barter_statistics
from twisted.internet.task import LoopingCall
from twisted.python import log

BARTERCAST_PERSIST_INTERVAL = 120.0


class BarterCommunity(Community):
@classmethod
def get_master_members(cls, dispersy):
# generated: Thu Oct 30 12:59:19 2014
# curve: NID_sect571r1
# len: 571 bits ~ 144 bytes signature
# pub: 170 3081a7301006072a8648ce3d020106052b81040027038192000405ef988346197abe009065e6f9f517263063495554e4d278074feb1be3e81586b44f90b8a11f170f0a059d8f26c259118e6afc775f3d1e7c46462c9de0ec2bb94e480390622056b002c1f121acc52c18a0857ce59e79cf73642a4787fcdc5398d332000fbd44b16f14b005c0910d81cb85392fd036f32a242044c8263e0c6b9dc10b68f9c30540cfbd8a6bb5ccec786e
# pub-sha1 59accbc05521d8b894e8e6ef8d686411384cdec9
#-----BEGIN PUBLIC KEY-----
# MIGnMBAGByqGSM49AgEGBSuBBAAnA4GSAAQF75iDRhl6vgCQZeb59RcmMGNJVVTk
# 0ngHT+sb4+gVhrRPkLihHxcPCgWdjybCWRGOavx3Xz0efEZGLJ3g7Cu5TkgDkGIg
# VrACwfEhrMUsGKCFfOWeec9zZCpHh/zcU5jTMgAPvUSxbxSwBcCRDYHLhTkv0Dbz
# KiQgRMgmPgxrncELaPnDBUDPvYprtczseG4=
#-----END PUBLIC KEY-----
master_key = "3081a7301006072a8648ce3d020106052b81040027038192000405ef988346197abe009065e6f9f517263063495554e4d278074feb1be3e81586b44f90b8a11f170f0a059d8f26c259118e6afc775f3d1e7c46462c9de0ec2bb94e480390622056b002c1f121acc52c18a0857ce59e79cf73642a4787fcdc5398d332000fbd44b16f14b005c0910d81cb85392fd036f32a242044c8263e0c6b9dc10b68f9c30540cfbd8a6bb5ccec786e".decode("HEX")
master = dispersy.get_member(public_key=master_key)
return [master]

def __init__(self, dispersy, master, my_member):
super(BarterCommunity, self).__init__(dispersy, master, my_member)
self._dispersy = dispersy
log.msg("joined BC community")
self.init_database()

# add task for persisting bartercast statistics every BARTERCAST_PERSIST_INTERVAL seconds
self._logger.debug("bartercast persist task started")
self.register_task("bartercast persist",
LoopingCall(self.backup_bartercast_statistics)).start(BARTERCAST_PERSIST_INTERVAL, now=False)

def init_database(self):
log.msg("loading BC statistics from database")
_barter_statistics.load_statistics(self._dispersy)

def initiate_meta_messages(self):
return super(BarterCommunity, self).initiate_meta_messages() + [
Message(self, u"stats-request",
MemberAuthentication(),
PublicResolution(),
DirectDistribution(),
CandidateDestination(),
StatisticsRequestPayload(),
self.check_stats_request,
self.on_stats_request),
Message(self, u"stats-response",
MemberAuthentication(),
PublicResolution(),
DirectDistribution(),
CandidateDestination(),
StatisticsResponsePayload(),
self.check_stats_response,
self.on_stats_response)
]

def initialize(self, integrate_with_tribler=False, auto_join_channel=False):
super(BarterCommunity, self).initialize()

def initiate_conversions(self):
return [DefaultConversion(self), StatisticsConversion(self)]

@property
def dispersy_sync_response_limit(self):
return 1

@property
def dispersy_sync_skip_enable(self):
return False

@property
def dispersy_sync_cache_enable(self):
return False

def create_stats_request(self, candidate, stats_type):
log.msg("Creating stats-request for type %d to member: %s" % (stats_type, candidate._association.mid.encode("hex")))
meta = self.get_meta_message(u"stats-request")
message = meta.impl(authentication=(self._my_member,),
distribution=(self.claim_global_time(),),
destination=(candidate,),
payload=(stats_type,))
self._dispersy._forward([message])

def check_stats_request(self, messages):
for message in messages:
allowed, _ = self._timeline.check(message)
if allowed:
yield message
else:
yield DelayMessageByProof(message)

def on_stats_request(self, messages):
log.msg("IN: stats-request")
for message in messages:
log.msg("stats-request: %s %s" % (message._distribution.global_time, message.payload.stats_type))
# send back stats-response
self.create_stats_response(message.payload.stats_type, message.candidate)

def create_stats_response(self, stats_type, candidate):
log.msg("OUT: stats-response")
meta = self.get_meta_message(u"stats-response")
records = _barter_statistics.get_top_n_bartercast_statistics(stats_type, 5)
log.msg("sending stats for type %d: %s" % (stats_type, records))

message = meta.impl(authentication=(self._my_member,),
distribution=(self.claim_global_time(),),
destination=(candidate,),
payload=(stats_type, records))
self._dispersy._forward([message])

def check_stats_response(self, messages):
for message in messages:
allowed, _ = self._timeline.check(message)
if allowed:
yield message
else:
yield DelayMessageByProof(message)

def on_stats_response(self, messages):
log.msg("IN: stats-response")
for message in messages:
log.msg("stats-response: %s %s %s"
% (message._distribution.global_time, message.payload.stats_type, message.payload.records))
for r in message.payload.records:
_barter_statistics.log_interaction(self._dispersy,
message.payload.stats_type,
message.authentication.member.mid.encode('hex'),
r[0], int(r[1].encode('hex'), 16))

# bartercast accounting stuff
def backup_bartercast_statistics(self):
self._logger.debug("merging bartercast statistics")
_barter_statistics.persist(self, 1)


class BarterCommunityCrawler(BarterCommunity):

def __init__(self, *args, **kargs):
super(BarterCommunityCrawler, self).__init__(*args, **kargs)

def on_introduction_response(self, messages):
super(BarterCommunity, self).on_introduction_response(messages)
for message in messages:
log.msg("in on_introduction_response: Requesting stats from %s" % message.candidate)
for t in BartercastStatisticTypes.reverse_mapping:
self.create_stats_request(message.candidate, t)

def start_walking(self):
self.register_task("take step", LoopingCall(self.take_step)).start(1.0, now=True)
51 changes: 51 additions & 0 deletions Tribler/community/bartercast4/conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from struct import pack, unpack_from
from Tribler.dispersy.conversion import BinaryConversion


class StatisticsConversion(BinaryConversion):

MTU_SIZE = 1500

def __init__(self, community):
super(StatisticsConversion, self).__init__(community, "\x02")
self.define_meta_message(chr(1), community.get_meta_message(u"stats-request"),
self._encode_statistics_request, self._decode_statistics_request)
self.define_meta_message(chr(2), community.get_meta_message(u"stats-response"),
self._encode_statistics_response, self._decode_statistics_response)

def _encode_statistics_request(self, message):
stats_type = message.payload.stats_type
return pack("!i", stats_type),

def _decode_statistics_request(self, placeholder, offset, data):
stats_type, = unpack_from("!i", data, offset)
offset += 4
return offset, placeholder.meta.payload.implement(stats_type)

# TODO fix for dictionaries larger than MTU (split message)
def _encode_statistics_response(self, message):
stats_type = message.payload.stats_type
records = message.payload.records
packed = pack("!i", stats_type)
for r in records:
peer_id = r[0].encode('utf8')
value = r[1]
packed = packed + pack("!H%dsi" % len(peer_id), len(peer_id), peer_id, value)
return packed,

def _decode_statistics_response(self, placeholder, offset, data):
stats_type, = unpack_from("!i", data, offset)
offset += 4
records = []
while offset < len(data):
len_key, = unpack_from("!H", data, offset)
if len_key < 1:
break
offset += 2
key, = unpack_from("!%ds" % len_key, data, offset)
offset += len_key
value = data[offset: offset + 4]
offset += 4
r = [key, value]
records.append(r)
return offset, placeholder.meta.payload.implement(stats_type, records)
23 changes: 23 additions & 0 deletions Tribler/community/bartercast4/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from Tribler.dispersy.payload import Payload


class StatisticsRequestPayload(Payload):
'''
Request statistics for key 'key' from peer.
'''

class Implementation(Payload.Implementation):

def __init__(self, meta, stats_type):
super(StatisticsRequestPayload.Implementation, self).__init__(meta)
self.stats_type = stats_type


class StatisticsResponsePayload(Payload):

class Implementation(Payload.Implementation):

def __init__(self, meta, stats_type, records):
super(StatisticsResponsePayload.Implementation, self).__init__(meta)
self.stats_type = stats_type
self.records = records
Loading