Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Elasticsearch] Support Elasticsearch source #2821

Merged
merged 26 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a96b361
[Feature][Connector-V2]improve elasticseach connector directory struc…
Aug 28, 2022
c7050d7
[Feature][Connector-V2]new connector of Elasticsearch source(#2553)
Sep 20, 2022
66f9ad9
[Feature][Connector-V2]improve elasticseach connector directory struc…
Aug 28, 2022
b6688c2
[Feature][Connector-V2]new connector of Elasticsearch source(#2553)
Sep 20, 2022
a8f96fe
Merge branch 'dev-20220919' of github.com:iture123/incubator-seatunne…
Sep 22, 2022
3e4538e
Merge branch 'dev' of github.com:iture123/incubator-seatunnel into de…
Sep 24, 2022
c878685
[Feature][Connector-V2]resolve conflict(#2553)
Sep 24, 2022
5cba46f
[Feature][Connector-V2]remove checkstyle:MagicNumber(#2553)
Sep 27, 2022
83e288b
[Feature][Connector-V2]use lombok to simple code(#2553)
Sep 28, 2022
c70f2af
[Feature][Connector-V2]add 'synchronized (output.getCheckpointLock())…
Sep 28, 2022
5a02fc9
[Feature][Connector-V2][Elasticsearce]simplify Config.class path(#2553)
Oct 15, 2022
ad71a2a
[Feature][Connector-V2][Elasticsearce]improve elasticsearch record de…
Oct 15, 2022
d39c5cb
[Feature][Connector-V2][Elasticsearce]EsRestClient#getFieldTypeMappin…
Oct 16, 2022
ed2ba50
[Feature][Connector-V2][Elasticsearce]improve restore split(#2553)
Oct 16, 2022
a184fac
[Feature][Connector-V2][Elasticsearce]improve Elasticsearc.md(#2553)
Oct 16, 2022
dbb44ad
[Feature][Connector-V2][Elasticsearce]fix NPP when ES version <= 6.x(…
Oct 16, 2022
62f299f
[Feature][Connector-V2][Elasticsearce]improve elasticsearch source(#2…
Oct 22, 2022
d6f9282
Merge branch 'dev' of github.com:iture123/incubator-seatunnel into de…
Oct 22, 2022
4a3b560
[Feature][Connector-V2][Elasticsearce]fix jackson conflict in spark a…
Oct 22, 2022
388226b
[Feature][Connector-V2][Elasticsearce]pom.xml remove dependency seatu…
Oct 30, 2022
d53b879
[Feature][Connector-V2][Elasticsearce]improve ElasticsearchSinkWriter…
Oct 30, 2022
b51b058
[Feature][Connector-V2][Elasticsearce]improve ElasticsearchSinkWriter…
Oct 30, 2022
b6c318d
[Feature][Connector-V2][Elasticsearce]improve ElasticsearchIT
Oct 31, 2022
6eb559f
[Feature][Connector-V2][Elasticsearce]improve ElasticsearchSourceReader
Oct 31, 2022
0ccdca0
[Feature][Connector-V2][Elasticsearce]improve ElasticsearchSourceSpli…
Oct 31, 2022
4232854
[Feature][Connector-V2][Elasticsearce]ElasticsearchSourceSplit use lo…
Oct 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Elasticsearch

> Elasticsearch source connector

## Description

Used to read data from Elasticsearch.

support version >= 2.x and < 8.x.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support stream read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't support.I will revise.

- [x] [exactly-once](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to restore split(exactly-once)?

reference restore split
#2917

- [ ] [schema projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------|--------| -------- |---------------|
| hosts | array | yes | - |
| username | string | no | - |
| password | string | no | - |
| index | string | yes | - |
| source | array | yes | - |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |



### hosts [array]
Elasticsearch cluster http address, the format is `host:port`, allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.

### username [string]
x-pack username.

### password [string]
x-pack password.

### index [string]
Elasticsearch index name, support * fuzzy matching.

### source [array]
The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.

### scroll_time [String]
Amount of time Elasticsearch will keep the search context alive for scroll requests.

### scroll_size [int]
Maximum number of hits to be returned with each Elasticsearch scroll request.

## Examples
simple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add complex example

```hocon
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
source = ["_id","name","age"]
}
```
3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.Elasticsearch = connector-elasticsearch
seatunnel.sink.Elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,75 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class EsRestClient {

private static EsRestClient ES_REST_CLIENT;
private static RestClient REST_CLIENT;
private static final int CONNECTION_REQUEST_TIMEOUT = 10 * 1000;

private static final int SOCKET_TIMEOUT = 5 * 60 * 1000;

private EsRestClient() {
private final RestClient restClient;

private final ObjectMapper mapper = new ObjectMapper();

private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}

public static EsRestClient createInstance(Config pluginConfig) {
List<String> hosts = pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS);
String username = null;
String password = null;
if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) {
username = pluginConfig.getString(EsClusterConnectionConfig.USERNAME);
if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) {
password = pluginConfig.getString(EsClusterConnectionConfig.PASSWORD);
}
}
return createInstance(hosts, username, password);
}

public static EsRestClient createInstance(List<String> hosts, String username, String password) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
return new EsRestClient(restClientBuilder.build());
}

@SuppressWarnings("checkstyle:MagicNumber")
private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
Expand All @@ -58,8 +95,8 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String

RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectionRequestTimeout(10 * 1000)
.setSocketTimeout(5 * 60 * 1000));
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT));

if (StringUtils.isNotEmpty(username)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
Expand All @@ -69,20 +106,11 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String
return builder;
}

public static EsRestClient getInstance(List<String> hosts, String username, String password) {
if (REST_CLIENT == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
REST_CLIENT = restClientBuilder.build();
ES_REST_CLIENT = new EsRestClient();
}
return ES_REST_CLIENT;
}

public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
request.setJsonEntity(requestBody);
try {
Response response = REST_CLIENT.performRequest(request);
Response response = restClient.performRequest(request);
if (response == null) {
throw new BulkElasticsearchException("bulk es Response is null");
}
Expand All @@ -104,10 +132,10 @@ public BulkResponse bulk(String requestBody) {
/**
* @return version.number, example:2.0.0
*/
public static String getClusterVersion() {
public String getClusterVersion() {
Request request = new Request("GET", "/");
try {
Response response = REST_CLIENT.performRequest(request);
Response response = restClient.performRequest(request);
String result = EntityUtils.toString(response.getEntity());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
Expand All @@ -119,7 +147,114 @@ public static String getClusterVersion() {
}

public void close() throws IOException {
REST_CLIENT.close();
restClient.close();
}

/**
* first time to request search documents by scroll
* call /${index}/_search?scroll=${scroll}
*
* @param index index name
* @param source select fields
* @param scrollTime such as:1m
* @param scrollSize fetch documents count in one request
*/
public ScrollResult searchByScroll(String index, List<String> source, String scrollTime, int scrollSize) {
Map<String, Object> param = new HashMap<>();
Map<String, Object> query = new HashMap<>();
query.put("match_all", new HashMap<String, String>());
param.put("query", query);
param.put("_source", source);
param.put("sort", new String[]{"_doc"});
param.put("size", scrollSize);
String endpoint = index + "/_search?scroll=" + scrollTime;
ScrollResult scrollResult = getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
return scrollResult;
}

/**
* scroll to get result
* call _search/scroll
*
* @param scrollId the scroll id of the last request
* @param scrollTime such as:1m
*/
public ScrollResult searchWithScrollId(String scrollId, String scrollTime) {
Map<String, String> param = new HashMap<>();
param.put("scroll_id", scrollId);
param.put("scroll", scrollTime);
ScrollResult scrollResult = getDocsFromScrollRequest("_search/scroll", JsonUtils.toJsonString(param));
return scrollResult;
}

private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(requestBody);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ScrollRequestException("POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
ObjectNode responseJson = JsonUtils.parseObject(entity);

JsonNode shards = responseJson.get("_shards");
int totalShards = shards.get("total").intValue();
int successful = shards.get("successful").intValue();
Asserts.check(totalShards == successful, String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful));

ScrollResult scrollResult = getDocsFromScrollResponse(responseJson);
return scrollResult;
} else {
throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e);

}
}

private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
ScrollResult scrollResult = new ScrollResult();
String scrollId = responseJson.get("_scroll_id").asText();
scrollResult.setScrollId(scrollId);

JsonNode hitsNode = responseJson.get("hits").get("hits");
List<Map<String, Object>> docs = new ArrayList<>(hitsNode.size());
scrollResult.setDocs(docs);

Iterator<JsonNode> iter = hitsNode.iterator();
while (iter.hasNext()) {
Map<String, Object> doc = new HashMap<>();
JsonNode hitNode = iter.next();
doc.put("_index", hitNode.get("_index").textValue());
doc.put("_id", hitNode.get("_id").textValue());
Map<String, Object> source = mapper.convertValue(hitNode.get("_source"), new TypeReference<Map<String, Object>>(){});
doc.putAll(source);
docs.add(doc);
}
return scrollResult;
}

public List<IndexDocsCount> getIndexDocsCount(String index) {
String endpoint = String.format("_cat/indices/%s?h=index,docsCount&format=json", index);
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new GetIndexDocsCountException("POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<IndexDocsCount> indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class);
return indexDocsCounts;
} else {
throw new GetIndexDocsCountException(String.format("POST %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new GetIndexDocsCountException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;

public class EsClusterConnectionConfig {

public static final String HOSTS = "hosts";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;

public class SourceConfig {

public static final String INDEX = "index";

public static final String SOURCE = "source";

public static final String SCROLL_TIME = "scroll_time";

public static final String SCROLL_SIZE = "scroll_size";

}
Loading