Skip to content

Commit

Permalink
fix #1565
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed May 9, 2023
1 parent 80e344b commit 2dd5e11
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 22 deletions.
84 changes: 62 additions & 22 deletions otoroshi/app/events/OtoroshiEventsActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@ import akka.actor.{Actor, Props}
import akka.http.scaladsl.model.{ContentType, ContentTypes}
import akka.http.scaladsl.util.FastFuture
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{
ApiVersion,
ListBucketResultContents,
MemoryBufferType,
MetaHeaders,
S3Attributes,
S3Settings
}
import akka.stream.alpakka.s3.{ApiVersion, ListBucketResultContents, MemoryBufferType, MetaHeaders, S3Attributes, S3Settings}
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{Attributes, OverflowStrategy, QueueOfferResult}
import com.sksamuel.pulsar4s.Producer
Expand All @@ -27,34 +20,24 @@ import otoroshi.models._
import org.joda.time.DateTime
import otoroshi.models.{DataExporterConfig, Exporter, ExporterRef, FileSettings}
import otoroshi.next.events.TrafficCaptureEvent
import otoroshi.next.plugins.FakeWasmContext
import otoroshi.next.plugins.api.NgPluginCategory
import otoroshi.script._
import otoroshi.security.IdGenerator
import otoroshi.storage.drivers.inmemory.S3Configuration
import otoroshi.utils.TypedMap
import otoroshi.utils.cache.types.LegitTrieMap
import otoroshi.utils.json.JsonOperationsHelper
import otoroshi.utils.mailer.{EmailLocation, MailerSettings}
import play.api.Logger
import play.api.libs.json.{
Format,
JsArray,
JsBoolean,
JsError,
JsNull,
JsNumber,
JsObject,
JsResult,
JsString,
JsSuccess,
JsValue,
Json
}
import play.api.libs.json.{Format, JsArray, JsBoolean, JsError, JsNull, JsNumber, JsObject, JsResult, JsString, JsSuccess, JsValue, Json}

import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
import otoroshi.utils.syntax.implicits._
import otoroshi.wasm.{WasmConfig, WasmUtils}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.AwsRegionProvider
Expand Down Expand Up @@ -1204,6 +1187,63 @@ object Exporters {
}
}

case class WasmExporterSettings(params: JsObject, wasmRef: Option[String])
extends Exporter {
def json: JsValue = WasmExporterSettings.format.writes(this)
def toJson: JsValue = json
}

object WasmExporterSettings {
val format = new Format[WasmExporterSettings] {
override def reads(json: JsValue): JsResult[WasmExporterSettings] = Try {
WasmExporterSettings(
params = json.select("params").asOpt[JsObject].getOrElse(Json.obj()),
wasmRef = json.select("wasm_ref").asOpt[String].filter(_.trim.nonEmpty),
)
} match {
case Failure(e) => JsError(e.getMessage)
case Success(e) => JsSuccess(e)
}

override def writes(o: WasmExporterSettings): JsValue = Json.obj(
"params" -> o.params,
"wasm_ref" -> o.wasmRef.map(JsString.apply).getOrElse(JsNull).asValue
)
}
}

class WasmExporter(_config : DataExporterConfig)(implicit ec: ExecutionContext, env: Env)
extends DefaultDataExporter(_config)(ec, env) {
override def send(events: Seq[JsValue]): Future[ExportResult] = {
implicit val mat = env.analyticsMaterializer
exporter[WasmExporterSettings].flatMap { exporterConfig =>
exporterConfig.wasmRef
.flatMap(id => env.proxyState.wasmPlugin(id))
.map { plugin =>
val attrs = TypedMap.empty.some
val ctx = FakeWasmContext(exporterConfig.params).some
val input = Json.obj(
"params" -> exporterConfig.params,
"config" -> configUnsafe.json,
)
// println(s"call send: ${events.size}")
WasmUtils.execute(plugin.config, "export_events", input ++ Json.obj("events" -> JsArray(events)), ctx, attrs)
.map {
case Left(err) => ExportResult.ExportResultFailure(err.stringify)
case Right(res) => res.parseJson.select("error").asOpt[JsValue] match {
case None => ExportResult.ExportResultSuccess
case Some(error) => ExportResult.ExportResultFailure(error.stringify)
}
}
.recover {
case e =>
e.printStackTrace()
ExportResult.ExportResultFailure(e.getMessage)
}
}
}.getOrElse(ExportResult.ExportResultSuccess.vfuture)
}
}
}

class DataExporterUpdateJob extends Job {
Expand Down
7 changes: 7 additions & 0 deletions otoroshi/app/models/dataExporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ object DataExporterConfig {
case "console" => ConsoleSettings()
case "metrics" => MetricsSettings((json \ "config" \ "labels").as[Map[String, String]])
case "custommetrics" => CustomMetricsSettings.format.reads((json \ "config").as[JsObject]).get
case "wasm" => WasmExporterSettings.format.reads((json \ "config").as[JsObject]).get
case _ => throw new RuntimeException("Bad config type")
}
)
Expand Down Expand Up @@ -299,6 +300,10 @@ object DataExporterConfigType {
def name: String = "custommetrics"
}

case object Wasm extends DataExporterConfigType {
def name: String = "wasm"
}

def parse(str: String): DataExporterConfigType = {
str.toLowerCase() match {
case "kafka" => Kafka
Expand All @@ -315,6 +320,7 @@ object DataExporterConfigType {
case "console" => Console
case "metrics" => Metrics
case "custommetrics" => CustomMetrics
case "wasm" => Wasm
case _ => None
}
}
Expand Down Expand Up @@ -371,6 +377,7 @@ case class DataExporterConfig(
case c: ConsoleSettings => new ConsoleExporter(this)
case c: MetricsSettings => new MetricsExporter(this)
case c: CustomMetricsSettings => new CustomMetricsExporter(this)
case c: WasmExporterSettings => new WasmExporter(this)
case _ => throw new RuntimeException("unsupported exporter type")
}
}
Expand Down
26 changes: 26 additions & 0 deletions otoroshi/javascript/src/pages/DataExportersPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -1611,4 +1611,30 @@ const possibleExporterConfigFormValues = {
},
},
},
wasm: {
flow: ['wasm_ref', 'params'],
schema: {
wasm_ref: {
type: 'select',
props: {
label: 'Wasm plugin',
valuesFrom: `/bo/api/proxy/apis/plugins.otoroshi.io/v1/wasm-plugins`,
transformer: i => ({ label: i.name, value: i.id })
},
},
one_by_one: {
type: 'bool',
props: {
label: 'One by one'
}
},
params: {
type: 'jsonobjectcode',
props: {
label: 'Exporter config.'
},
},
},
},
};

0 comments on commit 2dd5e11

Please sign in to comment.