-
Notifications
You must be signed in to change notification settings - Fork 288
/
Copy pathcloud_adapter.py
199 lines (169 loc) · 7.08 KB
/
cloud_adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Awaitable, Callable, Optional
from aiohttp.web import (
Request,
Response,
json_response,
WebSocketResponse,
HTTPBadRequest,
HTTPMethodNotAllowed,
HTTPUnauthorized,
HTTPUnsupportedMediaType,
)
from botbuilder.core import (
Bot,
CloudAdapterBase,
InvokeResponse,
TurnContext,
)
from botbuilder.core.streaming import (
StreamingActivityProcessor,
StreamingHttpDriver,
StreamingRequestHandler,
)
from botbuilder.schema import Activity
from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
from botframework.connector.aio import ConnectorClient
from botframework.connector.auth import (
AuthenticateRequestResult,
BotFrameworkAuthentication,
BotFrameworkAuthenticationFactory,
ConnectorFactory,
MicrosoftAppCredentials,
)
from .bot_framework_http_adapter_integration_base import (
BotFrameworkHttpAdapterIntegrationBase,
)
class CloudAdapter(CloudAdapterBase, BotFrameworkHttpAdapterIntegrationBase):
def __init__(self, bot_framework_authentication: BotFrameworkAuthentication = None):
"""
Initializes a new instance of the CloudAdapter class.
:param bot_framework_authentication: Optional BotFrameworkAuthentication instance
"""
# pylint: disable=invalid-name
if not bot_framework_authentication:
bot_framework_authentication = BotFrameworkAuthenticationFactory.create()
self._AUTH_HEADER_NAME = "authorization"
self._CHANNEL_ID_HEADER_NAME = "channelid"
super().__init__(bot_framework_authentication)
async def process(
self, request: Request, bot: Bot, ws_response: WebSocketResponse = None
) -> Optional[Response]:
if not request:
raise TypeError("request can't be None")
# if ws_response is None:
# raise TypeError("ws_response can't be None")
if not bot:
raise TypeError("bot can't be None")
try:
# Only GET requests for web socket connects are allowed
if (
request.method == "GET"
and ws_response
and ws_response.can_prepare(request)
):
# All socket communication will be handled by the internal streaming-specific BotAdapter
await self._connect(bot, request, ws_response)
elif request.method == "POST":
# Deserialize the incoming Activity
if "application/json" in request.headers["Content-Type"]:
body = await request.json()
else:
raise HTTPUnsupportedMediaType()
activity: Activity = Activity().deserialize(body)
# A POST request must contain an Activity
if not activity.type:
raise HTTPBadRequest
# Grab the auth header from the inbound http request
auth_header = (
request.headers["Authorization"]
if "Authorization" in request.headers
else ""
)
# Process the inbound activity with the bot
invoke_response = await self.process_activity(
auth_header, activity, bot.on_turn
)
# Write the response, serializing the InvokeResponse
if invoke_response:
return json_response(
data=invoke_response.body, status=invoke_response.status
)
return Response(status=201)
else:
raise HTTPMethodNotAllowed
except (HTTPUnauthorized, PermissionError) as _:
raise HTTPUnauthorized
async def _connect(
self, bot: Bot, request: Request, ws_response: WebSocketResponse
):
if ws_response is None:
raise TypeError("ws_response can't be None")
# Grab the auth header from the inbound http request
auth_header = request.headers.get(self._AUTH_HEADER_NAME)
# Grab the channelId which should be in the http headers
channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME)
authentication_request_result = (
await self.bot_framework_authentication.authenticate_streaming_request(
auth_header, channel_id
)
)
# Transition the request to a WebSocket connection
await ws_response.prepare(request)
bf_web_socket = AiohttpWebSocket(ws_response)
streaming_activity_processor = _StreamingActivityProcessor(
authentication_request_result, self, bot, bf_web_socket
)
await streaming_activity_processor.listen()
class _StreamingActivityProcessor(StreamingActivityProcessor):
def __init__(
self,
authenticate_request_result: AuthenticateRequestResult,
adapter: CloudAdapter,
bot: Bot,
web_socket: AiohttpWebSocket = None,
) -> None:
self._authenticate_request_result = authenticate_request_result
self._adapter = adapter
# Internal reuse of the existing StreamingRequestHandler class
self._request_handler = StreamingRequestHandler(bot, self, web_socket)
# Fix up the connector factory so connector create from it will send over this connection
self._authenticate_request_result.connector_factory = (
_StreamingConnectorFactory(self._request_handler)
)
async def listen(self):
await self._request_handler.listen()
async def process_streaming_activity(
self,
activity: Activity,
bot_callback_handler: Callable[[TurnContext], Awaitable],
) -> InvokeResponse:
return await self._adapter.process_activity(
self._authenticate_request_result, activity, bot_callback_handler
)
class _StreamingConnectorFactory(ConnectorFactory):
def __init__(self, request_handler: StreamingRequestHandler) -> None:
self._request_handler = request_handler
self._service_url = None
async def create(
self, service_url: str, audience: str # pylint: disable=unused-argument
) -> ConnectorClient:
if not self._service_url:
self._service_url = service_url
elif service_url != self._service_url:
raise RuntimeError(
"This is a streaming scenario, all connectors from this factory must all be for the same url."
)
# TODO: investigate if Driver and pipeline should be moved here
streaming_driver = StreamingHttpDriver(self._request_handler)
config = BotFrameworkConnectorConfiguration(
MicrosoftAppCredentials.empty(),
service_url,
pipeline_type=AsyncBfPipeline,
driver=streaming_driver,
)
streaming_driver.config = config
connector_client = ConnectorClient(None, custom_configuration=config)
return connector_client