Skip to content

Commit

Permalink
[Fix][File] Fix Multi-file with binary format synchronization failed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
taohaozhi1129 authored Jan 17, 2025
1 parent 275db78 commit 6e4ee46
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
}
SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex});
buffer = new byte[1024];
row.setTableId(tableId);
output.collect(row);
partIndex++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public class LocalFileWithMultipleTableIT extends TestSuiteBase {
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
container);

ContainerUtil.copyFileIntoContainers(
"/binary/cat.png",
"/seatunnel/read/binary/name=tyrantlucifer/hobby=coding/cat.png",
container);

container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
};

Expand Down Expand Up @@ -109,4 +114,11 @@ public void testLocalFileReadAndWriteInMultipleTableMode_text(TestContainer cont
TestHelper helper = new TestHelper(container);
helper.execute("/text/local_file_text_to_assert_with_multipletable.conf");
}

@TestTemplate
public void testLocalFileReadAndWriteInMultipleTableMode_binary(TestContainer container)
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
helper.execute("/binary/local_file_binary_to_local_file_binary_with_multipletable.conf");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
LocalFile {
tables_configs = [
{
schema {
table = "cat"
}
path = "/seatunnel/read/binary"
file_format_type = "binary"
},
{
schema {
table = "dog"
}
path = "/seatunnel/read/binary"
file_format_type = "binary"
}

]
}
}
sink {
Assert {
rules {
table-names = ["cat", "dog"]
}
}
}

0 comments on commit 6e4ee46

Please sign in to comment.