Skip to content

Commit

Permalink
bulker: file_storage: fix for replace table and empty file
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 3, 2024
1 parent e9480ae commit f791dbe
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
6 changes: 4 additions & 2 deletions bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ func (ps *AbstractFileStorageStream) postConsume(err error) error {
}

func (ps *AbstractFileStorageStream) postComplete(err error) (bulker.State, error) {
_ = ps.batchFile.Close()
_ = os.Remove(ps.batchFile.Name())
if ps.batchFile != nil {
_ = ps.batchFile.Close()
_ = os.Remove(ps.batchFile.Name())
}
if err != nil {
ps.state.SetError(err)
ps.state.Status = bulker.Failed
Expand Down
11 changes: 10 additions & 1 deletion bulkerlib/implementations/file_storage/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ func TestBasics(t *testing.T) {
configIds: []string{implementations.S3BulkerTypeId + "_flat"},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), bulker.WithDiscriminatorField([]string{"nested", "int1"})},
},
{
name: "empty",
modes: []bulker.BulkMode{bulker.ReplaceTable, bulker.ReplacePartition},
dataFile: "test_data/empty.ndjson",
expectedRowsCount: 0,
expectPartitionId: true,
configIds: allBulkerConfigs,
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -402,11 +410,12 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
//PostStep("state_lasterror", testConfig, mode, reqr, state.LastError)
if testConfig.expectedRowsCount != nil || testConfig.expectedRows != nil {
time.Sleep(1 * time.Second)
expectedRowCount := utils.Ternary(testConfig.expectedRows != nil, any(len(testConfig.expectedRows)), testConfig.expectedRowsCount).(int)
//Check rows count and rows data when provided
rowBytes, err := fileAdapter.Download(expectedFileName)
rows := []map[string]any{}
var reader io.Reader
if fileAdapter.Compression() == types.FileCompressionGZIP {
if expectedRowCount > 0 && fileAdapter.Compression() == types.FileCompressionGZIP {
reader, _ = gzip.NewReader(bytes.NewReader(rowBytes))
} else {
reader = bytes.NewReader(rowBytes)
Expand Down
Empty file.

0 comments on commit f791dbe

Please sign in to comment.