Skip to content

Commit

Permalink
KAFKA-9338; Fetch session should cache request leader epoch (#7970)
Browse files Browse the repository at this point in the history
Since the leader epoch was not maintained in the fetch session cache, no validation would be done except for the initial (full) fetch request. This patch adds the leader epoch to the session cache and addresses the testing gaps.

Reviewers: Ismael Juma <[email protected]>, Colin Patrick McCabe <[email protected]>
  • Loading branch information
a0x8o committed Jan 18, 2020
1 parent 12fdf40 commit a03462a
Show file tree
Hide file tree
Showing 649 changed files with 29,868 additions and 13,861 deletions.
14 changes: 3 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@ Apache Kafka
=================
See our [web site](https://kafka.apache.org) for details on the project.

You need to have [Gradle](https://www.gradle.org/installation) and [Java](https://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

Kafka requires Gradle 5.0 or higher.
You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

Java 8 should be used for building in order to support both Java 8 and Java 11 at runtime.

Scala 2.12 is used by default, see below for how to use a different Scala version or all of the supported Scala versions.

### First bootstrap and download the wrapper ###
cd kafka_source_dir
gradle

Now everything else will work.

### Build a jar and run it ###
./gradlew jar

Expand Down Expand Up @@ -77,7 +69,7 @@ The release file can be found inside `./core/build/distributions/`.
### Cleaning the build ###
./gradlew clean

### Running a task with one of the Scala versions available (2.11.x, 2.12.x or 2.13.x) ###
### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
*Note that if building the jars with a version other than 2.12.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.*

You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
Expand Down Expand Up @@ -177,7 +169,7 @@ You can run checkstyle using:
./gradlew checkstyleMain checkstyleTest

The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails.
subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.

#### Spotbugs ####
Spotbugs uses static analysis to look for bugs in the code.
Expand Down
14 changes: 10 additions & 4 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
clients_lib_dir=$(dirname $0)/../clients/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
streams_lib_dir=$clients_lib_dir
rocksdb_lib_dir=$streams_lib_dir
streams_dependant_clients_lib_dir=$streams_lib_dir
fi


Expand Down Expand Up @@ -122,7 +122,12 @@ else
fi
fi

for file in "$rocksdb_lib_dir"/rocksdb*.jar;
for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done

for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
Expand Down Expand Up @@ -244,8 +249,9 @@ if [ -z "$KAFKA_HEAP_OPTS" ]; then
fi

# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi

while [ $# -gt 0 ]; do
Expand Down
145 changes: 66 additions & 79 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ buildscript {
classpath "com.github.jengelman.gradle.plugins:shadow:$versions.shadowPlugin"
classpath "org.owasp:dependency-check-gradle:$versions.owaspDepCheckPlugin"
classpath "com.diffplug.spotless:spotless-plugin-gradle:$versions.spotlessPlugin"
classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:$versions.spotbugsPlugin"
classpath "com.github.spotbugs:spotbugs-gradle-plugin:$versions.spotbugsPlugin"
}
}

Expand All @@ -54,7 +54,7 @@ allprojects {
repositories {
mavenCentral()
}

apply plugin: 'idea'
apply plugin: 'org.owasp.dependencycheck'
apply plugin: 'com.github.ben-manes.versions'
Expand Down Expand Up @@ -130,13 +130,15 @@ if (file('.git').exists()) {
'PULL_REQUEST_TEMPLATE.md',
'gradlew',
'gradlew.bat',
'gradle/wrapper/gradle-wrapper.properties',
'TROGDOR.md',
'**/README.md',
'**/id_rsa',
'**/id_rsa.pub',
'checkstyle/suppressions.xml',
'streams/quickstart/java/src/test/resources/projects/basic/goal.txt',
'streams/streams-scala/logs/*'
'streams/streams-scala/logs/*',
'**/generated/**'
])
}
}
Expand Down Expand Up @@ -409,6 +411,7 @@ subprojects {
"-language:postfixOps",
"-language:implicitConversions",
"-language:existentials",
"-Xlint:constant",
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
Expand All @@ -419,35 +422,29 @@ subprojects {
"-Xlint:poly-implicit-overload",
"-Xlint:private-shadow",
"-Xlint:stars-align",
"-Xlint:type-parameter-shadow"
"-Xlint:type-parameter-shadow",
"-Xlint:unused"
]

if (versions.baseScala != '2.11') {
scalaCompileOptions.additionalParameters += [
"-Xlint:constant",
"-Xlint:unused"
]
// Inline more aggressively when compiling the `core` jar since it's not meant to be used as a library.
// More specifically, inline classes from the Scala library so that we can inline methods like `Option.exists`
// and avoid lambda allocations. This is only safe if the Scala library version is the same at compile time
// and runtime. We cannot guarantee this for libraries like kafka streams, so only inline classes from the
// Kafka project in that case.
List<String> inlineFrom
if (project.name.equals('core'))
inlineFrom = ["-opt-inline-from:scala.**", "-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]
else
inlineFrom = ["-opt-inline-from:org.apache.kafka.**"]

// Inline more aggressively when compiling the `core` jar since it's not meant to be used as a library.
// More specifically, inline classes from the Scala library so that we can inline methods like `Option.exists`
// and avoid lambda allocations. This is only safe if the Scala library version is the same at compile time
// and runtime. We cannot guarantee this for libraries like kafka streams, so only inline classes from the
// Kafka project in that case.
List<String> inlineFrom
if (project.name.equals('core'))
inlineFrom = ["-opt-inline-from:scala.**", "-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]
else
inlineFrom = ["-opt-inline-from:org.apache.kafka.**"]

// Somewhat confusingly, `-opt:l:inline` enables all optimizations. `inlineFrom` configures what can be inlined.
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom
}
// Somewhat confusingly, `-opt:l:inline` enables all optimizations. `inlineFrom` configures what can be inlined.
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom

// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala in ['2.11','2.12']) {
// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:unsound-match"
Expand Down Expand Up @@ -519,6 +516,25 @@ subprojects {
def coverageGen = it.path == ':core' ? 'reportScoverage' : 'jacocoTestReport'
task reportCoverage(dependsOn: [coverageGen])

task determineCommitId {
def takeFromHash = 16
if (commitId) {
commitId = commitId.take(takeFromHash)
} else if (file("$rootDir/.git/HEAD").exists()) {
def headRef = file("$rootDir/.git/HEAD").text
if (headRef.contains('ref: ')) {
headRef = headRef.replaceAll('ref: ', '').trim()
if (file("$rootDir/.git/$headRef").exists()) {
commitId = file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
}
} else {
commitId = headRef.trim().take(takeFromHash)
}
} else {
commitId = "unknown"
}
}

}

gradle.taskGraph.whenReady { taskGraph ->
Expand All @@ -543,8 +559,8 @@ def fineTuneEclipseClasspathFile(eclipse, project) {
if (project.name.equals('core')) {
cp.entries.findAll { it.kind == "src" && it.path.equals("src/test/scala") }*.excludes = ["integration/", "other/", "unit/"]
}
/*
* Set all eclipse build output to go to 'build_eclipse' directory. This is to ensure that gradle and eclipse use different
/*
* Set all eclipse build output to go to 'build_eclipse' directory. This is to ensure that gradle and eclipse use different
* build output directories, and also avoid using the eclpise default of 'bin' which clashes with some of our script directories.
* https://discuss.gradle.org/t/eclipse-generated-files-should-be-put-in-the-same-place-as-the-gradle-generated-files/6986/2
*/
Expand All @@ -571,10 +587,10 @@ task jacocoRootReport(type: org.gradle.testing.jacoco.tasks.JacocoReport) {
description = 'Generates an aggregate report from all subprojects'
dependsOn(javaProjects.test)

additionalSourceDirs = files(javaProjects.sourceSets.main.allSource.srcDirs)
sourceDirectories = files(javaProjects.sourceSets.main.allSource.srcDirs)
classDirectories = files(javaProjects.sourceSets.main.output)
executionData = files(javaProjects.jacocoTestReport.executionData)
additionalSourceDirs.from = javaProjects.sourceSets.main.allSource.srcDirs
sourceDirectories.from = javaProjects.sourceSets.main.allSource.srcDirs
classDirectories.from = javaProjects.sourceSets.main.output
executionData.from = javaProjects.jacocoTestReport.executionData

reports {
html.enabled = true
Expand Down Expand Up @@ -733,19 +749,14 @@ project(':core') {
testCompile libs.scalatest
testCompile libs.slf4jlog4j
testCompile libs.jfreechart

scoverage libs.scoveragePlugin
scoverage libs.scoverageRuntime
}

scoverage {
scoverageVersion = "$versions.scoverage"
reportDir = file("${rootProject.buildDir}/scoverage")
highlighting = false
}
checkScoverage {
minimumRate = 0.0
}
checkScoverage.shouldRunAfter('test')

configurations {
// manually excludes some unnecessary dependencies
Expand Down Expand Up @@ -994,25 +1005,6 @@ project(':clients') {
testCompile libs.jacksonJaxrsJsonProvider
}

task determineCommitId {
def takeFromHash = 16
if (commitId) {
commitId = commitId.take(takeFromHash)
} else if (file("$rootDir/.git/HEAD").exists()) {
def headRef = file("$rootDir/.git/HEAD").text
if (headRef.contains('ref: ')) {
headRef = headRef.replaceAll('ref: ', '').trim()
if (file("$rootDir/.git/$headRef").exists()) {
commitId = file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
}
} else {
commitId = headRef.trim().take(takeFromHash)
}
} else {
commitId = "unknown"
}
}

task createVersionFile(dependsOn: determineCommitId) {
ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName")
outputs.file receiptFile
Expand Down Expand Up @@ -1219,6 +1211,7 @@ project(':streams') {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
include('log4j*jar')
include('*hamcrest*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
Expand All @@ -1227,25 +1220,6 @@ project(':streams') {
duplicatesStrategy 'exclude'
}

task determineCommitId {
def takeFromHash = 16
if (commitId) {
commitId = commitId.take(takeFromHash)
} else if (file("$rootDir/.git/HEAD").exists()) {
def headRef = file("$rootDir/.git/HEAD").text
if (headRef.contains('ref: ')) {
headRef = headRef.replaceAll('ref: ', '').trim()
if (file("$rootDir/.git/$headRef").exists()) {
commitId = file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
}
} else {
commitId = headRef.trim().take(takeFromHash)
}
} else {
commitId = "unknown"
}
}

task createStreamsVersionFile(dependsOn: determineCommitId) {
ext.receiptFile = file("$buildDir/kafka/$buildStreamsVersionFileName")
outputs.file receiptFile
Expand Down Expand Up @@ -1515,6 +1489,18 @@ project(':streams:upgrade-system-tests-23') {
}
}

project(':streams:upgrade-system-tests-24') {
archivesBaseName = "kafka-streams-upgrade-system-tests-24"

dependencies {
testCompile libs.kafkaStreams_24
}

systemTestLibs {
dependsOn testJar
}
}

project(':jmh-benchmarks') {

apply plugin: 'com.github.johnrengelman.shadow'
Expand Down Expand Up @@ -1886,6 +1872,7 @@ project(':connect:mirror') {
compile libs.slf4jApi

testCompile libs.junit
testCompile libs.mockitoCore
testCompile project(':clients').sourceSets.test.output
testCompile project(':connect:runtime').sourceSets.test.output
testCompile project(':core')
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@
<allow pkg="net.sourceforge.argparse4j" />
<!-- for tests -->
<allow pkg="org.apache.kafka.connect.integration" />
<allow pkg="org.apache.kafka.connect.mirror" />
</subpackage>

<subpackage name="runtime">
Expand Down
8 changes: 5 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
<suppress checks="NPathComplexity"
files="(FieldSpec).java"/>
files="(MessageDataGenerator|FieldSpec).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType).java|MessageDataGenerator.java"/>
<suppress checks="MethodLength"
files="MessageDataGenerator.java"/>

<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
Expand Down Expand Up @@ -214,15 +216,15 @@
<suppress checks="CyclomaticComplexity"
files="KStreamKStreamJoinTest.java"/>
<suppress checks="CyclomaticComplexity"
files="SmokeTestDriver.java"/>
files="RelationalSmokeTest.java|SmokeTestDriver.java"/>

<suppress checks="JavaNCSS"
files="KStreamKStreamJoinTest.java"/>
<suppress checks="JavaNCSS"
files="SmokeTestDriver.java"/>

<suppress checks="NPathComplexity"
files="EosTestDriver|KStreamKStreamJoinTest.java|SmokeTestDriver.java|KStreamKStreamLeftJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
files="EosTestDriver|KStreamKStreamJoinTest.java|RelationalSmokeTest.java|SmokeTestDriver.java|KStreamKStreamLeftJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>

<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Loading

0 comments on commit a03462a

Please sign in to comment.