From 6e4ee468a5a37a78c70e948b18dcbf18ce06cdeb Mon Sep 17 00:00:00 2001 From: THZ <57392019+taohaozhi1129@users.noreply.github.com> Date: Fri, 17 Jan 2025 20:33:20 +0800 Subject: [PATCH] [Fix][File] Fix Multi-file with binary format synchronization failed (#8546) --- .../source/reader/BinaryReadStrategy.java | 1 + .../local/LocalFileWithMultipleTableIT.java | 12 +++++ ..._local_file_binary_with_multipletable.conf | 50 +++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary_with_multipletable.conf diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java index 7849415b32d..66f1a262834 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java @@ -77,6 +77,7 @@ public void read(String path, String tableId, Collector output) } SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex}); buffer = new byte[1024]; + row.setTableId(tableId); output.collect(row); partIndex++; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java index 4c63b7e3357..5303d4a6629 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java @@ -65,6 +65,11 @@ public class LocalFileWithMultipleTableIT extends TestSuiteBase { "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", container); + ContainerUtil.copyFileIntoContainers( + "/binary/cat.png", + "/seatunnel/read/binary/name=tyrantlucifer/hobby=coding/cat.png", + container); + container.execInContainer("mkdir", "-p", "/tmp/fake_empty"); }; @@ -109,4 +114,11 @@ public void testLocalFileReadAndWriteInMultipleTableMode_text(TestContainer cont TestHelper helper = new TestHelper(container); helper.execute("/text/local_file_text_to_assert_with_multipletable.conf"); } + + @TestTemplate + public void testLocalFileReadAndWriteInMultipleTableMode_binary(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); + helper.execute("/binary/local_file_binary_to_local_file_binary_with_multipletable.conf"); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary_with_multipletable.conf new file mode 100644 index 00000000000..c97dd83340f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary_with_multipletable.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + tables_configs = [ + { + schema { + table = "cat" + } + path = "/seatunnel/read/binary" + file_format_type = "binary" + }, + { + schema { + table = "dog" + } + path = "/seatunnel/read/binary" + file_format_type = "binary" + } + + ] + } +} +sink { + Assert { + rules { + table-names = ["cat", "dog"] + } + } +} \ No newline at end of file