diff --git a/bulkerapp/app/batch_consumer.go b/bulkerapp/app/batch_consumer.go index 1cb0488..cf3ef6c 100644 --- a/bulkerapp/app/batch_consumer.go +++ b/bulkerapp/app/batch_consumer.go @@ -8,7 +8,6 @@ import ( "github.com/jitsucom/bulker/bulkerapp/metrics" bulker "github.com/jitsucom/bulker/bulkerlib" "github.com/jitsucom/bulker/bulkerlib/types" - "github.com/jitsucom/bulker/jitsubase/logging" "github.com/jitsucom/bulker/jitsubase/timestamp" jsoniter "github.com/json-iterator/go" "strconv" @@ -52,7 +51,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum counters = cnts if err2 != nil { bc.errorMetric("PROCESS_FAILED_ERROR") - logging.SystemError(err2) + bc.SystemErrorf(err2.Error()) } else if cnts.failed > 1 { // if we fail right on the first message - that probably means connection problems. No need to move further. // otherwise we can try to consume next batch @@ -204,8 +203,8 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition, if err != nil { //cleanup _ = producer.AbortTransaction(context.Background()) - _, err = bc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition}) - if err != nil { + _, err2 := bc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition}) + if err2 != nil { bc.errorMetric("SEEK_ERROR") } }