Skip to content

Commit

Permalink
Merge pull request #1 from Sanketika-Obsrv/main
Browse files Browse the repository at this point in the history
Obsrv beta release: Kafka Connector for Event Transfer to Pipeline's Entry Topic
  • Loading branch information
manjudr authored Jun 7, 2023
2 parents be49ddc + ef34ae4 commit af0a912
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
**/target
dependency-reduced-pom.xml
*.iml
.idea
.classpath
*.DS_Store
232 changes: 232 additions & 0 deletions kafka-connector/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>connectors</artifactId>
<version>1.0</version>
</parent>

<groupId>org.sunbird.obsrv.connectors</groupId>
<artifactId>kafka-connector</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Kafka Connector</name>
<description>
Reads data from source kafka topic(s) and writes them to a configurable topic
</description>

<properties>
<encoding>UTF-8</encoding>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.maj.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.12.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.12.7</version>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>framework</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>framework</artifactId>
<version>1.0.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>it.ozimov</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.obsrv.kafkaconnector.task.KafkaConnectorStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<source>${java.target.runtime}</source>
<target>${java.target.runtime}</target>
<scalaVersion>${scala.version}</scalaVersion>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>

<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>dp-duplication-testsuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<aggregate>true</aggregate>
<highlighting>true</highlighting>
</configuration>
</plugin>
</plugins>
</build>

</project>
17 changes: 17 additions & 0 deletions kafka-connector/src/main/resources/kafka-connector.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".test"
// output.topic = ${job.env}".ingest"
output.failed.topic = ${job.env}".failed"
event.max.size = "1048576" # Max is only 1MB
groupId = ${job.env}"-kafkaconnector-group"
producer {
max-request-size = 5242880
}
}

task {
consumer.parallelism = 1
downstream.operators.parallelism = 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.sunbird.obsrv.kafkaconnector.task

import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.scala.OutputTag
import org.sunbird.obsrv.core.streaming.BaseJobConfig

import scala.collection.mutable

class KafkaConnectorConfig (override val config: Config) extends BaseJobConfig[String](config, "KafkaConnectorJob") {

private val serialVersionUID = 2905979435603791379L

implicit val mapTypeInfo: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]])
implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])

override def inputTopic(): String = ""
override def inputConsumer(): String = ""

private val DUMMY_OUTPUT_TAG = "dummy-events"
override def successTag(): OutputTag[String] = OutputTag[String](DUMMY_OUTPUT_TAG)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.sunbird.obsrv.kafkaconnector.task

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.sunbird.obsrv.core.streaming.{BaseStreamTask, FlinkKafkaConnector}
import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil}
import org.sunbird.obsrv.registry.DatasetRegistry
import org.joda.time.DateTime
import org.joda.time.DateTimeZone

import java.io.File
import scala.collection.mutable

class KafkaConnectorStreamTask(config: KafkaConnectorConfig, kafkaConnector: FlinkKafkaConnector) extends BaseStreamTask[String] {

private val serialVersionUID = -7729362727131516112L

// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster
def process(): Unit = {
implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)

val datasetSourceConfig = DatasetRegistry.getDatasetSourceConfig()
datasetSourceConfig.map { configList =>
configList.filter(_.connectorType.equalsIgnoreCase("kafka")).map {
dataSourceConfig =>
val dataStream: DataStream[String] =
getStringDataStream(env, config, List(dataSourceConfig.connectorConfig.topic),
config.kafkaConsumerProperties(kafkaBrokerServers = Some(dataSourceConfig.connectorConfig.kafkaBrokers),
kafkaConsumerGroup = Some(s"kafka-${dataSourceConfig.connectorConfig.topic}-consumer")),
consumerSourceName = s"kafka-${dataSourceConfig.connectorConfig.topic}", kafkaConnector)
val datasetId = dataSourceConfig.datasetId
val kafkaOutputTopic = DatasetRegistry.getDataset(datasetId).get.datasetConfig.entryTopic
val resultMapStream: DataStream[String] = dataStream
.filter{msg: String => JSONUtil.isJSON(msg)}.returns(classOf[String]) // TODO: Add a metric to capture invalid JSON messages
.map { streamMap: String => {
val mutableMap = JSONUtil.deserialize[mutable.Map[String, AnyRef]](streamMap)
mutableMap.put("dataset", datasetId)
mutableMap.put("syncts", java.lang.Long.valueOf(new DateTime(DateTimeZone.UTC).getMillis))
JSONUtil.serialize(mutableMap)
}
}.returns(classOf[String])
resultMapStream.sinkTo(kafkaConnector.kafkaStringSink(kafkaOutputTopic))
.name(s"$datasetId-kafka-connector-sink").uid(s"$datasetId-kafka-connector-sink")
.setParallelism(config.downstreamOperatorsParallelism)
}
env.execute(config.jobName)
}
}

override def processStream(dataStream: DataStream[String]): DataStream[String] = {
null
}
// $COVERAGE-ON$
}

// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster
object KafkaConnectorStreamTask {

def main(args: Array[String]): Unit = {
val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
val config = configFilePath.map {
path => ConfigFactory.parseFile(new File(path)).resolve()
}.getOrElse(ConfigFactory.load("kafka-connector.conf").withFallback(ConfigFactory.systemEnvironment()))
val kafkaConnectorConfig = new KafkaConnectorConfig(config)
val kafkaUtil = new FlinkKafkaConnector(kafkaConnectorConfig)
val task = new KafkaConnectorStreamTask(kafkaConnectorConfig, kafkaUtil)
task.process()
}
}
// $COVERAGE-ON$
Loading

0 comments on commit af0a912

Please sign in to comment.