Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-890 proto #599

Merged
merged 4 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
container: golang:1.20.3
services:
kafka:
image: bitnami/kafka:latest
image: bitnami/kafka:3.5
ports:
- 9092:9092
env:
Expand Down
7 changes: 3 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# We may as well allow multiple golangci-lint invocations at once.
run:
allow-parallel-runners: true
go: "1.19"
go: "1.21"

# golangci-lint by default ignores some staticcheck and vet raised issues that
# are actually important to catch. The following ensures that we do not ignore
Expand Down Expand Up @@ -68,15 +68,14 @@ linters-settings:
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.19"
lang-version: "1.21"
extra-rules: true

gosec:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
- G404 # we want math/rand


# Gocritic is a meta linter that has very good lints, and most of the
# experimental ones are very good too. We opt into everything, which helps
# us when we upgrade golangci-lint, and we specifically opt out of a batch.
Expand Down Expand Up @@ -164,5 +163,5 @@ linters-settings:
# contexts for beneficial reasons, and we disable the SSLv3 deprecation
# warning because this is solely for a debug log.
staticcheck:
go: "1.19"
go: "1.21"
checks: ["all", "-SA1012", "-SA1019"]
44 changes: 38 additions & 6 deletions generate/definitions/24_add_partitions_to_txn
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,60 @@
// partitions in the request. Before producing any records to a partition in
// the transaction, that partition must have been added to the transaction with
// this request.
AddPartitionsToTxnRequest => key 24, max version 3, flexible v3+, txn coordinator
//
// Versions 3 and below are exclusively used by clients and versions 4 and
// above are used by brokers.
//
// Version 4 adds VerifyOnly field to check if partitions are already in
// transaction and adds support to batch multiple transactions.
AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator
// TransactionalID is the transactional ID to use for this request.
TransactionalID: string
TransactionalID: string // v0-v3
// ProducerID is the producer ID of the client for this transactional ID
// as received from InitProducerID.
ProducerID: int64
ProducerID: int64 // v0-v3
// ProducerEpoch is the producer epoch of the client for this transactional ID
// as received from InitProducerID.
ProducerEpoch: int16
ProducerEpoch: int16 // v0-v3
// Topics are topics to add as part of the producer side of a transaction.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic name.
Topic: string
// Partitions are partitions within a topic to add as part of the producer
// side of a transaction.
Partitions: [int32]
// The list of transactions to add partitions to, for v4+, for brokers only.
// The fields in this are batch broker requests that duplicate the above fields
// and thus are undocumented (except VerifyOnly, which is new).
Transactions: [=>] // v4+
TransactionalID: string
ProducerID: int64
ProducerEpoch: int16
// VerifyOnly signifies if we want to check if the partition is in the
// transaction rather than add it.
VerifyOnly: bool
Topics: [=>]
Topic: string
Partitions: [int32]

// AddPartitionsToTxnResponse is a response to an AddPartitionsToTxnRequest.
AddPartitionsToTxnResponse =>
ThrottleMillis(1)
// The response top level error code.
ErrorCode: int16 // v4+
// Results categorized by transactional ID, v4+ only, for brokers only.
// The fields duplicate v3 and below fields (except TransactionalID) and
// are left undocumented.
Transactions: [=>] // v4+
// The transactional id corresponding to the transaction.
TransactionalID: string
Topics: [=>]
Topic: string
Partitions: [=>]
Partition: int32
ErrorCode: int16
// Topics are responses to topics in the request.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic being responded to.
Topic: string
// Partitions are responses to partitions in the request.
Expand Down
3 changes: 1 addition & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func (o Offset) String() string {
return fmt.Sprintf("{%d e%d ce%d}", o.at, o.epoch, o.currentEpoch)
} else if o.relative > 0 {
return fmt.Sprintf("{%d+%d e%d ce%d}", o.at, o.relative, o.epoch, o.currentEpoch)
} else {
return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch)
}
return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch)
}

// EpochOffset returns this offset as an EpochOffset, allowing visibility into
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func testLogger() Logger {
num := loggerNum.Add(1)
pfx := strconv.Itoa(int(num))
return BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().Format("[15:04:05 ") + pfx + "]"
return time.Now().UTC().Format("[15:04:05.999 ") + pfx + "]"
})
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/kgo/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,10 @@ func (r *ringBatchPromise) dropPeek() (next batchPromise, more bool) {
return next, false
}
return r.overflow[0], true
} else {
r.overflow = r.overflow[1:]
if len(r.overflow) > 0 {
return r.overflow[0], true
}
return next, false
}
r.overflow = r.overflow[1:]
if len(r.overflow) > 0 {
return r.overflow[0], true
}
return next, false
}
4 changes: 3 additions & 1 deletion pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func TestTxnEtl(t *testing.T) {
go func() {
cl, err := NewClient(
getSeedBrokers(),
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().UTC().Format("15:04:05.999") + " "
})),
TransactionalID("p"+randsha()),
TransactionTimeout(2*time.Minute),
MaxBufferedRecords(10000),
Expand Down
Loading