Skip to content

Commit

Permalink
bulker: fix possible hiding of error in batch consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Oct 22, 2023
1 parent b0ae986 commit 407daaf
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/jitsucom/bulker/bulkerapp/metrics"
bulker "github.com/jitsucom/bulker/bulkerlib"
"github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/timestamp"
jsoniter "github.com/json-iterator/go"
"strconv"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum
counters = cnts
if err2 != nil {
bc.errorMetric("PROCESS_FAILED_ERROR")
logging.SystemError(err2)
bc.SystemErrorf(err2.Error())
} else if cnts.failed > 1 {
// if we fail right on the first message - that probably means connection problems. No need to move further.
// otherwise we can try to consume next batch
Expand Down Expand Up @@ -204,8 +203,8 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition,
if err != nil {
//cleanup
_ = producer.AbortTransaction(context.Background())
_, err = bc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition})
if err != nil {
_, err2 := bc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition})
if err2 != nil {
bc.errorMetric("SEEK_ERROR")
}
}
Expand Down

0 comments on commit 407daaf

Please sign in to comment.