Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[Transaction] KoP transaction stage0 #295

Merged
merged 30 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: kop mvn build check and ut
name: kop mvn build check and kafka-impl test

on:
pull_request:
Expand Down Expand Up @@ -33,11 +33,8 @@ jobs:
- name: Spotbugs check
run: mvn spotbugs:check

- name: KafkaIntegrationTest
run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests

- name: test after build
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest'
- name: kafka-impl test after build
run: mvn test -DfailIfNoTests=false -pl kafka-impl

# The DistributedClusterTest is hard to pass in CI tests environment, we disable it first
# the track issue: https://github.com/streamnative/kop/issues/184
Expand Down
43 changes: 43 additions & 0 deletions .github/workflows/pr-integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: kop integration tests

on:
pull_request:
branches:
- master
push:
branches:
- master
- branch-*

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Build with Maven skipTests
run: mvn clean install -DskipTests

- name: kop integration test
run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts

- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
43 changes: 43 additions & 0 deletions .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: kop tests

on:
pull_request:
branches:
- master
push:
branches:
- master
- branch-*

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Build with Maven skipTests
run: mvn clean install -DskipTests

- name: tests module
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts

- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -40,6 +41,8 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
@Getter
private final GroupCoordinator groupCoordinator;
@Getter
private final TransactionCoordinator transactionCoordinator;
@Getter
private final boolean enableTls;
@Getter
private final EndPoint advertisedEndPoint;
Expand All @@ -49,12 +52,14 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator,
TransactionCoordinator transactionCoordinator,
boolean enableTLS,
EndPoint advertisedEndPoint) {
super();
this.pulsarService = pulsarService;
this.kafkaConfig = kafkaConfig;
this.groupCoordinator = groupCoordinator;
this.transactionCoordinator = transactionCoordinator;
this.enableTls = enableTLS;
this.advertisedEndPoint = advertisedEndPoint;

Expand All @@ -74,7 +79,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, enableTls, advertisedEndPoint));
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,24 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case CREATE_TOPICS:
handleCreateTopics(kafkaHeaderAndRequest, responseFuture);
break;
case INIT_PRODUCER_ID:
handleInitProducerId(kafkaHeaderAndRequest, responseFuture);
break;
case ADD_PARTITIONS_TO_TXN:
handleAddPartitionsToTxn(kafkaHeaderAndRequest, responseFuture);
break;
case ADD_OFFSETS_TO_TXN:
handleAddOffsetsToTxn(kafkaHeaderAndRequest, responseFuture);
break;
case TXN_OFFSET_COMMIT:
handleTxnOffsetCommit(kafkaHeaderAndRequest, responseFuture);
break;
case END_TXN:
handleEndTxn(kafkaHeaderAndRequest, responseFuture);
break;
case WRITE_TXN_MARKERS:
handleWriteTxnMarkers(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_CONFIGS:
handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -372,6 +390,24 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
Expand Down Expand Up @@ -189,6 +191,8 @@ public enum ListenerType {
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private TransactionCoordinator transactionCoordinator;
@Getter
private String bindAddress;


Expand Down Expand Up @@ -252,6 +256,13 @@ public void start(BrokerService service) {
log.error("initGroupCoordinator failed with", e);
}
}
if (kafkaConfig.isEnableTransactionCoordinator()) {
try {
initTransactionCoordinator();
} catch (Exception e) {
log.error("Initialized transaction coordinator failed.", e);
}
}
}

// this is called after initialize, and with kafkaConfig, brokerService all set.
Expand Down Expand Up @@ -279,12 +290,12 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, false, advertisedEndPoint));
kafkaConfig, groupCoordinator, transactionCoordinator, false, advertisedEndPoint));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, true, advertisedEndPoint));
kafkaConfig, groupCoordinator, transactionCoordinator, true, advertisedEndPoint));
break;
}
});
Expand Down Expand Up @@ -349,6 +360,11 @@ public void startGroupCoordinator() throws Exception {
}
}

public void initTransactionCoordinator() throws Exception {
TransactionConfig transactionConfig = TransactionConfig.builder().build();
this.transactionCoordinator = TransactionCoordinator.of(transactionConfig);
}

/**
* This method discovers ownership of offset topic partitions and attempts to load offset topics
* assigned to this broker.
Expand Down
Loading