Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using Flink sinking clickhouse .some error -- java.lang.IllegalArgumentException: Only singleton array is allowed, but we got: ["E5", "E6"] #896

Closed
Aload opened this issue Apr 14, 2022 · 8 comments · Fixed by #932
Labels

Comments

@Aload
Copy link

Aload commented Apr 14, 2022

When I use v0.3.2 ,
val array = pst.getConnection
.createArrayOf(ClickHouseDataType.String.name(),
ClickHouseArrayValue.of(intoMsg.withArrayJsonNode.elements().asScala.toArray).asArray())
pst.setArray(20, array) //告警
pst.setInt(21, intoMsg.get("isInterpolate").asInt) //是否插补(0正常||1插补)


   default ClickHouseArray createArrayOf(String typeName, Object[] elements) throws SQLException {
        ClickHouseConfig config = this.getConfig();
        ClickHouseColumn column = ClickHouseColumn.of("", ClickHouseDataType.Array, false, new ClickHouseColumn[]{ClickHouseColumn.of("", typeName)});
        ClickHouseValue v = ClickHouseValues.newValue(config, column).update(elements);
        ClickHouseResultSet rs = new ClickHouseResultSet("", "", this.createStatement(), ClickHouseSimpleResponse.of(config, Collections.singletonList(column), new Object[][]{{v.asObject()}}));
        rs.next();
        return new ClickHouseArray(rs, 1);
    }

    default ClickHouseValue update(Object[] value) {
        if (value != null && value.length != 0) {
            if (value.length != 1) {
                throw new IllegalArgumentException("Only singleton array is allowed, but we got: " + Arrays.toString(value));
            } else {
                return this.update(value[0]);
            }
        } else {
            return this.resetToNullOrEmpty();
        }
    }

this update must size=0?

Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:179)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:229)
... 11 more
Caused by: java.lang.IllegalArgumentException: Only singleton array is allowed, but we got: ["E5", "E6"]
at com.clickhouse.client.ClickHouseValue.update(ClickHouseValue.java:1148)
at com.clickhouse.jdbc.ClickHouseConnection.createArrayOf(ClickHouseConnection.java:40)
at com.clickhouse.jdbc.ClickHouseConnection.createArrayOf(ClickHouseConnection.java:23)
at com.anso.process.function.JdbcCkStatementBuilder.accept(JdbcCkStatementBuilder.scala:55)
at com.anso.process.function.JdbcCkStatementBuilder.accept(JdbcCkStatementBuilder.scala:21)
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:213)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:183)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

@Aload
Copy link
Author

Aload commented Apr 14, 2022

when i use v0.3.2-patch7.A different set of errors resurfaced exg:

Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:179)
at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.snapshotState(GenericJdbcSinkFunction.java:62)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 23 more
Caused by: java.lang.NullPointerException
at com.clickhouse.client.data.BinaryStreamUtils.writeString(BinaryStreamUtils.java:1661)
at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.lambda$buildMappingsForDataTypes$65(ClickHouseRowBinaryProcessor.java:338)
at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.serialize(ClickHouseRowBinaryProcessor.java:485)
at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:295)
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:213)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:183)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

@zhicwu
Copy link
Contributor

zhicwu commented Apr 14, 2022

Hi @Aload, the issue happens when you're trying to update a non-array column using an array, for example: trying to update a String column using new String[] {"E5", "E6"}.

Would you mind to share the table structure(especially the column you failed to update using array)? Also it would be very helpful if you can provide code snippet for reproducing the issue.

Update: please refer to test case like this if you need to deal with array, or maybe this for complex data type(e.g. array of map etc.).

@Aload
Copy link
Author

Aload commented Apr 14, 2022

get 。but when I coded in Scala, I couldn't achieve the desired effect .
pst.setArray(3, pst.getConnection.createArrayOf("String", Array[String]("3", null, "1")))

@Aload
Copy link
Author

Aload commented Apr 14, 2022

Hi @Aload, the issue happens when you're trying to update a non-array column using an array, for example: trying to update a String column using new String[] {"E5", "E6"}.

Would you mind to share the table structure(especially the column you failed to update using array)? Also it would be very helpful if you can provide code snippet for reproducing the issue.

Update: please refer to test case like this if you need to deal with array, or maybe this for complex data type(e.g. array of map etc.).

When I coded in Scala, I couldn't achieve the desired effect
The same problem will still occur when I change it to what you said.
pst.setArray(20, pst.getConnection().createArrayOf("String", new String[]{"3", null, "1"}));

Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:179)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:229)
... 11 more
Caused by: java.lang.IllegalArgumentException: Only singleton array is allowed, but we got: [3, null, 1]
at com.clickhouse.client.ClickHouseValue.update(ClickHouseValue.java:1099)
at com.clickhouse.jdbc.ClickHouseConnection.createArrayOf(ClickHouseConnection.java:40)
at com.clickhouse.jdbc.ClickHouseConnection.createArrayOf(ClickHouseConnection.java:23)
at com.anso.process.function.JdbcCkStatementBuilders.accept(JdbcCkStatementBuilders.java:37)
at com.anso.process.function.JdbcCkStatementBuilders.accept(JdbcCkStatementBuilders.java:14)
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:213)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:183)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

@zhicwu
Copy link
Contributor

zhicwu commented Apr 15, 2022

image

The exception is irrelevant to your initial question or array. It looks like you passed null to a non-nullable String column.

https://github.com/ClickHouse/clickhouse-jdbc/blob/058596195ce1dc24fed2cf20a669459c8eec43d0/clickhouse-client/src/main/java/com/clickhouse/client/data/BinaryStreamUtils.java#L1661

@mafiore
Copy link

mafiore commented May 11, 2022

I run into the same error since version 0.3.2-patch6.

Up to version 0.3.2-patch5 everything works fine.

I poked around an bit and found out that the jdbc-driver has problems with empty strings.
It seems that it handles empty strings like null values.

Before patch6 everything worked fine. This behavior still exists in patch9

@zhicwu zhicwu linked a pull request May 11, 2022 that will close this issue
zhicwu added a commit that referenced this issue May 11, 2022
@zhicwu
Copy link
Contributor

zhicwu commented Jun 26, 2022

nullAsDefault option was added in 0.3.2-patch10 - see comments at here.

@zhicwu zhicwu closed this as completed Jun 26, 2022
@Aload
Copy link
Author

Aload commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants