Skip to content

Commit

Permalink
bulker: improved logging
Browse files Browse the repository at this point in the history
all projects: upgraded to go 1.22
  • Loading branch information
absorbb committed Mar 21, 2024
1 parent f65ddf6 commit 876c7e2
Show file tree
Hide file tree
Showing 21 changed files with 347 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21.5
go-version: 1.22.1
cache: false

- name: Bulker Test
Expand Down
2 changes: 1 addition & 1 deletion admin/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/admin

go 1.21
go 1.22

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
Expand Down
1 change: 1 addition & 0 deletions bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
bc.Debugf("Consumer should not consume. offsets: %d-%d", lowOffset, highOffset)
return BatchCounters{}, nil
}
bc.Infof("Starting consuming messages from topic. Messages in topic: ~%d. ", highOffset-lowOffset)
batchNumber := 1
for {
if bc.retired.Load() {
Expand Down
1 change: 1 addition & 0 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodS
}

func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum, batchSize, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error) {
bc.Infof("Processing batch #%d", batchNum)
counters.firstOffset = int64(kafka.OffsetBeginning)
startTime := time.Now()
var bulkerStream bulker.BulkerStream
Expand Down
2 changes: 1 addition & 1 deletion bulkerapp/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/bulkerapp

go 1.21
go 1.22

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/bulkerlib

go 1.21
go 1.22

require (
cloud.google.com/go v0.112.0
Expand Down
19 changes: 18 additions & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type AbstractFileStorageStream struct {

state bulker.State
inited bool

startTime time.Time
}

func newAbstractFileStorageStream(id string, p implementations2.FileAdapter, filenameFunc func(ctx context.Context) string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (AbstractFileStorageStream, error) {
Expand All @@ -66,6 +68,7 @@ func newAbstractFileStorageStream(id string, p implementations2.FileAdapter, fil
}
ps.csvHeader = utils.NewSet[string]()
ps.state = bulker.State{Status: bulker.Active}
ps.startTime = time.Now()
return ps, nil
}

Expand Down Expand Up @@ -130,6 +133,8 @@ func (ps *AbstractFileStorageStream) postComplete(err error) (bulker.State, erro
ps.state.SetError(err)
ps.state.Status = bulker.Failed
} else {
sec := time.Since(ps.startTime).Seconds()
logging.Infof("[%s] Stream completed successfully in %.2f s. Avg Speed: %.2f events/sec.", ps.id, sec, float64(ps.state.SuccessfulRows)/sec)
ps.state.Status = bulker.Completed
}
return ps.state, err
Expand All @@ -154,6 +159,13 @@ func (ps *AbstractFileStorageStream) flushBatchFile(ctx context.Context) (err er
if err != nil {
return errorj.Decorate(err, "failed to sync batch file")
}
stat, _ := ps.batchFile.Stat()
var batchSizeMb float64
if stat != nil {
batchSizeMb = float64(stat.Size()) / 1024 / 1024
sec := time.Since(ps.startTime).Seconds()
logging.Infof("[%s] Flushed %d events to batch file. Size: %.2f mb in %.2f s. Speed: %.2f mb/s", ps.id, ps.eventsInBatch, batchSizeMb, sec, batchSizeMb/sec)
}
workingFile := ps.batchFile
needToConvert := false
convertStart := time.Now()
Expand Down Expand Up @@ -215,7 +227,12 @@ func (ps *AbstractFileStorageStream) flushBatchFile(ctx context.Context) (err er
workingFile.Sync()
}
if needToConvert {
logging.Infof("[%s] Converted batch file from %s to %s in %s", ps.id, ps.marshaller.Format(), ps.targetMarshaller.Format(), time.Now().Sub(convertStart))
stat, _ = workingFile.Stat()
var convertedSizeMb float64
if stat != nil {
convertedSizeMb = float64(stat.Size()) / 1024 / 1024
}
logging.Infof("[%s] Converted batch file from %s (%.2f mb) to %s (%.2f mb) in %.2f s.", ps.id, ps.marshaller.FileExtension(), batchSizeMb, ps.targetMarshaller.FileExtension(), convertedSizeMb, time.Since(convertStart).Seconds())
}
//create file reader for workingFile
_, err = workingFile.Seek(0, 0)
Expand Down
4 changes: 4 additions & 0 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/utils"
jsoniter "github.com/json-iterator/go"
"time"
)

// TODO: check whether COPY is transactional ?
Expand All @@ -31,6 +32,8 @@ type AbstractSQLStream struct {
customTypes types.SQLTypes
pkColumns []string
timestampColumn string

startTime time.Time
}

func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (*AbstractSQLStream, error) {
Expand All @@ -56,6 +59,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
//TODO: max column?
ps.state = bulker.State{Status: bulker.Active}
ps.customTypes = customFields
ps.startTime = time.Now()
return &ps, nil
}

Expand Down
16 changes: 15 additions & 1 deletion bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (ps *AbstractTransactionalSQLStream) postComplete(ctx context.Context, err
_ = ps.tx.Rollback()
}
} else {
sec := time.Since(ps.startTime).Seconds()
logging.Infof("[%s] Stream completed successfully in %.2f s. Avg Speed: %.2f events/sec.", ps.id, sec, float64(ps.state.SuccessfulRows)/sec)
if ps.tx != nil {
if ps.tmpTable != nil {
_ = ps.tx.Drop(ctx, ps.tmpTable, true)
Expand Down Expand Up @@ -141,6 +143,13 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
if err != nil {
return nil, errorj.Decorate(err, "failed to sync batch file")
}
stat, _ := ps.batchFile.Stat()
var batchSizeMb float64
if stat != nil {
batchSizeMb = float64(stat.Size()) / 1024 / 1024
sec := time.Since(ps.startTime).Seconds()
logging.Infof("[%s] Flushed %d events to batch file. Size: %.2f mb in %.2f s. Speed: %.2f mb/s", ps.id, ps.eventsInBatch, batchSizeMb, sec, batchSizeMb/sec)
}
workingFile := ps.batchFile
needToConvert := false
convertStart := time.Now()
Expand Down Expand Up @@ -202,7 +211,12 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
workingFile.Sync()
}
if needToConvert {
logging.Infof("[%s] Converted batch file from %s(%s) to %s(%s) in %s", ps.id, ps.marshaller.Format(), ps.marshaller.Compression(), ps.targetMarshaller.Format(), ps.targetMarshaller.Compression(), time.Now().Sub(convertStart))
stat, _ = workingFile.Stat()
var convertedSizeMb float64
if stat != nil {
convertedSizeMb = float64(stat.Size()) / 1024 / 1024
}
logging.Infof("[%s] Converted batch file from %s (%.2f mb) to %s (%.2f mb) in %.2f s.", ps.id, ps.marshaller.FileExtension(), batchSizeMb, ps.targetMarshaller.FileExtension(), convertedSizeMb, time.Since(convertStart).Seconds())
}
if ps.s3 != nil {
s3Config := s3BatchFileOption.Get(&ps.options)
Expand Down
3 changes: 2 additions & 1 deletion bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription
defer func() {
bq.logQuery(jobDescription, runner, err)
}()
startTime := time.Now()
state = &bulker.WarehouseState{}
var status *bigquery.JobStatus
var jobID string
Expand Down Expand Up @@ -1109,7 +1110,7 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription
}
return job, state, fmt.Errorf("Failed to %s.%s Completed with error: %v%s", jobDescription, jobID, err, bytesProcessed)
} else {
bq.Infof("Successfully %s.%s%s", jobDescription, jobID, bytesProcessed)
bq.Infof("Successfully %s.%s%s in %.2f s.", jobDescription, jobID, bytesProcessed, time.Since(startTime).Seconds())
return job, state, nil
}
}
2 changes: 1 addition & 1 deletion connectors/airbytecdk/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/jitsucom/bulker/airbytecdk

go 1.21
go 1.22
2 changes: 1 addition & 1 deletion connectors/firebase/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/connectors/firebase

go 1.21
go 1.22

require (
cloud.google.com/go/firestore v1.14.0
Expand Down
2 changes: 1 addition & 1 deletion eventslog/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/eventslog

go 1.21
go 1.22

require (
github.com/ClickHouse/clickhouse-go/v2 v2.19.0
Expand Down
2 changes: 1 addition & 1 deletion go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.21
go 1.22

use (
./bulkerapp
Expand Down
Loading

0 comments on commit 876c7e2

Please sign in to comment.