Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the TagComponent to the KnowledgeComponent #7070

Merged
merged 45 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
283e5c4
Add DB scheme for the `relation` field
drew2a Sep 27, 2022
b69b9d2
Add `HAS_TAG` condition to the `get_infohashes`
drew2a Oct 3, 2022
52e0416
Relax tag validation rules
drew2a Oct 3, 2022
601deb5
Fix get_infohashes with an absent tag
drew2a Oct 3, 2022
ef7dcfb
Ubuntu rules v1
drew2a Oct 3, 2022
8b12740
Ubuntu rules v2
drew2a Oct 3, 2022
87ade19
Ubuntu rules v3
drew2a Oct 3, 2022
664c5e4
Initial prototype for search result bundling
devos50 Sep 21, 2022
3ba7733
Improved prototype
devos50 Sep 23, 2022
8b4abe7
Further improvements
devos50 Sep 26, 2022
e33c9a1
Merge the backed changes and the GUI changes
drew2a Oct 3, 2022
46e26bc
Fix duplicate items bug
drew2a Oct 3, 2022
56636d2
Add Rules for Debian and Linux Mint
drew2a Oct 4, 2022
6ebd9d7
Move Enums to tag_db.py
drew2a Oct 5, 2022
e00ad06
Change tag_db scheme
drew2a Oct 6, 2022
ff8b474
Adopt the tag community
drew2a Oct 6, 2022
79d7fae
Adopt the REST
drew2a Oct 6, 2022
0476d6a
Adopt TagProcessor
drew2a Oct 6, 2022
a7554d7
Finish transfer to KG
drew2a Oct 6, 2022
7c7e875
Fix the upgrader's test
drew2a Oct 6, 2022
101f0cf
Fix tests
drew2a Oct 7, 2022
f996c1a
Linter
drew2a Oct 7, 2022
4fcab3b
Remove HAS
drew2a Oct 7, 2022
ab0c1c2
Add 'case_sensitive` argument
drew2a Oct 10, 2022
0002de5
Add subject type
drew2a Oct 10, 2022
62a886a
Rename `Predicate` to `ResourceType`
drew2a Oct 11, 2022
4b3e30b
Swap three digit linux version by two digit version
drew2a Oct 11, 2022
e7f64ec
Add documentation for tag_db
drew2a Oct 11, 2022
0571d97
Refactored snippet logic in search endpoint
devos50 Oct 11, 2022
166e469
Merge pull request #2 from devos50/kg_search_algorithm
drew2a Oct 11, 2022
aede0d1
Added tests for snippet generation when searching
devos50 Oct 11, 2022
d122fb9
Merge pull request #3 from devos50/search_endpoint_tests
drew2a Oct 11, 2022
c7e2373
Swap the relation from TORRENT to TITLE for Content Items
drew2a Oct 11, 2022
cbd1564
Minor GUI polish
devos50 Oct 11, 2022
52b122e
Merge pull request #4 from devos50/snippets_gui_polish
drew2a Oct 11, 2022
3a49a9e
Rename TagComponent to KnowledgeComponent
drew2a Oct 12, 2022
15fd1ed
Add migration
drew2a Oct 17, 2022
9d4b491
Change msg_id for the community
drew2a Oct 17, 2022
5a5d326
Refactor the Migration
drew2a Oct 17, 2022
4afd110
Add get_statements()
drew2a Oct 18, 2022
1c5d308
Renamed TagsEndpoint to KnowledgeEndpoint
devos50 Oct 18, 2022
d50e672
Updated REST API to process statements
devos50 Oct 18, 2022
8262795
Renamed /suggestions to /tag_suggestions
devos50 Oct 18, 2022
56e98ae
Merge pull request #5 from devos50/rename_tags_endpoint
drew2a Oct 18, 2022
901f4ee
Polish the code
drew2a Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.reporter.reporter_component import ReporterComponent
from tribler.core.components.tag.tag_component import TagComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent


class GigaChannelComponent(Component):
Expand All @@ -25,7 +25,7 @@ async def run(self):

self._ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)
tag_component = await self.get_component(TagComponent)
knowledge_component = await self.get_component(KnowledgeComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
Expand All @@ -37,7 +37,7 @@ async def run(self):
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
max_peers=50,
tags_db=tag_component.tags_db if tag_component else None
knowledge_db=knowledge_component.knowledge_db if knowledge_component else None
)
self.community = community
self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import pytest

from tribler.core.components.session import Session
from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.tag.tag_component import TagComponent
from tribler.core.components.session import Session


# pylint: disable=protected-access

Expand All @@ -14,7 +13,8 @@ async def test_giga_channel_component(tribler_config):
tribler_config.ipv8.enabled = True
tribler_config.libtorrent.enabled = True
tribler_config.chant.enabled = True
components = [TagComponent(), MetadataStoreComponent(), KeyComponent(), Ipv8Component(), GigaChannelComponent()]
components = [KnowledgeComponent(), MetadataStoreComponent(), KeyComponent(), Ipv8Component(),
GigaChannelComponent()]
async with Session(tribler_config, components) as session:
comp = session.get_instance(GigaChannelComponent)
assert comp.started_event.is_set() and not comp.failed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import pytest

from tribler.core.components.session import Session
from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent
from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.session import Session
from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent
from tribler.core.components.tag.tag_component import TagComponent


# pylint: disable=protected-access


async def test_gigachannel_manager_component(tribler_config):
components = [Ipv8Component(), TagComponent(), SocksServersComponent(), KeyComponent(), MetadataStoreComponent(),
components = [Ipv8Component(), KnowledgeComponent(), SocksServersComponent(), KeyComponent(),
MetadataStoreComponent(),
LibtorrentComponent(), GigachannelManagerComponent()]
async with Session(tribler_config, components) as session:
comp = session.get_instance(GigachannelManagerComponent)
Expand Down
136 changes: 136 additions & 0 deletions src/tribler/core/components/knowledge/community/knowledge_community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import random
from binascii import unhexlify

from cryptography.exceptions import InvalidSignature
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from ipv8.lazy_community import lazy_wrapper
from ipv8.types import Key
from pony.orm import db_session

from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.knowledge.community.knowledge_payload import (
RawStatementOperationMessage,
RequestStatementOperationMessage,
StatementOperation,
StatementOperationMessage,
StatementOperationSignature,
)
from tribler.core.components.knowledge.community.knowledge_validator import validate_operation, validate_resource, \
validate_resource_type
from tribler.core.components.knowledge.community.operations_requests import OperationsRequests, PeerValidationError
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase

REQUESTED_OPERATIONS_COUNT = 10

REQUEST_INTERVAL = 5 # 5 sec
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes
TIME_DELTA_READY_TO_GOSSIP = {'minutes': 1}


class KnowledgeCommunity(TriblerCommunity):
""" Community for disseminating tags across the network.

Only tags that are older than 1 minute will be gossiped.
"""

community_id = unhexlify('d7f7bdc8bcd3d9ad23f06f25aa8aab6754eb23a0')

def __init__(self, *args, db: KnowledgeDatabase, key: LibNaCLSK, request_interval=REQUEST_INTERVAL,
**kwargs):
super().__init__(*args, **kwargs)
self.db = db
self.key = key
self.requests = OperationsRequests()

self.add_message_handler(RawStatementOperationMessage, self.on_message)
self.add_message_handler(RequestStatementOperationMessage, self.on_request)

self.register_task("request_operations", self.request_operations, interval=request_interval)
self.register_task("clear_requests", self.requests.clear_requests, interval=CLEAR_ALL_REQUESTS_INTERVAL)
self.logger.info('Knowledge community initialized')

def request_operations(self):
if not self.get_peers():
return

peer = random.choice(self.get_peers())
self.requests.register_peer(peer, REQUESTED_OPERATIONS_COUNT)
self.logger.info(f'-> request {REQUESTED_OPERATIONS_COUNT} operations from peer {peer.mid.hex()}')
self.ez_send(peer, RequestStatementOperationMessage(count=REQUESTED_OPERATIONS_COUNT))

@lazy_wrapper(RawStatementOperationMessage)
def on_message(self, peer, raw: RawStatementOperationMessage):
operation, _ = self.serializer.unpack_serializable(StatementOperation, raw.operation)
signature, _ = self.serializer.unpack_serializable(StatementOperationSignature, raw.signature)
self.logger.debug(f'<- message received: {operation}')
try:
remote_key = self.crypto.key_from_public_bin(operation.creator_public_key)

self.requests.validate_peer(peer)
self.verify_signature(packed_message=raw.operation, key=remote_key, signature=signature.signature,
operation=operation)
self.validate_operation(operation)

with db_session():
is_added = self.db.add_operation(operation, signature.signature)
if is_added:
s = f'+ operation added ({operation.object!r} "{operation.predicate}" {operation.subject!r})'
self.logger.info(s)

except PeerValidationError as e: # peer has exhausted his response count
self.logger.warning(e)
except ValueError as e: # validation error
self.logger.warning(e)
except InvalidSignature as e: # signature verification error
self.logger.warning(e)

@lazy_wrapper(RequestStatementOperationMessage)
def on_request(self, peer, operation):
operations_count = min(max(1, operation.count), REQUESTED_OPERATIONS_COUNT)
self.logger.info(f'<- peer {peer.mid.hex()} requested {operations_count} operations')

with db_session:
random_operations = self.db.get_operations_for_gossip(
count=operations_count,
time_delta=TIME_DELTA_READY_TO_GOSSIP
)

self.logger.debug(f'Response {len(random_operations)} operations')
sent_operations = []
for op in random_operations:
try:
operation = StatementOperation(
subject_type=op.statement.subject.type,
subject=op.statement.subject.name,
predicate=op.statement.predicate,
object=op.statement.object.name,
operation=op.operation,
clock=op.clock,
creator_public_key=op.peer.public_key,
)
self.validate_operation(operation)
signature = StatementOperationSignature(signature=op.signature)
self.ez_send(peer, StatementOperationMessage(operation=operation, signature=signature))
sent_operations.append(operation)
except ValueError as e: # validation error
self.logger.warning(e)
if sent_operations:
sent_tags_info = ", ".join(f"({t})" for t in sent_operations)
self.logger.info(f'-> sent {len(sent_operations)} operations to peer: {peer.mid.hex()}')
self.logger.debug(f'-> sent operations ({sent_tags_info}) to peer: {peer.mid.hex()}')

@staticmethod
def validate_operation(operation: StatementOperation):
validate_resource(operation.subject)
validate_resource(operation.object)
validate_operation(operation.operation)
validate_resource_type(operation.subject_type)
validate_resource_type(operation.predicate)

def verify_signature(self, packed_message: bytes, key: Key, signature: bytes, operation: StatementOperation):
if not self.crypto.is_valid_signature(key, packed_message, signature):
raise InvalidSignature(f'Invalid signature for {operation}')

def sign(self, operation: StatementOperation) -> bytes:
packed = self.serializer.pack_serializable(operation)
return self.crypto.create_signature(self.key, packed)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from dataclasses import dataclass

from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format

dataclass = overwrite_dataclass(dataclass)


@dataclass
class StatementOperation:
"""Do not change the format of the StatementOperation, because this will result in an invalid signature.
"""
subject_type: int # ResourceType enum
subject: str
predicate: int # ResourceType enum
object: str
operation: int # Operation enum
clock: int # This is the lamport-like clock that unique for each quadruple {public_key, subject, predicate, object}
creator_public_key: type_from_format('74s')

def __str__(self):
return f'({self.subject} {self.predicate} {self.object}), o:{self.operation}, c:{self.clock}))'


RAW_DATA = type_from_format('varlenH')
STATEMENT_OPERATION_MESSAGE_ID = 2


@dataclass
class StatementOperationSignature:
signature: type_from_format('64s')


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
class RawStatementOperationMessage:
""" RAW payload class is used for reducing ipv8 unpacking operations
For more information take a look at: https://github.com/Tribler/tribler/pull/6396#discussion_r728334323
"""
operation: RAW_DATA
signature: RAW_DATA


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
class StatementOperationMessage:
operation: StatementOperation
signature: StatementOperationSignature


@dataclass(msg_id=1)
class RequestStatementOperationMessage:
count: int
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from tribler.core.components.knowledge.db.knowledge_db import Operation, ResourceType
from tribler.core.components.knowledge.knowledge_constants import MAX_RESOURCE_LENGTH, MIN_RESOURCE_LENGTH


def validate_resource(resource: str):
"""Validate the resource. Raises ValueError, in the case the resource is not valid."""
if len(resource) < MIN_RESOURCE_LENGTH or len(resource) > MAX_RESOURCE_LENGTH:
raise ValueError(f'Tag length should be in range [{MIN_RESOURCE_LENGTH}..{MAX_RESOURCE_LENGTH}]')
drew2a marked this conversation as resolved.
Show resolved Hide resolved


def is_valid_resource(resource: str) -> bool:
"""Validate the resource. Returns False, in the case the resource is not valid."""
try:
validate_resource(resource)
except ValueError:
return False
return True


def validate_operation(operation: int):
"""Validate the incoming operation. Raises ValueError, in the case the operation is not valid."""
Operation(operation)


def validate_resource_type(t: int):
"""Validate the resource type. Raises ValueError, in the case the type is not valid."""
ResourceType(t)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class PeerValidationError(ValueError):
...


class TagRequests:
class OperationsRequests:
""" This class is design for controlling requests during pull-based gossip.

The main idea:
Expand Down
Loading