From b6688c26d9b2c955251829a6e928238276453ff9 Mon Sep 17 00:00:00 2001 From: iture123 Date: Tue, 20 Sep 2022 22:53:05 +0800 Subject: [PATCH] [Feature][Connector-V2]new connector of Elasticsearch source(#2553) --- docs/en/connector-v2/source/Elasticsearch.md | 64 +++++++ plugin-mapping.properties | 3 +- .../elasticsearch/client/EsRestClient.java | 173 ++++++++++++++++-- .../config/EsClusterConnectionConfig.java | 28 +++ .../config/source/SourceConfig.java | 30 +++ .../source/SourceConfigDeaultConstant.java | 26 +++ .../dto/source/IndexDocsCount.java | 43 +++++ .../dto/source/ScrollResult.java | 43 +++++ .../dto/source/SourceIndexInfo.java | 67 +++++++ .../exception/GetIndexDocsCountException.java | 30 +++ .../exception/ScrollRequestException.java | 30 +++ .../elasticsearch/sink/ElasticsearchSink.java | 4 +- .../sink/ElasticsearchSinkWriter.java | 28 +-- .../source/ElasticsearchSource.java | 87 +++++++++ .../source/ElasticsearchSourceReader.java | 121 ++++++++++++ .../source/ElasticsearchSourceSplit.java | 53 ++++++ .../ElasticsearchSourceSplitEnumerator.java | 116 ++++++++++++ .../source/ElasticsearchSourceState.java | 23 +++ .../connector-elasticsearch-flink-e2e/pom.xml | 59 ++++++ .../ElasticsearchSourceToConsoleIT.java | 87 +++++++++ .../FakeSourceToElasticsearchIT.java | 62 +++++++ .../elasticsearch_to_console.conf | 69 +++++++ .../fakesource_to_elasticsearch.conf | 62 +++++++ .../src/test/resources/log4j.properties | 22 +++ .../connector-elasticsearch-spark-e2e/pom.xml | 61 ++++++ .../ElasticsearchSourceToConsoleIT.java | 88 +++++++++ .../FakeSourceToElasticsearchIT.java | 66 +++++++ .../elasticsearch_to_console.conf | 72 ++++++++ .../fakesource_to_elasticsearch.conf | 65 +++++++ .../src/test/resources/log4j.properties | 22 +++ 30 files changed, 1661 insertions(+), 43 deletions(-) create mode 100644 docs/en/connector-v2/source/Elasticsearch.md create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/FakeSourceToElasticsearchIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/FakeSourceToElasticsearchIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties diff --git a/docs/en/connector-v2/source/Elasticsearch.md b/docs/en/connector-v2/source/Elasticsearch.md new file mode 100644 index 00000000000..b5301148359 --- /dev/null +++ b/docs/en/connector-v2/source/Elasticsearch.md @@ -0,0 +1,64 @@ +# Elasticsearch + +> Elasticsearch source connector + +## Description + +Used to read data from Elasticsearch. + +support version >= 2.x and < 8.x. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------|--------| -------- |---------------| +| hosts | array | yes | - | +| username | string | no | - | +| password | string | no | - | +| index | string | yes | - | +| source | array | yes | - | +| scroll_time | string | no | 1m | +| scroll_size | int | no | 100 | + + + +### hosts [array] +Elasticsearch cluster http address, the format is `host:port`, allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`. + +### username [string] +x-pack username. + +### password [string] +x-pack password. + +### index [string] +Elasticsearch index name, support * fuzzy matching. + +### source [array] +The fields of index. +You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit. + +### scroll_time [String] +Amount of time Elasticsearch will keep the search context alive for scroll requests. + +### scroll_size [int] +Maximum number of hits to be returned with each Elasticsearch scroll request. + +## Examples +simple +```hocon +Elasticsearch { + hosts = ["localhost:9200"] + index = "seatunnel-*" + source = ["_id","name","age"] +} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index bf4b0ce5f9a..4324c85b868 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -116,7 +116,8 @@ seatunnel.sink.OssFile = connector-file-oss seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk -seatunnel.sink.elasticsearch = connector-elasticsearch +seatunnel.source.Elasticsearch = connector-elasticsearch +seatunnel.sink.Elasticsearch = connector-elasticsearch seatunnel.source.IoTDB = connector-iotdb seatunnel.sink.IoTDB = connector-iotdb seatunnel.sink.Neo4j = connector-neo4j diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index 661d5b7ca6c..3cf0807484d 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -17,12 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; @@ -30,6 +40,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.util.Asserts; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -37,18 +48,44 @@ import org.elasticsearch.client.RestClientBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; public class EsRestClient { - private static EsRestClient ES_REST_CLIENT; - private static RestClient REST_CLIENT; + private static final int CONNECTION_REQUEST_TIMEOUT = 10 * 1000; + + private static final int SOCKET_TIMEOUT = 5 * 60 * 1000; - private EsRestClient() { + private final RestClient restClient; + private final ObjectMapper mapper = new ObjectMapper(); + + private EsRestClient(RestClient restClient) { + this.restClient = restClient; + } + + public static EsRestClient createInstance(Config pluginConfig) { + List hosts = pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS); + String username = null; + String password = null; + if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) { + username = pluginConfig.getString(EsClusterConnectionConfig.USERNAME); + if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) { + password = pluginConfig.getString(EsClusterConnectionConfig.PASSWORD); + } + } + return createInstance(hosts, username, password); + } + + public static EsRestClient createInstance(List hosts, String username, String password) { + RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password); + return new EsRestClient(restClientBuilder.build()); } - @SuppressWarnings("checkstyle:MagicNumber") private static RestClientBuilder getRestClientBuilder(List hosts, String username, String password) { HttpHost[] httpHosts = new HttpHost[hosts.size()]; for (int i = 0; i < hosts.size(); i++) { @@ -58,8 +95,8 @@ private static RestClientBuilder getRestClientBuilder(List hosts, String RestClientBuilder builder = RestClient.builder(httpHosts) .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder - .setConnectionRequestTimeout(10 * 1000) - .setSocketTimeout(5 * 60 * 1000)); + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT) + .setSocketTimeout(SOCKET_TIMEOUT)); if (StringUtils.isNotEmpty(username)) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -69,20 +106,11 @@ private static RestClientBuilder getRestClientBuilder(List hosts, String return builder; } - public static EsRestClient getInstance(List hosts, String username, String password) { - if (REST_CLIENT == null) { - RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password); - REST_CLIENT = restClientBuilder.build(); - ES_REST_CLIENT = new EsRestClient(); - } - return ES_REST_CLIENT; - } - public BulkResponse bulk(String requestBody) { Request request = new Request("POST", "_bulk"); request.setJsonEntity(requestBody); try { - Response response = REST_CLIENT.performRequest(request); + Response response = restClient.performRequest(request); if (response == null) { throw new BulkElasticsearchException("bulk es Response is null"); } @@ -104,10 +132,10 @@ public BulkResponse bulk(String requestBody) { /** * @return version.number, example:2.0.0 */ - public static String getClusterVersion() { + public String getClusterVersion() { Request request = new Request("GET", "/"); try { - Response response = REST_CLIENT.performRequest(request); + Response response = restClient.performRequest(request); String result = EntityUtils.toString(response.getEntity()); ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(result); @@ -119,7 +147,114 @@ public static String getClusterVersion() { } public void close() throws IOException { - REST_CLIENT.close(); + restClient.close(); + } + + /** + * first time to request search documents by scroll + * call /${index}/_search?scroll=${scroll} + * + * @param index index name + * @param source select fields + * @param scrollTime such as:1m + * @param scrollSize fetch documents count in one request + */ + public ScrollResult searchByScroll(String index, List source, String scrollTime, int scrollSize) { + Map param = new HashMap<>(); + Map query = new HashMap<>(); + query.put("match_all", new HashMap()); + param.put("query", query); + param.put("_source", source); + param.put("sort", new String[]{"_doc"}); + param.put("size", scrollSize); + String endpoint = index + "/_search?scroll=" + scrollTime; + ScrollResult scrollResult = getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param)); + return scrollResult; + } + + /** + * scroll to get result + * call _search/scroll + * + * @param scrollId the scroll id of the last request + * @param scrollTime such as:1m + */ + public ScrollResult searchWithScrollId(String scrollId, String scrollTime) { + Map param = new HashMap<>(); + param.put("scroll_id", scrollId); + param.put("scroll", scrollTime); + ScrollResult scrollResult = getDocsFromScrollRequest("_search/scroll", JsonUtils.toJsonString(param)); + return scrollResult; + } + + private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) { + Request request = new Request("POST", endpoint); + request.setJsonEntity(requestBody); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new ScrollRequestException("POST " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String entity = EntityUtils.toString(response.getEntity()); + ObjectNode responseJson = JsonUtils.parseObject(entity); + + JsonNode shards = responseJson.get("_shards"); + int totalShards = shards.get("total").intValue(); + int successful = shards.get("successful").intValue(); + Asserts.check(totalShards == successful, String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful)); + + ScrollResult scrollResult = getDocsFromScrollResponse(responseJson); + return scrollResult; + } else { + throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody)); + } + } catch (IOException e) { + throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e); + + } + } + + private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) { + ScrollResult scrollResult = new ScrollResult(); + String scrollId = responseJson.get("_scroll_id").asText(); + scrollResult.setScrollId(scrollId); + + JsonNode hitsNode = responseJson.get("hits").get("hits"); + List> docs = new ArrayList<>(hitsNode.size()); + scrollResult.setDocs(docs); + + Iterator iter = hitsNode.iterator(); + while (iter.hasNext()) { + Map doc = new HashMap<>(); + JsonNode hitNode = iter.next(); + doc.put("_index", hitNode.get("_index").textValue()); + doc.put("_id", hitNode.get("_id").textValue()); + Map source = mapper.convertValue(hitNode.get("_source"), new TypeReference>(){}); + doc.putAll(source); + docs.add(doc); + } + return scrollResult; + } + + public List getIndexDocsCount(String index) { + String endpoint = String.format("_cat/indices/%s?h=index,docsCount&format=json", index); + Request request = new Request("GET", endpoint); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new GetIndexDocsCountException("POST " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String entity = EntityUtils.toString(response.getEntity()); + List indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class); + return indexDocsCounts; + } else { + throw new GetIndexDocsCountException(String.format("POST %s response status code=%d", endpoint, response.getStatusLine().getStatusCode())); + } + } catch (IOException ex) { + throw new GetIndexDocsCountException(ex); + } } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java new file mode 100644 index 00000000000..fd482db5efd --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config; + +public class EsClusterConnectionConfig { + + public static final String HOSTS = "hosts"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java new file mode 100644 index 00000000000..8040803079b --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source; + +public class SourceConfig { + + public static final String INDEX = "index"; + + public static final String SOURCE = "source"; + + public static final String SCROLL_TIME = "scroll_time"; + + public static final String SCROLL_SIZE = "scroll_size"; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java new file mode 100644 index 00000000000..035b556b6ff --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source; + +public class SourceConfigDeaultConstant { + + public static final String SCROLLL_TIME = "1m"; + + public static final int SCROLLL_SIZE = 100; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java new file mode 100644 index 00000000000..314c1ed4867 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +public class IndexDocsCount { + + private String index; + /** + * index docs count + */ + private Long docsCount; + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public Long getDocsCount() { + return docsCount; + } + + public void setDocsCount(Long docsCount) { + this.docsCount = docsCount; + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java new file mode 100644 index 00000000000..7608f3a24bc --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +import java.util.List; +import java.util.Map; + +public class ScrollResult { + + private String scrollId; + private List> docs; + + public String getScrollId() { + return scrollId; + } + + public void setScrollId(String scrollId) { + this.scrollId = scrollId; + } + + public List> getDocs() { + return docs; + } + + public void setDocs(List> docs) { + this.docs = docs; + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java new file mode 100644 index 00000000000..0f1d018b5b2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +import java.io.Serializable; +import java.util.List; + +public class SourceIndexInfo implements Serializable { + private String index; + private List source; + private String scrollTime; + private int scrollSize; + + public SourceIndexInfo(String index, List source, String scrollTime, int scrollSize) { + this.index = index; + this.source = source; + this.scrollTime = scrollTime; + this.scrollSize = scrollSize; + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public List getSource() { + return source; + } + + public void setSource(List source) { + this.source = source; + } + + public String getScrollTime() { + return scrollTime; + } + + public void setScrollTime(String scrollTime) { + this.scrollTime = scrollTime; + } + + public int getScrollSize() { + return scrollSize; + } + + public void setScrollSize(int scrollSize) { + this.scrollSize = scrollSize; + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java new file mode 100644 index 00000000000..19925a966f4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception; + +public class GetIndexDocsCountException extends RuntimeException { + + public GetIndexDocsCountException(String message) { + super(message); + } + + public GetIndexDocsCountException(Throwable cause) { + super(cause); + } + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java new file mode 100644 index 00000000000..f0948341790 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception; + +public class ScrollRequestException extends RuntimeException { + + public ScrollRequestException(String message, Throwable cause) { + super(message, cause); + } + + public ScrollRequestException(String message) { + super(message); + } + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index e0f7630cf9a..6864e2aa872 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -45,8 +45,7 @@ public String getPluginName() { } @Override - public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws - PrepareFailException { + public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws PrepareFailException { this.pluginConfig = pluginConfig; SinkConfig.setValue(pluginConfig); } @@ -65,4 +64,5 @@ public SeaTunnelDataType getConsumedType() { public SinkWriter createWriter(SinkWriter.Context context) { return new ElasticsearchSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList()); } + } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index d527d90d58c..615f303ba44 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; @@ -30,6 +29,7 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -44,45 +44,31 @@ /** * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch. */ -public class ElasticsearchSinkWriter implements SinkWriter { +public class ElasticsearchSinkWriter implements SinkWriter { - private final Context context; + private final SinkWriter.Context context; private final SeaTunnelRowSerializer seaTunnelRowSerializer; private final List requestEsList; private EsRestClient esRestClient; - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class); public ElasticsearchSinkWriter( - Context context, + SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, Config pluginConfig, - List elasticsearchStates) { + List elasticsearchStates) { this.context = context; IndexInfo indexInfo = new IndexInfo(pluginConfig); - initRestClient(pluginConfig); - ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(EsRestClient.getClusterVersion()); + esRestClient = EsRestClient.createInstance(pluginConfig); + ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(esRestClient.getClusterVersion()); this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType); this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE); } - private void initRestClient(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) { - List hosts = pluginConfig.getStringList(SinkConfig.HOSTS); - String username = null; - String password = null; - if (pluginConfig.hasPath(SinkConfig.USERNAME)) { - username = pluginConfig.getString(SinkConfig.USERNAME); - if (pluginConfig.hasPath(SinkConfig.PASSWORD)) { - password = pluginConfig.getString(SinkConfig.PASSWORD); - } - } - esRestClient = EsRestClient.getInstance(hosts, username, password); - } - @Override public void write(SeaTunnelRow element) { String indexRequestRow = seaTunnelRowSerializer.serializeRow(element); diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java new file mode 100644 index 00000000000..1f639f83687 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.util.List; + +@AutoService(SeaTunnelSource.class) +public class ElasticsearchSource implements SeaTunnelSource { + + + private Config pluginConfig; + + private SeaTunnelRowType rowTypeInfo; + + @Override + public String getPluginName() { + return "Elasticsearch"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + List source = pluginConfig.getStringList(SourceConfig.SOURCE); + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[source.size()]; + for (int i = 0; i < source.size(); i++) { + fieldTypes[i] = BasicType.STRING_TYPE; + } + rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[source.size()]), fieldTypes); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new ElasticsearchSourceReader(readerContext, pluginConfig); + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) { + return new ElasticsearchSourceSplitEnumerator(enumeratorContext, pluginConfig); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, ElasticsearchSourceState checkpointState) throws Exception { + return new ElasticsearchSourceSplitEnumerator(enumeratorContext, pluginConfig); + } + +} + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java new file mode 100644 index 00000000000..63bdd78172e --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSourceReader implements SourceReader { + + protected static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSourceReader.class); + + SourceReader.Context context; + + private Config pluginConfig; + + private EsRestClient esRestClient; + + Deque splits = new LinkedList<>(); + boolean noMoreSplit; + + private final long pollNextWaitTime = 1000L; + + public ElasticsearchSourceReader(SourceReader.Context context, Config pluginConfig) { + this.context = context; + this.pluginConfig = pluginConfig; + } + + @Override + public void open() { + esRestClient = EsRestClient.createInstance(this.pluginConfig); + } + + @Override + public void close() throws IOException { + esRestClient.close(); + } + + @Override + public void pollNext(Collector output) throws Exception { + ElasticsearchSourceSplit split = splits.poll(); + if (null != split) { + SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo(); + + ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize()); + outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) { + scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime()); + outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + } + } else if (noMoreSplit) { + // signal to the source that we have reached the end of the data. + LOG.info("Closed the bounded ELasticsearch source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(pollNextWaitTime); + } + } + + private void outputFromScrollResult(ScrollResult scrollResult, List source, Collector output) { + int sourceSize = source.size(); + for (Map doc : scrollResult.getDocs()) { + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(sourceSize); + for (int i = 0; i < sourceSize; i++) { + Object value = doc.get(source.get(i)); + seaTunnelRow.setField(i, String.valueOf(value)); + } + output.collect(seaTunnelRow); + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(splits); + } + + @Override + public void addSplits(List splits) { + this.splits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java new file mode 100644 index 00000000000..33626f1ed82 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; + +public class ElasticsearchSourceSplit implements SourceSplit { + + private SourceIndexInfo sourceIndexInfo; + private int splitId; + + public ElasticsearchSourceSplit(SourceIndexInfo sourceIndexInfo, int splitId) { + this.sourceIndexInfo = sourceIndexInfo; + this.splitId = splitId; + } + + @Override + public String splitId() { + return String.valueOf(splitId); + } + + public SourceIndexInfo getSourceIndexInfo() { + return sourceIndexInfo; + } + + public void setSourceIndexInfo(SourceIndexInfo sourceIndexInfo) { + this.sourceIndexInfo = sourceIndexInfo; + } + + public int getSplitId() { + return splitId; + } + + public void setSplitId(int splitId) { + this.splitId = splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java new file mode 100644 index 00000000000..becef4ff7fd --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfigDeaultConstant; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +public class ElasticsearchSourceSplitEnumerator implements SourceSplitEnumerator { + + private SourceSplitEnumerator.Context enumeratorContext; + + private Config pluginConfig; + + private EsRestClient esRestClient; + + public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext, Config pluginConfig) { + this.enumeratorContext = enumeratorContext; + this.pluginConfig = pluginConfig; + } + + @Override + public void open() { + esRestClient = EsRestClient.createInstance(pluginConfig); + } + + @Override + public void run() throws Exception { + + } + + @Override + public void close() throws IOException { + esRestClient.close(); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + + } + + @Override + public int currentUnassignedSplitSize() { + return 0; + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + String scrolllTime = SourceConfigDeaultConstant.SCROLLL_TIME; + if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME)) { + scrolllTime = pluginConfig.getString(SourceConfig.SCROLL_TIME); + } + int scrollSize = SourceConfigDeaultConstant.SCROLLL_SIZE; + if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE)) { + scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE); + } + + List indexDocsCounts = esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX)); + indexDocsCounts = indexDocsCounts.stream().filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) + .sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList()); + List splits = new ArrayList<>(); + int parallelism = enumeratorContext.currentParallelism(); + List sources = pluginConfig.getStringList(SourceConfig.SOURCE); + + for (int i = 0; i < indexDocsCounts.size(); i++) { + IndexDocsCount indexDocsCount = indexDocsCounts.get(i); + if (i % parallelism == subtaskId) { + splits.add(new ElasticsearchSourceSplit(new SourceIndexInfo(indexDocsCount.getIndex(), sources, scrolllTime, scrollSize), subtaskId)); + } + } + + enumeratorContext.assignSplit(subtaskId, splits); + enumeratorContext.signalNoMoreSplits(subtaskId); + } + + @Override + public ElasticsearchSourceState snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java new file mode 100644 index 00000000000..0bb3b12d00f --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import java.io.Serializable; + +public class ElasticsearchSourceState implements Serializable { +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml new file mode 100644 index 00000000000..e1c1e4841a8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml @@ -0,0 +1,59 @@ + + + + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-elasticsearch-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-elasticsearch + ${project.version} + test + + + + org.testcontainers + elasticsearch + 1.17.3 + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java new file mode 100644 index 00000000000..10e0e8c0a2b --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.elasticsearch; + +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +public class ElasticsearchSourceToConsoleIT extends FlinkContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSourceToConsoleIT.class); + + private ElasticsearchContainer container; + + @SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"}) + @BeforeEach + public void startElasticsearchContainer() throws InterruptedException { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")) + .withNetwork(NETWORK) + .withNetworkAliases("elasticsearch") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); + container.start(); + LOGGER.info("Elasticsearch container started"); + Thread.sleep(5000L); + createIndexDocs(); + } + + /** + * create a index,and bulk some documents + */ + private void createIndexDocs() { + EsRestClient esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); + String requestBody = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"EbvYoFkXtS\",\"age\":18}\n" + + "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"LjFMprGLJZ\",\"age\":19}\n" + + "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"uJTtAVuSyI\",\"age\":20}\n"; + esRestClient.bulk(requestBody); + try { + esRestClient.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testElasticsearchSourceToConsoleSink() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closeContainer() { + if (container != null) { + container.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/FakeSourceToElasticsearchIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/FakeSourceToElasticsearchIT.java new file mode 100644 index 00000000000..a5d864a02cf --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/FakeSourceToElasticsearchIT.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.elasticsearch; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +public class FakeSourceToElasticsearchIT extends FlinkContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToElasticsearchIT.class); + + private ElasticsearchContainer container; + + @SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"}) + @BeforeEach + public void startElasticsearchContainer() throws InterruptedException { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")).withNetwork(NETWORK).withNetworkAliases("elasticsearch").withLogConsumer(new Slf4jLogConsumer(LOGGER)); + container.start(); + LOGGER.info("Elasticsearch container started"); + Thread.sleep(5000L); + + } + + @Test + public void testFakeSourceToELasticsearchSink() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/elasticsearch/fakesource_to_elasticsearch.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closeContainer() { + if (container != null) { + container.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf new file mode 100644 index 00000000000..9a13c8ec135 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Elasticsearch { + hosts = ["elasticsearch:9200"] + index = "st_index*" + source = ["_id","name","age"] + result_table_name = "fake" + scroll_size = 100 + scroll_time = "1m" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select _id as doc_id,name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} + Assert { + rules = [ + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf new file mode 100644 index 00000000000..e8c2b8ac3e8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Elasticsearch{ + hosts = ["elasticsearch:9200"] + index = "st_index" + index_type = "st" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml new file mode 100644 index 00000000000..9f7a1a72ac3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml @@ -0,0 +1,61 @@ + + + + + org.apache.seatunnel + seatunnel-spark-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-elasticsearch-spark-e2e + + + + org.apache.seatunnel + connector-spark-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-elasticsearch + ${project.version} + test + + + + + org.testcontainers + elasticsearch + 1.17.3 + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java new file mode 100644 index 00000000000..ae541039e4c --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchSourceToConsoleIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.elasticsearch; + +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +/** + * This test case is used to verify that the elasticsearch source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +public class ElasticsearchSourceToConsoleIT extends SparkContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSourceToConsoleIT.class); + + private ElasticsearchContainer container; + + @SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"}) + @BeforeEach + public void startElasticsearchContainer() throws InterruptedException { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")).withNetwork(NETWORK).withNetworkAliases("elasticsearch").withLogConsumer(new Slf4jLogConsumer(LOGGER)); + container.start(); + LOGGER.info("Elasticsearch container started"); + Thread.sleep(5000L); + createIndexDocs(); + } + + /** + * create a index,and bulk some documents + */ + private void createIndexDocs() { + EsRestClient esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); + String requestBody = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"EbvYoFkXtS\",\"age\":18}\n" + + "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"LjFMprGLJZ\",\"age\":19}\n" + + "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" + + "{\"name\":\"uJTtAVuSyI\",\"age\":20}\n"; + esRestClient.bulk(requestBody); + try { + esRestClient.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testElasticsearchSourceToConsoleSink() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/elasticsearch/elasticsearch_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closeContainer() { + if (container != null) { + container.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/FakeSourceToElasticsearchIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/FakeSourceToElasticsearchIT.java new file mode 100644 index 00000000000..33aebc39441 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/FakeSourceToElasticsearchIT.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.elasticsearch; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +/** + * This test case is used to verify that the fake source is able to send data to the elasticsearch. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +public class FakeSourceToElasticsearchIT extends SparkContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToElasticsearchIT.class); + + private ElasticsearchContainer container; + + @SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"}) + @BeforeEach + public void startElasticsearchContainer() throws InterruptedException { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")).withNetwork(NETWORK).withNetworkAliases("elasticsearch").withLogConsumer(new Slf4jLogConsumer(LOGGER)); + container.start(); + LOGGER.info("Elasticsearch container started"); + Thread.sleep(5000L); + } + + @Test + public void testFakeSourceToElasticsearchSink() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/elasticsearch/fakesource_to_elasticsearch.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closeContainer() { + if (container != null) { + container.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf new file mode 100644 index 00000000000..174f60110a7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_to_console.conf @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties + job.mode = "BATCH" + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Elasticsearch { + hosts = ["elasticsearch:9200"] + index = "st_index*" + source = ["_id","name","age"] + result_table_name = "fake" + scroll_size = 100 + scroll_time = "1m" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select _id as doc_id,name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} + Assert { + rules = [ + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf new file mode 100644 index 00000000000..ec706d89a87 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch.conf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties + job.mode = "BATCH" + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Elasticsearch{ + hosts = ["elasticsearch:9200"] + index = "st_index" + index_type = "st" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n