Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2] Neo4j source connector #2777

Merged
merged 39 commits into from
Oct 21, 2022

Conversation

getChan
Copy link
Contributor

@getChan getChan commented Sep 18, 2022

Purpose of this pull request

To add Neo4j Source Connector

Check list

@CalvinKirs
Copy link
Member

hi, thanks for your contribution, could you add the e2e test for this PR?

https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e

Comment on lines 29 to 40
public static final String PLUGIN_NAME = "Neo4j";

public static final String KEY_NEO4J_URI = "uri";
public static final String KEY_USERNAME = "username";
public static final String KEY_PASSWORD = "password";
public static final String KEY_BEARER_TOKEN = "bearer_token";
public static final String KEY_KERBEROS_TICKET = "kerberos_ticket"; // Base64 encoded

public static final String KEY_DATABASE = "database";
public static final String KEY_QUERY = "query";
public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract common configuration to Neo4jCommonConfig?

Comment on lines 93 to 122
if (dataType.equals(BasicType.STRING_TYPE)) {
return value.asString();
} else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
return value.asBoolean();
} else if (dataType.equals(BasicType.LONG_TYPE)) {
return value.asLong();
} else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
return value.asDouble();
} else if (dataType.equals(BasicType.VOID_TYPE)) {
return null;
} else if (dataType.equals(PrimitiveByteArrayType.INSTANCE)) {
return value.asByteArray();
} else if (dataType.equals(LocalTimeType.LOCAL_DATE_TYPE)) {
return value.asLocalDate();
} else if (dataType.equals(LocalTimeType.LOCAL_TIME_TYPE)) {
return value.asLocalTime();
} else if (dataType.equals(LocalTimeType.LOCAL_DATE_TIME_TYPE)) {
return value.asLocalDateTime();
} else if (dataType instanceof MapType) {
if (!((MapType<?, ?>) dataType).getKeyType().equals(BasicType.STRING_TYPE)) {
throw new IllegalArgumentException("Key Type of MapType must String type");
}
return value.asMap();
} else if (dataType.equals(BasicType.INT_TYPE)) {
return value.asInt();
} else if (dataType.equals(BasicType.FLOAT_TYPE)) {
return value.asFloat();
} else {
throw new IllegalArgumentException("not supported data type: " + dataType);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use switch(dataType.getSqlType()) ?

@getChan
Copy link
Contributor Author

getChan commented Sep 22, 2022

@hailin0
thanks for your review. I fixed it.
and add e2e test.

}
neo4jSourceConfig.setQuery(pluginConfig.getString(KEY_QUERY));

final CheckResult schemaConfigCheck = CheckConfigUtil.checkAllExists(pluginConfig, SeaTunnelSchema.SCHEMA);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, SeaTunnelSchema.SCHEMA)

Objects.requireNonNull(value);

switch (dataType.getSqlType()) {
case STRING:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you case Neo4j.List to SqlType.ARRAY?

// then
Assertions.assertEquals(0, execResult.getExitCode());

final Result result = neo4jSession.run("MATCH (a:Person) RETURN a.name, a.age");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test all of the datatype.

@Test
public void testSource() throws IOException, InterruptedException {
// given
neo4jSession.run("CREATE (a:Person {name: 'foo', age: 10})");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Container.ExecResult execResult = executeSeaTunnelSparkJob("/neo4j/fake_to_neo4j.conf");

// then
Assertions.assertEquals(0, execResult.getExitCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the flink e2e.

Comment on lines 35 to 36
name=STRING
age=INT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query = "MATCH (a:Person) RETURN a.name, a.age"

schema {
fields {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FakeSource {
result_table_name = "fake"
schema = {
fields = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query = "MATCH (a:Person) RETURN a.name, a.age"

schema {
fields {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Test
public void testSink() throws IOException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@getChan
Copy link
Contributor Author

getChan commented Oct 9, 2022

@EricJoy2048 @hailin0
I applied your reviews.
except for TIME datatypes at spark. It does not supported at spark

@EricJoy2048
Copy link
Member

@EricJoy2048 @hailin0 I applied your reviews. except for TIME datatypes at spark. It does not supported at spark

Yes I know.

EricJoy2048
EricJoy2048 previously approved these changes Oct 13, 2022
Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +94 to +95
a.age=INT
a.name=STRING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to include .

e.g.

xxx_age = INT
xxx_name = STRING

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0
I think the dot(.) expression is familiar to neo4j users.
because dot means reference of node's property.

is the dot naming in the field forbidden in SeaTunnel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0 PTAL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the dot naming in the field forbidden in SeaTunnel?

You could use, seatunnel currently has no restrictions on field names

…-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java

Co-authored-by: hailin0 <[email protected]>
@EricJoy2048
Copy link
Member

Hi, @getChan Thanks for your contribution, Please resolve conflicts.

getChan and others added 2 commits October 16, 2022 21:44
# Conflicts:
#	seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
#	seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@EricJoy2048 EricJoy2048 merged commit 38b0daf into apache:dev Oct 21, 2022
@CalvinKirs
Copy link
Member

Thanks:)

Carl-Zhou-CN pushed a commit to Carl-Zhou-CN/incubator-seatunnel that referenced this pull request Oct 27, 2022
Carl-Zhou-CN pushed a commit to Carl-Zhou-CN/incubator-seatunnel that referenced this pull request Oct 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants