From c4fa299087a3c61c6e3a5721a1b1b21bed6d0d12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Fri, 21 Oct 2022 16:13:16 +0800 Subject: [PATCH 1/9] [Feature][Connector-V2] Add starrocks connector sink --- .../connector-starrocks/pom.xml | 71 ++++ .../starrocks/client/StarRocksFlushTuple.java | 21 ++ .../client/StarRocksSinkManager.java | 141 ++++++++ .../StarRocksStreamLoadFailedException.java | 33 ++ .../client/StarRocksStreamLoadVisitor.java | 308 ++++++++++++++++++ .../starrocks/config/SinkConfig.java | 127 ++++++++ .../serialize/StarRocksBaseSerializer.java | 39 +++ .../serialize/StarRocksCsvSerializer.java | 31 ++ .../serialize/StarRocksDelimiterParser.java | 55 ++++ .../serialize/StarRocksISerializer.java | 11 + .../serialize/StarRocksJsonSerializer.java | 37 +++ .../starrocks/sink/StarRocksSink.java | 61 ++++ .../starrocks/sink/StarRocksSinkWriter.java | 78 +++++ seatunnel-connectors-v2/pom.xml | 1 + 14 files changed, 1014 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-starrocks/pom.xml create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml new file mode 100644 index 00000000000..9c1667855cd --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -0,0 +1,71 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-starrocks + + + 4.5.13 + 4.4.4 + + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + org.apache.seatunnel + connector-common + ${project.version} + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + org.apache.httpcomponents + httpcore + ${httpcore.version} + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java new file mode 100644 index 00000000000..1c54689b481 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java @@ -0,0 +1,21 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.client; + +import java.util.List; + +public class StarRocksFlushTuple { + + private String label; + private Long bytes; + private List rows; + + public StarRocksFlushTuple(String label, Long bytes, List rows) { + this.label = label; + this.bytes = bytes; + this.rows = rows; + } + + public String getLabel() { return label; } + public void setLabel(String label) { this.label = label; } + public Long getBytes() { return bytes; } + public List getRows() { return rows; } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java new file mode 100644 index 00000000000..353306e4003 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.client; + +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class StarRocksSinkManager { + + private final SinkConfig sinkConfig; + private final List batchList; + + private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + + public StarRocksSinkManager(SinkConfig sinkConfig, List fileNames) { + this.sinkConfig = sinkConfig; + this.batchList = new ArrayList<>(); + starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + + if (sinkConfig.getBatchIntervalMs() != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + sinkConfig.getBatchIntervalMs(), + sinkConfig.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } + initialize = true; + } + + public synchronized void write(String record) throws IOException { + tryInit(); + checkFlushException(); + + batchList.add(record); + if (sinkConfig.getBatchSize() > 0 + && batchList.size() >= sinkConfig.getBatchSize()) { + flush(); + } + } + + public synchronized void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + } + + synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + + for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { + try { + starrocksStreamLoadVisitor.doStreamLoad(null); + } catch (IoTDBConnectionException | StatementExecutionException e) { + log.error("Writing records to IoTDB failed, retry times = {}", i, e); + if (i >= sinkConfig.getMaxRetries()) { + throw new IOException("Writing records to IoTDB failed.", e); + } + + try { + long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, + sinkConfig.getMaxRetryBackoffMs()); + Thread.sleep(backoff); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "Unable to flush; interrupted while doing another attempt.", e); + } + } + } + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to IoTDB failed.", flushException); + } + } + + public String createBatchLabel() { + StringBuilder sb = new StringBuilder(); + if (!Strings.isNullOrEmpty(sinkConfig.getLabelPrefix())) { + sb.append(sinkConfig.getLabelPrefix()); + } + return sb.append(UUID.randomUUID().toString()) + .toString(); + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java new file mode 100644 index 00000000000..d32dc32e73e --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java @@ -0,0 +1,33 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.client; + +import java.io.IOException; +import java.util.Map; + + +public class StarRocksStreamLoadFailedException extends IOException { + + static final long serialVersionUID = 1L; + + private final Map response; + private boolean reCreateLabel; + + public StarRocksStreamLoadFailedException(String message, Map response) { + super(message); + this.response = response; + } + + public StarRocksStreamLoadFailedException(String message, Map response, boolean reCreateLabel) { + super(message); + this.response = response; + this.reCreateLabel = reCreateLabel; + } + + public Map getFailedResponse() { + return response; + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java new file mode 100644 index 00000000000..84f24e8dcea --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -0,0 +1,308 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +public class StarRocksStreamLoadVisitor { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class); + + private final SinkConfig sinkConfig; + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + private final ObjectMapper mapper = new ObjectMapper(); + + private List fieldNames; + + + public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List fieldNames) { + this.sinkConfig = sinkConfig; + this.fieldNames = fieldNames; + } + + public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { + String host = getAvailableHost(); + if (null == host) { + throw new IOException("None of the host in `load_url` could be connected."); + } + String loadUrl = new StringBuilder(host) + .append("/api/") + .append(sinkConfig.getDatabase()) + .append("/") + .append(sinkConfig.getTable()) + .append("/_stream_load") + .toString(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + } + Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue())); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + LOG.error("unknown result status. {}", loadResult); + throw new IOException("Unable to flush data to StarRocks: unknown result status. " + loadResult); + } + if (LOG.isDebugEnabled()) { + LOG.debug(new StringBuilder("StreamLoad response:\n").append(mapper.writeValueAsString(loadResult)).toString()); + } + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + StringBuilder errorBuilder = new StringBuilder("Failed to flush data to StarRocks.\n"); + if (loadResult.containsKey("Message")) { + errorBuilder.append(loadResult.get("Message")); + errorBuilder.append('\n'); + } + if (loadResult.containsKey("ErrorURL")) { + LOG.error("StreamLoad response: {}", loadResult); + try { + errorBuilder.append(doHttpGet(loadResult.get("ErrorURL").toString())); + errorBuilder.append('\n'); + } catch (IOException e) { + LOG.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e); + } + } else { + errorBuilder.append(mapper.writeValueAsString(loadResult)); + errorBuilder.append('\n'); + } + throw new IOException(errorBuilder.toString()); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + LOG.debug(new StringBuilder("StreamLoad response:\n").append(mapper.writeValueAsString(loadResult)).toString()); + // has to block-checking the state to get the final result + checkLabelState(host, flushData.getLabel()); + } + } + + private String getAvailableHost() { + List hostList = sinkConfig.getNodeUrls(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); + if (tryHttpConnection(host)) { + return host; + } + } + return null; + } + + private boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(1000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + LOG.warn("Failed to connect to address:{}", host, e1); + return false; + } + } + + private byte[] joinRows(List rows, int totalBytes) { + if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { + Map props = (sinkConfig.getStreamLoadProps() == null ? new HashMap<>() : sinkConfig.getStreamLoadProps()); + byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); + } + + @SuppressWarnings("unchecked") + private void checkLabelState(String host, String label) throws IOException { + int idx = 0; + while(true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + } catch (InterruptedException ex) { + break; + } + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString()); + httpGet.setHeader("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); + httpGet.setHeader("Connection", "close"); + + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s].\n", label), null); + } + + Map result = mapper.readValue(EntityUtils.toString(respEntity), Map.class); + String labelState = (String)result.get("state"); + if (null == labelState) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); + } + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch(labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null, true); + case RESULT_LABEL_UNKNOWN: + default: + throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null); + } + } + } + } + } + + @SuppressWarnings("unchecked") + private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { + LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(loadUrl); + if (null != fieldNames && !fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { + httpPut.setHeader("columns", String.join(",", fieldNames.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); + } + if (null != sinkConfig.getStreamLoadProps()) { + for (Map.Entry entry : sinkConfig.getStreamLoadProps().entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("label", label); + httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); + httpPut.setEntity(new ByteArrayEntity(data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + String errorText; + try { + HttpEntity respEntity = resp.getEntity(); + errorText = EntityUtils.toString(respEntity); + } catch (Exception err) { + errorText = "find errorText failed: " + err.getMessage(); + } + LOG.warn("Request failed with code:{}, err:{}", code, errorText); + Map errorMap = new HashMap<>(); + errorMap.put("Status", "Fail"); + errorMap.put("Message", errorText); + return errorMap; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return mapper.readValue(EntityUtils.toString(respEntity), Map.class); + } + } + } + + private String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); + } + + private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + private String doHttpGet(String getUrl) throws IOException { + LOG.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = buildHttpClient()) { + HttpGet httpGet = new HttpGet(getUrl); + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return EntityUtils.toString(respEntity); + } + } + } + + private CloseableHttpClient buildHttpClient(){ + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + return httpClientBuilder.build(); + } + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java new file mode 100644 index 00000000000..da3172ebbaa --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +@Setter +@Getter +@ToString +public class SinkConfig { + + public static final String NODE_URLS = "nodeUrls"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String LABEL_PREFIX = "labelPrefix"; + public static final String DATABASE = "database"; + public static final String TABLE = "table"; + public static final String STARROCKS_CONFIG_PREFIX = "starrocks."; + private static final String LOAD_FORMAT = "format"; + private static final String DEFAULT_LOAD_FORMAT = "JSON"; + private static final String COLUMN_SEPARATOR = "column_separator"; + public static final String BATCH_SIZE = "batch_size"; + public static final String BATCH_INTERVAL_MS = "batch_interval_ms"; + public static final String MAX_RETRIES = "max_retries"; + public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms"; + public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms"; + + public enum StreamLoadFormat { + CSV, JSON; + } + + private List nodeUrls; + private String username; + private String password; + private String database; + private String table; + private String labelPrefix; + private String columnSeparator; + + private String format = DEFAULT_LOAD_FORMAT; + private static final int DEFAULT_BATCH_SIZE = 1024; + + + private int batchSize = DEFAULT_BATCH_SIZE; + private Integer batchIntervalMs; + private int maxRetries; + private int retryBackoffMultiplierMs; + private int maxRetryBackoffMs; + + private final Map streamLoadProps = new HashMap<>(); + + + public static SinkConfig loadConfig(Config pluginConfig) { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS)); + sinkConfig.setDatabase(pluginConfig.getString(DATABASE)); + sinkConfig.setTable(pluginConfig.getString(TABLE)); + + if (pluginConfig.hasPath(USERNAME)) { + sinkConfig.setUsername(pluginConfig.getString(USERNAME)); + } + if (pluginConfig.hasPath(PASSWORD)) { + sinkConfig.setPassword(pluginConfig.getString(PASSWORD)); + } + if (pluginConfig.hasPath(LABEL_PREFIX)) { + sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX)); + } + if (pluginConfig.hasPath(LOAD_FORMAT)) { + sinkConfig.setFormat(pluginConfig.getString(LOAD_FORMAT)); + } + if (pluginConfig.hasPath(COLUMN_SEPARATOR)) { + sinkConfig.setColumnSeparator(pluginConfig.getString(COLUMN_SEPARATOR)); + } + if (pluginConfig.hasPath(BATCH_SIZE)) { + sinkConfig.setBatchSize(pluginConfig.getInt(BATCH_SIZE)); + } + if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) { + sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS)); + } + if (pluginConfig.hasPath(MAX_RETRIES)) { + sinkConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES)); + } + if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) { + sinkConfig.setRetryBackoffMultiplierMs(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) { + sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS)); + } + parseSinkStreamLoadProperties(pluginConfig, sinkConfig); + return sinkConfig; + } + + private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) { + Config starRocksConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, + STARROCKS_CONFIG_PREFIX, false); + starRocksConfig.entrySet().forEach(entry -> { + sinkConfig.streamLoadProps.put(entry.getKey(), entry.getValue().unwrapped()); + }); + } + + + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java new file mode 100644 index 00000000000..f5357f3320b --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java @@ -0,0 +1,39 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +public class StarRocksBaseSerializer { + + protected static String convert(SeaTunnelDataType dataType, + Object val) { + if (val == null) { + return null; + } + switch (dataType.getSqlType()) { + case STRING: + return (String) val; + case TINYINT: + case SMALLINT: + return String.valueOf(((Number) val).shortValue()); + case INT: + return String.valueOf(((Number) val).intValue()); + case BIGINT: + return String.valueOf(((Number) val).longValue()); + case FLOAT: + return String.valueOf(((Number) val).floatValue()); + case DOUBLE: + return String.valueOf(((Number) val).doubleValue()); + case BOOLEAN: + return String.valueOf((Long) val); + case BYTES: + byte[] bts = (byte[]) val; + long value = 0; + for (int i = 0; i < bts.length; i++) { + value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); + } + return String.valueOf(value); + default: + throw new UnsupportedOperationException("Unsupported dataType: " + dataType); + } + } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java new file mode 100644 index 00000000000..f4eed8d1ed9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java @@ -0,0 +1,31 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { + + private static final long serialVersionUID = 1L; + + private final String columnSeparator; + private final SeaTunnelRowType seaTunnelRowType; + + public StarRocksCsvSerializer(String sp, SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t"); + } + + @Override + public String serialize(SeaTunnelRow row) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.getFields().length; i++) { + String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + sb.append(null == value ? "\\N" : value); + if (i < row.getFields().length - 1) { + sb.append(columnSeparator); + } + } + return sb.toString(); + } + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java new file mode 100644 index 00000000000..3f09f8d457e --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java @@ -0,0 +1,55 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import com.google.common.base.Strings; + +import java.io.StringWriter; + +public class StarRocksDelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + public static String parse(String sp, String dSp) throws RuntimeException { + if (Strings.isNullOrEmpty(sp)) { + return dSp; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java new file mode 100644 index 00000000000..bd773df18b0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java @@ -0,0 +1,11 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.io.Serializable; + +public interface StarRocksISerializer extends Serializable { + + String serialize(SeaTunnelRow seaTunnelRow); + +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java new file mode 100644 index 00000000000..3b75402297f --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java @@ -0,0 +1,37 @@ +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { + + private static final long serialVersionUID = 1L; + + private final SeaTunnelRowType seaTunnelRowType; + private final ObjectMapper mapper = new ObjectMapper(); + + public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public String serialize(SeaTunnelRow row) { + Map rowMap = new HashMap<>(row.getFields().length); + + for (int i = 0; i < row.getFields().length; i++) { + String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + rowMap.put(seaTunnelRowType.getFieldName(i), value); + } + try { + return mapper.writeValueAsString(rowMap); + } catch (JsonProcessingException e) { + throw new RuntimeException("serialize err", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java new file mode 100644 index 00000000000..e37b5934583 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +@AutoService(SeaTunnelSink.class) +public class StarRocksSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "StarRocks"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new StarRocksSinkWriter(pluginConfig, seaTunnelRowType); + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java new file mode 100644 index 00000000000..b0dcd8bd10f --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +public class StarRocksSinkWriter extends AbstractSinkWriter { + + private final StarRocksISerializer serializer; + private final StarRocksSinkManager manager; + + public StarRocksSinkWriter(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) { + SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig); + List fieldNames = Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList()); + this.serializer = createSerializer(sinkConfig, seaTunnelRowType); + this.manager = new StarRocksSinkManager(sinkConfig, fieldNames); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + String record = serializer.serialize(element); + manager.write(record); + } + + @SneakyThrows + @Override + public Optional prepareCommit() { + // Flush to storage before snapshot state is performed + return super.prepareCommit(); + } + + @Override + public void close() throws IOException { + } + + public static StarRocksISerializer createSerializer(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { + if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { + return new StarRocksCsvSerializer(sinkConfig.getColumnSeparator(), seaTunnelRowType); + } + if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getFormat())) { + return new StarRocksJsonSerializer(seaTunnelRowType); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 37dd5223c7e..b5a5147e6e1 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -56,6 +56,7 @@ connector-mongodb connector-iceberg connector-influxdb + connector-starrocks From 00e686c494ca33dcfdf23fd6d46dfc4e845cdc0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 23 Oct 2022 17:01:04 +0800 Subject: [PATCH 2/9] [Feature][Connector-V2] StarRocks sink connector (StarRocks stream load API) --- docs/en/connector-v2/sink/StarRocks.md | 120 +++++++ plugin-mapping.properties | 3 +- .../connector-starrocks/pom.xml | 12 - .../starrocks/client/StarRocksFlushTuple.java | 39 ++- .../client/StarRocksSinkManager.java | 47 ++- .../StarRocksStreamLoadFailedException.java | 18 +- .../client/StarRocksStreamLoadVisitor.java | 116 ++++--- .../starrocks/config/SinkConfig.java | 92 +++--- .../serialize/StarRocksBaseSerializer.java | 61 +++- .../serialize/StarRocksCsvSerializer.java | 18 +- .../serialize/StarRocksDelimiterParser.java | 22 +- .../serialize/StarRocksISerializer.java | 18 +- .../serialize/StarRocksJsonSerializer.java | 30 +- .../starrocks/sink/StarRocksSink.java | 15 +- .../starrocks/sink/StarRocksSinkWriter.java | 12 +- seatunnel-dist/pom.xml | 6 + .../connector-starrocks-e2e/pom.xml | 48 +++ .../e2e/connector/starrocks/StarRocksIT.java | 301 ++++++++++++++++++ .../src/test/resources/log4j.properties | 22 ++ .../starrocks-jdbc-to-starrocks.conf | 47 +++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 21 files changed, 891 insertions(+), 157 deletions(-) create mode 100644 docs/en/connector-v2/sink/StarRocks.md create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md new file mode 100644 index 00000000000..183cd8edbe0 --- /dev/null +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -0,0 +1,120 @@ +# StarRocks + +> StarRocks sink connector + +## Description +Used to send data to StarRocks. Both support streaming and batch mode. +The internal implementation of StarRocks sink connector is cached and imported by stream load in batches. +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------------------|------------------------------|----------|-----------------| +| node_urls | list | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| database | string | yes | - | +| table | string | no | - | +| labelPrefix | string | no | - | +| batch_max_rows | long | no | 1024 | +| batch_max_bytes | int | no | 5 * 1024 * 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| max_retry_backoff_ms | int | no | - | +| sink.properties.* | starrocks stream load config | no | - | + +### node_urls [list] + +`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` + +### username [string] + +`StarRocks` user username + +### password [string] + +`StarRocks` user password + +### database [string] + +The name of StarRocks database + +### table [string] + +The name of StarRocks table + +### labelPrefix [string] + +the prefix of StarRocks stream load label +### batch_max_rows [string] + +For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks + +### batch_max_bytes [string] + +For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks + +### batch_interval_ms [string] + +For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks + +### max_retries [string] + +The number of retries to flush failed + +### retry_backoff_multiplier_ms [string] + +Using as a multiplier for generating the next delay for backoff + +### max_retry_backoff_ms [string] + +The amount of time to wait before attempting to retry a request to `StarRocks` + +### sink.properties.* [starrocks stream load config] +the parameter of the stream load `data_desc` +The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name. +For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`. + +#### Supported import data formats +The supported formats include CSV and JSON. Default value: CSV + + +## Example +Use JSON format to import data +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + sink.properties.format = "JSON" + sink.properties.strip_outer_array = true + } +} + +``` + +Use CSV format to import data +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + sink.properties.format = "CSV" + sink.properties.column_separator = "\\x01", + sink.properties.row_delimiter = "\\x02" + } +} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 0b786891e48..d5114dd4eaf 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,4 +130,5 @@ seatunnel.sink.Sentry = connector-sentry seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg -seatunnel.source.influxdb = connector-influxdb \ No newline at end of file +seatunnel.source.influxdb = connector-influxdb +seatunnel.sink.StarRocks = connector-starrocks \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml index 9c1667855cd..927a2453163 100644 --- a/seatunnel-connectors-v2/connector-starrocks/pom.xml +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -55,17 +55,5 @@ httpcore ${httpcore.version} - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java index 1c54689b481..c70fbb3c90c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java @@ -1,9 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.client; import java.util.List; public class StarRocksFlushTuple { - private String label; private Long bytes; private List rows; @@ -14,8 +30,19 @@ public StarRocksFlushTuple(String label, Long bytes, List rows) { this.rows = rows; } - public String getLabel() { return label; } - public void setLabel(String label) { this.label = label; } - public Long getBytes() { return bytes; } - public List getRows() { return rows; } -} \ No newline at end of file + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public Long getBytes() { + return bytes; + } + + public List getRows() { + return rows; + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java index 353306e4003..983f6905b20 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java @@ -17,14 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.client; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; + import com.google.common.base.Strings; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; - import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -37,13 +37,15 @@ public class StarRocksSinkManager { private final SinkConfig sinkConfig; - private final List batchList; + private final List batchList; private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; private ScheduledExecutorService scheduler; private ScheduledFuture scheduledFuture; private volatile boolean initialize; private volatile Exception flushException; + private int batchRowCount = 0; + private long batchBytesSize = 0; public StarRocksSinkManager(SinkConfig sinkConfig, List fileNames) { this.sinkConfig = sinkConfig; @@ -58,7 +60,7 @@ private void tryInit() throws IOException { if (sinkConfig.getBatchIntervalMs() != null) { scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); + new ThreadFactoryBuilder().setNameFormat("StarRocks-sink-output-%s").build()); scheduledFuture = scheduler.scheduleAtFixedRate( () -> { try { @@ -77,10 +79,11 @@ private void tryInit() throws IOException { public synchronized void write(String record) throws IOException { tryInit(); checkFlushException(); - - batchList.add(record); - if (sinkConfig.getBatchSize() > 0 - && batchList.size() >= sinkConfig.getBatchSize()) { + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + batchList.add(bts); + batchRowCount++; + batchBytesSize += bts.length; + if (batchRowCount >= sinkConfig.getBatchMaxSize() || batchBytesSize >= sinkConfig.getBatchMaxBytes()) { flush(); } } @@ -94,19 +97,28 @@ public synchronized void close() throws IOException { flush(); } - synchronized void flush() throws IOException { + public synchronized void flush() throws IOException { checkFlushException(); if (batchList.isEmpty()) { return; } - + StarRocksFlushTuple tuple = null; for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { try { - starrocksStreamLoadVisitor.doStreamLoad(null); - } catch (IoTDBConnectionException | StatementExecutionException e) { - log.error("Writing records to IoTDB failed, retry times = {}", i, e); + String label = createBatchLabel(); + tuple = new StarRocksFlushTuple(label, batchBytesSize, new ArrayList<>(batchList)); + starrocksStreamLoadVisitor.doStreamLoad(tuple); + } catch (Exception e) { + + log.error("Writing records to StarRocks failed, retry times = {}", i, e); if (i >= sinkConfig.getMaxRetries()) { - throw new IOException("Writing records to IoTDB failed.", e); + throw new IOException("Writing records to StarRocks failed.", e); + } + + if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException) e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + log.warn(String.format("Batch label changed from [%s] to [%s]", tuple.getLabel(), newLabel)); + tuple.setLabel(newLabel); } try { @@ -120,13 +132,14 @@ synchronized void flush() throws IOException { } } } - batchList.clear(); + batchRowCount = 0; + batchBytesSize = 0; } private void checkFlushException() { if (flushException != null) { - throw new RuntimeException("Writing records to IoTDB failed.", flushException); + throw new RuntimeException("Writing records to StarRocks failed.", flushException); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java index d32dc32e73e..626b38d3f00 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java @@ -1,9 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.client; import java.io.IOException; import java.util.Map; - public class StarRocksStreamLoadFailedException extends IOException { static final long serialVersionUID = 1L; diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java index 84f24e8dcea..8d314593914 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -1,8 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.client; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -13,8 +34,6 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; -import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +48,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - public class StarRocksStreamLoadVisitor { private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class); + private static final int CONNECT_TIMEOUT = 1000000; + private static final int MAX_SLEEP_TIME = 5; private final SinkConfig sinkConfig; private long pos; @@ -44,11 +64,8 @@ public class StarRocksStreamLoadVisitor { private static final String RESULT_LABEL_ABORTED = "ABORTED"; private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; - private final ObjectMapper mapper = new ObjectMapper(); - private List fieldNames; - public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List fieldNames) { this.sinkConfig = sinkConfig; this.fieldNames = fieldNames; @@ -60,12 +77,12 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { throw new IOException("None of the host in `load_url` could be connected."); } String loadUrl = new StringBuilder(host) - .append("/api/") - .append(sinkConfig.getDatabase()) - .append("/") - .append(sinkConfig.getTable()) - .append("/_stream_load") - .toString(); + .append("/api/") + .append(sinkConfig.getDatabase()) + .append("/") + .append(sinkConfig.getTable()) + .append("/_stream_load") + .toString(); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); } @@ -76,7 +93,7 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { throw new IOException("Unable to flush data to StarRocks: unknown result status. " + loadResult); } if (LOG.isDebugEnabled()) { - LOG.debug(new StringBuilder("StreamLoad response:\n").append(mapper.writeValueAsString(loadResult)).toString()); + LOG.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); } if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { StringBuilder errorBuilder = new StringBuilder("Failed to flush data to StarRocks.\n"); @@ -93,12 +110,12 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { LOG.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e); } } else { - errorBuilder.append(mapper.writeValueAsString(loadResult)); + errorBuilder.append(JsonUtils.toJsonString(loadResult)); errorBuilder.append('\n'); } throw new IOException(errorBuilder.toString()); } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { - LOG.debug(new StringBuilder("StreamLoad response:\n").append(mapper.writeValueAsString(loadResult)).toString()); + LOG.debug(new StringBuilder("StreamLoad response:\n").append(JsonUtils.toJsonString(loadResult)).toString()); // has to block-checking the state to get the final result checkLabelState(host, flushData.getLabel()); } @@ -117,10 +134,10 @@ private String getAvailableHost() { } private boolean tryHttpConnection(String host) { - try { + try { URL url = new URL(host); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(1000); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(CONNECT_TIMEOUT); co.connect(); co.disconnect(); return true; @@ -131,9 +148,9 @@ private boolean tryHttpConnection(String host) { } private byte[] joinRows(List rows, int totalBytes) { - if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { - Map props = (sinkConfig.getStreamLoadProps() == null ? new HashMap<>() : sinkConfig.getStreamLoadProps()); - byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { + Map props = sinkConfig.getStreamLoadProps(); + byte[] lineDelimiter = StarRocksDelimiterParser.parse((String) props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); for (byte[] row : rows) { bos.put(row); @@ -141,8 +158,8 @@ private byte[] joinRows(List rows, int totalBytes) { } return bos.array(); } - - if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getFormat())) { + + if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) { ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); bos.put("[".getBytes(StandardCharsets.UTF_8)); byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); @@ -163,9 +180,9 @@ private byte[] joinRows(List rows, int totalBytes) { @SuppressWarnings("unchecked") private void checkLabelState(String host, String label) throws IOException { int idx = 0; - while(true) { + while (true) { try { - TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + TimeUnit.SECONDS.sleep(Math.min(++idx, MAX_SLEEP_TIME)); } catch (InterruptedException ex) { break; } @@ -173,7 +190,6 @@ private void checkLabelState(String host, String label) throws IOException { HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString()); httpGet.setHeader("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); httpGet.setHeader("Connection", "close"); - try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { HttpEntity respEntity = getHttpEntity(resp); if (respEntity == null) { @@ -181,14 +197,14 @@ private void checkLabelState(String host, String label) throws IOException { "could not get the final state of label[%s].\n", label), null); } - Map result = mapper.readValue(EntityUtils.toString(respEntity), Map.class); - String labelState = (String)result.get("state"); + Map result = JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + String labelState = (String) result.get("state"); if (null == labelState) { throw new IOException(String.format("Failed to flush data to StarRocks, Error " + "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); } LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); - switch(labelState) { + switch (labelState) { case LAEBL_STATE_VISIBLE: case LAEBL_STATE_COMMITTED: return; @@ -200,7 +216,7 @@ private void checkLabelState(String host, String label) throws IOException { case RESULT_LABEL_UNKNOWN: default: throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + - "label[%s] state[%s]\n", label, labelState), null); + "label[%s] state[%s]\n", label, labelState), null); } } } @@ -211,15 +227,15 @@ private void checkLabelState(String host, String label) throws IOException { private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); try (CloseableHttpClient httpclient = httpClientBuilder.build()) { HttpPut httpPut = new HttpPut(loadUrl); - if (null != fieldNames && !fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { + if (null != fieldNames && !fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { httpPut.setHeader("columns", String.join(",", fieldNames.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); } if (null != sinkConfig.getStreamLoadProps()) { @@ -227,6 +243,7 @@ protected boolean isRedirectable(String method) { httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); } } + httpPut.setHeader("strip_outer_array", "true"); httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("label", label); httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); @@ -235,7 +252,7 @@ protected boolean isRedirectable(String method) { httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { int code = resp.getStatusLine().getStatusCode(); - if (200 != code) { + if (HttpStatus.SC_OK != code) { String errorText; try { HttpEntity respEntity = resp.getEntity(); @@ -254,7 +271,7 @@ protected boolean isRedirectable(String method) { LOG.warn("Request failed with empty response."); return null; } - return mapper.readValue(EntityUtils.toString(respEntity), Map.class); + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); } } } @@ -267,7 +284,7 @@ private String getBasicAuthHeader(String username, String password) { private HttpEntity getHttpEntity(CloseableHttpResponse resp) { int code = resp.getStatusLine().getStatusCode(); - if (200 != code) { + if (HttpStatus.SC_OK != code) { LOG.warn("Request failed with code:{}", code); return null; } @@ -278,7 +295,7 @@ private HttpEntity getHttpEntity(CloseableHttpResponse resp) { } return respEntity; } - + private String doHttpGet(String getUrl) throws IOException { LOG.info("Executing GET from {}.", getUrl); try (CloseableHttpClient httpclient = buildHttpClient()) { @@ -294,15 +311,14 @@ private String doHttpGet(String getUrl) throws IOException { } } - private CloseableHttpClient buildHttpClient(){ + private CloseableHttpClient buildHttpClient() { final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); return httpClientBuilder.build(); } - } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java index da3172ebbaa..0c04c372db0 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java @@ -17,16 +17,17 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.config; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.seatunnel.common.config.TypesafeConfigUtils; -import org.apache.seatunnel.shade.com.typesafe.config.Config; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; @Setter @Getter @@ -39,11 +40,12 @@ public class SinkConfig { public static final String LABEL_PREFIX = "labelPrefix"; public static final String DATABASE = "database"; public static final String TABLE = "table"; - public static final String STARROCKS_CONFIG_PREFIX = "starrocks."; + public static final String STARROCKS_SINK_CONFIG_PREFIX = "sink.properties."; private static final String LOAD_FORMAT = "format"; - private static final String DEFAULT_LOAD_FORMAT = "JSON"; + private static final StreamLoadFormat DEFAULT_LOAD_FORMAT = StreamLoadFormat.CSV; private static final String COLUMN_SEPARATOR = "column_separator"; - public static final String BATCH_SIZE = "batch_size"; + public static final String BATCH_MAX_SIZE = "batch_max_rows"; + public static final String BATCH_MAX_BYTES = "batch_max_bytes"; public static final String BATCH_INTERVAL_MS = "batch_interval_ms"; public static final String MAX_RETRIES = "max_retries"; public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms"; @@ -51,21 +53,28 @@ public class SinkConfig { public enum StreamLoadFormat { CSV, JSON; + public static StreamLoadFormat parse(String format) { + if (StreamLoadFormat.JSON.name().equals(format)) { + return JSON; + } + return CSV; + } } - private List nodeUrls; - private String username; - private String password; - private String database; - private String table; - private String labelPrefix; - private String columnSeparator; + private List nodeUrls; + private String username; + private String password; + private String database; + private String table; + private String labelPrefix; + private String columnSeparator; + private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT; + private static final int DEFAULT_BATCH_MAX_SIZE = 1024; + private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024; - private String format = DEFAULT_LOAD_FORMAT; - private static final int DEFAULT_BATCH_SIZE = 1024; + private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE; + private long batchMaxBytes = DEFAULT_BATCH_BYTES; - - private int batchSize = DEFAULT_BATCH_SIZE; private Integer batchIntervalMs; private int maxRetries; private int retryBackoffMultiplierMs; @@ -73,30 +82,29 @@ public enum StreamLoadFormat { private final Map streamLoadProps = new HashMap<>(); - public static SinkConfig loadConfig(Config pluginConfig) { - SinkConfig sinkConfig = new SinkConfig(); - sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS)); - sinkConfig.setDatabase(pluginConfig.getString(DATABASE)); - sinkConfig.setTable(pluginConfig.getString(TABLE)); + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS)); + sinkConfig.setDatabase(pluginConfig.getString(DATABASE)); + sinkConfig.setTable(pluginConfig.getString(TABLE)); - if (pluginConfig.hasPath(USERNAME)) { + if (pluginConfig.hasPath(USERNAME)) { sinkConfig.setUsername(pluginConfig.getString(USERNAME)); - } - if (pluginConfig.hasPath(PASSWORD)) { - sinkConfig.setPassword(pluginConfig.getString(PASSWORD)); - } - if (pluginConfig.hasPath(LABEL_PREFIX)) { + } + if (pluginConfig.hasPath(PASSWORD)) { + sinkConfig.setPassword(pluginConfig.getString(PASSWORD)); + } + if (pluginConfig.hasPath(LABEL_PREFIX)) { sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX)); - } - if (pluginConfig.hasPath(LOAD_FORMAT)) { - sinkConfig.setFormat(pluginConfig.getString(LOAD_FORMAT)); - } + } if (pluginConfig.hasPath(COLUMN_SEPARATOR)) { sinkConfig.setColumnSeparator(pluginConfig.getString(COLUMN_SEPARATOR)); } - if (pluginConfig.hasPath(BATCH_SIZE)) { - sinkConfig.setBatchSize(pluginConfig.getInt(BATCH_SIZE)); + if (pluginConfig.hasPath(BATCH_MAX_SIZE)) { + sinkConfig.setBatchMaxSize(pluginConfig.getInt(BATCH_MAX_SIZE)); + } + if (pluginConfig.hasPath(BATCH_MAX_BYTES)) { + sinkConfig.setBatchMaxBytes(pluginConfig.getLong(BATCH_MAX_BYTES)); } if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) { sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS)); @@ -111,17 +119,21 @@ public static SinkConfig loadConfig(Config pluginConfig) { sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS)); } parseSinkStreamLoadProperties(pluginConfig, sinkConfig); - return sinkConfig; + if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) { + sinkConfig.setColumnSeparator((String) sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR)); + } + if (sinkConfig.streamLoadProps.containsKey(LOAD_FORMAT)) { + sinkConfig.setLoadFormat(StreamLoadFormat.parse((String) sinkConfig.streamLoadProps.get(LOAD_FORMAT))); + } + return sinkConfig; } private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) { Config starRocksConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, - STARROCKS_CONFIG_PREFIX, false); + STARROCKS_SINK_CONFIG_PREFIX, false); starRocksConfig.entrySet().forEach(entry -> { - sinkConfig.streamLoadProps.put(entry.getKey(), entry.getValue().unwrapped()); + final String configKey = entry.getKey().toLowerCase(); + sinkConfig.streamLoadProps.put(configKey, entry.getValue().unwrapped()); }); } - - - } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java index f5357f3320b..190b18b95e9 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java @@ -1,17 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.DateUtils; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.common.utils.TimeUtils; + +import lombok.Builder; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; public class StarRocksBaseSerializer { + @Builder.Default + private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD; + @Builder.Default + private DateTimeUtils.Formatter dateTimeFormatter = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; + @Builder.Default + private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; - protected static String convert(SeaTunnelDataType dataType, - Object val) { + protected String convert(SeaTunnelDataType dataType, Object val) { if (val == null) { return null; } switch (dataType.getSqlType()) { - case STRING: - return (String) val; case TINYINT: case SMALLINT: return String.valueOf(((Number) val).shortValue()); @@ -23,17 +53,24 @@ protected static String convert(SeaTunnelDataType dataType, return String.valueOf(((Number) val).floatValue()); case DOUBLE: return String.valueOf(((Number) val).doubleValue()); + case DECIMAL: case BOOLEAN: - return String.valueOf((Long) val); + return val.toString(); + case DATE: + return DateUtils.toString((LocalDate) val, dateFormatter); + case TIME: + return TimeUtils.toString((LocalTime) val, timeFormatter); + case TIMESTAMP: + return DateTimeUtils.toString((LocalDateTime) val, dateTimeFormatter); + case STRING: + return (String) val; + case ARRAY: + case MAP: + return JsonUtils.toJsonString(val); case BYTES: - byte[] bts = (byte[]) val; - long value = 0; - for (int i = 0; i < bts.length; i++) { - value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); - } - return String.valueOf(value); + return new String((byte[]) val); default: throw new UnsupportedOperationException("Unsupported dataType: " + dataType); } } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java index f4eed8d1ed9..191b615baed 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java @@ -1,10 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { - private static final long serialVersionUID = 1L; private final String columnSeparator; diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java index 3f09f8d457e..1b7ea726f19 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; import com.google.common.base.Strings; @@ -5,6 +22,7 @@ import java.io.StringWriter; public class StarRocksDelimiterParser { + private static final int SHIFT = 4; private static final String HEX_STRING = "0123456789ABCDEF"; @@ -43,7 +61,7 @@ private static byte[] hexStrToBytes(String hexStr) { byte[] bytes = new byte[length]; for (int i = 0; i < length; i++) { int pos = i * 2; - bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + bytes[i] = (byte) (charToByte(hexChars[pos]) << SHIFT | charToByte(hexChars[pos + 1])); } return bytes; } @@ -51,5 +69,5 @@ private static byte[] hexStrToBytes(String hexStr) { private static byte charToByte(char c) { return (byte) HEX_STRING.indexOf(c); } - } + diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java index bd773df18b0..bca39d20b92 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -7,5 +24,4 @@ public interface StarRocksISerializer extends Serializable { String serialize(SeaTunnelRow seaTunnelRow); - } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java index 3b75402297f..5e5edaa271c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java @@ -1,18 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.JsonUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.HashMap; -import java.util.List; import java.util.Map; public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { private static final long serialVersionUID = 1L; - private final SeaTunnelRowType seaTunnelRowType; private final ObjectMapper mapper = new ObjectMapper(); @@ -28,10 +44,6 @@ public String serialize(SeaTunnelRow row) { String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); rowMap.put(seaTunnelRowType.getFieldName(i), value); } - try { - return mapper.writeValueAsString(rowMap); - } catch (JsonProcessingException e) { - throw new RuntimeException("serialize err", e); - } + return JsonUtils.toJsonString(rowMap); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index e37b5934583..4a59139b851 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -17,17 +17,26 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; -import com.google.auto.service.AutoService; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.NODE_URLS; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.TABLE; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + import org.apache.seatunnel.shade.com.typesafe.config.Config; +import com.google.auto.service.AutoService; + @AutoService(SeaTunnelSink.class) public class StarRocksSink extends AbstractSimpleSink { @@ -42,6 +51,10 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { this.pluginConfig = pluginConfig; + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS, DATABASE, TABLE); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } } @Override diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index b0dcd8bd10f..7affd28896b 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -27,8 +25,12 @@ import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer; + import org.apache.seatunnel.shade.com.typesafe.config.Config; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -59,18 +61,20 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() { // Flush to storage before snapshot state is performed + manager.flush(); return super.prepareCommit(); } @Override public void close() throws IOException { + manager.close(); } public static StarRocksISerializer createSerializer(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { - if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getFormat())) { + if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { return new StarRocksCsvSerializer(sinkConfig.getColumnSeparator(), seaTunnelRowType); } - if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getFormat())) { + if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) { return new StarRocksJsonSerializer(seaTunnelRowType); } throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a0d0350d3eb..7784c1fdd36 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -272,6 +272,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-starrocks + ${project.version} + provided + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml new file mode 100644 index 00000000000..684b7e33669 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml @@ -0,0 +1,48 @@ + + + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-starrocks-e2e + + + + + org.apache.seatunnel + connector-starrocks + ${project.version} + test + + + org.apache.seatunnel + connector-jdbc + ${project.version} + test + + + org.apache.seatunnel + connector-jdbc-e2e + ${project.version} + test + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java new file mode 100644 index 00000000000..c5d367e0be0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.starrocks; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class StarRocksIT extends TestSuiteBase implements TestResource { + private static final String DOCKER_IMAGE = "d87904488/starrocks-starter:2.2.1"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String HOST = "e2e_starRocksdb"; + private static final int SR_PORT = 9030; + private static final String URL = "jdbc:mysql://%s:" + SR_PORT; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String DATABASE = "test"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String SR_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL"; + + private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + + + private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + + private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL,\n" + + " LARGEINT_COL,\n" + + " SMALLINT_COL,\n" + + " TINYINT_COL,\n" + + " BOOLEAN_COL,\n" + + " DECIMAL_COL,\n" + + " DOUBLE_COL,\n" + + " FLOAT_COL,\n" + + " INT_COL,\n" + + " CHAR_COL,\n" + + " VARCHAR_11_COL,\n" + + " STRING_COL,\n" + + " DATETIME_COL,\n" + + " DATE_COL\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + private Connection jdbcConnection; + private GenericContainer starRocksServer; + private static final List TEST_DATASET = generateTestDataSet(); + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = container -> { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + SR_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + starRocksServer = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + starRocksServer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", SR_PORT, SR_PORT))); + Startables.deepStart(Stream.of(starRocksServer)).join(); + log.info("StarRocks container started"); + // wait for starrocks fully start + given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + initializeJdbcTable(); + batchInsertData(); + } + + private static List generateTestDataSet() { + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Long.valueOf(1123456), + Short.parseShort("1"), + Byte.parseByte("1"), + Boolean.FALSE, + BigDecimal.valueOf(2222243, 1), + Double.parseDouble("2222243.2222243"), + Float.parseFloat("222224"), + Integer.parseInt("1"), + "a", + "VARCHAR_COL", + "STRING_COL", + "2022-08-13 17:35:59", + "2022-08-13" + }); + rows.add(row); + } + return rows; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + if (starRocksServer != null) { + starRocksServer.close(); + } + } + + @TestTemplate + public void testStarRocksSink(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/starrocks-jdbc-to-starrocks.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + try { + assertHasData(SINK_TABLE); + String sourceSql = String.format("select * from %s.%s", DATABASE, SOURCE_TABLE); + String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE); + List columnList = Arrays.stream(COLUMN_STRING.split(",")).map(x -> x.trim()).collect(Collectors.toList()); + Statement sourceStatement = jdbcConnection.createStatement(); + Statement sinkStatement = jdbcConnection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals(sourceResultSet.getMetaData().getColumnCount(), sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columnList) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); + } + } + } + } catch (Exception e) { + throw new RuntimeException("get starRocks connection error", e); + } + } + + private void initializeJdbcConnection() throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException { + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(SR_DRIVER_JAR)}, StarRocksIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + jdbcConnection = driver.connect(String.format(URL, starRocksServer.getHost()), props); + } + + private void initializeJdbcTable() { + try (Statement statement = jdbcConnection.createStatement()) { + // create databases + statement.execute("create database test"); + // create source table + statement.execute(DDL_SOURCE); + // create sink table + statement.execute(DDL_SINK); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private void batchInsertData() { + List rows = TEST_DATASET; + try { + jdbcConnection.setAutoCommit(false); + try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(INIT_DATA_SQL)) { + for (int i = 0; i < rows.size(); i++) { + for (int index = 0; index < rows.get(i).getFields().length; index++) { + preparedStatement.setObject(index + 1, rows.get(i).getFields()[index]); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + jdbcConnection.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + throw new RuntimeException("get connection error", exception); + } + } + + private void assertHasData(String table) { + try (Statement statement = jdbcConnection.createStatement()) { + String sql = String.format("select * from %s.%s limit 1", DATABASE, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (Exception e) { + throw new RuntimeException("test starrocks server image error", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf new file mode 100644 index 00000000000..3f8dfa33a86 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://e2e_starRocksdb:9030" + user = root + password = "" + query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from `test`.`e2e_table_source`" + } +} + +transform { +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + batch_max_rows = 10 + sink.properties.format = "JSON" + sink.properties.strip_outer_array = true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index fbc2b214ac9..74ef7b53787 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -27,6 +27,7 @@ connector-assert-e2e connector-jdbc-e2e connector-redis-e2e + connector-starrocks-e2e seatunnel-connector-v2-e2e From 258aa56b4904658963cd26274965ca5d5af89ada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 15:37:39 +0800 Subject: [PATCH 3/9] fix code style --- docs/en/connector-v2/sink/StarRocks.md | 4 +- .../connector-starrocks/pom.xml | 1 - .../starrocks/client/HttpHelper.java | 163 +++++++++++++++ .../client/StarRocksSinkManager.java | 11 +- .../client/StarRocksStreamLoadVisitor.java | 197 +++++------------- .../starrocks/sink/StarRocksSink.java | 4 +- .../starrocks/sink/StarRocksSinkWriter.java | 9 +- .../e2e/connector/starrocks/StarRocksIT.java | 4 +- 8 files changed, 235 insertions(+), 158 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 183cd8edbe0..841a06d2d16 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -51,6 +51,7 @@ The name of StarRocks table ### labelPrefix [string] the prefix of StarRocks stream load label + ### batch_max_rows [string] For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks @@ -76,13 +77,14 @@ Using as a multiplier for generating the next delay for backoff The amount of time to wait before attempting to retry a request to `StarRocks` ### sink.properties.* [starrocks stream load config] + the parameter of the stream load `data_desc` The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name. For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`. #### Supported import data formats -The supported formats include CSV and JSON. Default value: CSV +The supported formats include CSV and JSON. Default value: CSV ## Example Use JSON format to import data diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml index 927a2453163..f7992271b16 100644 --- a/seatunnel-connectors-v2/connector-starrocks/pom.xml +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -49,7 +49,6 @@ httpclient ${httpclient.version} - org.apache.httpcomponents httpcore diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java new file mode 100644 index 00000000000..1d2ad3fbab2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.client; + +import org.apache.seatunnel.common.utils.JsonUtils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class HttpHelper { + private static final int DEFAULT_CONNECT_TIMEOUT = 1000000; + + public HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + log.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + public String doHttpGet(String getUrl) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = buildHttpClient()) { + HttpGet httpGet = new HttpGet(getUrl); + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return EntityUtils.toString(respEntity); + } + } + } + + public Map doHttpGet(String getUrl, Map header) throws IOException { + log.info("Executing GET from {}.", getUrl); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(getUrl); + if (null != header) { + for (Map.Entry entry : header.entrySet()) { + httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } + } + } + + @SuppressWarnings("unchecked") + public Map doHttpPut(String url, byte[] data, Map header) throws IOException { + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(url); + if (null != header) { + for (Map.Entry entry : header.entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setEntity(new ByteArrayEntity(data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { + int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + String errorText; + try { + HttpEntity respEntity = resp.getEntity(); + errorText = EntityUtils.toString(respEntity); + } catch (Exception err) { + errorText = "find errorText failed: " + err.getMessage(); + } + log.warn("Request failed with code:{}, err:{}", code, errorText); + Map errorMap = new HashMap<>(); + errorMap.put("Status", "Fail"); + errorMap.put("Message", errorText); + return errorMap; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + log.warn("Request failed with empty response."); + return null; + } + return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } + } + } + + private CloseableHttpClient buildHttpClient() { + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + return httpClientBuilder.build(); + } + + public boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + log.warn("Failed to connect to address:{}", host, e1); + return false; + } + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java index 983f6905b20..2f67e4fea93 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java @@ -47,9 +47,12 @@ public class StarRocksSinkManager { private int batchRowCount = 0; private long batchBytesSize = 0; + private Integer batchIntervalMs; + public StarRocksSinkManager(SinkConfig sinkConfig, List fileNames) { this.sinkConfig = sinkConfig; this.batchList = new ArrayList<>(); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames); } @@ -57,8 +60,9 @@ private void tryInit() throws IOException { if (initialize) { return; } + initialize = true; - if (sinkConfig.getBatchIntervalMs() != null) { + if (batchIntervalMs != null) { scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("StarRocks-sink-output-%s").build()); scheduledFuture = scheduler.scheduleAtFixedRate( @@ -69,11 +73,10 @@ private void tryInit() throws IOException { flushException = e; } }, - sinkConfig.getBatchIntervalMs(), - sinkConfig.getBatchIntervalMs(), + batchIntervalMs, + batchIntervalMs, TimeUnit.MILLISECONDS); } - initialize = true; } public synchronized void write(String record) throws IOException { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java index 8d314593914..58efb872673 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -22,24 +22,10 @@ import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser; import org.apache.commons.codec.binary.Base64; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultRedirectStrategy; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -51,7 +37,8 @@ public class StarRocksStreamLoadVisitor { private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class); - private static final int CONNECT_TIMEOUT = 1000000; + + private final HttpHelper httpHelper = new HttpHelper(); private static final int MAX_SLEEP_TIME = 5; private final SinkConfig sinkConfig; @@ -86,7 +73,7 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); } - Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue())); + Map loadResult = httpHelper.doHttpPut(loadUrl, joinRows(flushData.getRows(), flushData.getBytes().intValue()), getStreamLoadHttpHeader(flushData.getLabel())); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { LOG.error("unknown result status. {}", loadResult); @@ -104,7 +91,7 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { if (loadResult.containsKey("ErrorURL")) { LOG.error("StreamLoad response: {}", loadResult); try { - errorBuilder.append(doHttpGet(loadResult.get("ErrorURL").toString())); + errorBuilder.append(httpHelper.doHttpGet(loadResult.get("ErrorURL").toString())); errorBuilder.append('\n'); } catch (IOException e) { LOG.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e); @@ -126,27 +113,13 @@ private String getAvailableHost() { long tmp = pos + hostList.size(); for (; pos < tmp; pos++) { String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); - if (tryHttpConnection(host)) { + if (httpHelper.tryHttpConnection(host)) { return host; } } return null; } - private boolean tryHttpConnection(String host) { - try { - URL url = new URL(host); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(CONNECT_TIMEOUT); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e1) { - LOG.warn("Failed to connect to address:{}", host, e1); - return false; - } - } - private byte[] joinRows(List rows, int totalBytes) { if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { Map props = sinkConfig.getStreamLoadProps(); @@ -186,92 +159,35 @@ private void checkLabelState(String host, String label) throws IOException { } catch (InterruptedException ex) { break; } - try (CloseableHttpClient httpclient = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString()); - httpGet.setHeader("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); - httpGet.setHeader("Connection", "close"); - try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { - HttpEntity respEntity = getHttpEntity(resp); - if (respEntity == null) { - throw new IOException(String.format("Failed to flush data to StarRocks, Error " + - "could not get the final state of label[%s].\n", label), null); - } - - Map result = JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); - String labelState = (String) result.get("state"); - if (null == labelState) { - throw new IOException(String.format("Failed to flush data to StarRocks, Error " + - "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); - } - LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); - switch (labelState) { - case LAEBL_STATE_VISIBLE: - case LAEBL_STATE_COMMITTED: - return; - case RESULT_LABEL_PREPARE: - continue; - case RESULT_LABEL_ABORTED: - throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + - "label[%s] state[%s]\n", label, labelState), null, true); - case RESULT_LABEL_UNKNOWN: - default: - throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + - "label[%s] state[%s]\n", label, labelState), null); - } - } - } - } - } - - @SuppressWarnings("unchecked") - private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { - LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); - final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - try (CloseableHttpClient httpclient = httpClientBuilder.build()) { - HttpPut httpPut = new HttpPut(loadUrl); - if (null != fieldNames && !fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { - httpPut.setHeader("columns", String.join(",", fieldNames.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); - } - if (null != sinkConfig.getStreamLoadProps()) { - for (Map.Entry entry : sinkConfig.getStreamLoadProps().entrySet()) { - httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + try { + String queryLoadStateUrl = new StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString(); + Map result = httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label)); + if (result == null) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s].\n", label), null); } - } - httpPut.setHeader("strip_outer_array", "true"); - httpPut.setHeader("Expect", "100-continue"); - httpPut.setHeader("label", label); - httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); - httpPut.setHeader("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); - httpPut.setEntity(new ByteArrayEntity(data)); - httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); - try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { - int code = resp.getStatusLine().getStatusCode(); - if (HttpStatus.SC_OK != code) { - String errorText; - try { - HttpEntity respEntity = resp.getEntity(); - errorText = EntityUtils.toString(respEntity); - } catch (Exception err) { - errorText = "find errorText failed: " + err.getMessage(); - } - LOG.warn("Request failed with code:{}, err:{}", code, errorText); - Map errorMap = new HashMap<>(); - errorMap.put("Status", "Fail"); - errorMap.put("Message", errorText); - return errorMap; + String labelState = (String) result.get("state"); + if (null == labelState) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, JsonUtils.toJsonString(result)), null); } - HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch (labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null, true); + case RESULT_LABEL_UNKNOWN: + default: + throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null); } - return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); + } catch (IOException e) { + throw new IOException(e); } } } @@ -282,43 +198,28 @@ private String getBasicAuthHeader(String username, String password) { return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); } - private HttpEntity getHttpEntity(CloseableHttpResponse resp) { - int code = resp.getStatusLine().getStatusCode(); - if (HttpStatus.SC_OK != code) { - LOG.warn("Request failed with code:{}", code); - return null; + private Map getStreamLoadHttpHeader(String label) { + Map headerMap = new HashMap<>(); + if (null != fieldNames && !fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { + headerMap.put("columns", String.join(",", fieldNames.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); } - HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } - return respEntity; - } - - private String doHttpGet(String getUrl) throws IOException { - LOG.info("Executing GET from {}.", getUrl); - try (CloseableHttpClient httpclient = buildHttpClient()) { - HttpGet httpGet = new HttpGet(getUrl); - try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { - HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } - return EntityUtils.toString(respEntity); + if (null != sinkConfig.getStreamLoadProps()) { + for (Map.Entry entry : sinkConfig.getStreamLoadProps().entrySet()) { + headerMap.put(entry.getKey(), String.valueOf(entry.getValue())); } } + headerMap.put("strip_outer_array", "true"); + headerMap.put("Expect", "100-continue"); + headerMap.put("label", label); + headerMap.put("Content-Type", "application/x-www-form-urlencoded"); + headerMap.put("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); + return headerMap; } - private CloseableHttpClient buildHttpClient() { - final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - return httpClientBuilder.build(); + private Map getLoadStateHttpHeader(String label) { + Map headerMap = new HashMap<>(); + headerMap.put("Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); + headerMap.put("Connection", "close"); + return headerMap; } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index 4a59139b851..cf9c1093f66 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -19,7 +19,9 @@ import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.NODE_URLS; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -51,7 +53,7 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { this.pluginConfig = pluginConfig; - CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS, DATABASE, TABLE); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS, DATABASE, TABLE, USERNAME, PASSWORD); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index 7affd28896b..441af029651 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -67,7 +67,14 @@ public Optional prepareCommit() { @Override public void close() throws IOException { - manager.close(); + try { + if (manager != null) { + manager.close(); + } + } catch (IOException e) { + log.error("Close starRocks manager failed.", e); + throw new IOException("Close starRocks manager failed.", e); + } } public static StarRocksISerializer createSerializer(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java index c5d367e0be0..242924d21df 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java @@ -101,7 +101,6 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { "\"storage_format\" = \"DEFAULT\"" + ")"; - private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" + " BIGINT_COL BIGINT,\n" + " LARGEINT_COL LARGEINT,\n" + @@ -145,6 +144,7 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { ")values(\n" + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + ")"; + private Connection jdbcConnection; private GenericContainer starRocksServer; private static final List TEST_DATASET = generateTestDataSet(); @@ -217,6 +217,7 @@ public void testStarRocksSink(TestContainer container) throws IOException, Inter Assertions.assertEquals(0, execResult.getExitCode()); try { assertHasData(SINK_TABLE); + String sourceSql = String.format("select * from %s.%s", DATABASE, SOURCE_TABLE); String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE); List columnList = Arrays.stream(COLUMN_STRING.split(",")).map(x -> x.trim()).collect(Collectors.toList()); @@ -237,7 +238,6 @@ public void testStarRocksSink(TestContainer container) throws IOException, Inter String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); Assertions.assertEquals(sourceValue, sinkValue); } - Assertions.assertTrue(true); } } } From da505670f9e1ab12e493d5bb8561f305c1382830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 16:36:22 +0800 Subject: [PATCH 4/9] improve StarRocksFlushTuple --- .../starrocks/client/StarRocksFlushTuple.java | 29 +++++-------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java index c70fbb3c90c..66ae222b318 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java @@ -17,32 +17,17 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.client; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + import java.util.List; +@AllArgsConstructor +@Getter +@Setter public class StarRocksFlushTuple { private String label; private Long bytes; private List rows; - - public StarRocksFlushTuple(String label, Long bytes, List rows) { - this.label = label; - this.bytes = bytes; - this.rows = rows; - } - - public String getLabel() { - return label; - } - - public void setLabel(String label) { - this.label = label; - } - - public Long getBytes() { - return bytes; - } - - public List getRows() { - return rows; - } } From de1da503b366ca9bcf7edcc40032b603dcc49aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Fri, 28 Oct 2022 19:58:05 +0800 Subject: [PATCH 5/9] add Changelog --- docs/en/connector-v2/sink/StarRocks.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 841a06d2d16..2131cdc0d5d 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -119,4 +119,10 @@ sink { sink.properties.row_delimiter = "\\x02" } } -``` \ No newline at end of file +``` + +## Changelog + +### next version + +- Add StarRocks Sink Connector \ No newline at end of file From 72b5533ef30c2e6486a2f741b26b45dc776776fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Fri, 4 Nov 2022 18:23:20 +0800 Subject: [PATCH 6/9] merge dev --- .../seatunnel-connector-spark-hbase/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/pom.xml index c5de125c4f5..0ae94a4797b 100644 --- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/pom.xml +++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/pom.xml @@ -1,4 +1,4 @@ - +