Skip to content

Commit

Permalink
Merge pull request #599 from twmb/kip-890-proto
Browse files Browse the repository at this point in the history
KIP-890 proto
  • Loading branch information
twmb authored Oct 21, 2023
2 parents ae169a1 + 7e83e62 commit 6a961da
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
container: golang:1.20.3
services:
kafka:
image: bitnami/kafka:latest
image: bitnami/kafka:3.5
ports:
- 9092:9092
env:
Expand Down
7 changes: 3 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# We may as well allow multiple golangci-lint invocations at once.
run:
allow-parallel-runners: true
go: "1.19"
go: "1.21"

# golangci-lint by default ignores some staticcheck and vet raised issues that
# are actually important to catch. The following ensures that we do not ignore
Expand Down Expand Up @@ -68,15 +68,14 @@ linters-settings:
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.19"
lang-version: "1.21"
extra-rules: true

gosec:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
- G404 # we want math/rand


# Gocritic is a meta linter that has very good lints, and most of the
# experimental ones are very good too. We opt into everything, which helps
# us when we upgrade golangci-lint, and we specifically opt out of a batch.
Expand Down Expand Up @@ -164,5 +163,5 @@ linters-settings:
# contexts for beneficial reasons, and we disable the SSLv3 deprecation
# warning because this is solely for a debug log.
staticcheck:
go: "1.19"
go: "1.21"
checks: ["all", "-SA1012", "-SA1019"]
44 changes: 38 additions & 6 deletions generate/definitions/24_add_partitions_to_txn
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,60 @@
// partitions in the request. Before producing any records to a partition in
// the transaction, that partition must have been added to the transaction with
// this request.
AddPartitionsToTxnRequest => key 24, max version 3, flexible v3+, txn coordinator
//
// Versions 3 and below are exclusively used by clients and versions 4 and
// above are used by brokers.
//
// Version 4 adds VerifyOnly field to check if partitions are already in
// transaction and adds support to batch multiple transactions.
AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator
// TransactionalID is the transactional ID to use for this request.
TransactionalID: string
TransactionalID: string // v0-v3
// ProducerID is the producer ID of the client for this transactional ID
// as received from InitProducerID.
ProducerID: int64
ProducerID: int64 // v0-v3
// ProducerEpoch is the producer epoch of the client for this transactional ID
// as received from InitProducerID.
ProducerEpoch: int16
ProducerEpoch: int16 // v0-v3
// Topics are topics to add as part of the producer side of a transaction.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic name.
Topic: string
// Partitions are partitions within a topic to add as part of the producer
// side of a transaction.
Partitions: [int32]
// The list of transactions to add partitions to, for v4+, for brokers only.
// The fields in this are batch broker requests that duplicate the above fields
// and thus are undocumented (except VerifyOnly, which is new).
Transactions: [=>] // v4+
TransactionalID: string
ProducerID: int64
ProducerEpoch: int16
// VerifyOnly signifies if we want to check if the partition is in the
// transaction rather than add it.
VerifyOnly: bool
Topics: [=>]
Topic: string
Partitions: [int32]

// AddPartitionsToTxnResponse is a response to an AddPartitionsToTxnRequest.
AddPartitionsToTxnResponse =>
ThrottleMillis(1)
// The response top level error code.
ErrorCode: int16 // v4+
// Results categorized by transactional ID, v4+ only, for brokers only.
// The fields duplicate v3 and below fields (except TransactionalID) and
// are left undocumented.
Transactions: [=>] // v4+
// The transactional id corresponding to the transaction.
TransactionalID: string
Topics: [=>]
Topic: string
Partitions: [=>]
Partition: int32
ErrorCode: int16
// Topics are responses to topics in the request.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic being responded to.
Topic: string
// Partitions are responses to partitions in the request.
Expand Down
3 changes: 1 addition & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func (o Offset) String() string {
return fmt.Sprintf("{%d e%d ce%d}", o.at, o.epoch, o.currentEpoch)
} else if o.relative > 0 {
return fmt.Sprintf("{%d+%d e%d ce%d}", o.at, o.relative, o.epoch, o.currentEpoch)
} else {
return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch)
}
return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch)
}

// EpochOffset returns this offset as an EpochOffset, allowing visibility into
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func testLogger() Logger {
num := loggerNum.Add(1)
pfx := strconv.Itoa(int(num))
return BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().Format("[15:04:05 ") + pfx + "]"
return time.Now().UTC().Format("[15:04:05.999 ") + pfx + "]"
})
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/kgo/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,10 @@ func (r *ringBatchPromise) dropPeek() (next batchPromise, more bool) {
return next, false
}
return r.overflow[0], true
} else {
r.overflow = r.overflow[1:]
if len(r.overflow) > 0 {
return r.overflow[0], true
}
return next, false
}
r.overflow = r.overflow[1:]
if len(r.overflow) > 0 {
return r.overflow[0], true
}
return next, false
}
4 changes: 3 additions & 1 deletion pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func TestTxnEtl(t *testing.T) {
go func() {
cl, err := NewClient(
getSeedBrokers(),
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().UTC().Format("15:04:05.999") + " "
})),
TransactionalID("p"+randsha()),
TransactionTimeout(2*time.Minute),
MaxBufferedRecords(10000),
Expand Down
Loading

0 comments on commit 6a961da

Please sign in to comment.