diff --git a/py/client/tests/test_plugin_client.py b/py/client/tests/test_plugin_client.py index ae8b3041a73..ad990c26988 100644 --- a/py/client/tests/test_plugin_client.py +++ b/py/client/tests/test_plugin_client.py @@ -46,6 +46,27 @@ def test_publish_fetch_figure(self): plugin_client = self.session.plugin_client(self.session.exportable_objects["plot3"]) self.assertIsNotNone(plugin_client) + with self.subTest("Fetchable in the Plugin object"): + payload, refs = next(plugin_client.resp_stream) + self.assertGreater(len(payload), 0) + self.assertGreater(len(refs), 0) + ref = refs[0] + self.assertEqual(ref.type, "Table") + fetched = ref.fetch() + self.assertIsNotNone(fetched) + self.assertEqual(fetched.size, 30) + + # Publish the fetchable + tbl_shared_ticket = SharedTicket.random_ticket() + self.session.publish(ref, tbl_shared_ticket) + + # Another session to use the shared fetchable + sub_session = Session() + sub_table = sub_session.fetch_table(tbl_shared_ticket) + self.assertIsNotNone(sub_table) + self.assertEqual(sub_table.size, 30) + sub_session.close() + with self.subTest("Plugin object"): # First fetch the Plugin object, then publish it export_plugin_client = self.session.fetch(plugin_client) @@ -67,34 +88,13 @@ def test_publish_fetch_figure(self): sub_plugin_client.close() sub_session.close() - with self.subTest("Fetchable in the Plugin object"): - payload, refs = next(plugin_client.resp_stream) - self.assertGreater(len(payload), 0) - self.assertGreater(len(refs), 0) - ref = refs[0] - self.assertEqual(ref.type, "Table") - fetched = ref.fetch() - self.assertIsNotNone(fetched) - self.assertEqual(fetched.size, 30) - - # Publish the fetchable - shared_ticket = SharedTicket.random_ticket() - self.session.publish(ref, shared_ticket) - - # Another session to use the shared fetchable - sub_session = Session() - sub_table = sub_session.fetch_table(shared_ticket) - self.assertIsNotNone(sub_table) - self.assertEqual(sub_table.size, 30) - sub_session.close() - with self.subTest("released Plugin object"): sub_session = Session() - server_obj = ServerObject(type="Figure", ticket=shared_ticket) - sub_plugin_client = sub_session.plugin_client(server_obj) + # close top-level session's export to disable the shared ticket self.session.release(export_plugin_client) + server_obj = ServerObject(type="Figure", ticket=shared_ticket) with self.assertRaises(Exception): - payload, refs = next(sub_plugin_client.resp_stream) + sub_plugin_client = sub_session.plugin_client(server_obj) sub_session.close() plugin_client.close() diff --git a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java index f2901dee6ff..60aa6283e17 100644 --- a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java @@ -78,36 +78,35 @@ private final class SendMessageObserver implements StreamObserver class EnqueuedStreamOperation { private final StreamOperation wrapped; - private final List> requirements; + private final SessionState.ExportBuilder nonExport; EnqueuedStreamOperation(Collection> dependencies, StreamOperation wrapped) { this.wrapped = wrapped; - this.requirements = List.copyOf(dependencies); + this.nonExport = session.nonExport() + .onErrorHandler(SendMessageObserver.this::onError) + .require(List.copyOf(dependencies)); } public void run() { - session.nonExport() - .onErrorHandler(SendMessageObserver.this::onError) - .require(requirements) - .submit(() -> { - if (runState.get() == EnqueuedState.CLOSED) { - return; - } - // Run the specified work. Note that we're not concerned about exceptions, the stream will - // be dead (via onError) and won't be used again. - try { - wrapped.run(); - } catch (ObjectCommunicationException e) { - throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, - "Error performing MessageStream operation"); - } + nonExport.submit(() -> { + if (runState.get() == EnqueuedState.CLOSED) { + return; + } + // Run the specified work. Note that we're not concerned about exceptions, the stream will + // be dead (via onError) and won't be used again. + try { + wrapped.run(); + } catch (ObjectCommunicationException e) { + throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, + "Error performing MessageStream operation"); + } - // Set state to WAITING if it is RUNNING so that any new work can race being added - if (runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) { - doWork(); - } // else the stream should be ended and no more work done - }); + // Set state to WAITING if it is RUNNING so that any new work can race being added + if (runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) { + doWork(); + } // else the stream should be ended and no more work done + }); } }