Skip to content

Commit

Permalink
feat: Ingest Points continuously with Akka Stream (#461)
Browse files Browse the repository at this point in the history
Co-authored-by: Gustavo De Micheli <[email protected]>
  • Loading branch information
nMoncho and Gustavo De Micheli authored Nov 2, 2022
1 parent 319c2ef commit 98da5f9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
package com.influxdb.client.scala.internal

import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient}
Expand Down Expand Up @@ -53,13 +53,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* @param org Specifies the destination organization for writes.
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
* if the `org` is not specified.
* @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit.
* @return the sink that accept the record specified in InfluxDB Line Protocol.
*/
override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = {
Flow[String]
.map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record)))
.map(batch => writeHttp(precision, bucket, org, batch))
.toMat(Sink.head)(Keep.right)
.toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right)
}

/**
Expand All @@ -83,13 +82,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* Write Line Protocol records into specified bucket.
*
* @param parameters specify InfluxDB Write endpoint parameters
* @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit.
* @return the sink that accept the records specified in InfluxDB Line Protocol.
*/
override def writeRecords(parameters: WriteParameters): Sink[Seq[String], Future[Done]] = {
Flow[Seq[String]]
.map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record)))
.map(batch => writeHttp(parameters, batch))
.toMat(Sink.head)(Keep.right)
.toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right)
}

/**
Expand All @@ -101,13 +99,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* @param org Specifies the destination organization for writes.
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
* if the `org` is not specified.
* @return the sink that accept the Data points. The `point` is considered as one batch unit.
* @return the sink that accept the Data points.
*/
override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = {
Flow[Point]
.map(point => (point.getPrecision, Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options))))
.map(batch => writeHttp(Some(batch._1), bucket, org, batch._2))
.toMat(Sink.head)(Keep.right)
.toMat(Sink.foreach(batch => writeHttp(Some(batch._1), bucket, org, batch._2)))(Keep.right)
}

/**
Expand All @@ -119,7 +116,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* @param org Specifies the destination organization for writes.
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
* if the `org` is not specified.
* @return the sink that accept the Data points. The `points` are considered as one batch unit.
* @return the sink that accept the Data points.
*/
override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = {
writePoints(new WriteParameters(bucket.orNull, org.orNull, null, null))
Expand All @@ -129,7 +126,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* Write Data points into specified bucket.
*
* @param parameters specify InfluxDB Write endpoint parameters
* @return the sink that accept the Data points. The `points` are considered as one batch unit.
* @return the sink that accept the Data points.
*/
override def writePoints(parameters: WriteParameters): Sink[Seq[Point], Future[Done]] = {
Flow[Seq[Point]]
Expand All @@ -138,9 +135,8 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
case (point, map) => map.updated(point.getPrecision, point +: map.getOrElse(point.getPrecision, Seq()))
}.toList.reverse)
.map(grouped => grouped.map(group => (group._1, group._2.map(point => new AbstractWriteClient.BatchWriteDataPoint(point, options)))))
.map(batches => batches.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2)))
.map(_ => Done.done())
.toMat(Sink.head)(Keep.right)
.flatMapConcat(batches => Source(batches))
.toMat(Sink.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2)))(Keep.right)
}

/**
Expand All @@ -155,16 +151,15 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
* if the `org` is not specified.
* @tparam M the type of the measurement (POJO)
* @return the sink that accept the measurement. The `measurement` is considered as one batch unit.
* @return the sink that accept the measurement.
*/
override def writeMeasurement[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[M, Future[Done]] = {
Flow[M]
.map(measurement => {
val parameters = toWriteParameters(precision, bucket, org)
Seq(toMeasurementBatch(measurement, parameters.precisionSafe(options)))
})
.map(batch => writeHttp(precision, bucket, org, batch))
.toMat(Sink.head)(Keep.right)
.toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right)
}

/**
Expand All @@ -179,7 +174,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
* The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization`
* if the `org` is not specified.
* @tparam M the type of the measurement (POJO)
* @return the sink that accept the measurements. The `measurements` are considered as one batch unit.
* @return the sink that accept the measurements.
*/
override def writeMeasurements[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[M], Future[Done]] = {
writeMeasurements(toWriteParameters(precision, bucket, org))
Expand All @@ -190,13 +185,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx
*
* @param parameters specify InfluxDB Write endpoint parameters
* @tparam M the type of the measurement (POJO)
* @return the sink that accept the measurements. The `measurements` are considered as one batch unit.
* @return the sink that accept the measurements.
*/
override def writeMeasurements[M](parameters: WriteParameters): Sink[Seq[M], Future[Done]] = {
Flow[Seq[M]]
.map(records => records.map(record => toMeasurementBatch(record, parameters.precisionSafe(options))))
.map(batch => writeHttp(parameters, batch))
.toMat(Sink.head)(Keep.right)
.toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right)
}

private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi
request.getRequestUrl.queryParameter("precision") should be("ns")
}

test("write record as stream") {

utils.serverMockResponse()

val source = Source(List("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2"))
val sink = client.getWriteScalaApi.writeRecord()
val materialized = source.toMat(sink)(Keep.right)

Await.ready(materialized.run(), Duration.Inf)

utils.getRequestCount should be(2)
val request = utils.serverTakeRequest()
// check request
request.getBody.readUtf8() should be("m2m,tag=a value=1i 1")
request.getRequestUrl.queryParameter("bucket") should be("my-bucket")
request.getRequestUrl.queryParameter("org") should be("my-org")
request.getRequestUrl.queryParameter("precision") should be("ns")
}

test("write records") {

utils.serverMockResponse()
Expand Down Expand Up @@ -142,6 +161,37 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi
request.getRequestUrl.queryParameter("precision") should be("ns")
}

test("write point as stream") {

utils.serverMockResponse()

val point = Point
.measurement("h2o")
.addTag("location", "europe")
.addField("level", 1)
.time(1L, WritePrecision.NS)

val point2 = Point
.measurement("h2o")
.addTag("location", "europe")
.addField("level", 2)
.time(2L, WritePrecision.NS)

val source = Source(List(point, point2))
val sink = client.getWriteScalaApi.writePoint()
val materialized = source.toMat(sink)(Keep.right)

Await.ready(materialized.run(), Duration.Inf)

utils.getRequestCount should be(2)
val request = utils.serverTakeRequest()
// check request
request.getBody.readUtf8() should be("h2o,location=europe level=1i 1")
request.getRequestUrl.queryParameter("bucket") should be("my-bucket")
request.getRequestUrl.queryParameter("org") should be("my-org")
request.getRequestUrl.queryParameter("precision") should be("ns")
}

test("write points") {

utils.serverMockResponse()
Expand Down

0 comments on commit 98da5f9

Please sign in to comment.