diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index c54a4cabe7c..344902b1da9 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -22,7 +22,7 @@ package com.influxdb.client.scala.internal import akka.Done -import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import com.influxdb.client.InfluxDBClientOptions import com.influxdb.client.domain.WritePrecision import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient} @@ -53,13 +53,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @param org Specifies the destination organization for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. - * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. + * @return the sink that accept the record specified in InfluxDB Line Protocol. */ override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = { Flow[String] .map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record))) - .map(batch => writeHttp(precision, bucket, org, batch)) - .toMat(Sink.head)(Keep.right) + .toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right) } /** @@ -83,13 +82,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * Write Line Protocol records into specified bucket. * * @param parameters specify InfluxDB Write endpoint parameters - * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + * @return the sink that accept the records specified in InfluxDB Line Protocol. */ override def writeRecords(parameters: WriteParameters): Sink[Seq[String], Future[Done]] = { Flow[Seq[String]] .map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record))) - .map(batch => writeHttp(parameters, batch)) - .toMat(Sink.head)(Keep.right) + .toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right) } /** @@ -101,13 +99,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @param org Specifies the destination organization for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. - * @return the sink that accept the Data points. The `point` is considered as one batch unit. + * @return the sink that accept the Data points. */ override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { Flow[Point] .map(point => (point.getPrecision, Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options)))) - .map(batch => writeHttp(Some(batch._1), bucket, org, batch._2)) - .toMat(Sink.head)(Keep.right) + .toMat(Sink.foreach(batch => writeHttp(Some(batch._1), bucket, org, batch._2)))(Keep.right) } /** @@ -119,7 +116,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @param org Specifies the destination organization for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. - * @return the sink that accept the Data points. The `points` are considered as one batch unit. + * @return the sink that accept the Data points. */ override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { writePoints(new WriteParameters(bucket.orNull, org.orNull, null, null)) @@ -129,7 +126,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * Write Data points into specified bucket. * * @param parameters specify InfluxDB Write endpoint parameters - * @return the sink that accept the Data points. The `points` are considered as one batch unit. + * @return the sink that accept the Data points. */ override def writePoints(parameters: WriteParameters): Sink[Seq[Point], Future[Done]] = { Flow[Seq[Point]] @@ -138,9 +135,8 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx case (point, map) => map.updated(point.getPrecision, point +: map.getOrElse(point.getPrecision, Seq())) }.toList.reverse) .map(grouped => grouped.map(group => (group._1, group._2.map(point => new AbstractWriteClient.BatchWriteDataPoint(point, options))))) - .map(batches => batches.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2))) - .map(_ => Done.done()) - .toMat(Sink.head)(Keep.right) + .flatMapConcat(batches => Source(batches)) + .toMat(Sink.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2)))(Keep.right) } /** @@ -155,7 +151,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. * @tparam M the type of the measurement (POJO) - * @return the sink that accept the measurement. The `measurement` is considered as one batch unit. + * @return the sink that accept the measurement. */ override def writeMeasurement[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[M, Future[Done]] = { Flow[M] @@ -163,8 +159,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx val parameters = toWriteParameters(precision, bucket, org) Seq(toMeasurementBatch(measurement, parameters.precisionSafe(options))) }) - .map(batch => writeHttp(precision, bucket, org, batch)) - .toMat(Sink.head)(Keep.right) + .toMat(Sink.foreach(batch => writeHttp(precision, bucket, org, batch)))(Keep.right) } /** @@ -179,7 +174,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. * @tparam M the type of the measurement (POJO) - * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + * @return the sink that accept the measurements. */ override def writeMeasurements[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[M], Future[Done]] = { writeMeasurements(toWriteParameters(precision, bucket, org)) @@ -190,13 +185,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * * @param parameters specify InfluxDB Write endpoint parameters * @tparam M the type of the measurement (POJO) - * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + * @return the sink that accept the measurements. */ override def writeMeasurements[M](parameters: WriteParameters): Sink[Seq[M], Future[Done]] = { Flow[Seq[M]] .map(records => records.map(record => toMeasurementBatch(record, parameters.precisionSafe(options)))) - .map(batch => writeHttp(parameters, batch)) - .toMat(Sink.head)(Keep.right) + .toMat(Sink.foreach(batch => writeHttp(parameters, batch)))(Keep.right) } private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala index f5a785c03fe..fdda0bdebcc 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -72,6 +72,25 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi request.getRequestUrl.queryParameter("precision") should be("ns") } + test("write record as stream") { + + utils.serverMockResponse() + + val source = Source(List("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2")) + val sink = client.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(2) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("m2m,tag=a value=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + test("write records") { utils.serverMockResponse() @@ -142,6 +161,37 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi request.getRequestUrl.queryParameter("precision") should be("ns") } + test("write point as stream") { + + utils.serverMockResponse() + + val point = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val point2 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .time(2L, WritePrecision.NS) + + val source = Source(List(point, point2)) + val sink = client.getWriteScalaApi.writePoint() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(2) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("h2o,location=europe level=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + test("write points") { utils.serverMockResponse()