Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move away from ClientMiddleware and ClientAuthHandler in pydeephaven #5489

Merged
merged 41 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3578908
For debugging the gRPC error seen at optiver
jmao-denver May 8, 2024
3553aeb
Move away from ClientMiddleware and ClientAuthHandler in pydeephaven.
jcferretti May 14, 2024
f6dadda
Cleanups.
jcferretti May 15, 2024
15cb8be
Cleanups.
jcferretti May 15, 2024
7d72046
Remove unused import.
jcferretti May 15, 2024
05ddb37
type annotation for _refresh_token return.
jcferretti May 15, 2024
fbf225e
Remove test_mt.
jcferretti May 15, 2024
6653f15
Fix issues with bi-streaming calls and typos.
jcferretti May 15, 2024
1b36a54
Followup to review comments from Jianfeng.
jcferretti May 15, 2024
7bd0874
Update _session_service.py after _auth_value rename.
jcferretti May 15, 2024
7e98271
Followup to Corey's comments.
jcferretti May 15, 2024
084ea10
Followup to Corey's comments.
jcferretti May 15, 2024
16baf19
Limit refresh retries.
jcferretti May 15, 2024
2108d19
Fixed forwarding.
jcferretti May 15, 2024
ec17ddb
Do something more sane for wrap_rpc with pre-existing metadata.
jcferretti May 15, 2024
b77a416
Again.
jcferretti May 15, 2024
04ecf83
Followup to review comments from Colin.
jcferretti May 16, 2024
2be4d76
Removed comment that didn't apply anymore.
jcferretti May 16, 2024
5046212
Followup to review comments.
jcferretti May 16, 2024
cb49b18
Removed file that I did not intend to add.
jcferretti May 16, 2024
67876bb
Followup to review comments.
jcferretti May 16, 2024
4d96bfb
Fix typos, thanks Jianfeng.
jcferretti May 16, 2024
5be9d10
Shorter.
jcferretti May 16, 2024
567e878
Followup to review comments.
jcferretti May 16, 2024
2a7ac15
Followup to Colin's comments.
jcferretti May 16, 2024
dcaeba5
Rework wrap.
jcferretti May 16, 2024
3b9d80a
Fix typo, hat tip Jianfeng.
jcferretti May 16, 2024
01e2cac
remove unused imports.
jcferretti May 16, 2024
d21f90a
Moved to _services_lock for service initialization.
jcferretti May 16, 2024
af4f656
Restore newline.
jcferretti May 16, 2024
915df90
remove newline
jcferretti May 16, 2024
e13e4d3
remove spurious whitespace
jcferretti May 16, 2024
0a27bad
Add newline
jcferretti May 16, 2024
cd8560f
Add newline.
jcferretti May 16, 2024
41b1a97
Something.
jcferretti May 16, 2024
b15473b
Ensure the program terminates if we run out of refresh attempts.
jcferretti May 16, 2024
c57f6dd
Tweak retry failure case.
jcferretti May 16, 2024
765f383
Spurious whitespace.
jcferretti May 16, 2024
a1b374d
Fix types on update_matadata.
jcferretti May 16, 2024
82f1049
Add type annotations
jcferretti May 16, 2024
dd8fc49
One more type hint.
jcferretti May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_app_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def __init__(self, session):
def list_fields(self) -> Any:
"""Fetches the current application fields."""
try:
fields = self._grpc_app_stub.ListFields(
application_pb2.ListFieldsRequest(),
metadata=self.session.grpc_metadata
fields = self.session.wrap_bidi_rpc(
self._grpc_app_stub.ListFields,
application_pb2.ListFieldsRequest()
)
return fields
except Exception as e:
Expand Down
18 changes: 11 additions & 7 deletions py/client/pydeephaven/_arrow_flight_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyarrow as pa
import pyarrow.flight as paflight

from pyarrow.flight import FlightCallOptions
from pydeephaven._arrow import map_arrow_type
from pydeephaven.dherror import DHError
from pydeephaven.table import Table
Expand All @@ -26,10 +27,10 @@ def import_table(self, data: pa.Table) -> Table:
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type)))
dh_schema = pa.schema(dh_fields)

# No need to add headers/metadata here via the options argument;
# or middleware is already doing it for every call.
writer, reader = self._flight_client.do_put(
pa.flight.FlightDescriptor.for_path("export", str(ticket)), dh_schema)
pa.flight.FlightDescriptor.for_path("export", str(ticket)),
dh_schema,
FlightCallOptions(headers=self.session.grpc_metadata))
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
writer.write_table(data)
# Note that pyarrow's write_table completes the gRPC. If we send another gRPC close
# it is possible that by the time the request arrives at the server that it no longer
Expand All @@ -44,9 +45,10 @@ def do_get_table(self, table: Table) -> pa.Table:
"""Gets a snapshot of a Table via Flight do_get."""
try:
flight_ticket = paflight.Ticket(table.ticket.ticket)
# No need to add headers/metadata here via the options argument;
# or middleware is already doing it for every call.
reader = self._flight_client.do_get(flight_ticket)
reader = self._flight_client.do_get(
flight_ticket,
FlightCallOptions(headers=self.session.grpc_metadata))

return reader.read_all()
except Exception as e:
raise DHError("failed to perform a flight DoGet on the table.") from e
Expand All @@ -59,7 +61,9 @@ def do_exchange(self):
"""
try:
desc = pa.flight.FlightDescriptor.for_command(b"dphn")
writer, reader = self._flight_client.do_exchange(desc)
writer, reader = self._flight_client.do_exchange(
desc,
FlightCallOptions(headers=self.session.grpc_metadata))
return writer, reader

except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_config_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def __init__(self, session):
def get_configuration_constants(self) -> Dict[str, Any]:
"""Fetches the server configuration as a dict."""
try:
response = self._grpc_app_stub.GetConfigurationConstants(config_pb2.ConfigurationConstantsRequest(),
metadata=self.session.grpc_metadata
)
response = self.session.wrap_rpc(
self._grpc_app_stub.GetConfigurationConstants,
config_pb2.ConfigurationConstantsRequest())
return dict(response.config_values)
except Exception as e:
raise DHError("failed to get the configuration constants.") from e
25 changes: 14 additions & 11 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ def start_console(self):
if not self.console_id:
try:
result_id = self.session.make_ticket()
response = self._grpc_console_stub.StartConsole(
console_pb2.StartConsoleRequest(result_id=result_id, session_type=self.session._session_type),
metadata=self.session.grpc_metadata)
response = self.session.wrap_rpc(
self._grpc_console_stub.StartConsole,
console_pb2.StartConsoleRequest(
result_id=result_id,
session_type=self.session._session_type))
self.console_id = response.result_id
except Exception as e:
raise DHError("failed to start a console.") from e
Expand All @@ -35,11 +37,11 @@ def run_script(self, server_script: str) -> Any:
self.start_console()

try:
response = self._grpc_console_stub.ExecuteCommand(
response = self.session.wrap_rpc(
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script),
metadata=self.session.grpc_metadata)
code=server_script))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand All @@ -49,10 +51,11 @@ def bind_table(self, table: Table, variable_name: str):
if not table or not variable_name:
raise DHError("invalid table and/or variable_name values.")
try:
response = self._grpc_console_stub.BindTableToVariable(
console_pb2.BindTableToVariableRequest(console_id=self.console_id,
table_id=table.ticket,
variable_name=variable_name),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_console_stub.BindTableToVariable,
console_pb2.BindTableToVariableRequest(
console_id=self.console_id,
table_id=table.ticket,
variable_name=variable_name))
except Exception as e:
raise DHError("failed to bind a table to a variable on the server.") from e
15 changes: 8 additions & 7 deletions py/client/pydeephaven/_input_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ def __init__(self, session):
def add(self, input_table: InputTable, table: Table):
"""Adds a table to the InputTable."""
try:
response = self._grpc_input_table_stub.AddTableToInputTable(
self.session.wrap_rpc(
self._grpc_input_table_stub.AddTableToInputTable,
inputtable_pb2.AddTableRequest(input_table=input_table.ticket,
table_to_add=table.ticket),
metadata=self.session.grpc_metadata)
table_to_add=table.ticket))
except Exception as e:
raise DHError("failed to add to InputTable") from e

def delete(self, input_table: InputTable, table: Table):
"""Deletes a table from an InputTable."""
try:
response = self._grpc_input_table_stub.DeleteTableFromInputTable(
inputtable_pb2.DeleteTableRequest(input_table=input_table.ticket,
table_to_remove=table.ticket),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_input_table_stub.DeleteTableFromInputTable,
inputtable_pb2.DeleteTableRequest(
input_table=input_table.ticket,
table_to_remove=table.ticket))
except Exception as e:
raise DHError("failed to delete from InputTable") from e
4 changes: 3 additions & 1 deletion py/client/pydeephaven/_plugin_obj_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def __init__(self, session: 'pydeephaven.session.Session'):
def message_stream(self, req_stream: PluginRequestStream) -> Any:
"""Opens a connection to the server-side implementation of this plugin."""
try:
resp = self._grpc_app_stub.MessageStream(req_stream, metadata=self.session.grpc_metadata)
resp = self.session.wrap_bidi_rpc(
self._grpc_app_stub.MessageStream,
req_stream)
return resp
except Exception as e:
raise DHError("failed to establish bidirectional stream with the server.") from e
20 changes: 14 additions & 6 deletions py/client/pydeephaven/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ def connect(self) -> grpc.Channel:
def close(self):
"""Closes the gRPC connection."""
try:
self._grpc_session_stub.CloseSession(
session_pb2.HandshakeRequest(auth_protocol=0, payload=self.session._auth_token),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_session_stub.CloseSession,
session_pb2.HandshakeRequest(
auth_protocol=0,
payload=self.session._auth_header_value))
except Exception as e:
raise DHError("failed to close the session.") from e

def release(self, ticket):
"""Releases an exported ticket."""
try:
self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_session_stub.Release,
session_pb2.ReleaseRequest(id=ticket))
except Exception as e:
raise DHError("failed to release a ticket.") from e

Expand All @@ -52,6 +56,10 @@ def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ti
result_ticket: The result ticket to publish to.
"""
try:
self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=source_ticket, result_id=result_ticket), metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_session_stub.PublishFromTicket,
session_pb2.PublishRequest(
source_id=source_ticket,
result_id=result_ticket))
except Exception as e:
raise DHError("failed to publish a ticket.") from e
raise DHError("failed to publish a ticket.") from e
22 changes: 13 additions & 9 deletions py/client/pydeephaven/_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def batch(self, ops: List[TableOp]) -> Table:
batch_ops = BatchOpAssembler(self.session, table_ops=ops).build_batch()

try:
response = self._grpc_table_stub.Batch(
table_pb2.BatchTableRequest(ops=batch_ops),
metadata=self.session.grpc_metadata)
response = self.session.wrap_bidi_rpc(
self._grpc_table_stub.Batch,
table_pb2.BatchTableRequest(ops=batch_ops))

exported_tables = []
for exported in response:
Expand All @@ -46,9 +46,11 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) ->
else:
table_reference = None
stub_func = op.__class__.get_stub_func(self._grpc_table_stub)
response = stub_func(op.make_grpc_request(result_id=result_id, source_id=table_reference),
metadata=self.session.grpc_metadata)

response = self.session.wrap_rpc(
stub_func,
op.make_grpc_request(
result_id=result_id,
source_id=table_reference))
if response.success:
return table_class(self.session, ticket=response.result_id.ticket,
schema_header=response.schema_header,
Expand All @@ -61,11 +63,13 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) ->

def fetch_etcr(self, ticket) -> Table:
"""Given a ticket, constructs a table around it, by fetching metadata from the server."""
response = self._grpc_table_stub.GetExportedTableCreationResponse(ticket, metadata=self.session.grpc_metadata)
response = self.session.wrap_rpc(
self._grpc_table_stub.GetExportedTableCreationResponse,
ticket)
if response.success:
return Table(self.session, ticket=response.result_id.ticket,
schema_header=response.schema_header,
size=response.size,
is_static=response.is_static)
else:
raise DHError(f"Server error received for ExportedTableCreationResponse: {response.error_info}")
raise DHError(
f"Server error received for ExportedTableCreationResponse: {response.error_info}")
Loading
Loading