Skip to content

Commit

Permalink
Remove java execution logic during table.py module init (#2872)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith authored Sep 20, 2022
1 parent 05976d6 commit 7f63d2c
Show file tree
Hide file tree
Showing 27 changed files with 145 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.util.annotations.VisibleForTesting;

import java.util.Arrays;
import java.util.Objects;
import java.util.function.Supplier;

public class ExecutionContext {
Expand All @@ -18,17 +19,8 @@ public static Builder newBuilder() {
return new Builder();
}

public static ExecutionContext makeSystemicExecutionContext() {
final ExecutionContext context = getContext();
if (context.isSystemic) {
return context;
}
return ExecutionContext.newBuilder()
.captureQueryScope()
.captureQueryLibrary()
.captureQueryCompiler()
.markSystemic()
.build();
public static ExecutionContext makeExecutionContext(boolean isSystemic) {
return getContext().withSystemic(isSystemic);
}

@VisibleForTesting
Expand Down Expand Up @@ -104,9 +96,23 @@ private ExecutionContext(
final QueryScope queryScope,
final QueryCompiler queryCompiler) {
this.isSystemic = isSystemic;
this.queryLibrary = queryLibrary;
this.queryScope = queryScope;
this.queryCompiler = queryCompiler;
this.queryLibrary = Objects.requireNonNull(queryLibrary);
this.queryScope = Objects.requireNonNull(queryScope);
this.queryCompiler = Objects.requireNonNull(queryCompiler);
}

/**
* Returns, or creates, an execution context with the given value for {@code isSystemic} and existing values for the
* other members.
*
* @param isSystemic if the context should be systemic
* @return the execution context
*/
public ExecutionContext withSystemic(boolean isSystemic) {
if (isSystemic == this.isSystemic) {
return this;
}
return new ExecutionContext(isSystemic, queryLibrary, queryScope, queryCompiler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ public VariableProvider getVariableProvider() {
// ScriptSession-based QueryScope implementation, with no remote scope or object reflection support
// -----------------------------------------------------------------------------------------------------------------

private abstract static class ScriptSessionQueryScope extends QueryScope {
public abstract static class ScriptSessionQueryScope extends QueryScope {
final ScriptSession scriptSession;

private ScriptSessionQueryScope(ScriptSession scriptSession) {
public ScriptSessionQueryScope(ScriptSession scriptSession) {
this.scriptSession = scriptSession;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.deephaven.plugin.type.ObjectTypeLookup.NoOp;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.time.DateTime;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.GroovyDeephavenSession;
Expand Down Expand Up @@ -336,7 +335,7 @@ private void validateBindingTables(GroovyDeephavenSession session, Map<String, O

private void validateBindingPartitionedTableConstituents(
GroovyDeephavenSession session, Map<String, Object> hardReferences) {
final ExecutionContext executionContext = ExecutionContext.makeSystemicExecutionContext();
final ExecutionContext executionContext = ExecutionContext.makeExecutionContext(true);
// noinspection unchecked
session.getBinding().getVariables().forEach((k, v) -> {
if (v instanceof PartitionedTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void testTransformPartitionedTableThenMerge() {

final PartitionedTable partitionedTable = sourceTable.partitionBy("Key");

final ExecutionContext executionContext = ExecutionContext.makeSystemicExecutionContext();
final ExecutionContext executionContext = ExecutionContext.makeExecutionContext(true);
final EvalNuggetInterface[] en = new EvalNuggetInterface[] {
new EvalNugget() {
@Override
Expand Down Expand Up @@ -753,11 +753,7 @@ public void testMergeConstituentChanges() {
ExecutionContext.getContext().getQueryLibrary().importStatic(TableTools.class);

final Table underlying;
try (final SafeCloseable ignored = ExecutionContext.newBuilder()
.captureQueryLibrary()
.captureQueryCompiler()
.captureQueryScope()
.build().open()) {
try (final SafeCloseable ignored = ExecutionContext.makeExecutionContext(false).open()) {
underlying = base.update(
"Constituent=emptyTable(1000 * step.longValue()).update(\"JJ=ii * \" + II + \" * step.longValue()\")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TestConcurrentInstantiation extends QueryTableTestBase {
@Override
protected void setUp() throws Exception {
super.setUp();
final ExecutionContext executionContext = ExecutionContext.makeSystemicExecutionContext();
final ExecutionContext executionContext = ExecutionContext.makeExecutionContext(true);
final ThreadFactory threadFactory = runnable -> {
Thread thread = new Thread(() -> {
try (final SafeCloseable ignored = executionContext.open()) {
Expand Down
21 changes: 21 additions & 0 deletions py/server/deephaven/_jpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#

""" This module is an internal module to simplify usage patterns around jpy.
"""
from __future__ import annotations

import jpy

from deephaven import DHError


def strict_cast(j_obj: jpy.JType, to_type: type) -> jpy.JType:
"""A convenience function around jpy.cast. Checks that j_obj is not None and that the result is not None."""
if not j_obj:
raise DHError(message=f"Unable to cast None into '{to_type}'")
j_obj_casted = jpy.cast(j_obj, to_type)
if not j_obj_casted:
raise DHError(message=f"Unable to cast '{j_obj.getClass()}' into '{to_type}'")
return j_obj_casted
5 changes: 5 additions & 0 deletions py/server/deephaven/dherror.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class DHError(Exception):
"""

def __init__(self, cause=None, message=""):
if isinstance(cause, str) and message == "":
# Saves a lot of debugging headache when library code incorrectly creates something like:
# raise DHError("My error message here")
message = cause
cause = None
self._message = message
self._traceback = traceback.format_exc()
self._cause = cause
Expand Down
19 changes: 12 additions & 7 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import jpy

from deephaven import DHError, dtypes
from deephaven._jpy import strict_cast
from deephaven._wrapper import JObjectWrapper
from deephaven.agg import Aggregation
from deephaven.column import Column, ColumnType
Expand Down Expand Up @@ -47,12 +48,15 @@

# Dynamic Query Scope
_JExecutionContext = jpy.get_type("io.deephaven.engine.context.ExecutionContext")
_JQueryScope = jpy.get_type("io.deephaven.engine.context.QueryScope")
_JUnsynchronizedScriptSessionQueryScope = jpy.get_type(
"io.deephaven.engine.util.AbstractScriptSession$UnsynchronizedScriptSessionQueryScope")
_JScriptSessionQueryScope = jpy.get_type("io.deephaven.engine.util.AbstractScriptSession$ScriptSessionQueryScope")
_JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession")
_j_script_session = jpy.cast(_JExecutionContext.getContext().getQueryScope(), _JUnsynchronizedScriptSessionQueryScope).scriptSession()
_j_py_script_session = jpy.cast(_j_script_session, _JPythonScriptSession)


def _j_py_script_session() -> _JPythonScriptSession:
j_execution_context = _JExecutionContext.getContext()
j_query_scope = j_execution_context.getQueryScope()
j_script_session_query_scope = strict_cast(j_query_scope, _JScriptSessionQueryScope)
return strict_cast(j_script_session_query_scope.scriptSession(), _JPythonScriptSession)


@contextlib.contextmanager
Expand All @@ -72,11 +76,12 @@ def _query_scope_ctx():
if len(outer_frames) > i + 2 or function != "<module>":
scope_dict = caller_frame.f_globals.copy()
scope_dict.update(caller_frame.f_locals)
j_py_script_session = _j_py_script_session()
try:
_j_py_script_session.pushScope(scope_dict)
j_py_script_session.pushScope(scope_dict)
yield
finally:
_j_py_script_session.popScope()
j_py_script_session.popScope()
else:
# in the __main__ module, use the default main global scope
yield
Expand Down
5 changes: 5 additions & 0 deletions py/server/test_helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

from deephaven_internal import jvm


py_dh_session = None


def start_jvm(jvm_props: Dict[str, str] = None):
jvm.preload_jvm_dll()
import jpy
Expand Down Expand Up @@ -93,6 +97,7 @@ def start_jvm(jvm_props: Dict[str, str] = None):

# Set up a Deephaven Python session
py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals()
global py_dh_session
py_dh_session = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession")(py_scope_jpy)
py_dh_session.getExecutionContext().open()

Expand Down
4 changes: 4 additions & 0 deletions py/server/tests/test_calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class CalendarTestCase(BaseTestCase):
def setUp(self) -> None:
super().setUp()
self.test_calendar = BusinessCalendar("USNYSE")
self.b_day1 = "2022-01-03"
self.b_day = "2022-03-08"
Expand All @@ -22,6 +23,9 @@ def setUp(self) -> None:
self.last_b_day_month = "2022-03-31"
self.last_b_day_week = "2022-03-11"

def tearDown(self) -> None:
super().tearDown()

def test_calendar(self):
with self.subTest(msg="calendarNames() test"):
cal_names = calendar_names()
Expand Down
2 changes: 2 additions & 0 deletions py/server/tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@

class FilterTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.test_table = read_csv("tests/data/test_table.csv")

def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_regex_filter(self):
new_test_table = self.test_table.update("X = String.valueOf(d)")
Expand Down
2 changes: 2 additions & 0 deletions py/server/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class CustomClass:

class NumpyTestCase(BaseTestCase):
def setUp(self):
super().setUp()
j_array_list1 = j_array_list([1, -1])
j_array_list2 = j_array_list([2, -2])
input_cols = [
Expand Down Expand Up @@ -65,6 +66,7 @@ def setUp(self):

def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_to_numpy(self):
for col in self.test_table.columns:
Expand Down
2 changes: 2 additions & 0 deletions py/server/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CustomClass:

class PandasTestCase(BaseTestCase):
def setUp(self):
super().setUp()
j_array_list1 = j_array_list([1, -1])
j_array_list2 = j_array_list([2, -2])
input_cols = [
Expand All @@ -49,6 +50,7 @@ def setUp(self):

def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_to_pandas(self):
df = to_pandas(self.test_table)
Expand Down
49 changes: 19 additions & 30 deletions py/server/tests/test_partitioned_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#

import jpy
import unittest

from deephaven.agg import partition
Expand All @@ -11,7 +10,7 @@
from deephaven.filters import Filter

from deephaven import read_csv, DHError, new_table, ugp, time_table
from tests.testbase import BaseTestCase
from tests.testbase import BaseTestCase, make_user_exec_ctx


def transform_func(t: Table) -> Table:
Expand All @@ -35,12 +34,14 @@ def apply(self, t: Table, ot: Table) -> Table:

class PartitionedTableTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.test_table = read_csv("tests/data/test_table.csv").tail(num_rows=100)
self.partitioned_table = self.test_table.partition_by(by=["c", "e"])

def tearDown(self):
self.partitioned_table = None
self.test_table = None
super().tearDown()

def test_table(self):
self.assertIsNotNone(self.partitioned_table.table)
Expand Down Expand Up @@ -117,37 +118,25 @@ def test_constituents(self):
self.assertGreater(len(constituent_tables), 0)

def test_transform(self):
_JExecutionContext = jpy.get_type("io.deephaven.engine.context.ExecutionContext")
context = _JExecutionContext.newBuilder() \
.captureQueryCompiler() \
.captureQueryLibrary() \
.emptyQueryScope() \
.build().open()
pt = self.partitioned_table.transform(transform_func)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

pt = self.partitioned_table.transform(Transformer)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])
with make_user_exec_ctx():
pt = self.partitioned_table.transform(transform_func)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

with self.assertRaises(DHError) as cm:
pt = self.partitioned_table.transform(lambda t, t1: t.join(t1))
self.assertRegex(str(cm.exception), r"missing .* argument")
context.close()
pt = self.partitioned_table.transform(Transformer)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

with self.assertRaises(DHError) as cm:
pt = self.partitioned_table.transform(lambda t, t1: t.join(t1))
self.assertRegex(str(cm.exception), r"missing .* argument")

def test_partitioned_transform(self):
_JExecutionContext = jpy.get_type("io.deephaven.engine.context.ExecutionContext")
context = _JExecutionContext.newBuilder() \
.captureQueryCompiler() \
.captureQueryLibrary() \
.emptyQueryScope() \
.build().open()
other_pt = self.partitioned_table.transform(transform_func)
pt = self.partitioned_table.partitioned_transform(other_pt, partitioned_transform_func)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer())
self.assertIn("f", [col.name for col in pt.constituent_table_columns])
context.close()
with make_user_exec_ctx():
other_pt = self.partitioned_table.transform(transform_func)
pt = self.partitioned_table.partitioned_transform(other_pt, partitioned_transform_func)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer())
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

def test_partition_agg(self):
with ugp.shared_lock():
Expand Down
2 changes: 2 additions & 0 deletions py/server/tests/test_plot/test_color.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

class ColorTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.test_table = read_csv("tests/data/test_table.csv")

def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_color(self):
figure = Figure()
Expand Down
2 changes: 2 additions & 0 deletions py/server/tests/test_plot/test_figure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@

class FigureTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.test_table = read_csv("tests/data/test_table.csv")

def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_figure(self):
with self.subTest("Not supported yet."):
Expand Down
Loading

0 comments on commit 7f63d2c

Please sign in to comment.