Skip to content

Commit

Permalink
Feature #19 Data agreement synchronisation between aries cloudagent a…
Browse files Browse the repository at this point in the history
…nd api backend
  • Loading branch information
albinpa committed Jun 13, 2023
1 parent 7c4c06f commit 339445c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 4 deletions.
43 changes: 43 additions & 0 deletions mydata_did/v1_0/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from confluent_kafka import Producer
import os
import json
from logging import Logger
from enum import Enum
# from mydata_did.v1_0.manager import ADAManager

class DataAgreementOperations(Enum):
CREATE = "create"
UPDATE = "update"
DELETE = "delete"

async def publish_event_to_kafka_topic(key: str, message: str, topic: str, logger: Logger):
kafka_server_address = os.environ.get("KAFKA_SERVER_ADDRESS", 'localhost:9092')
# Fetch iGrant.io config
# igrantio_config =await ada_manager.fetch_igrantio_config_from_os_environ()
igrantio_org_id = os.environ.get("IGRANTIO_ORG_ID")
data = json.loads(message)
data['igrantio_org_id'] = igrantio_org_id

message_with_org_id = json.dumps(data)

kafka_producer_configuration = {
'bootstrap.servers': kafka_server_address,
}
kafka_producer = Producer(kafka_producer_configuration)

def kafka_event_delivery_callback_handler(err: str, msg: str):
if err is not None:
log_message = f"Message delivery failed: {err}"
else:
log_message = f'Message delivered to {msg.topic()} [{msg.partition()}] partition'
logger.debug(log_message)

# Publish event to Kafka topic
kafka_producer.produce(topic,key=key, value=message_with_org_id, callback=kafka_event_delivery_callback_handler)

# Wait for the message to be delivered
kafka_producer.flush()

async def publish_event_to_data_agreement_topic(key: str, message: str, logger: Logger):
topic = "data_agreement"
await publish_event_to_kafka_topic(key, message, topic, logger)
28 changes: 25 additions & 3 deletions mydata_did/v1_0/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
from ..patched_protocols.present_proof.v1_0.message_types import ATTACH_DECO_IDS, PRESENTATION_REQUEST
from ..patched_protocols.present_proof.v1_0.manager import PresentationManager
from mydata_did.v1_0.utils.jsonld import data_agreement
from .kafka_publisher import publish_event_to_data_agreement_topic, DataAgreementOperations


class ADAManagerError(BaseError):
Expand Down Expand Up @@ -555,7 +556,7 @@ async def process_read_did_message(self, read_did_message: ReadDIDMessage, recei
create_did_message_to_did: DIDMyData = DIDMyData.from_public_key_b58(
receipt.recipient_verkey, key_type=KeyType.ED25519)

# From and To DIDs for the response messages
# From and To DIDs for the response messagesr
response_message_from_did = create_did_message_to_did
response_message_to_did = create_did_message_from_did

Expand Down Expand Up @@ -1452,6 +1453,11 @@ async def delete_data_agreement_in_wallet(self, data_agreement_id: str):

# Save the data agreement record
await data_agreement_record.save(self.context)

# Notify iGrant.io backend about data agreement deletion
da_record_json_str = json.dumps(data_agreement_record.serialize())
key = DataAgreementOperations.DELETE.value
await publish_event_to_data_agreement_topic(key,da_record_json_str, self._logger)
except StorageError as err:
# Raise an error
raise ADAManagerError(
Expand Down Expand Up @@ -4055,7 +4061,16 @@ def serialize_data_agreement_record(self, *, data_agreement_records: typing.List

return data_agreement_record_list if is_list else data_agreement_record_list[0]

async def create_data_agreement_and_personal_data_records(self, *, data_agreement: dict, existing_schema_id: str = None, draft: bool = False, existing_version: int = None, existing_data_agreement_id: str = None, update_ssi_payload: bool = True, existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]:
async def create_data_agreement_and_personal_data_records(self,
*,
data_agreement: dict,
existing_schema_id: str = None,
draft: bool = False,
existing_version: int = None,
existing_data_agreement_id: str = None,
update_ssi_payload: bool = True,
existing_data_agreement_record: DataAgreementV1Record = None,
is_update: bool = False) -> typing.Tuple[DataAgreementV1Record, dict]:
"""
Create data agreement and personal data records.
Expand Down Expand Up @@ -4196,6 +4211,12 @@ async def create_data_agreement_and_personal_data_records(self, *, data_agreemen
# Save the data agreement record
await data_agreement_v1_record.save(self.context)

# Notify the iGrant.io backend about the creation of data agreement.

da_record_json_str = json.dumps(data_agreement_v1_record.serialize())
key = DataAgreementOperations.UPDATE.value if is_update else DataAgreementOperations.CREATE.value
await publish_event_to_data_agreement_topic(key, da_record_json_str, self._logger)

return data_agreement_v1_record, self.serialize_data_agreement_record(data_agreement_records=[data_agreement_v1_record], is_list=False)

async def update_data_agreement_and_personal_data_records(self, *, data_agreement_id: str, data_agreement: dict, existing_schema_id: str = None, draft: bool = False) -> typing.Tuple[DataAgreementV1Record, dict]:
Expand Down Expand Up @@ -4229,7 +4250,8 @@ async def update_data_agreement_and_personal_data_records(self, *, data_agreemen
draft=draft,
existing_version=existing_data_agreement_record.data_agreement.get(
"template_version", None),
existing_data_agreement_id=existing_data_agreement_dict.data_agreement_template_id
existing_data_agreement_id=existing_data_agreement_dict.data_agreement_template_id,
is_update=True
)

return new_data_agreement_record, new_data_agreement_dict
Expand Down
6 changes: 6 additions & 0 deletions mydata_did/v1_0/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3554,10 +3554,12 @@ async def register(app: web.Application):
"/v1/data-agreements/didcomm/transactions/{da_crud_didcomm_tx_id}",
data_agreement_crud_didcomm_transaction_records_delete_by_id,
),
# TODO: Notify the backend that data agreement is created.
web.post(
"/v1/data-agreements",
create_and_store_data_agreement_in_wallet_v2,
),
# TODO: Notify the backend that data agreement is published.
web.post(
"/v1/data-agreements/{data_agreement_id}/publish",
publish_data_agreement_handler,
Expand All @@ -3567,10 +3569,12 @@ async def register(app: web.Application):
query_data_agreements_in_wallet,
allow_head=False
),
# TODO: Notify the backend that data agreement is updated.
web.put(
"/v1/data-agreements/{data_agreement_id}",
update_data_agreement_in_wallet_v2,
),
# TODO: Notify the backend that data agreement is deleted.
web.delete(
"/v1/data-agreements/{data_agreement_id}",
delete_data_agreement_in_wallet,
Expand All @@ -3585,10 +3589,12 @@ async def register(app: web.Application):
query_da_personal_data_in_wallet,
allow_head=False
),
# TODO: Check if the backend needs to be notified.
web.put(
"/v1/data-agreements/personal-data/{attribute_id}",
update_da_personal_data_in_wallet,
),
# TODO: Check if the backend needs to be notified.
web.delete(
"/v1/data-agreements/personal-data/{attribute_id}",
delete_da_personal_data_in_wallet,
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ wrapt==1.12.1
yarl==1.5.1
py-multibase==1.0.3
validators==0.18.2
semver==2.13.0
semver==2.13.0
confluent-kafka =="*"

0 comments on commit 339445c

Please sign in to comment.