Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High frequency writing scene,execute prepareStatement.executeBatch() about 20 times, then it blocks! #1374

Closed
xlvchao opened this issue May 30, 2023 · 11 comments
Labels

Comments

@xlvchao
Copy link

xlvchao commented May 30, 2023

My question is:
High frequency writing scene,execute prepareStatement.executeBatch() about 20 times, then it blocks for a while. The batch is 1000 records.

Jdbc version: 0.3.1

I tried all the versions(from 0.3.1 to 0.4.6), the problem is still there! Please help me!
And it does not work about this way: #991

@xlvchao xlvchao added the bug label May 30, 2023
@zhicwu
Copy link
Contributor

zhicwu commented May 31, 2023

Hi @xlvchao, so you're reusing prepared statement to execute multiple batches, right? I wrote a simple loop but couldn't reproduce the issue.

Could you share more details like table structure and insert query etc.? Moreover, is there any clue in server log / system.query_log or output of show processlist?

Lastly, I want to understand how long it will take for a batch update at your end. In the case that the connection is idle for too long and the network is unstable, you may run into ClickHouse/clickhouse-docs#1178.

@xlvchao
Copy link
Author

xlvchao commented May 31, 2023

@zhicwu

1、DB scripts:
CREATE DATABASE IF NOT EXISTS aiops_local_test;
create table aiops_local_test.aiops_collect_1
(
product String,
service String,
itf String,
accountErrorCode String,
addn String,
aggregateDelay Nullable(Int64),
avgAggregateDelay Nullable(Int64),
avgLatency Nullable(Float64),
destinationId String,
errorDetail String,
fail Nullable(Int64),
ip String,
ipCity String,
ipCountry String,
ipProvince String,
itfGroup String,
latency Nullable(Int64),
nonAggregatedDelay Nullable(Int64),
returnCode String,
sourceId String,
success Nullable(Int64),
total Nullable(Int64),
totalInterfaceLogAggregateDelay Nullable(Int64),
type String,
sysTime DateTime,
time DateTime,
greyTag String,
minLatency Nullable(Int64) default latency,
maxLatency Nullable(Int64) default latency,
sumLatency Nullable(Int64) default latency,
latency_p95 Nullable(Float64) default NULL,
referenceTotal Nullable(Int64) default 0
)
engine = MergeTree()
PARTITION BY toYYYYMMDD(time)
PRIMARY KEY (product, service, itf, addn, time)
ORDER BY (product, service, itf, addn, time)
TTL time + toIntervalMonth(1)
SETTINGS index_granularity = 8192;

2、JDBC demo:
import com.alibaba.fastjson.JSONObject;
import com.clickhouse.jdbc.ClickHouseDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
public class BatchInsertTest {
public static final String USER = "username";
public static final String PASSWORD = "password";
public static final String URL = "jdbc:clickhouse://10.68.237.236:8123?collect_timeout=3600&max_execution_time=3600&socket_timeout=4800000";
public static final String sql = "INSERT INTO aiops_local_test.aiops_collect_1 (product,service,itf,accountErrorCode,addn,aggregateDelay,avgAggregateDelay,avgLatency,destinationId,errorDetail,fail,ip,ipCity,ipCountry,ipProvince,itfGroup,latency,nonAggregatedDelay,returnCode,sourceId,success,total,totalInterfaceLogAggregateDelay,type,sysTime,time,greyTag,minLatency,maxLatency,sumLatency,latency_p95,referenceTotal) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
public static final String[] fields = new String[]{"product","service","itf","accountErrorCode","addn","aggregateDelay","avgAggregateDelay","avgLatency","destinationId","errorDetail","fail","ip","ipCity","ipCountry","ipProvince","itfGroup","latency","nonAggregatedDelay","returnCode","sourceId","success","total","totalInterfaceLogAggregateDelay","type","sysTime","time","greyTag","minLatency","maxLatency","sumLatency","latency_p95","referenceTotal"};

public static final BlockingQueue<List<JSONObject>> commonQueue = new LinkedBlockingQueue<>(3000);

public static ClickHouseDataSource dataSource;
static {
    try {
        dataSource = new ClickHouseDataSource(URL);
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}

public static void main(String[] args) throws Exception {
    //producers
    for (int k = 0; k < 5; k++) {
        new Thread(() -> {
            List<JSONObject> localBuffer = new ArrayList<>();
            for (int i = 1; i <= 100000000; i++) {
                JSONObject jsonObject = JSONObject.parseObject("{\"accountErrorCode\":\"000\",\"addn\":\"no addn\",\"aggregateDelay\":1,\"avgAggregateDelay\":1," +
                        "\"avgLatency\":25.0,\"destinationId\":\"id-234\",\"errorDetail\":\"no error\",\"fail\":0,\"greyTag\":\"121\",\"id\":1,\"ip\":\"127.0.0.1\"," +
                        "\"ipCity\":\"深圳\",\"ipCountry\":\"中国\",\"ipProvince\":\"广东\",\"itf\":\"itf11\",\"itfGroup\":\"itfGroup\",\"latency\":23,\"maxLatency\":1," +
                        "\"minLatency\":1,\"nonAggregatedDelay\":0,\"product\":\"product3\",\"returnCode\":\"000\",\"service\":\"service3\",\"sourceId\":\"id-123\"," +
                        "\"success\":1,\"sumLatency\":1,\"sysTime\":\"2023-05-30 19:00:00\",\"time\":\"2023-05-30 19:00:00\",\"total\":100,\"totalInterfaceLogAggregateDelay\":100," +
                        "\"type\":\"cli\"}\n");
                localBuffer.add(jsonObject);
                if(localBuffer.size() >= 1000) {
                    try {
                        List<JSONObject> deepCopy = deepCopy(localBuffer);
                        localBuffer.clear();
                        commonQueue.put(deepCopy);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }).start();
    }

    //consumers
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                List<JSONObject> data = commonQueue.poll(3, SECONDS);
                if(data != null && data.size() > 0) {
                    flushToClickHouse(data);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

static void flushToClickHouse(List<JSONObject> data) {
    try (Connection conn = dataSource.getConnection(USER, PASSWORD);
         PreparedStatement prepareStatement = conn.prepareStatement(sql)) {

        long s = System.currentTimeMillis();
        conn.setAutoCommit(false);
        prepareParameters(prepareStatement, data);
        prepareStatement.executeBatch(); //Here is the point that caused the problem
        conn.commit();
        long e = System.currentTimeMillis();

        System.out.println(String.format("Success flush data to ClickHouse, elapsed = {}ms, batch size = {}", (e - s), data.size()));

    } catch (Exception e) {
        e.printStackTrace();
        System.err.println("Failed while flush data to ClickHouse! ");
    }
}

static void prepareParameters(PreparedStatement prepareStatement, List<JSONObject> data) throws SQLException {
    for (JSONObject d : data) {
        int idx = 1;
        for (String field : fields) {
            Object value = d.get(field);
            prepareStatement.setObject(idx++, value);
        }
        prepareStatement.addBatch();
    }
}

private static List<JSONObject> deepCopy(List<JSONObject> original) {
    return Collections.unmodifiableList(new ArrayList<>(original));
}

}

3、Problem:Full GC occured frequently, then triggered a series of briefly STW, and this is why my program often block for a little while.

@zhicwu
Copy link
Contributor

zhicwu commented May 31, 2023

Thanks @xlvchao, I'm not sure how this related to the JDBC driver. If you look at heap usage and thread stack, you'll see JVM is occupied by fastjson which I'm afraid has nothing to do with the driver.

By the way, ClickHouse supports JSONEachRow format, which might be of use in your case, for instance:

// 'format RowBinary' is the hint to use streaming mode, you may use different
// format like JSONEachRow as needed
sql = String.format("insert into %s format RowBinary", TABLE_NAME);
try (PreparedStatement ps = conn.prepareStatement(sql)) {
// it's streaming so there's only one parameter(could be one of String, byte[],
// InputStream, File, ClickHouseWriter), and you don't have to process batch by
// batch
ps.setObject(1, new ClickHouseWriter() {
@Override
public void write(ClickHouseOutputStream output) throws IOException {
// this will be executed in a separate thread
for (int i = 0; i < 1_000_000; i++) {
output.writeUnicodeString("a-" + i);
output.writeBoolean(false); // non-null
output.writeUnicodeString("b-" + i);
}
}
});
ps.executeUpdate();
}

@xlvchao
Copy link
Author

xlvchao commented Jun 1, 2023

Thanks @xlvchao, I'm not sure how this related to the JDBC driver. If you look at heap usage and thread stack, you'll see JVM is occupied by fastjson which I'm afraid has nothing to do with the driver.

By the way, ClickHouse supports JSONEachRow format, which might be of use in your case, for instance:

// 'format RowBinary' is the hint to use streaming mode, you may use different
// format like JSONEachRow as needed
sql = String.format("insert into %s format RowBinary", TABLE_NAME);
try (PreparedStatement ps = conn.prepareStatement(sql)) {
// it's streaming so there's only one parameter(could be one of String, byte[],
// InputStream, File, ClickHouseWriter), and you don't have to process batch by
// batch
ps.setObject(1, new ClickHouseWriter() {
@Override
public void write(ClickHouseOutputStream output) throws IOException {
// this will be executed in a separate thread
for (int i = 0; i < 1_000_000; i++) {
output.writeUnicodeString("a-" + i);
output.writeBoolean(false); // non-null
output.writeUnicodeString("b-" + i);
}
}
});
ps.executeUpdate();
}

@zhicwu It seems doesn't work

JDBC version:

0.4.6

DB script is here:

#1374 (comment)

My code:

import com.alibaba.fastjson.JSONObject;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseWriter;
import com.clickhouse.data.format.BinaryStreamUtils;
import com.clickhouse.jdbc.ClickHouseDataSource;
import com.test.xlc.utils.DateTimeUtil;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

public class BatchInsertTest {
    public static final String USER = "default";
    public static final String PASSWORD = "xxx";
    public static final String URL = "jdbc:clickhouse://10.68.237.236:8123?collect_timeout=3600&max_execution_time=3600&socket_timeout=4800000";
    //public static final String sql = "INSERT INTO aiops_local_test.aiops_collect_1 (product,service,itf,accountErrorCode,addn,aggregateDelay,avgAggregateDelay,avgLatency,destinationId,errorDetail,fail,ip,ipCity,ipCountry,ipProvince,itfGroup,latency,nonAggregatedDelay,returnCode,sourceId,success,total,totalInterfaceLogAggregateDelay,type,sysTime,time,greyTag,minLatency,maxLatency,sumLatency,latency_p95,referenceTotal) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    public static final String[] fields = new String[]{"product","service","itf","accountErrorCode","addn","aggregateDelay","avgAggregateDelay","avgLatency","destinationId","errorDetail","fail","ip","ipCity","ipCountry","ipProvince","itfGroup","latency","nonAggregatedDelay","returnCode","sourceId","success","total","totalInterfaceLogAggregateDelay","type","sysTime","time","greyTag","minLatency","maxLatency","sumLatency","latency_p95","referenceTotal"};
    public static Map<String, String> fieldTypeMap;
    public static ClickHouseDataSource dataSource;
    static {
        try {
            dataSource = new ClickHouseDataSource(URL);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        fieldTypeMap = new HashMap<>();
        fieldTypeMap.put("product", "String");
        fieldTypeMap.put("service", "String");
        fieldTypeMap.put("itf", "String");
        fieldTypeMap.put("accountErrorCode", "String");
        fieldTypeMap.put("addn", "String");
        fieldTypeMap.put("aggregateDelay", "Nullable(Int64)");
        fieldTypeMap.put("avgAggregateDelay", "Nullable(Int64)");
        fieldTypeMap.put("avgLatency", "Nullable(Float64)");
        fieldTypeMap.put("destinationId", "String");
        fieldTypeMap.put("errorDetail", "String");
        fieldTypeMap.put("fail", "Nullable(Int64)");
        fieldTypeMap.put("ip", "String");
        fieldTypeMap.put("ipCity", "String");
        fieldTypeMap.put("ipCountry", "String");
        fieldTypeMap.put("ipProvince", "String");
        fieldTypeMap.put("itfGroup", "String");
        fieldTypeMap.put("latency", "Nullable(Int64)");
        fieldTypeMap.put("nonAggregatedDelay", "Nullable(Int64)");
        fieldTypeMap.put("returnCode", "String");
        fieldTypeMap.put("sourceId", "String");
        fieldTypeMap.put("success", "Nullable(Int64)");
        fieldTypeMap.put("total", "Nullable(Int64)");
        fieldTypeMap.put("totalInterfaceLogAggregateDelay", "Nullable(Int64)");
        fieldTypeMap.put("type", "String");
        fieldTypeMap.put("sysTime", "DateTime");
        fieldTypeMap.put("time", "DateTime");
        fieldTypeMap.put("greyTag", "String");
        fieldTypeMap.put("minLatency", "Nullable(Int64)");
        fieldTypeMap.put("maxLatency", "Nullable(Int64)");
        fieldTypeMap.put("sumLatency", "Nullable(Int64)");
        fieldTypeMap.put("latency_p95", "Nullable(Float64)");
        fieldTypeMap.put("referenceTotal", "Nullable(Int64)");
    }

    public static void main(String[] args) throws Exception {
        for (int k = 0; k < 5; k++) {
            new Thread(() -> {
                List<JSONObject> localBuffer = new ArrayList<>();
                for (int i = 1; i <= 100; i++) {
                    JSONObject jsonObject = JSONObject.parseObject("{\"accountErrorCode\":\"000\",\"addn\":\"no addn\",\"aggregateDelay\":1,\"avgAggregateDelay\":1," +
                            "\"avgLatency\":25.0,\"destinationId\":\"id-234\",\"errorDetail\":\"no error\",\"fail\":0,\"greyTag\":\"121\",\"id\":1,\"ip\":\"127.0.0.1\"," +
                            "\"ipCity\":\"shenzhen\",\"ipCountry\":\"china\",\"ipProvince\":\"guangdong\",\"itf\":\"itf11\",\"itfGroup\":\"itfGroup\",\"latency\":23,\"maxLatency\":1," +
                            "\"minLatency\":1,\"nonAggregatedDelay\":0,\"product\":\"product3\",\"returnCode\":\"000\",\"service\":\"service3\",\"sourceId\":\"id-123\"," +
                            "\"success\":1,\"sumLatency\":1,\"sysTime\":\"2023-05-30 19:00:00\",\"time\":\"2023-05-30 19:00:00\",\"total\":100,\"totalInterfaceLogAggregateDelay\":100," +
                            "\"type\":\"cli\"}\n");
                    localBuffer.add(jsonObject);
                    if(localBuffer.size() >= 100) {
                        try {
                            flushToClickHouse(localBuffer);
                            localBuffer.clear();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }).start();
        }
    }

    static void flushToClickHouse(List<JSONObject> dataList) {
        String sql = "insert into aiops_local_test.aiops_collect_1 format RowBinary";
        try (Connection conn = dataSource.getConnection(USER, PASSWORD);
             PreparedStatement prepareStatement = conn.prepareStatement(sql)) {

            long s = System.currentTimeMillis();
            prepareStatement.setObject(1, new ClickHouseWriter() {
                @Override
                public void write(ClickHouseOutputStream output) throws IOException {
                    for (JSONObject object : dataList) {
                        for (String field : fields) {
                            if(object.get(field) != null) {
                                String type = fieldTypeMap.get(field);
                                if("String".equals(type) || "Nullable(String)".equals(type)) {
                                    BinaryStreamUtils.writeString(output, object.getString(field));
                                } else if("Int32".equals(type) || "Nullable(Int32)".equals(type)) {
                                    BinaryStreamUtils.writeInt32(output, object.getIntValue(field));
                                } else if("Int64".equals(type) || "Nullable(Int64)".equals(type)) {
                                    BinaryStreamUtils.writeInt64(output, object.getLongValue(field));
                                } else if("Float32".equals(type) || "Nullable(Float32)".equals(type)) {
                                    BinaryStreamUtils.writeFloat32(output, object.getFloatValue(field));
                                } else if("Float64".equals(type) || "Nullable(Float64)".equals(type)) {
                                    BinaryStreamUtils.writeFloat64(output, object.getDoubleValue(field));
                                } else if("DateTime".equals(type) || "Nullable(DateTime)".equals(type)) {
                                    BinaryStreamUtils.writeDateTime(output, DateTimeUtil.parseLocalDateTime(object.getString(field)), TimeZone.getDefault());
                                }
                            } else {
                                BinaryStreamUtils.writeNull(output);
                            }
                        }
                    }
                }
            });
            prepareStatement.executeUpdate();
            long e = System.currentTimeMillis();
            System.out.println(String.format("Success flush data to ClickHouse, elapsed = {}ms, batch size = {}", (e - s), dataList.size()));
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Failed while flush data to ClickHouse! ");
        }
    }

The Exception:

java.sql.SQLException: Code: 33, e.displayText() = DB::Exception: Cannot read all data. Bytes read: 18987. Bytes expected: 13585893.: (at row 6)
 (version 21.8.10.19 (official build))
, server ClickHouseNode [uri=http://10.68.237.236:8123/default, options={collect_timeout=3600,max_execution_time=3600,socket_timeout=4800000}]@-98731198
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:85)
	at com.clickhouse.jdbc.SqlExceptionUtils.create(SqlExceptionUtils.java:31)
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:90)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeInsert(ClickHouseStatementImpl.java:322)
	at com.clickhouse.jdbc.internal.StreamBasedPreparedStatement.executeAny(StreamBasedPreparedStatement.java:97)
	at com.clickhouse.jdbc.internal.StreamBasedPreparedStatement.executeLargeUpdate(StreamBasedPreparedStatement.java:161)
	at com.clickhouse.jdbc.internal.AbstractPreparedStatement.executeUpdate(AbstractPreparedStatement.java:135)
	at com.test.xlc.clickhouse.BatchInsertTest.flushToClickHouse(BatchInsertTest.java:127)
	at com.test.xlc.clickhouse.BatchInsertTest.lambda$main$0(BatchInsertTest.java:83)
	at java.lang.Thread.run(Thread.java:748)

@zhicwu
Copy link
Contributor

zhicwu commented Jun 1, 2023

Apologies for the confusion. The example I provided was for RowBinary format. JSONEachRow, on the other hand, is a text-based format where you simply need to write a JSON string into the output stream for insertion, with each row on a separate line. For example:

// CREATE TABLE test_insert_with_format(i Int32, s String) ENGINE=Memory
try (PreparedStatement ps = conn
        .prepareStatement(
                "INSERT INTO test_insert_with_format(s,i) format JSONEachRow")) {
    ps.setObject(1, new ClickHouseWriter() {
        @Override
        public void write(ClickHouseOutputStream out) throws IOException {
            // actually line-break is optional
            out.write("{\"i\":2,\"s\":\"22\"}\n".getBytes());
            out.write("{\"i\":5,\"s\":\"55\"}\n".getBytes());
        }
    });
    ps.executeUpdate();
}

If you already have the JSON string from the start, there is no need to use fastjson to deserialize it back into a Java object and then serialize it again for the JDBC driver. You can simply pass the JSON string directly to the driver, as demonstrated above. ClickHouse will handle the rest of the process for you.

@xlvchao
Copy link
Author

xlvchao commented Jun 3, 2023

Apologies for the confusion. The example I provided was for RowBinary format. JSONEachRow, on the other hand, is a text-based format where you simply need to write a JSON string into the output stream for insertion, with each row on a separate line. For example:

// CREATE TABLE test_insert_with_format(i Int32, s String) ENGINE=Memory
try (PreparedStatement ps = conn
        .prepareStatement(
                "INSERT INTO test_insert_with_format(s,i) format JSONEachRow")) {
    ps.setObject(1, new ClickHouseWriter() {
        @Override
        public void write(ClickHouseOutputStream out) throws IOException {
            // actually line-break is optional
            out.write("{\"i\":2,\"s\":\"22\"}\n".getBytes());
            out.write("{\"i\":5,\"s\":\"55\"}\n".getBytes());
        }
    });
    ps.executeUpdate();
}

If you already have the JSON string from the start, there is no need to use fastjson to deserialize it back into a Java object and then serialize it again for the JDBC driver. You can simply pass the JSON string directly to the driver, as demonstrated above. ClickHouse will handle the rest of the process for you.

@zhicwu
I have implemented an efficient writing method based on JSONEachRow, but have encountered a problem.

have conducted multiple tests of high-concurrency writing and added tracking logs in the code for analysis.

The problem is that sometimes the amount of data in the database matches, while sometimes it is missing a few thousand records.

I can ensure that every piece of data from the start of the test program to the data written to out.write() is not lost.

Therefore, I suspect that either our ClickHouse testing environment maybe has some problems, or there is a problem with the underlying driver code.

Or do I need to modify or add some configuration parameters? Currently, I have added connection parameters max_queued_buffers=0 after the JDBC URL.

@zhicwu
Copy link
Contributor

zhicwu commented Jun 4, 2023

sometimes it is missing a few thousand records.

Have you checked written_rows in system.query_log? In the case you're using InputStreamWriter etc., have you called flush() method in the write method?

@xlvchao
Copy link
Author

xlvchao commented Jun 7, 2023

Apologies for the confusion. The example I provided was for RowBinary format. JSONEachRow, on the other hand, is a text-based format where you simply need to write a JSON string into the output stream for insertion, with each row on a separate line. For example:

// CREATE TABLE test_insert_with_format(i Int32, s String) ENGINE=Memory
try (PreparedStatement ps = conn
        .prepareStatement(
                "INSERT INTO test_insert_with_format(s,i) format JSONEachRow")) {
    ps.setObject(1, new ClickHouseWriter() {
        @Override
        public void write(ClickHouseOutputStream out) throws IOException {
            // actually line-break is optional
            out.write("{\"i\":2,\"s\":\"22\"}\n".getBytes());
            out.write("{\"i\":5,\"s\":\"55\"}\n".getBytes());
        }
    });
    ps.executeUpdate();
}

If you already have the JSON string from the start, there is no need to use fastjson to deserialize it back into a Java object and then serialize it again for the JDBC driver. You can simply pass the JSON string directly to the driver, as demonstrated above. ClickHouse will handle the rest of the process for you.

@zhicwu It doesn't seem as fast as expected, and sometimes even slower!

DDL:

create table aiops_local_test.aiops_collect_123
(
    product          String,
    service          String,
    itf              String,
    addn             String,
    destinationId    String,
    errorDetail      String,
    ip               String,
    ipCity           String,
    ipCountry        String,
    ipProvince       String,
    itfGroup         String,
    latency          Int64,
    time             DateTime
)
engine = MergeTree()
PARTITION BY toYYYYMMDD(time)
PRIMARY KEY (product, service, itf, time)
ORDER BY (product, service, itf, time)
TTL time + toIntervalMonth(1)
SETTINGS index_granularity = 8192;

JDBC version:

0.4.6 or 0.3.x

Java demo:

import com.alibaba.fastjson.JSONObject;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseWriter;
import com.clickhouse.jdbc.ClickHouseDataSource;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;

public class BatchInsertTest {
    public static final String user = "default";
    public static final String password = "xxxxxxx";
    public static final String jdbcUrl = "jdbc:clickhouse://10.68.237.236:8123?collect_timeout=3600&max_execution_time=3600&socket_timeout=4800000";


    public static void main(String[] args) throws Exception {
        ClickHouseDataSource dataSource = new ClickHouseDataSource(jdbcUrl);
        String jsonString = "{\"addn\":\"no addn\",\"product\":\"product\",\"ipCity\":\"深圳\",\"ip\":\"127.0.0.1\",\"latency\":30,\"ipCountry\":\"中国\",\"destinationId\":\"id-234\",\"service\":\"service\",\"errorDetail\":\"no error\",\"ipProvince\":\"广东\",\"itf\":\"itf\",\"time\":\"2023-06-01 21:00:00\",\"itfGroup\":\"itfGroup\"}";

        for (int k = 0; k < 3; k++) {
            new Thread(() -> {
                List<String> localBuffer = new ArrayList<>();
                for (int i = 1; i <= 10000; i++) {
                    localBuffer.add(jsonString);
                    if(localBuffer.size() >= 1000) {
                        flushByJsonEachRow(dataSource, localBuffer);
                        localBuffer.clear();
                    }
                }
            }).start();
        }

        Thread.sleep(10000L);

        for (int k = 0; k < 3; k++) {
            new Thread(() -> {
                List<JSONObject> localBuffer = new ArrayList<>();
                for (int i = 1; i <= 10000; i++) {
                    localBuffer.add(JSONObject.parseObject(jsonString));
                    if(localBuffer.size() >= 1000) {
                        flushByStandardBatch(dataSource, localBuffer);
                        localBuffer.clear();
                    }
                }
            }).start();
        }


        Thread.sleep(10000000000000L);
    }

    static void flushByJsonEachRow(ClickHouseDataSource dataSource, List<String> dataList) {
        String sql = "insert into aiops_local_test.aiops_collect_123 format JSONEachRow";
        try (Connection conn = dataSource.getConnection(user, password);
             PreparedStatement prepareStatement = conn.prepareStatement(sql)) {

            prepareStatement.setObject(1, new ClickHouseWriter() {
                @Override
                public void write(ClickHouseOutputStream out) throws IOException {
                    for (String json : dataList) {
                        out.write(json.getBytes());
                    }
                    out.flush();
                }
            });

            long s = System.currentTimeMillis();
            prepareStatement.executeUpdate();
            long e = System.currentTimeMillis();
            System.err.println(String.format("Elapsed = %dms, batch size = %d", (e - s), dataList.size()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static void flushByStandardBatch(ClickHouseDataSource dataSource, List<JSONObject> dataList) {
        String sql = "INSERT INTO aiops_local_test.aiops_collect_123 (product,service,itf,addn,destinationId,errorDetail,ip,ipCity,ipCountry,ipProvince,itfGroup,latency,time) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
        String[] fields = new String[]{"product","service","itf","addn","destinationId","errorDetail","ip","ipCity","ipCountry","ipProvince","itfGroup","latency","time"};
        try (Connection conn = dataSource.getConnection(user, password);
             PreparedStatement prepareStatement = conn.prepareStatement(sql)) {

            for (JSONObject data : dataList) {
                int idx = 1;
                for (String field : fields) {
                    Object value = data.get(field);
                    prepareStatement.setObject(idx++, value);
                }
                prepareStatement.addBatch();
            }

            long s = System.currentTimeMillis();
            prepareStatement.executeBatch();
            long e = System.currentTimeMillis();
            System.out.println(String.format("Elapsed = %dms, batch size = %d", (e - s), dataList.size()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@zhicwu
Copy link
Contributor

zhicwu commented Jun 7, 2023

@xlvchao, do you still see execution being blocked, OOM, or serialization error?

As to performance, you're comparing insertion using different formats(JSONEachRow vs. RowBinary), so it's not surprising that the latter is faster. However, I'd suggest considering the cost of JSON deserialization etc. at client side as well as resource utilization on server in the comparison for better understanding.

Lastly, if you're handling small amount of data(as shown in above example), it's not worthy of using multi-threading with multiple batches. Instead, one insert with all the data you have should be good enough in most cases. Simply put, it's faster to insert 1M rows directly in one go instead of 100 times each just for 10,000 rows.

@xlvchao
Copy link
Author

xlvchao commented Jun 8, 2023

@zhicwu I adjusted the JVM parameters, and then I have never see execution being blocked, OOM, or serialization error.

Our team inserts over 15 billion pieces of data into Clickhouse every day, so if I only consider insertion performance, I should consider using clickhouse-client instead of clickhouse-jdbc. Am I right?

@zhicwu zhicwu added question and removed bug labels Jun 8, 2023
@zhicwu
Copy link
Contributor

zhicwu commented Jun 8, 2023

@zhicwu I adjusted the JVM parameters, and then I have never see execution being blocked, OOM, or serialization error.

Good to know.

Our team inserts over 15 billion pieces of data into Clickhouse every day, so if I only consider insertion performance, I should consider using clickhouse-client instead of clickhouse-jdbc. Am I right?

Yes, in general, clickhouse-client is faster than clickhouse-jdbc, especially when (de)serialization was involved. If you need to process data before ingesting into ClickHouse, there'll be not much difference - see comparison among clickhouse-client, curl, and Java Client(dump or load) in #928. Apart from that, if the source of data are messages from MQ, you probably just need to use the built-in connector in ClickHouse for ingestion.

@xlvchao xlvchao closed this as completed Jun 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants