Skip to content

Commit

Permalink
Merge pull request #43 from ianco/demo_updates
Browse files Browse the repository at this point in the history
Demo updates
  • Loading branch information
nrempel authored Jul 9, 2019
2 parents 26f0bbc + 4810962 commit 40818b1
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 50 deletions.
204 changes: 204 additions & 0 deletions demo/acme.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import asyncio
import json
import logging
import os
import random
import sys

from .agent import DemoAgent, default_genesis_txns
from .utils import log_json, log_msg, log_status, log_timer, prompt, prompt_loop

LOGGER = logging.getLogger(__name__)

AGENT_PORT = int(sys.argv[1])

TIMING = False


class AcmeAgent(DemoAgent):
def __init__(self, http_port: int, admin_port: int, **kwargs):
super().__init__("Acme Agent", http_port, admin_port, prefix="Acme", **kwargs)
self.connection_id = None
self._connection_active = asyncio.Future()
self.cred_state = {}

async def detect_connection(self):
await self._connection_active

@property
def connection_active(self):
return self._connection_active.done() and self._connection_active.result()

async def handle_connections(self, message):
if message["connection_id"] == self.connection_id:
if message["state"] == "active" and not self._connection_active.done():
self.log("Connected")
self._connection_active.set_result(True)

async def handle_credentials(self, message):
state = message["state"]
credential_exchange_id = message["credential_exchange_id"]
prev_state = self.cred_state.get(credential_exchange_id)
if prev_state == state:
return # ignore
self.cred_state[credential_exchange_id] = state

self.log(
"Credential: state =",
state,
", credential_exchange_id =",
credential_exchange_id,
)

if state == "request_received":
# TODO handle received credential requests
pass

async def handle_presentations(self, message):
state = message["state"]

presentation_exchange_id = message["presentation_exchange_id"]
self.log(
"Presentation: state =",
state,
", presentation_exchange_id =",
presentation_exchange_id,
)

if state == "presentation_received":
# TODO handle received presentations
pass

async def handle_basicmessages(self, message):
self.log("Received message:", message["content"])


async def main():

genesis = await default_genesis_txns()
if not genesis:
print("Error retrieving ledger genesis transactions")
sys.exit(1)

agent = None
start_port = AGENT_PORT

try:
log_status("#1 Provision an agent and wallet, get back configuration details")
agent = AcmeAgent(start_port, start_port + 1, genesis_data=genesis)
await agent.listen_webhooks(start_port + 2)
await agent.register_did()

with log_timer("Startup duration:"):
await agent.start_process()
log_msg("Admin url is at:", agent.admin_url)
log_msg("Endpoint url is at:", agent.endpoint)

# Create a schema
log_status("#3 Create a new schema on the ledger")
with log_timer("Publish schema duration:"):
version = format(
"%d.%d.%d"
% (
random.randint(1, 101),
random.randint(1, 101),
random.randint(1, 101),
)
)
# TODO define schema
#schema_body = {
#}
#schema_response = await agent.admin_POST("/schemas", schema_body)
# log_json(json.dumps(schema_response), label="Schema:")
#schema_id = schema_response["schema_id"]
#log_msg("Schema ID:", schema_id)

# Create a cred def for the schema
# TODO define cred def
#log_status("#4 Create a new credential definition on the ledger")
#with log_timer("Publish credential definition duration:"):
# credential_definition_body = {"schema_id": schema_id}
# credential_definition_response = await agent.admin_POST(
# "/credential-definitions", credential_definition_body
# )
#credential_definition_id = credential_definition_response[
# "credential_definition_id"
#]
#log_msg("Cred def ID:", credential_definition_id)

with log_timer("Generate invitation duration:"):
# Generate an invitation
log_status(
"#5 Create a connection to alice and print out the invite details"
)
connection = await agent.admin_POST("/connections/create-invitation")

agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")
log_msg("*****************")
log_msg(json.dumps(connection["invitation"]), label="Invitation:", color=None)
log_msg("*****************")

log_msg("Waiting for connection...")
await agent.detect_connection()

async for option in prompt_loop(
"(1) Issue Credential, (2) Send Proof Request, "
+ "(3) Send Message (X) Exit? [1/2/3/X] "
):
if option in "xX":
break

elif option == "1":
log_status("#13 Issue credential offer to X")
# TODO credential offers
#offer = {
# "credential_definition_id": credential_definition_id,
# "connection_id": agent.connection_id,
#}
#await agent.admin_POST("/credential_exchange/send-offer", offer)

elif option == "2":
log_status("#20 Request proof of degree from alice")
# TODO presentation requests
#proof_attrs = [
#]
#proof_predicates = []
#proof_request = {
#}
#await agent.admin_POST(
# "/presentation_exchange/send_request", proof_request
#)

elif option == "3":
msg = await prompt("Enter message: ")
await agent.admin_POST(
f"/connections/{agent.connection_id}/send-message", {"content": msg}
)

if TIMING:
timing = await agent.fetch_timing()
if timing:
for line in agent.format_timing(timing):
log_msg(line)

finally:
terminated = True
try:
if agent:
await agent.terminate()
except Exception:
LOGGER.exception("Error terminating agent:")
terminated = False

await asyncio.sleep(0.1)

if not terminated:
os._exit(1)


if __name__ == "__main__":
try:
asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt:
os._exit(1)
56 changes: 51 additions & 5 deletions demo/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,31 @@

RUN_MODE = os.getenv("RUNMODE")

GENESIS_URL = os.getenv("GENESIS_URL")
LEDGER_URL = os.getenv("LEDGER_URL")

if RUN_MODE == "docker":
DEFAULT_INTERNAL_HOST = os.getenv("DOCKERHOST") or "host.docker.internal"
DEFAULT_EXTERNAL_HOST = DEFAULT_INTERNAL_HOST
DEFAULT_BIN_PATH = "./bin"
DEFAULT_PYTHON_PATH = "."
elif RUN_MODE == "pwd":
#DEFAULT_INTERNAL_HOST =
DEFAULT_EXTERNAL_HOST = os.getenv("DOCKERHOST") or "host.docker.internal"
DEFAULT_BIN_PATH = "./bin"
DEFAULT_PYTHON_PATH = "."


async def default_genesis_txns():
genesis = None
try:
if RUN_MODE == "docker":
if GENESIS_URL:
async with ClientSession() as session:
async with session.get(
GENESIS_URL
) as resp:
genesis = await resp.text()
elif RUN_MODE == "docker":
async with ClientSession() as session:
async with session.get(
f"http://{DEFAULT_EXTERNAL_HOST}:9000/genesis"
Expand Down Expand Up @@ -74,8 +88,11 @@ def __init__(
self.postgres = DEFAULT_POSTGRES if postgres is None else postgres
self.extra_args = extra_args

self.endpoint = f"http://{self.external_host}:{http_port}"
self.admin_url = f"http://{self.external_host}:{admin_port}"
if RUN_MODE == 'pwd':
self.endpoint = f"http://{self.external_host}"
else:
self.endpoint = f"http://{self.external_host}:{http_port}"
self.admin_url = f"http://{self.internal_host}:{admin_port}"
self.webhook_port = None
self.webhook_url = None
self.webhook_site = None
Expand All @@ -94,6 +111,30 @@ def __init__(
self.wallet_key = params.get("wallet_key") or self.ident + rand_name
self.did = None

async def register_schema_and_creddef(self, schema_name, version, schema_attrs):
# Create a schema
schema_body = {
"schema_name": schema_name,
"schema_version": version,
"attributes": schema_attrs,
}
schema_response = await self.admin_POST("/schemas", schema_body)
# log_json(json.dumps(schema_response), label="Schema:")
schema_id = schema_response["schema_id"]
log_msg("Schema ID:", schema_id)

# Create a cred def for the schema
credential_definition_body = {"schema_id": schema_id}
credential_definition_response = await self.admin_POST(
"/credential-definitions", credential_definition_body
)
credential_definition_id = credential_definition_response[
"credential_definition_id"
]
log_msg("Cred def ID:", credential_definition_id)

return (schema_id, credential_definition_id)

def get_agent_args(self):
result = [
("--endpoint", self.endpoint),
Expand Down Expand Up @@ -159,6 +200,8 @@ def prefix_str(self):

async def register_did(self, ledger_url: str = None, alias: str = None):
self.log(f"Registering {self.ident} with seed {self.seed}")
if not ledger_url:
ledger_url = LEDGER_URL
if not ledger_url:
ledger_url = f"http://{self.external_host}:9000"
data = {"alias": alias or self.ident, "seed": self.seed, "role": "TRUST_ANCHOR"}
Expand Down Expand Up @@ -259,7 +302,10 @@ async def terminate(self):

async def listen_webhooks(self, webhook_port):
self.webhook_port = webhook_port
self.webhook_url = f"http://{self.external_host}:{str(webhook_port)}/webhooks"
if RUN_MODE == "pwd":
self.webhook_url = f"http://localhost:{str(webhook_port)}/webhooks"
else:
self.webhook_url = f"http://{self.external_host}:{str(webhook_port)}/webhooks"
app = web.Application()
app.add_routes([web.post("/webhooks/topic/{topic}/", self._receive_webhook)])
runner = web.AppRunner(app)
Expand Down Expand Up @@ -313,7 +359,7 @@ async def detect_process(self):
if resp.status == 200:
text = await resp.text()
break
except ClientError:
except ClientError as ce:
text = None
continue
if not text:
Expand Down
45 changes: 26 additions & 19 deletions demo/alice.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ async def handle_basicmessages(self, message):
self.log("Received message:", message["content"])


async def input_invitation(agent):
async for details in prompt_loop("Invite details: "):
if details:
try:
json.loads(details)
break
except json.JSONDecodeError as e:
log_msg("Invalid JSON:", str(e))
pass

with log_timer("Connect duration:"):
connection = await agent.admin_POST(
"/connections/receive-invitation", details
)
agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")

await agent.detect_connection()


async def main():

genesis = await default_genesis_txns()
Expand All @@ -151,33 +171,16 @@ async def main():
log_status("#7 Provision an agent and wallet, get back configuration details")
agent = AliceAgent(start_port, start_port + 1, genesis_data=genesis)
await agent.listen_webhooks(start_port + 2)
await agent.register_did()

with log_timer("Startup duration:"):
await agent.start_process()
log_msg("Admin url is at:", agent.admin_url)
log_msg("Endpoint url is at:", agent.endpoint)

log_status("#9 Input faber.py invitation details")
async for details in prompt_loop("Invite details: "):
if details:
try:
json.loads(details)
break
except json.JSONDecodeError as e:
log_msg("Invalid JSON:", str(e))
pass

with log_timer("Connect duration:"):
connection = await agent.admin_POST(
"/connections/receive-invitation", details
)
agent.connection_id = connection["connection_id"]
log_json(connection, label="Invitation response:")

await agent.detect_connection()
await input_invitation(agent)

async for option in prompt_loop("(3) Send Message (X) Exit? [3/X]: "):
async for option in prompt_loop("(3) Send Message (4) Input New Invitation (X) Exit? [3/4/X]: "):
if option in "xX":
break
elif option == "3":
Expand All @@ -187,6 +190,10 @@ async def main():
f"/connections/{agent.connection_id}/send-message",
{"content": msg},
)
elif option == "4":
# handle new invitation
log_status("Input new invitation details")
await input_invitation(agent)

if TIMING:
timing = await agent.fetch_timing()
Expand Down
Loading

0 comments on commit 40818b1

Please sign in to comment.