Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demo updates #43

Merged
merged 24 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions demo/acme.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import asyncio
import json
import logging
import os
import random
import sys

from .agent import DemoAgent, default_genesis_txns
from .utils import log_json, log_msg, log_status, log_timer, prompt, prompt_loop

LOGGER = logging.getLogger(__name__)

AGENT_PORT = int(sys.argv[1])

TIMING = False


class AcmeAgent(DemoAgent):
def __init__(self, http_port: int, admin_port: int, **kwargs):
super().__init__("Acme Agent", http_port, admin_port, prefix="Acme", **kwargs)
self.connection_id = None
self._connection_active = asyncio.Future()
self.cred_state = {}

async def detect_connection(self):
await self._connection_active

@property
def connection_active(self):
return self._connection_active.done() and self._connection_active.result()

async def handle_connections(self, message):
if message["connection_id"] == self.connection_id:
if message["state"] == "active" and not self._connection_active.done():
self.log("Connected")
self._connection_active.set_result(True)

async def handle_credentials(self, message):
state = message["state"]
credential_exchange_id = message["credential_exchange_id"]
prev_state = self.cred_state.get(credential_exchange_id)
if prev_state == state:
return # ignore
self.cred_state[credential_exchange_id] = state

self.log(
"Credential: state =",
state,
", credential_exchange_id =",
credential_exchange_id,
)

if state == "request_received":
# TODO handle received credential requests
pass

async def handle_presentations(self, message):
state = message["state"]

presentation_exchange_id = message["presentation_exchange_id"]
self.log(
"Presentation: state =",
state,
", presentation_exchange_id =",
presentation_exchange_id,
)

if state == "presentation_received":
# TODO handle received presentations
pass

async def handle_basicmessages(self, message):
self.log("Received message:", message["content"])


async def main():

genesis = await default_genesis_txns()
if not genesis:
print("Error retrieving ledger genesis transactions")
sys.exit(1)

agent = None
start_port = AGENT_PORT

try:
log_status("#1 Provision an agent and wallet, get back configuration details")
agent = AcmeAgent(start_port, start_port + 1, genesis_data=genesis)
await agent.listen_webhooks(start_port + 2)
await agent.register_did()

with log_timer("Startup duration:"):
await agent.start_process()
log_msg("Admin url is at:", agent.admin_url)
log_msg("Endpoint url is at:", agent.endpoint)

# Create a schema
log_status("#3 Create a new schema on the ledger")
with log_timer("Publish schema duration:"):
version = format(
"%d.%d.%d"
% (
random.randint(1, 101),
random.randint(1, 101),
random.randint(1, 101),
)
)
# TODO define schema
#schema_body = {
#}
#schema_response = await agent.admin_POST("/schemas", schema_body)
# log_json(json.dumps(schema_response), label="Schema:")
#schema_id = schema_response["schema_id"]
#log_msg("Schema ID:", schema_id)

# Create a cred def for the schema
# TODO define cred def
#log_status("#4 Create a new credential definition on the ledger")
#with log_timer("Publish credential definition duration:"):
# credential_definition_body = {"schema_id": schema_id}
# credential_definition_response = await agent.admin_POST(
# "/credential-definitions", credential_definition_body
# )
#credential_definition_id = credential_definition_response[
# "credential_definition_id"
#]
#log_msg("Cred def ID:", credential_definition_id)

with log_timer("Generate invitation duration:"):
# Generate an invitation
log_status(
"#5 Create a connection to alice and print out the invite details"
)
connection = await agent.admin_POST("/connections/create-invitation")

agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")
log_msg("*****************")
log_msg(json.dumps(connection["invitation"]), label="Invitation:", color=None)
log_msg("*****************")

log_msg("Waiting for connection...")
await agent.detect_connection()

async for option in prompt_loop(
"(1) Issue Credential, (2) Send Proof Request, "
+ "(3) Send Message (X) Exit? [1/2/3/X] "
):
if option in "xX":
break

elif option == "1":
log_status("#13 Issue credential offer to X")
# TODO credential offers
#offer = {
# "credential_definition_id": credential_definition_id,
# "connection_id": agent.connection_id,
#}
#await agent.admin_POST("/credential_exchange/send-offer", offer)

elif option == "2":
log_status("#20 Request proof of degree from alice")
# TODO presentation requests
#proof_attrs = [
#]
#proof_predicates = []
#proof_request = {
#}
#await agent.admin_POST(
# "/presentation_exchange/send_request", proof_request
#)

elif option == "3":
msg = await prompt("Enter message: ")
await agent.admin_POST(
f"/connections/{agent.connection_id}/send-message", {"content": msg}
)

if TIMING:
timing = await agent.fetch_timing()
if timing:
for line in agent.format_timing(timing):
log_msg(line)

finally:
terminated = True
try:
if agent:
await agent.terminate()
except Exception:
LOGGER.exception("Error terminating agent:")
terminated = False

await asyncio.sleep(0.1)

if not terminated:
os._exit(1)


if __name__ == "__main__":
try:
asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt:
os._exit(1)
56 changes: 51 additions & 5 deletions demo/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,31 @@

RUN_MODE = os.getenv("RUNMODE")

GENESIS_URL = os.getenv("GENESIS_URL")
LEDGER_URL = os.getenv("LEDGER_URL")

if RUN_MODE == "docker":
DEFAULT_INTERNAL_HOST = os.getenv("DOCKERHOST") or "host.docker.internal"
DEFAULT_EXTERNAL_HOST = DEFAULT_INTERNAL_HOST
DEFAULT_BIN_PATH = "./bin"
DEFAULT_PYTHON_PATH = "."
elif RUN_MODE == "pwd":
#DEFAULT_INTERNAL_HOST =
DEFAULT_EXTERNAL_HOST = os.getenv("DOCKERHOST") or "host.docker.internal"
DEFAULT_BIN_PATH = "./bin"
DEFAULT_PYTHON_PATH = "."


async def default_genesis_txns():
genesis = None
try:
if RUN_MODE == "docker":
if GENESIS_URL:
async with ClientSession() as session:
async with session.get(
GENESIS_URL
) as resp:
genesis = await resp.text()
elif RUN_MODE == "docker":
async with ClientSession() as session:
async with session.get(
f"http://{DEFAULT_EXTERNAL_HOST}:9000/genesis"
Expand Down Expand Up @@ -74,8 +88,11 @@ def __init__(
self.postgres = DEFAULT_POSTGRES if postgres is None else postgres
self.extra_args = extra_args

self.endpoint = f"http://{self.external_host}:{http_port}"
self.admin_url = f"http://{self.external_host}:{admin_port}"
if RUN_MODE == 'pwd':
self.endpoint = f"http://{self.external_host}"
else:
self.endpoint = f"http://{self.external_host}:{http_port}"
self.admin_url = f"http://{self.internal_host}:{admin_port}"
self.webhook_port = None
self.webhook_url = None
self.webhook_site = None
Expand All @@ -94,6 +111,30 @@ def __init__(
self.wallet_key = params.get("wallet_key") or self.ident + rand_name
self.did = None

async def register_schema_and_creddef(self, schema_name, version, schema_attrs):
# Create a schema
schema_body = {
"schema_name": schema_name,
"schema_version": version,
"attributes": schema_attrs,
}
schema_response = await self.admin_POST("/schemas", schema_body)
# log_json(json.dumps(schema_response), label="Schema:")
schema_id = schema_response["schema_id"]
log_msg("Schema ID:", schema_id)

# Create a cred def for the schema
credential_definition_body = {"schema_id": schema_id}
credential_definition_response = await self.admin_POST(
"/credential-definitions", credential_definition_body
)
credential_definition_id = credential_definition_response[
"credential_definition_id"
]
log_msg("Cred def ID:", credential_definition_id)

return (schema_id, credential_definition_id)

def get_agent_args(self):
result = [
("--endpoint", self.endpoint),
Expand Down Expand Up @@ -159,6 +200,8 @@ def prefix_str(self):

async def register_did(self, ledger_url: str = None, alias: str = None):
self.log(f"Registering {self.ident} with seed {self.seed}")
if not ledger_url:
ledger_url = LEDGER_URL
if not ledger_url:
ledger_url = f"http://{self.external_host}:9000"
data = {"alias": alias or self.ident, "seed": self.seed, "role": "TRUST_ANCHOR"}
Expand Down Expand Up @@ -259,7 +302,10 @@ async def terminate(self):

async def listen_webhooks(self, webhook_port):
self.webhook_port = webhook_port
self.webhook_url = f"http://{self.external_host}:{str(webhook_port)}/webhooks"
if RUN_MODE == "pwd":
self.webhook_url = f"http://localhost:{str(webhook_port)}/webhooks"
else:
self.webhook_url = f"http://{self.external_host}:{str(webhook_port)}/webhooks"
app = web.Application()
app.add_routes([web.post("/webhooks/topic/{topic}/", self._receive_webhook)])
runner = web.AppRunner(app)
Expand Down Expand Up @@ -313,7 +359,7 @@ async def detect_process(self):
if resp.status == 200:
text = await resp.text()
break
except ClientError:
except ClientError as ce:
text = None
continue
if not text:
Expand Down
45 changes: 26 additions & 19 deletions demo/alice.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ async def handle_basicmessages(self, message):
self.log("Received message:", message["content"])


async def input_invitation(agent):
async for details in prompt_loop("Invite details: "):
if details:
try:
json.loads(details)
break
except json.JSONDecodeError as e:
log_msg("Invalid JSON:", str(e))
pass

with log_timer("Connect duration:"):
connection = await agent.admin_POST(
"/connections/receive-invitation", details
)
agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")

await agent.detect_connection()


async def main():

genesis = await default_genesis_txns()
Expand All @@ -151,33 +171,16 @@ async def main():
log_status("#7 Provision an agent and wallet, get back configuration details")
agent = AliceAgent(start_port, start_port + 1, genesis_data=genesis)
await agent.listen_webhooks(start_port + 2)
await agent.register_did()

with log_timer("Startup duration:"):
await agent.start_process()
log_msg("Admin url is at:", agent.admin_url)
log_msg("Endpoint url is at:", agent.endpoint)

log_status("#9 Input faber.py invitation details")
async for details in prompt_loop("Invite details: "):
if details:
try:
json.loads(details)
break
except json.JSONDecodeError as e:
log_msg("Invalid JSON:", str(e))
pass

with log_timer("Connect duration:"):
connection = await agent.admin_POST(
"/connections/receive-invitation", details
)
agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")

await agent.detect_connection()
await input_invitation(agent)

async for option in prompt_loop("(3) Send Message (X) Exit? [3/X]: "):
async for option in prompt_loop("(3) Send Message (4) Input New Invitation (X) Exit? [3/4/X]: "):
if option in "xX":
break
elif option == "3":
Expand All @@ -187,6 +190,10 @@ async def main():
f"/connections/{agent.connection_id}/send-message",
{"content": msg},
)
elif option == "4":
# handle new invitation
log_status("Input new invitation details")
await input_invitation(agent)

if TIMING:
timing = await agent.fetch_timing()
Expand Down
Loading