From 50a978d1bfbde86799acb8433cb542d0a9f9a028 Mon Sep 17 00:00:00 2001
From: elliVM <49@teragrep.com>
Date: Wed, 11 Oct 2023 12:58:13 +0300
Subject: [PATCH 1/3] use blf_01 Token inside buffer
---
pom.xml | 2 +-
.../functions/dpf_03/TokenAggregator.scala | 12 ++-
.../functions/dpf_03/TokenBuffer.scala | 9 ++-
src/test/scala/TokenAggregatorTest.scala | 78 ++++++++++---------
src/test/scala/TokenBufferTest.scala | 72 +++++++++++++++++
5 files changed, 128 insertions(+), 45 deletions(-)
create mode 100644 src/test/scala/TokenBufferTest.scala
diff --git a/pom.xml b/pom.xml
index a6b793c..b84794d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
com.teragrep
blf_01
- 1.1.2
+ 2.0.0
org.apache.spark
diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
index bfbbb33..d6828d8 100644
--- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
+++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
@@ -46,11 +46,13 @@
package com.teragrep.functions.dpf_03
-import java.io.Serializable
-import com.teragrep.blf_01.tokenizer.Tokenizer
+import java.io.{ByteArrayInputStream, Serializable}
+import com.teragrep.blf_01.{Token, Tokenizer}
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
+
+import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
class TokenAggregator(private final val columnName: String) extends Aggregator[Row, TokenBuffer, Set[String]]
@@ -59,8 +61,10 @@ class TokenAggregator(private final val columnName: String) extends Aggregator[R
override def zero(): TokenBuffer = new TokenBuffer()
override def reduce(b: TokenBuffer, a: Row): TokenBuffer = {
+ val tokenizer: Tokenizer = new Tokenizer;
val input: String = a.getAs(columnName).toString
- Tokenizer.tokenize(input).forEach(i => b.addKey(i))
+ val stream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))
+ tokenizer.tokenize(stream).forEach(token => b.addKey(token))
b
}
@@ -70,7 +74,7 @@ class TokenAggregator(private final val columnName: String) extends Aggregator[R
}
override def finish(reduction: TokenBuffer): Set[String] = {
- reduction.getBuffer.keySet.toSet
+ reduction.getBuffer.keySet.map(token => token.toString).toSet
}
override def bufferEncoder: Encoder[TokenBuffer] = customKryoEncoder[TokenBuffer]
diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala
index 0b90235..2276870 100644
--- a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala
+++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala
@@ -47,20 +47,21 @@
package com.teragrep.functions.dpf_03
import scala.collection.mutable
+import com.teragrep.blf_01.Token
class TokenBuffer() {
- private var hashMap: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()
+ private var hashMap: mutable.HashMap[Token, Int] = mutable.HashMap[Token, Int]()
- def getBuffer: mutable.HashMap[String, Int] = hashMap
+ def getBuffer: mutable.HashMap[Token, Int] = hashMap
- def mergeBuffer(other: mutable.HashMap[String, Int]): Unit ={
+ def mergeBuffer(other: mutable.HashMap[Token, Int]): Unit ={
hashMap = hashMap ++ other
}
def getSize: Int = hashMap.size
- def addKey(key: String): Unit = {
+ def addKey(key: Token): Unit = {
hashMap.put(key, 1)
}
diff --git a/src/test/scala/TokenAggregatorTest.scala b/src/test/scala/TokenAggregatorTest.scala
index c2dc3b6..d37e8e9 100644
--- a/src/test/scala/TokenAggregatorTest.scala
+++ b/src/test/scala/TokenAggregatorTest.scala
@@ -49,14 +49,25 @@ import com.teragrep.functions.dpf_03.TokenBuffer
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
-import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, RowFactory, SparkSession}
import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, StructType}
+import org.junit.Assert.assertEquals
import java.sql.Timestamp
import java.time.{Instant, LocalDateTime, ZoneOffset}
+import java.util
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class TokenAggregatorTest {
+ val exampleString: String = "NetScreen row=[Root]system-notification-00257" +
+ "(traffic\uD83D\uDE41 start_time=\"2022-09-02 10:13:40\"" +
+ " duration=0 policy_id=320000 service=tcp/port:8151 proto=6" +
+ " src zone=Null dst zone=Null action=Deny sent=0 rcvd=40" +
+ " src=127.127.127.127 dst=127.0.0.1 src_port=52362" +
+ " dst_port=8151 session_id=0 reason=Traffic Denied"
+
+ val amount: Long = 10
val testSchema: StructType = new StructType(
Array[StructField]
@@ -75,9 +86,9 @@ class TokenAggregatorTest {
val sparkSession = SparkSession.builder.master("local[*]").getOrCreate
val sqlContext = sparkSession.sqlContext
sparkSession.sparkContext.setLogLevel("ERROR")
-
val encoder = RowEncoder.apply(testSchema)
val rowMemoryStream = new MemoryStream[Row](1,sqlContext)(encoder)
+
var rowDataset = rowMemoryStream.toDF
val tokenAggregator = new TokenAggregator("_raw")
@@ -94,59 +105,54 @@ class TokenAggregatorTest {
while (streamingQuery.isActive) {
val time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.now, ZoneOffset.UTC))
rowMemoryStream.addData(
- makeRows(time, String.valueOf(run), 10))
+ makeRows(time, String.valueOf(run)))
run += 1
if (run == 10) {
streamingQuery.processAllAvailable
- sparkSession.sql("SELECT * FROM TokenAggregatorQuery").show(100)
streamingQuery.stop
+ streamingQuery.awaitTermination()
}
}
- }
-
- @org.junit.jupiter.api.Test
- def testTokenBuffer(): Unit = {
- val buffer1 = new TokenBuffer
- val buffer2 = new TokenBuffer
-
- // Test no duplicates
- buffer1.addKey("one")
- buffer1.addKey("one")
-
- buffer2.addKey("two")
- buffer2.addKey("three")
-
- assert(buffer1.getSize == 1)
-
- buffer1.mergeBuffer(buffer2.getBuffer)
- assert(buffer1.getSize == 3)
+ val finalResult = sqlContext.sql("SELECT tokens FROM TokenAggregatorQuery").collectAsList()
+ println(finalResult.size())
+ println(finalResult)
}
- private def makeRows(time: Timestamp, partition: String, amount: Long): Seq[Row] = {
+ private def makeRows(time: Timestamp, partition: String): Seq[Row] = {
val rowList: ArrayBuffer[Row] = new ArrayBuffer[Row]
+ val rowData = generateRawData()
+
+ for (i <- 0 until amount.toInt) {
+ val row = RowFactory.create(time,
+ exampleString,
+ "topic",
+ "stream",
+ "host",
+ "input",
+ partition,
+ "0L")
- val row = RowFactory.create(
- time,
- "data Data",
- "topic",
- "stream",
- "host",
- "input",
- partition,
- "0L")
-
- var temp = amount
- while (temp > 0) {
rowList += row
- temp -= 1
}
rowList
}
+ private def generateRawData(): Array[String] = {
+ val testDataList = new Array[String](amount.toInt)
+
+ for (i <- testDataList.indices) {
+ val randomVal = Math.floor(Math.random() * 999)
+ val text = "ip=172.17.255."+randomVal+",port=8080,session_id=46889"
+ testDataList.update(i, text)
+
+ }
+ testDataList
+ }
+
private def startStream(rowDataset: Dataset[Row]): StreamingQuery =
rowDataset.writeStream.queryName("TokenAggregatorQuery")
.outputMode("complete")
diff --git a/src/test/scala/TokenBufferTest.scala b/src/test/scala/TokenBufferTest.scala
new file mode 100644
index 0000000..00429b7
--- /dev/null
+++ b/src/test/scala/TokenBufferTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Teragrep Tokenizer DPF-03
+ * Copyright (C) 2019, 2020, 2021, 2022, 2023 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+
+import com.teragrep.blf_01.Tokenizer
+import com.teragrep.functions.dpf_03.TokenBuffer
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.nio.charset.StandardCharsets
+
+class TokenBufferTest {
+
+ @org.junit.jupiter.api.Test
+ def testNoDuplicateKeys(): Unit = {
+
+ val tokenizer: Tokenizer = new Tokenizer
+
+ val tokenBuffer: TokenBuffer = new TokenBuffer
+
+ val input: String = "one,one"
+
+ val is: InputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))
+
+ tokenizer.tokenize(is).forEach(token => tokenBuffer.addKey(token))
+
+ // "one" and ","
+ assert(tokenBuffer.getSize == 2)
+
+ }
+}
From dd70ea50449ad7121a44dd561a4ad832d5ddd7a6 Mon Sep 17 00:00:00 2001
From: elliVM <49@teragrep.com>
Date: Wed, 11 Oct 2023 13:06:44 +0300
Subject: [PATCH 2/3] remove unused import
---
.../scala/com/teragrep/functions/dpf_03/TokenAggregator.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
index d6828d8..e20ccc4 100644
--- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
+++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
@@ -47,7 +47,7 @@
package com.teragrep.functions.dpf_03
import java.io.{ByteArrayInputStream, Serializable}
-import com.teragrep.blf_01.{Token, Tokenizer}
+import com.teragrep.blf_01.Tokenizer
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
From f26ae739d711e850a48d9540c14319d21eecbc1a Mon Sep 17 00:00:00 2001
From: Motoko Kusanagi
Date: Wed, 11 Oct 2023 14:01:47 +0300
Subject: [PATCH 3/3] less new
---
.../functions/dpf_03/TokenAggregator.scala | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
index e20ccc4..a73207f 100644
--- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
+++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
@@ -51,6 +51,7 @@ import com.teragrep.blf_01.Tokenizer
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.unsafe.types.UTF8String
import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
@@ -58,13 +59,17 @@ import scala.reflect.ClassTag
class TokenAggregator(private final val columnName: String) extends Aggregator[Row, TokenBuffer, Set[String]]
with Serializable {
- override def zero(): TokenBuffer = new TokenBuffer()
+ var tokenizer: Option[Tokenizer] = None
+
+ override def zero(): TokenBuffer = {
+ tokenizer = Some(new Tokenizer(32))
+ new TokenBuffer()
+ }
override def reduce(b: TokenBuffer, a: Row): TokenBuffer = {
- val tokenizer: Tokenizer = new Tokenizer;
- val input: String = a.getAs(columnName).toString
- val stream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))
- tokenizer.tokenize(stream).forEach(token => b.addKey(token))
+ val input = a.getAs[String](columnName).getBytes(StandardCharsets.UTF_8)
+ val stream = new ByteArrayInputStream(input)
+ tokenizer.get.tokenize(stream).forEach(token => b.addKey(token))
b
}