From 52a93374b34470918884bc26f3956a32037cc0ba Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 28 Aug 2024 10:50:51 -0500 Subject: [PATCH] eval: add rewrite settings to reference.conf (#1691) Move the config settings to be explicitly in the config so it is easy to confirm when checking settings. --- atlas-eval/src/main/resources/reference.conf | 7 +++ .../eval/stream/DataSourceRewriter.scala | 21 ++++---- .../eval/stream/DataSourceRewriterSuite.scala | 51 ++++++++++--------- 3 files changed, 44 insertions(+), 35 deletions(-) diff --git a/atlas-eval/src/main/resources/reference.conf b/atlas-eval/src/main/resources/reference.conf index 90db7fa03..19c4785ef 100644 --- a/atlas-eval/src/main/resources/reference.conf +++ b/atlas-eval/src/main/resources/reference.conf @@ -36,6 +36,13 @@ atlas.eval { // Which version of the LWC server API to use lwcapi-version = 2 + + // Rewriting for data sources. Can be enabled to call an endpoint for performing rewrites + // on the URIs for a data source. + rewrite { + enabled = false + uri = "" + } } graph { diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala index 5bd74bb18..c319b74ce 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala @@ -53,15 +53,14 @@ class DataSourceRewriter( implicit val system: ActorSystem ) extends StrictLogging { - private val (enabled, rewriteUrl) = { - val enabled = config.hasPath("atlas.eval.stream.rewrite-url") - val url = if (enabled) config.getString("atlas.eval.stream.rewrite-url") else "" - if (enabled) { - logger.info(s"Rewriting enabled with url: ${url}") - } else { - logger.info("Rewriting is disabled") - } - (enabled, url) + private val rewriteConfig = config.getConfig("atlas.eval.stream.rewrite") + private val enabled = rewriteConfig.getBoolean("enabled") + private val rewriteUri = rewriteConfig.getString("uri") + + if (enabled) { + logger.info(s"rewriting enabled with uri: $rewriteUri") + } else { + logger.info(s"rewriting is disabled") } private val client = PekkoHttpClient @@ -215,7 +214,7 @@ class DataSourceRewriter( Some(new DataSource(ds.id, ds.step(), rewrite)) } } - DataSources.of(rewrites: _*) + DataSources.of(rewrites*) } private[stream] def constructRequest(dss: List[DataSource]): HttpRequest = { @@ -226,7 +225,7 @@ class DataSourceRewriter( json.writeEndArray() } HttpRequest( - uri = rewriteUrl, + uri = rewriteUri, method = HttpMethods.POST, entity = HttpEntity(ContentTypes.`application/json`, baos.toByteArray) ) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala index e9aafc407..e089ff7ad 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala @@ -24,7 +24,6 @@ import com.netflix.atlas.pekko.PekkoHttpClient import com.netflix.spectator.api.NoopRegistry import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigValueFactory import munit.FunSuite import org.apache.pekko.NotUsed import org.apache.pekko.actor.ActorSystem @@ -51,7 +50,7 @@ import scala.util.Try class DataSourceRewriterSuite extends FunSuite with TestKitBase { - val dss = DataSources.of( + private val dss = DataSources.of( new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), new DataSource( "bar", @@ -60,19 +59,21 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { ) ) - var config: Config = _ - var logger: MockLogger = _ - var ctx: StreamContext = null + private var config: Config = _ + private var logger: MockLogger = _ + private var ctx: StreamContext = null override implicit def system: ActorSystem = ActorSystem("Test") override def beforeEach(context: BeforeEach): Unit = { config = ConfigFactory - .load() - .withValue( - "atlas.eval.stream.rewrite-url", - ConfigValueFactory.fromAnyRef("http://localhost/api/v1/rewrite") - ) + .parseString(""" + |atlas.eval.stream.rewrite { + | enabled = true + | uri = "http://localhost/api/v1/rewrite" + |} + |""".stripMargin) + .withFallback(ConfigFactory.load()) logger = new MockLogger() ctx = new StreamContext(config, Materializer(system), dsLogger = logger) } @@ -85,7 +86,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: OK") { val client = mockClient( - StatusCode.int2StatusCode(200), + StatusCodes.OK, okRewrite() ) val expected = DataSources.of( @@ -103,7 +104,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: Bad URI in datasources") { val client = mockClient( - StatusCode.int2StatusCode(200), + StatusCodes.OK, Map( "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( "OK", @@ -129,7 +130,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: Malformed response JSON") { val client = mockClient( - StatusCode.int2StatusCode(200), + StatusCodes.OK, Map( "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( "OK", @@ -153,7 +154,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: 500") { val client = mockClient( - StatusCode.int2StatusCode(500), + StatusCodes.InternalServerError, Map.empty ) val expected = DataSources.of() @@ -164,7 +165,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: Missing a rewrite") { val client = mockClient( - StatusCode.int2StatusCode(200), + StatusCodes.OK, Map( "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( "OK", @@ -185,9 +186,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: source changes with good, bad, good") { val client = new MockClient( List( - StatusCode.int2StatusCode(200), - StatusCode.int2StatusCode(500), - StatusCode.int2StatusCode(200) + StatusCodes.OK, + StatusCodes.InternalServerError, + StatusCodes.OK ), List( okRewrite(true), @@ -224,9 +225,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: retry initial flow with 500s") { val client = new MockClient( List( - StatusCode.int2StatusCode(500), - StatusCode.int2StatusCode(500), - StatusCode.int2StatusCode(200) + StatusCodes.InternalServerError, + StatusCodes.InternalServerError, + StatusCodes.OK ), List( Map.empty, @@ -250,9 +251,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: retry initial flow with 500, exception, ok") { val client = new MockClient( List( - StatusCode.int2StatusCode(500), + StatusCodes.InternalServerError, StatusCodes.custom(0, "no conn", "no conn", false, true), - StatusCode.int2StatusCode(200) + StatusCodes.OK ), List( Map.empty, @@ -343,7 +344,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { var called = 0 - override def singleRequest(request: HttpRequest): Future[HttpResponse] = ??? + override def singleRequest(request: HttpRequest): Future[HttpResponse] = { + Future.failed(new UnsupportedOperationException()) + } override def superPool[C]( config: PekkoHttpClient.ClientConfig