diff --git a/otoroshi/app/events/OtoroshiEventsActor.scala b/otoroshi/app/events/OtoroshiEventsActor.scala index 08669964cb..246218954e 100644 --- a/otoroshi/app/events/OtoroshiEventsActor.scala +++ b/otoroshi/app/events/OtoroshiEventsActor.scala @@ -8,14 +8,7 @@ import akka.actor.{Actor, Props} import akka.http.scaladsl.model.{ContentType, ContentTypes} import akka.http.scaladsl.util.FastFuture import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{ - ApiVersion, - ListBucketResultContents, - MemoryBufferType, - MetaHeaders, - S3Attributes, - S3Settings -} +import akka.stream.alpakka.s3.{ApiVersion, ListBucketResultContents, MemoryBufferType, MetaHeaders, S3Attributes, S3Settings} import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete} import akka.stream.{Attributes, OverflowStrategy, QueueOfferResult} import com.sksamuel.pulsar4s.Producer @@ -27,34 +20,24 @@ import otoroshi.models._ import org.joda.time.DateTime import otoroshi.models.{DataExporterConfig, Exporter, ExporterRef, FileSettings} import otoroshi.next.events.TrafficCaptureEvent +import otoroshi.next.plugins.FakeWasmContext import otoroshi.next.plugins.api.NgPluginCategory import otoroshi.script._ import otoroshi.security.IdGenerator import otoroshi.storage.drivers.inmemory.S3Configuration +import otoroshi.utils.TypedMap import otoroshi.utils.cache.types.LegitTrieMap import otoroshi.utils.json.JsonOperationsHelper import otoroshi.utils.mailer.{EmailLocation, MailerSettings} import play.api.Logger -import play.api.libs.json.{ - Format, - JsArray, - JsBoolean, - JsError, - JsNull, - JsNumber, - JsObject, - JsResult, - JsString, - JsSuccess, - JsValue, - Json -} +import play.api.libs.json.{Format, JsArray, JsBoolean, JsError, JsNull, JsNumber, JsObject, JsResult, JsString, JsSuccess, JsValue, Json} import scala.collection.concurrent.TrieMap import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success, Try} import otoroshi.utils.syntax.implicits._ +import otoroshi.wasm.{WasmConfig, WasmUtils} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.regions.providers.AwsRegionProvider @@ -1204,6 +1187,63 @@ object Exporters { } } + case class WasmExporterSettings(params: JsObject, wasmRef: Option[String]) + extends Exporter { + def json: JsValue = WasmExporterSettings.format.writes(this) + def toJson: JsValue = json + } + + object WasmExporterSettings { + val format = new Format[WasmExporterSettings] { + override def reads(json: JsValue): JsResult[WasmExporterSettings] = Try { + WasmExporterSettings( + params = json.select("params").asOpt[JsObject].getOrElse(Json.obj()), + wasmRef = json.select("wasm_ref").asOpt[String].filter(_.trim.nonEmpty), + ) + } match { + case Failure(e) => JsError(e.getMessage) + case Success(e) => JsSuccess(e) + } + + override def writes(o: WasmExporterSettings): JsValue = Json.obj( + "params" -> o.params, + "wasm_ref" -> o.wasmRef.map(JsString.apply).getOrElse(JsNull).asValue + ) + } + } + + class WasmExporter(_config : DataExporterConfig)(implicit ec: ExecutionContext, env: Env) + extends DefaultDataExporter(_config)(ec, env) { + override def send(events: Seq[JsValue]): Future[ExportResult] = { + implicit val mat = env.analyticsMaterializer + exporter[WasmExporterSettings].flatMap { exporterConfig => + exporterConfig.wasmRef + .flatMap(id => env.proxyState.wasmPlugin(id)) + .map { plugin => + val attrs = TypedMap.empty.some + val ctx = FakeWasmContext(exporterConfig.params).some + val input = Json.obj( + "params" -> exporterConfig.params, + "config" -> configUnsafe.json, + ) + // println(s"call send: ${events.size}") + WasmUtils.execute(plugin.config, "export_events", input ++ Json.obj("events" -> JsArray(events)), ctx, attrs) + .map { + case Left(err) => ExportResult.ExportResultFailure(err.stringify) + case Right(res) => res.parseJson.select("error").asOpt[JsValue] match { + case None => ExportResult.ExportResultSuccess + case Some(error) => ExportResult.ExportResultFailure(error.stringify) + } + } + .recover { + case e => + e.printStackTrace() + ExportResult.ExportResultFailure(e.getMessage) + } + } + }.getOrElse(ExportResult.ExportResultSuccess.vfuture) + } + } } class DataExporterUpdateJob extends Job { diff --git a/otoroshi/app/models/dataExporter.scala b/otoroshi/app/models/dataExporter.scala index 7adf5d1bf3..120f77e52d 100644 --- a/otoroshi/app/models/dataExporter.scala +++ b/otoroshi/app/models/dataExporter.scala @@ -227,6 +227,7 @@ object DataExporterConfig { case "console" => ConsoleSettings() case "metrics" => MetricsSettings((json \ "config" \ "labels").as[Map[String, String]]) case "custommetrics" => CustomMetricsSettings.format.reads((json \ "config").as[JsObject]).get + case "wasm" => WasmExporterSettings.format.reads((json \ "config").as[JsObject]).get case _ => throw new RuntimeException("Bad config type") } ) @@ -299,6 +300,10 @@ object DataExporterConfigType { def name: String = "custommetrics" } + case object Wasm extends DataExporterConfigType { + def name: String = "wasm" + } + def parse(str: String): DataExporterConfigType = { str.toLowerCase() match { case "kafka" => Kafka @@ -315,6 +320,7 @@ object DataExporterConfigType { case "console" => Console case "metrics" => Metrics case "custommetrics" => CustomMetrics + case "wasm" => Wasm case _ => None } } @@ -371,6 +377,7 @@ case class DataExporterConfig( case c: ConsoleSettings => new ConsoleExporter(this) case c: MetricsSettings => new MetricsExporter(this) case c: CustomMetricsSettings => new CustomMetricsExporter(this) + case c: WasmExporterSettings => new WasmExporter(this) case _ => throw new RuntimeException("unsupported exporter type") } } diff --git a/otoroshi/javascript/src/pages/DataExportersPage.js b/otoroshi/javascript/src/pages/DataExportersPage.js index aff5d24199..e91a35f08e 100644 --- a/otoroshi/javascript/src/pages/DataExportersPage.js +++ b/otoroshi/javascript/src/pages/DataExportersPage.js @@ -1611,4 +1611,30 @@ const possibleExporterConfigFormValues = { }, }, }, + wasm: { + flow: ['wasm_ref', 'params'], + schema: { + wasm_ref: { + type: 'select', + props: { + label: 'Wasm plugin', + valuesFrom: `/bo/api/proxy/apis/plugins.otoroshi.io/v1/wasm-plugins`, + transformer: i => ({ label: i.name, value: i.id }) + }, + }, + one_by_one: { + type: 'bool', + props: { + label: 'One by one' + } + }, + params: { + type: 'jsonobjectcode', + props: { + label: 'Exporter config.' + }, + }, + }, + }, }; +