From d453d5ec401b34a0cf66daf1e22a299e67650530 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Thu, 12 Dec 2024 09:55:25 +0100 Subject: [PATCH 1/5] Revert "Removed org.apache.commons.collections from OpenAI (#4304)" (#4305) This reverts commit 7e3bfb7874cc6cd49662d4c269305fa9b5234c12. --- extended/src/main/java/apoc/ml/OpenAI.java | 4 ++-- extended/src/main/java/apoc/util/ExtendedMapUtils.java | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/extended/src/main/java/apoc/ml/OpenAI.java b/extended/src/main/java/apoc/ml/OpenAI.java index 1a659f21ef..4bc4814b33 100644 --- a/extended/src/main/java/apoc/ml/OpenAI.java +++ b/extended/src/main/java/apoc/ml/OpenAI.java @@ -3,11 +3,11 @@ import apoc.ApocConfig; import apoc.Extended; import apoc.result.MapResult; -import apoc.util.ExtendedMapUtils; import apoc.util.ExtendedUtil; import apoc.util.JsonUtil; import apoc.util.Util; import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.neo4j.graphdb.security.URLAccessChecker; import org.neo4j.procedure.Context; @@ -225,7 +225,7 @@ public Stream completion(@Name("prompt") String prompt, @Name("api_ke public Stream chatCompletion(@Name("messages") List> messages, @Name("api_key") String apiKey, @Name(value = "configuration", defaultValue = "{}") Map configuration) throws Exception { boolean failOnError = isFailOnError(configuration); if (checkNullInput(messages, failOnError)) return Stream.empty(); - messages = messages.stream().filter(ExtendedMapUtils::isNotEmpty).toList(); + messages = messages.stream().filter(MapUtils::isNotEmpty).toList(); if (checkEmptyInput(messages, failOnError)) return Stream.empty(); configuration.putIfAbsent("model", GPT_4O_MODEL); return executeRequest(apiKey, configuration, "chat/completions", (String) configuration.get("model"), "messages", messages, "$", apocConfig, urlAccessChecker) diff --git a/extended/src/main/java/apoc/util/ExtendedMapUtils.java b/extended/src/main/java/apoc/util/ExtendedMapUtils.java index 06b8ce9c6e..c6b388b07d 100644 --- a/extended/src/main/java/apoc/util/ExtendedMapUtils.java +++ b/extended/src/main/java/apoc/util/ExtendedMapUtils.java @@ -11,8 +11,4 @@ public static int size(final Map map) { public static boolean isEmpty(final Map map) { return map == null || map.isEmpty(); } - - public static boolean isNotEmpty(final Map map) { - return !isEmpty(map); - } } From 66c8bd4fcc84ea44c5323fab56131246cd63458c Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Mon, 16 Dec 2024 12:50:51 +0100 Subject: [PATCH 2/5] Fixes #4244: Make apoc.dv.* procedures work in clusters (#4281) * Fixes #4244: Make apoc.dv.* procedures work in clusters * added procs to extended*.txt * fix tests * cleanup --- .../overview/apoc.dv/apoc.dv.catalog.add.adoc | 4 +- .../apoc.dv/apoc.dv.catalog.list.adoc | 4 +- .../apoc.dv/apoc.dv.catalog.remove.adoc | 4 +- .../ROOT/pages/virtual-resource/index.adoc | 17 +- .../modules/ROOT/partials/dv/deprecated.adoc | 19 + ...rtualizationCatalogClusterRoutingTest.java | 155 ++++++++ ...VirtualizationCatalogNewProcedureTest.java | 220 ++++++++++ .../dv/DataVirtualizationCatalogTest.java | 277 +++---------- .../dv/DataVirtualizationCatalogTestUtil.java | 376 ++++++++++++------ .../apoc/dv/DataVirtualizationCatalog.java | 37 +- .../dv/DataVirtualizationCatalogHandler.java | 1 - ...ualizationCatalogHandlerNewProcedures.java | 61 +++ ...ataVirtualizationCatalogNewProcedures.java | 74 ++++ .../src/main/resources/extendedCypher25.txt | 3 + .../src/main/resources/extendedCypher5.txt | 3 + .../apoc/util/ExtendedTestContainerUtil.java | 2 +- 16 files changed, 898 insertions(+), 359 deletions(-) create mode 100644 docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc create mode 100644 extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogClusterRoutingTest.java create mode 100644 extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogNewProcedureTest.java create mode 100644 extended/src/main/java/apoc/dv/DataVirtualizationCatalogHandlerNewProcedures.java create mode 100644 extended/src/main/java/apoc/dv/DataVirtualizationCatalogNewProcedures.java diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc index 947147749c..1f6d36705b 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.add :description: This section contains reference documentation for the apoc.dv.catalog.add procedure. -label:procedure[] label:apoc-extended[] +label:procedure[] label:apoc-extended[] label:deprecated[] [.emphasis] Add a virtualized resource configuration @@ -17,6 +17,8 @@ Add a virtualized resource configuration apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + [WARNING] ==== This procedure is not intended to be used in a cluster environment, and may act unpredictably. diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc index 3102370188..add2402127 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.list :description: This section contains reference documentation for the apoc.dv.catalog.list procedure. -label:procedure[] label:apoc-extended[] +label:procedure[] label:apoc-extended[] label:deprecated[] [.emphasis] List all virtualized resource configuration @@ -17,6 +17,8 @@ List all virtualized resource configuration apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + == Output parameters [.procedures, opts=header] |=== diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc index 774b34d7cd..c346b66b3d 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.remove :description: This section contains reference documentation for the apoc.dv.catalog.remove procedure. -label:procedure[] label:apoc-extended[] +label:procedure[] label:apoc-extended[] label:deprecated[] [.emphasis] Remove a virtualized resource config by name @@ -17,6 +17,8 @@ Remove a virtualized resource config by name apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + [WARNING] ==== This procedure is not intended to be used in a cluster environment, and may act unpredictably. diff --git a/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc b/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc index 253664b034..a9c5da0708 100644 --- a/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc @@ -2,6 +2,8 @@ = Virtual Resource :description: This chapter describes how to handle external data sources as virtual resource without persisting them in the database +include::partial$systemdbonly.note.adoc[] + [NOTE] ==== There are situations where we would like to enrich/complement the results of a cypher query in a Neo4j graph with additional @@ -40,10 +42,11 @@ image::apoc.dv.imported-graph-from-RDB.png[scaledwidth="100%"] == Managing a Virtualized Resource via JDBC === Creating a Virtualized Resource (JDBC) -Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.add` procedure. -The procedure takes two parameters: +Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.install` procedure. +The procedure takes three parameters: * a name that uniquely identifies the virtualized resource and can be used to query that resource +* the database name where we want to use the resource (default is `'neo4j'`) * a set of parameters indicating the type of the resource (type), the access point (url), the parameterised query that will be run on the access point (query) and the labels that will be applied to the generated virtual nodes (labels). @@ -56,7 +59,7 @@ Here is the cypher that creates such virtualized resource: [source,cypher] ---- -CALL apoc.dv.catalog.add("fr-towns-by-dept", { +CALL apoc.dv.catalog.install("fr-towns-by-dept", "neo4j", { type: "JDBC", url: "jdbc:postgresql://localhost/communes?user=jb&password=jb", labels: ["Town","PopulatedPlace"], @@ -124,11 +127,11 @@ RETURN path ---- === Listing the Virtualized Resource Catalog -The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It takes no parameters. +The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It accepts one parameter: i.e. the database name where we want to use the resource (default is 'neo4j'). [source,cypher] ---- -CALL apoc.dv.catalog.list() +CALL apoc.dv.catalog.show() ---- === Removing Virtualized Resources from the Catalog @@ -136,7 +139,7 @@ When a Virtualized Resource is no longer needed it can be removed from the catal [source,cypher] ---- -CALL apoc.dv.catalog.remove("vr-name") +CALL apoc.dv.catalog.drop("vr-name", ) ---- === Export metadata @@ -165,7 +168,7 @@ Here is the cypher that creates such virtualized resource: [source,cypher] ---- -CALL apoc.dv.catalog.add("prod-details-by-id", { +CALL apoc.dv.catalog.install("prod-details-by-id", "neo4j", { type: "CSV", url: "http://data.neo4j.com/northwind/products.csv", labels: ["ProductDetails"], diff --git a/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc b/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc new file mode 100644 index 0000000000..be2f21d8c6 --- /dev/null +++ b/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc @@ -0,0 +1,19 @@ +[WARNING] +==== +Please note that this procedure is deprecated. + +Use the following ones instead, which allow for better support in a cluster: + +[opts="header"] +|=== +| deprecated procedure | new procedure +| `apoc.dv.catalog.add(, $config)` | `apoc.dv.catalog.install('', '', $config)` +| `apoc.dv.catalog.remove('')` | `apoc.dv.catalog.drop('', '')` +| `apoc.dv.catalog.list()` | `apoc.dv.catalog.show('')` +|=== + +where `` is the database where we want to execute the procedure + +xref::virtual-resource/index.adoc[See here for more info]. + +==== \ No newline at end of file diff --git a/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogClusterRoutingTest.java b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogClusterRoutingTest.java new file mode 100644 index 0000000000..5bb383b2d9 --- /dev/null +++ b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogClusterRoutingTest.java @@ -0,0 +1,155 @@ +package apoc.dv; + +import apoc.util.Neo4jContainerExtension; +import apoc.util.TestContainerUtil; +import apoc.util.TestcontainersCausalCluster; +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Result; +import org.neo4j.driver.Session; +import org.neo4j.driver.SessionConfig; +import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Path; +import org.neo4j.driver.types.Relationship; + +import java.io.File; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static apoc.dv.DataVirtualizationCatalogTestUtil.*; +import static apoc.util.ExtendedTestContainerUtil.dbIsWriter; +import static apoc.util.ExtendedTestContainerUtil.getBoltAddress; +import static apoc.util.ExtendedTestContainerUtil.getDriverIfNotReplica; +import static apoc.util.MapUtil.map; +import static apoc.util.SystemDbUtil.PROCEDURE_NOT_ROUTED_ERROR; +import static apoc.util.TestContainerUtil.importFolder; +import static apoc.util.TestContainerUtil.testCall; +import static apoc.util.TestContainerUtil.testCallEmpty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME; + + +public class DataVirtualizationCatalogClusterRoutingTest { + private static final int NUM_CORES = 3; + private static TestcontainersCausalCluster cluster; + private static Session clusterSession; + private static List members; + + @BeforeClass + public static void setupCluster() throws Exception { + cluster = TestContainerUtil + .createEnterpriseCluster(List.of(TestContainerUtil.ApocPackage.EXTENDED, TestContainerUtil.ApocPackage.CORE), NUM_CORES, 0, + Collections.emptyMap(), + Map.of("NEO4J_dbms_routing_enabled", "true") + ); + clusterSession = cluster.getSession(); + members = cluster.getClusterMembers(); + FileUtils.copyFileToDirectory(new File(new URI(FILE_URL).toURL().getPath()), importFolder); + assertEquals(NUM_CORES, members.size()); + } + + @AfterClass + public static void bringDownCluster() { + cluster.close(); + } + + @Test + public void testVirtualizeCSV() { + dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME, + (session) -> testCall(session, APOC_DV_INSTALL_QUERY, + APOC_DV_INSTALL_PARAMS, + (row) -> assertCatalogContent(row, CSV_TEST_FILE)), APOC_DV_INSTALL_PARAMS + ); + + clusterSession.executeRead(tx -> { + final Result result = tx.run(APOC_DV_QUERY, + Map.of(NAME_KEY, CSV_NAME_VALUE, + APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, + CONFIG_KEY, CONFIG_VALUE) + ); + + Node node = result.single().get(NODE_KEY).asNode(); + assertEquals(NAME_VALUE, node.get(NAME_KEY).asString()); + assertEquals(AGE_VALUE, node.get(AGE_KEY).asString()); + assertEquals(List.of(LABELS_VALUE), node.labels()); + + return result.consume(); + } + ); + + clusterSession.executeWrite(tx -> tx.run(CREATE_HOOK_QUERY, CREATE_HOOK_PARAMS).consume()); + + clusterSession.executeRead(tx -> { + final Result result = tx.run(APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE) + ); + + Path path = result.single().get("path").asPath(); + Node node = path.end(); + assertEquals(NAME_VALUE, node.get(NAME_KEY).asString()); + assertEquals(AGE_VALUE, node.get(AGE_KEY).asString()); + assertEquals(List.of(LABELS_VALUE), node.labels()); + + Node hook = path.start(); + assertEquals(HOOK_NODE_NAME_VALUE, hook.get(NAME_KEY).asString()); + assertEquals(List.of("Hook"), hook.labels()); + + Relationship relationship = path.relationships().iterator().next(); + assertEquals(hook.elementId(), relationship.startNodeElementId()); + assertEquals(node.elementId(), relationship.endNodeElementId()); + assertEquals(RELTYPE_VALUE, relationship.type()); + + return result.consume(); + } + ); + + dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME, + (session) -> testCallEmpty(session, APOC_DV_DROP_QUERY, + APOC_DV_DROP_PARAMS), APOC_DV_DROP_PARAMS + ); + + } + + private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer testDv, Map params) { + dvInSysLeaderMemberCommon(uuidNotRoutedError, dbName, testDv, false, params); + } + + private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer testDv, boolean readOnlyOperation, Map params) { + final List members = cluster.getClusterMembers(); + assertEquals(NUM_CORES, members.size()); + boolean writeExecuted = false; + for (Neo4jContainerExtension container: members) { + // we skip READ_REPLICA members with write operations + // instead, we consider all members with a read only operations + final Driver driver = readOnlyOperation + ? container.getDriver() + : getDriverIfNotReplica(container); + if (driver == null) { + continue; + } + Session session = driver.session(SessionConfig.forDatabase(dbName)); + boolean isWriter = dbIsWriter(dbName, session, getBoltAddress(container)); + if (isWriter) { + testDv.accept(session); + writeExecuted = true; + } else { + try { + testDv.accept(session); + fail("Should fail because of non leader Data Virtualization addition"); + } catch (Exception e) { + String errorMsg = e.getMessage(); + assertTrue("The actual message is: " + errorMsg, errorMsg.contains(uuidNotRoutedError)); + } + } + } + assertTrue(writeExecuted); + } +} diff --git a/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogNewProcedureTest.java b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogNewProcedureTest.java new file mode 100644 index 0000000000..143b1db3e9 --- /dev/null +++ b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogNewProcedureTest.java @@ -0,0 +1,220 @@ +package apoc.dv; + +import apoc.create.Create; +import apoc.load.Jdbc; +import apoc.load.LoadCsv; +import apoc.util.TestUtil; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.dbms.api.DatabaseManagementService; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Result; +import org.neo4j.test.TestDatabaseManagementServiceBuilder; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.MySQLContainer; + +import java.io.File; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static apoc.dv.DataVirtualizationCatalog.DIRECTION_CONF_KEY; +import static apoc.dv.DataVirtualizationCatalogTestUtil.*; + +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testCallCount; + +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class DataVirtualizationCatalogNewProcedureTest { + private static final String DATABASE_NAME = "databaseName"; + private static GraphDatabaseService sysDb; + private static GraphDatabaseService db; + private static DatabaseManagementService databaseManagementService; + + public static JdbcDatabaseContainer mysql; + + @Rule + public TemporaryFolder storeDir = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + databaseManagementService = new TestDatabaseManagementServiceBuilder(storeDir.getRoot().toPath()) + .build(); + db = databaseManagementService.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME); + sysDb = databaseManagementService.database(GraphDatabaseSettings.SYSTEM_DATABASE_NAME); + + FileUtils.copyFile(new File(new URI(FILE_URL).toURL().getPath()), new File(storeDir.getRoot(), CSV_TEST_FILE)); + + TestUtil.registerProcedure(sysDb, DataVirtualizationCatalogNewProcedures.class); + TestUtil.registerProcedure(db, DataVirtualizationCatalog.class, Jdbc.class, LoadCsv.class, Create.class); + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + } + + @BeforeClass + public static void setUpContainer() { + mysql = new MySQLContainer().withInitScript("init_mysql.sql"); + mysql.start(); + } + + @AfterClass + public static void tearDownContainer() { + mysql.stop(); + } + + @Test + public void testVirtualizeCSV() { + getVirtualizeCSVCommonResult(db, + APOC_DV_INSTALL_QUERY, APOC_DV_SHOW_QUERY, CSV_TEST_FILE, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContent); + } + + @Test + public void testVirtualizeCSVWithCustomDirectionIN() { + getVirtualizeCSVCommonResult(db, + APOC_DV_INSTALL_QUERY, APOC_DV_SHOW_QUERY, CSV_TEST_FILE, sysDb); + + Map config = new HashMap<>(CONFIG_VALUE); + config.put(DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name()); + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, config), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBC() { + + getVirtualizeJDBCCommonResult(db, mysql, + APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithCustomDirectionIN() { + + getVirtualizeJDBCCommonResult(db, mysql, + APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBCWithParameterMap() { + + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithParameterMapAndDirectionIN() { + + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testRemove() { + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, + map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY))); + + testCallCount(sysDb, APOC_DV_DROP_QUERY, map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME), 0); + } + + @Test + public void testNameAsKey() { + Map params = map( + DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME, + NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY) + ); + + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, params); + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, params); + testResult(sysDb, APOC_DV_SHOW_QUERY, + (result) -> assertEquals(1, result.stream().count())); + } + + @Test + public void testJDBCQueryWithMixedParamsTypes() { + try { + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, + map( + DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, + "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY_WITH_PARAM) + ) + ); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + assertEquals("The query is mixing parameters with `$` and `?` please use just one notation", rootCause.getMessage()); + } + } + + @Test + public void testVirtualizeJDBCWithDifferentParameterMap() { + final String url = getVirtualizeJDBCUrl(mysql); + final List expectedParams = List.of("$name", "$head_of_state", "$CODE2"); + final List sortedExpectedParams = expectedParams.stream() + .sorted() + .toList(); + testCall(sysDb, APOC_DV_INSTALL_QUERY, + map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, VIRTUALIZE_JDBC_WITH_PARAMS_QUERY)), + (row) -> assertDvCatalogAddOrInstall(row, url)); + + try { + db.executeTransactionally(APOC_DV_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_WRONG_PARAMS, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + Result::resultAsString); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + final List actualParams = VIRTUALIZE_JDBC_QUERY_WRONG_PARAMS.keySet().stream() + .map(s -> "$" + s) + .sorted() + .toList(); + assertEquals(String.format("Expected query parameters are %s, actual are %s", sortedExpectedParams, actualParams), rootCause.getMessage()); + } + } +} diff --git a/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogTest.java b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogTest.java index 85c53f8d22..4dc1c44392 100644 --- a/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogTest.java +++ b/extended-it/src/test/java/apoc/dv/DataVirtualizationCatalogTest.java @@ -11,10 +11,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.neo4j.graphdb.Label; -import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Path; -import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.Result; import org.neo4j.test.rule.DbmsRule; import org.neo4j.test.rule.ImpermanentDbmsRule; @@ -22,13 +18,16 @@ import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.MySQLContainer; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; import static apoc.ApocConfig.apocConfig; +import static apoc.dv.DataVirtualizationCatalog.DIRECTION_CONF_KEY; import static apoc.dv.DataVirtualizationCatalogTestUtil.*; +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.getUrlFileName; import static apoc.util.TestUtil.testCall; import static apoc.util.TestUtil.testCallEmpty; import static apoc.util.TestUtil.testResult; @@ -61,228 +60,102 @@ public static void tearDownContainer() { @Test public void testVirtualizeCSV() { - CsvTestResult result = getCsvCommonResult(db); + final String url = getUrlFileName("test.csv").toString(); + getVirtualizeCSVCommonResult(db, APOC_DV_ADD_QUERY, APOC_DV_LIST, url, db); - final String relType = "LINKED_TO"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, "config", Map.of("header", true)), - (row) -> { - Path path = (Path) row.get("path"); - Node node = path.endNode(); - assertEquals(result.personName(), node.getProperty("name")); - assertEquals(result.personAge(), node.getProperty("age")); - assertEquals(List.of(Label.label("Person")), node.getLabels()); - - Node hook = path.startNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); - - Relationship relationship = path.lastRelationship(); - assertEquals(hook, relationship.getStartNode()); - assertEquals(node, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContent); } - + @Test public void testVirtualizeCSVWithCustomDirectionIN() { - CsvTestResult result = getCsvCommonResult(db); - - final String relType = "LINKED_TO"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, "config", Map.of("header", true, "direction", "IN")), - (row) -> { - Path path = (Path) row.get("path"); - Node hook = path.endNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); - Node node = path.startNode(); - - assertEquals(result.personName(), node.getProperty("name")); - assertEquals(result.personAge(), node.getProperty("age")); - assertEquals(List.of(Label.label("Person")), node.getLabels()); - - Relationship relationship = path.lastRelationship(); - assertEquals(node, relationship.getStartNode()); - assertEquals(hook, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + final String url = getUrlFileName("test.csv").toString(); + getVirtualizeCSVCommonResult(db, APOC_DV_ADD_QUERY, APOC_DV_LIST, url, db); + Map config = new HashMap<>(CONFIG_VALUE); + config.put(DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name()); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, config), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContentDirectionIN); } @Test public void testVirtualizeJDBC() { - VirtualizeJdbcResult result = getVirtualizeJdbcCommonResult(db, mysql); - - final String relType = "LINKED_TO_NEW"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, - "config", Map.of("credentials", Map.of("user", mysql.getUsername(), "password", mysql.getPassword()))), - (row) -> { - Path path = (Path) row.get("path"); - Node node = path.endNode(); - assertEquals(result.country(), node.getProperty("Name")); - assertEquals(result.labels(), node.getLabels()); - - Node hook = path.startNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); + getVirtualizeJDBCCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); - Relationship relationship = path.lastRelationship(); - assertEquals(hook, relationship.getStartNode()); - assertEquals(node, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); } @Test public void testVirtualizeJDBCWithCustomDirectionIN() { - VirtualizeJdbcResult result = getVirtualizeJdbcCommonResult(db, mysql); + getVirtualizeJDBCCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); + - final String relType = "LINKED_TO_NEW"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, - "config", Map.of( - "credentials", Map.of("user", mysql.getUsername(), "password", mysql.getPassword()), - "direction", "IN" + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) )), - (row) -> { - Path path = (Path) row.get("path"); - Node hook = path.endNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); - - Node node = path.startNode(); - assertEquals(result.country(), node.getProperty("Name")); - assertEquals(result.labels(), node.getLabels()); - - Relationship relationship = path.lastRelationship(); - assertEquals(node, relationship.getStartNode()); - assertEquals(hook, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); } @Test public void testVirtualizeJDBCWithParameterMap() { - VirtualizeJdbcWithParameterResult result = getVirtualizeJdbcWithParamsCommonResult(db, mysql); - - final String relType = "LINKED_TO_NEW"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, - "config", Map.of("credentials", Map.of("user", mysql.getUsername(), "password", mysql.getPassword()))), - (row) -> { - Path path = (Path) row.get("path"); - Node node = path.endNode(); - assertEquals(result.country(), node.getProperty("Name")); - assertEquals(result.labels(), node.getLabels()); - - Node hook = path.startNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); - Relationship relationship = path.lastRelationship(); - assertEquals(hook, relationship.getStartNode()); - assertEquals(node, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql)) + ), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); } - @Test public void testVirtualizeJDBCWithParameterMapAndDirectionIN() { - VirtualizeJdbcWithParameterResult result = getVirtualizeJdbcWithParamsCommonResult(db, mysql); + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); - final String relType = "LINKED_TO_NEW"; - testCall(db, "MATCH (hook:Hook) WITH hook " + - "CALL apoc.dv.queryAndLink(hook, $relType, $name, $queryParams, $config) yield path " + - "RETURN path ", - Map.of("name", result.name(), "queryParams", result.queryParams(), "relType", relType, - "config", Map.of("credentials", Map.of("user", mysql.getUsername(), "password", mysql.getPassword()), - "direction", "IN" + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) )), - (row) -> { - Path path = (Path) row.get("path"); - Node hook = path.endNode(); - assertEquals(result.hookNodeName(), hook.getProperty("name")); - assertEquals(List.of(Label.label("Hook")), hook.getLabels()); - - Node node = path.startNode(); - assertEquals(result.country(), node.getProperty("Name")); - assertEquals(result.labels(), node.getLabels()); - - Relationship relationship = path.lastRelationship(); - assertEquals(node, relationship.getStartNode()); - assertEquals(hook, relationship.getEndNode()); - assertEquals(relType, relationship.getType().name()); - }); + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); } @Test public void testRemove() { - String name = "jdbc_vr"; - String desc = "country details"; - List labelsAsString = List.of("Country"); - final String query = "SELECT * FROM country WHERE Name = $name"; - final String url = mysql.getJdbcUrl() + "?useSSL=false"; - Map map = Map.of("type", "JDBC", - "url", url, "query", query, - "desc", desc, - "labels", labelsAsString); - - db.executeTransactionally("CALL apoc.dv.catalog.add($name, $map)", - Map.of("name", name, "map", map)); + db.executeTransactionally(APOC_DV_ADD_QUERY, + map("name", JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY))); - testCallEmpty(db, "CALL apoc.dv.catalog.remove($name)", Map.of("name", name)); + testCallEmpty(db, "CALL apoc.dv.catalog.remove($name)", map("name", JDBC_NAME)); } @Test public void testNameAsKey() { - String name = "jdbc_vr"; - String desc = "country details"; - List labelsAsString = List.of("Country"); - final String query = "SELECT * FROM country WHERE Name = $name"; - final String url = mysql.getJdbcUrl() + "?useSSL=false"; - Map map = Map.of("type", "JDBC", - "url", url, "query", query, - "desc", desc, - "labels", labelsAsString); + Map params = map( + NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY) + ); - db.executeTransactionally("CALL apoc.dv.catalog.add($name, $map)", - Map.of("name", name, "map", map)); - db.executeTransactionally("CALL apoc.dv.catalog.add($name, $map)", - Map.of("name", name, "map", map)); + db.executeTransactionally(APOC_DV_ADD_QUERY, params); + db.executeTransactionally(APOC_DV_ADD_QUERY, params); testResult(db, "CALL apoc.dv.catalog.list()", - Map.of(), + map(), (result) -> assertEquals(1, result.stream().count())); } @Test public void testJDBCQueryWithMixedParamsTypes() { try { - String name = "jdbc_vr"; - String desc = "country details"; - List labelsAsString = List.of("Country"); - final String query = "SELECT * FROM country WHERE Name = $name AND param_with_question_mark = ? "; - final String url = mysql.getJdbcUrl() + "?useSSL=false"; - Map map = Map.of("type", "JDBC", - "url", url, "query", query, - "desc", desc, - "labels", labelsAsString); - - db.executeTransactionally("CALL apoc.dv.catalog.add($name, $map)", - Map.of("name", name, "map", map)); + db.executeTransactionally(APOC_DV_ADD_QUERY, + map("name", JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY_WITH_PARAM))); Assert.fail("Exception is expected"); } catch (Exception e) { final Throwable rootCause = ExceptionUtils.getRootCause(e); @@ -293,41 +166,20 @@ public void testJDBCQueryWithMixedParamsTypes() { @Test public void testVirtualizeJDBCWithDifferentParameterMap() { - String name = "jdbc_vr"; - String desc = "country details"; - List