Skip to content

Commit

Permalink
fix(py-client-test): Read PluginClients In Order; fix shared_ticket R…
Browse files Browse the repository at this point in the history
…ef (#6155)

This is related to investigation in #5996. Swap the order of these tests to reduce
the probability of the py-grpc client from claiming the rpc failed.

One test was not testing the proper shared ticket; adjusted for correctness to test intended feature.

ObjectService change is to adopt the proper ExportObject dependency pattern. Otherwise 
correctness requires analysis/knowledge that the list of objects are all brand new server side 
exports.
  • Loading branch information
nbauernfeind authored Sep 30, 2024
1 parent ff1621a commit 09a7849
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 46 deletions.
48 changes: 24 additions & 24 deletions py/client/tests/test_plugin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ def test_publish_fetch_figure(self):
plugin_client = self.session.plugin_client(self.session.exportable_objects["plot3"])
self.assertIsNotNone(plugin_client)

with self.subTest("Fetchable in the Plugin object"):
payload, refs = next(plugin_client.resp_stream)
self.assertGreater(len(payload), 0)
self.assertGreater(len(refs), 0)
ref = refs[0]
self.assertEqual(ref.type, "Table")
fetched = ref.fetch()
self.assertIsNotNone(fetched)
self.assertEqual(fetched.size, 30)

# Publish the fetchable
tbl_shared_ticket = SharedTicket.random_ticket()
self.session.publish(ref, tbl_shared_ticket)

# Another session to use the shared fetchable
sub_session = Session()
sub_table = sub_session.fetch_table(tbl_shared_ticket)
self.assertIsNotNone(sub_table)
self.assertEqual(sub_table.size, 30)
sub_session.close()

with self.subTest("Plugin object"):
# First fetch the Plugin object, then publish it
export_plugin_client = self.session.fetch(plugin_client)
Expand All @@ -67,34 +88,13 @@ def test_publish_fetch_figure(self):
sub_plugin_client.close()
sub_session.close()

with self.subTest("Fetchable in the Plugin object"):
payload, refs = next(plugin_client.resp_stream)
self.assertGreater(len(payload), 0)
self.assertGreater(len(refs), 0)
ref = refs[0]
self.assertEqual(ref.type, "Table")
fetched = ref.fetch()
self.assertIsNotNone(fetched)
self.assertEqual(fetched.size, 30)

# Publish the fetchable
shared_ticket = SharedTicket.random_ticket()
self.session.publish(ref, shared_ticket)

# Another session to use the shared fetchable
sub_session = Session()
sub_table = sub_session.fetch_table(shared_ticket)
self.assertIsNotNone(sub_table)
self.assertEqual(sub_table.size, 30)
sub_session.close()

with self.subTest("released Plugin object"):
sub_session = Session()
server_obj = ServerObject(type="Figure", ticket=shared_ticket)
sub_plugin_client = sub_session.plugin_client(server_obj)
# close top-level session's export to disable the shared ticket
self.session.release(export_plugin_client)
server_obj = ServerObject(type="Figure", ticket=shared_ticket)
with self.assertRaises(Exception):
payload, refs = next(sub_plugin_client.resp_stream)
sub_plugin_client = sub_session.plugin_client(server_obj)
sub_session.close()

plugin_client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,36 +78,35 @@ private final class SendMessageObserver implements StreamObserver<StreamRequest>

class EnqueuedStreamOperation {
private final StreamOperation wrapped;
private final List<ExportObject<?>> requirements;
private final SessionState.ExportBuilder<Object> nonExport;

EnqueuedStreamOperation(Collection<? extends ExportObject<?>> dependencies,
StreamOperation wrapped) {
this.wrapped = wrapped;
this.requirements = List.copyOf(dependencies);
this.nonExport = session.nonExport()
.onErrorHandler(SendMessageObserver.this::onError)
.require(List.copyOf(dependencies));
}

public void run() {
session.nonExport()
.onErrorHandler(SendMessageObserver.this::onError)
.require(requirements)
.submit(() -> {
if (runState.get() == EnqueuedState.CLOSED) {
return;
}
// Run the specified work. Note that we're not concerned about exceptions, the stream will
// be dead (via onError) and won't be used again.
try {
wrapped.run();
} catch (ObjectCommunicationException e) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"Error performing MessageStream operation");
}
nonExport.submit(() -> {
if (runState.get() == EnqueuedState.CLOSED) {
return;
}
// Run the specified work. Note that we're not concerned about exceptions, the stream will
// be dead (via onError) and won't be used again.
try {
wrapped.run();
} catch (ObjectCommunicationException e) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"Error performing MessageStream operation");
}

// Set state to WAITING if it is RUNNING so that any new work can race being added
if (runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) {
doWork();
} // else the stream should be ended and no more work done
});
// Set state to WAITING if it is RUNNING so that any new work can race being added
if (runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) {
doWork();
} // else the stream should be ended and no more work done
});
}
}

Expand Down

0 comments on commit 09a7849

Please sign in to comment.