Skip to content

Commit

Permalink
bulker: improved logging
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Mar 21, 2024
1 parent 0b1b5a2 commit 1257d44
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 1 deletion.
1 change: 1 addition & 0 deletions bulkerlib/implementations/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var folderMacro = map[string]func() string{

type FileAdapter interface {
io.Closer
Type() string
UploadBytes(fileName string, fileBytes []byte) error
Upload(fileName string, fileReader io.ReadSeeker) error
Download(fileName string) ([]byte, error)
Expand Down
3 changes: 3 additions & 0 deletions bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,12 @@ func (ps *AbstractFileStorageStream) flushBatchFile(ctx context.Context) (err er
ps.state.Representation = map[string]string{
"name": ps.fileAdapter.Path(fileName),
}
loadTime := time.Now()
err = ps.fileAdapter.Upload(fileName, workingFile)
if err != nil {
return errorj.Decorate(err, "failed to flush tmp file to the warehouse")
} else {
logging.Infof("[%s] Batch file loaded to %s in %.2f s.", ps.id, ps.fileAdapter.Type(), time.Since(loadTime).Seconds())
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions bulkerlib/implementations/file_storage/gcs_bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ func (gcs *GCSBulker) CreateStream(id, tableName string, mode bulker.BulkMode, s
}
return nil, fmt.Errorf("unsupported bulk mode: %s", mode)
}

func (gcs *GCSBulker) Type() string {
return GCSBulkerTypeId
}
4 changes: 4 additions & 0 deletions bulkerlib/implementations/file_storage/s3_bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func (s3 *S3Bulker) CreateStream(id, tableName string, mode bulker.BulkMode, str
}
return nil, fmt.Errorf("unsupported bulk mode: %s", mode)
}

func (s3 *S3Bulker) Type() string {
return S3BulkerTypeId
}
7 changes: 7 additions & 0 deletions bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
}
logging.Infof("[%s] Converted batch file from %s (%.2f mb) to %s (%.2f mb) in %.2f s.", ps.id, ps.marshaller.FileExtension(), batchSizeMb, ps.targetMarshaller.FileExtension(), convertedSizeMb, time.Since(convertStart).Seconds())
}
loadTime := time.Now()
if ps.s3 != nil {
s3Config := s3BatchFileOption.Get(&ps.options)
rFile, err := os.Open(workingFile.Name())
Expand All @@ -233,14 +234,20 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
return nil, errorj.Decorate(err, "failed to upload file to s3")
}
defer ps.s3.DeleteObject(s3FileName)
logging.Infof("[%s] Batch file uploaded to s3 in %.2f s.", ps.id, time.Since(loadTime).Seconds())
loadTime = time.Now()
state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: AmazonS3, Path: s3FileName, Format: ps.sqlAdapter.GetBatchFileFormat(), S3Config: s3Config})
if err != nil {
return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse")
} else {
logging.Infof("[%s] Batch file loaded to %s in %.2f s.", ps.id, ps.sqlAdapter.Type(), time.Since(loadTime).Seconds())
}
} else {
state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: LocalFile, Path: workingFile.Name(), Format: ps.sqlAdapter.GetBatchFileFormat()})
if err != nil {
return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse")
} else {
logging.Infof("[%s] Batch file loaded to %s in %.2f s.", ps.id, ps.sqlAdapter.Type(), time.Since(loadTime).Seconds())
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion bulkerlib/types/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (jm *JSONMarshaller) Marshal(object ...Object) error {
return err
}
_, err = jm.writer.Write([]byte("\n"))
return err
if err != nil {
return err
}
}
return nil
}
Expand Down

0 comments on commit 1257d44

Please sign in to comment.