From 5f0a4d8df250a54b7253907b9ef0b39ee8b7214f Mon Sep 17 00:00:00 2001 From: Andrea Lamparelli Date: Mon, 8 Apr 2024 19:27:30 +0200 Subject: [PATCH] MPL-68: make run_schemas updates asynchronous --- .../tools/horreum/bus/MessageBusChannels.java | 1 + .../tools/horreum/svc/SchemaServiceImpl.java | 9 +- .../tools/horreum/svc/ServiceMediator.java | 21 ++- .../src/main/resources/application.properties | 17 ++- .../tools/horreum/svc/SchemaServiceTest.java | 141 +++++++++++++++++- horreum-web/src/Banner.tsx | 51 ++++++- horreum-web/src/domain/schemas/Schema.tsx | 24 ++- 7 files changed, 243 insertions(+), 21 deletions(-) diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java index 8c13211c4..3ee2f4dfe 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/bus/MessageBusChannels.java @@ -15,5 +15,6 @@ public enum MessageBusChannels { RUN_VALIDATED, CHANGE_NEW, EXPERIMENT_RESULT_NEW, + SCHEMA_SYNC, FOOBAR } diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java index 76ee86e62..2b1a30d1a 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/SchemaServiceImpl.java @@ -187,19 +187,24 @@ public Integer add(Schema schemaDTO){ .setParameter(1, schema.id).executeUpdate(); em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1") .setParameter(1, schema.id).executeUpdate(); - mediator.newOrUpdatedSchema(schema); + newOrUpdatedSchema(schema); } } else { schema.id = null; schema.persist(); em.flush(); - mediator.newOrUpdatedSchema(schema); + newOrUpdatedSchema(schema); } log.debugf("Added schema %s (%d), URI %s", schema.name, schema.id, schema.uri); return schema.id; } + private void newOrUpdatedSchema(SchemaDAO schema) { + log.debugf("Push schema event for async run schemas update: %d (%s)", schema.id, schema.uri); + Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(schema.id)); + } + private void verifyNewSchema(Schema schemaDTO) { if (schemaDTO.uri == null || Arrays.stream(ALL_URNS).noneMatch(scheme -> schemaDTO.uri.startsWith(scheme + ":"))) { throw ServiceException.badRequest("Please use URI starting with one of these schemes: " + Arrays.toString(ALL_URNS)); diff --git a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java index 5514a1801..8a4c8a2e4 100644 --- a/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java +++ b/horreum-backend/src/main/java/io/hyperfoil/tools/horreum/svc/ServiceMediator.java @@ -9,7 +9,6 @@ import io.hyperfoil.tools.horreum.api.data.Test; import io.hyperfoil.tools.horreum.api.services.ExperimentService; import io.hyperfoil.tools.horreum.entity.data.ActionDAO; -import io.hyperfoil.tools.horreum.entity.data.SchemaDAO; import io.hyperfoil.tools.horreum.events.DatasetChanges; import io.smallrye.reactive.messaging.annotations.Blocking; import io.vertx.core.Vertx; @@ -76,6 +75,10 @@ public class ServiceMediator { @Channel("run-recalc-out") Emitter runEmitter; + @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000) + @Channel("schema-sync-out") + Emitter schemaEmitter; + public ServiceMediator() { } @@ -158,6 +161,18 @@ void queueRunRecalculation(int runId) { runEmitter.send(runId); } + @Incoming("schema-sync-in") + @Blocking(ordered = false, value = "horreum.schema.pool") + @ActivateRequestContext + public void processSchemaSync(int schemaId) { + runService.onNewOrUpdatedSchema(schemaId); + } + + @Transactional(Transactional.TxType.NOT_SUPPORTED) + void queueSchemaSync(int schemaId) { + schemaEmitter.send(schemaId); + } + void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) { experimentService.onDatapointsCreated(event); } @@ -206,9 +221,6 @@ void importTestToAll(TestExport test) { subscriptionService.importSubscriptions(test); } - public void newOrUpdatedSchema(SchemaDAO schema) { - runService.processNewOrUpdatedSchema(schema); - } public void updateFingerprints(int testId) { datasetService.updateFingerprints(testId); } @@ -222,4 +234,5 @@ public void validateDataset(Integer datasetId) { public void validateSchema(int schemaId) { schemaService.revalidateAll(schemaId); } + } diff --git a/horreum-backend/src/main/resources/application.properties b/horreum-backend/src/main/resources/application.properties index 881deee9c..826d79a9c 100644 --- a/horreum-backend/src/main/resources/application.properties +++ b/horreum-backend/src/main/resources/application.properties @@ -46,6 +46,18 @@ mp.messaging.outgoing.run-recalc-out.address=run-recalc mp.messaging.outgoing.run-recalc-out.durable=true mp.messaging.outgoing.run-recalc-out.container-id=horreum-broker mp.messaging.outgoing.run-recalc-out.link-name=run-recalc +# schema-sync incoming +mp.messaging.incoming.schema-sync-in.connector=smallrye-amqp +mp.messaging.incoming.schema-sync-in.address=schema-sync +mp.messaging.incoming.schema-sync-in.durable=true +mp.messaging.incoming.schema-sync-in.container-id=horreum-broker +mp.messaging.incoming.schema-sync-in.link-name=schema-sync +# schema-sync outgoing +mp.messaging.outgoing.schema-sync-out.connector=smallrye-amqp +mp.messaging.outgoing.schema-sync-out.address=schema-sync +mp.messaging.outgoing.schema-sync-out.durable=true +mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker +mp.messaging.outgoing.schema-sync-out.link-name=schema-sync ## Datasource updated by Liquibase - the same as app but always with superuser credentials @@ -74,8 +86,9 @@ horreum.test-mode=false #quarkus.native.additional-build-args= # thread pool sizes -smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10 -smallrye.messaging.worker.horreum.run.pool.max-concurrency=10 +smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=7 +smallrye.messaging.worker.horreum.run.pool.max-concurrency=7 +smallrye.messaging.worker.horreum.schema.pool.max-concurrency=7 hibernate.jdbc.time_zone=UTC diff --git a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java index fb6a378ce..d1861eafa 100644 --- a/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java +++ b/horreum-backend/src/test/java/io/hyperfoil/tools/horreum/svc/SchemaServiceTest.java @@ -30,6 +30,8 @@ import io.quarkus.test.junit.TestProfile; import io.quarkus.test.oidc.server.OidcWiremockTestResource; import io.restassured.common.mapper.TypeRef; +import jakarta.inject.Inject; +import jakarta.persistence.Tuple; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestInfo; @@ -256,7 +258,7 @@ private void testExportImport(boolean wipe) { } @org.junit.jupiter.api.Test - public void testFindUsages(TestInfo info) throws InterruptedException { + public void testFindUsages() throws InterruptedException { Test test = createTest(createExampleTest("nofilter")); createComparisonSchema(); uploadExampleRuns(test); @@ -268,12 +270,145 @@ public void testFindUsages(TestInfo info) throws InterruptedException { assertNotEquals(0, report.data.size()); - List usages = jsonRequest().get("/api/schema/findUsages?label=".concat("category")) - .then().statusCode(200).extract().body().as(List.class); + List usages = + jsonRequest().get("/api/schema/findUsages?label=".concat("category")) + .then().statusCode(200).extract().body().as(List.class); assertNotNull(usages); + } + + @org.junit.jupiter.api.Test + public void testCreateSchemaAfterRunWithArrayData() throws InterruptedException { + String schemaUri = "urn:unknown:schema"; + Test test = createTest(createExampleTest("dummy-test")); + + ArrayNode data = JsonNodeFactory.instance.arrayNode(); + data.addObject().put("$schema", schemaUri).put("foo", "bar"); + data.addObject().put("$schema", schemaUri).put("foo", "bar"); + int runId = uploadRun(data.toString(), test.name); + assertTrue(runId > 0); + + // no validation errors + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult()); + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult()); + + List runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + assertEquals(0, runSchemasBefore.size()); + + // create the schema afterward + Schema schema = createSchema("Unknown schema", schemaUri); + assertNotNull(schema); + assertTrue(schema.id > 0); + + TestUtil.eventually(() -> { + Util.withTx(tm, () -> { + em.clear(); + List runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + // two records as the run is an array of two objects, both referencing the same schema + assertEquals(2, runSchemas.size()); + return null; + }); + }); + } + + @org.junit.jupiter.api.Test + public void testCreateSchemaAfterRunWithMultipleSchemas() throws InterruptedException { + String firstSchemaUri = "urn:unknown1:schema"; + String secondSchemaUri = "urn:unknown2:schema"; + Test test = createTest(createExampleTest("dummy-test")); + ArrayNode data = JsonNodeFactory.instance.arrayNode(); + data.addObject().put("$schema", firstSchemaUri).put("foo", "bar"); + data.addObject().put("$schema", secondSchemaUri).put("foo", "zip"); + int runId = uploadRun(data.toString(), test.name); + assertTrue(runId > 0); + + // no validation errors + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult()); + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult()); + + List runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + assertEquals(0, runSchemasBefore.size()); + + // create the schema 1 afterward + Schema schema1 = createSchema("Unknown schema 1", firstSchemaUri); + assertNotNull(schema1); + assertTrue(schema1.id > 0); + + TestUtil.eventually(() -> { + Util.withTx(tm, () -> { + em.clear(); + List runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1", Tuple.class).setParameter(1, runId).getResultList(); + // 1 record as the run is an array of two objects referencing different schemas and only the first one is created + assertEquals(1, runSchemas.size()); + assertEquals(schema1.id, (int)runSchemas.get(0).get(3)); + return null; + }); + }); } + @org.junit.jupiter.api.Test + public void testCreateSchemaAfterRunWithObjectData() throws InterruptedException { + String schemaUri = "urn:unknown:schema"; + Test test = createTest(createExampleTest("dummy-test")); + ObjectNode data = JsonNodeFactory.instance.objectNode(); + data.put("$schema", schemaUri).put("foo", "bar"); + int runId = uploadRun(data.toString(), test.name); + assertTrue(runId > 0); + + // no validation errors + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult()); + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult()); + + List runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + assertEquals(0, runSchemasBefore.size()); + + // create the schema afterward + Schema schema = createSchema("Unknown schema", schemaUri); + assertNotNull(schema); + assertTrue(schema.id > 0); + + TestUtil.eventually(() -> { + Util.withTx(tm, () -> { + em.clear(); + List runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + // run has single object data, thus referencing one schema + assertEquals(1, runSchemas.size()); + return null; + }); + }); + } + + @org.junit.jupiter.api.Test + public void testChangeUriForReferencedSchema() throws InterruptedException { + String schemaUri = "urn:dummy:schema"; + Schema schema = createSchema("Dummy schema", schemaUri); + assertNotNull(schema); + assertTrue(schema.id > 0); + + Test test = createTest(createExampleTest("dummy-test")); + + ArrayNode data = JsonNodeFactory.instance.arrayNode(); + data.addObject().put("$schema", schemaUri).put("foo", "bar"); + data.addObject().put("$schema", schemaUri).put("foo", "bar"); + int runId = uploadRun(data.toString(), test.name); + assertTrue(runId > 0); + + // no validation errors + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult()); + assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult()); + + List runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + assertEquals(2, runSchemasBefore.size()); + + // update the schema uri afterward + schema.uri = "urn:new-dummy:schema"; + Schema updatedSchema = addOrUpdateSchema(schema); + assertNotNull(updatedSchema); + assertEquals(schema.id, updatedSchema.id); + + List runSchemasAfter = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList(); + assertEquals(0, runSchemasAfter.size()); + } } diff --git a/horreum-web/src/Banner.tsx b/horreum-web/src/Banner.tsx index cbf96c549..cf5e5ec82 100644 --- a/horreum-web/src/Banner.tsx +++ b/horreum-web/src/Banner.tsx @@ -3,19 +3,58 @@ import { useEffect, useState } from "react" import { Alert } from "@patternfly/react-core" import {Banner as BannerData, bannerApi} from "./api" +function getAlertBanner(banner: BannerData) { + return ( + +
+
+ ) +} + +// 30 seconds +const DEFAULT_TIMEOUT = 30000 +export type TimeoutBannerProps = { + bannerData: BannerData + timeout?: number + onTimeout?: () => void +} + +export function TimeoutBanner({bannerData, timeout, onTimeout}: TimeoutBannerProps) { + timeout = timeout ?? DEFAULT_TIMEOUT + const [banner, setBanner] = useState(bannerData) + + useEffect(() => { + if (banner) { + const timeoutId = setTimeout(() => { + setBanner(undefined) + if (onTimeout) { + onTimeout() + } + }, timeout) + + return () => clearTimeout(timeoutId); + } + }, []) + + if (!banner) { + return null + } + + return getAlertBanner(banner) +} + export default function Banner() { const [banner, setBanner] = useState() const [updateCounter, setUpdateCounter] = useState(0) useEffect(() => { - setTimeout(() => setUpdateCounter(updateCounter + 1), 60000) + const timeoutId = setTimeout(() => setUpdateCounter(updateCounter + 1), 60000) bannerApi.get().then(setBanner) + + return () => clearTimeout(timeoutId) }, [updateCounter]) if (!banner) { return null } - return ( - -
-
- ) + + return getAlertBanner(banner) } diff --git a/horreum-web/src/domain/schemas/Schema.tsx b/horreum-web/src/domain/schemas/Schema.tsx index d87f99ddb..736952263 100644 --- a/horreum-web/src/domain/schemas/Schema.tsx +++ b/horreum-web/src/domain/schemas/Schema.tsx @@ -37,10 +37,11 @@ import SavedTabs, { SavedTab, TabFunctions, modifiedFunc, resetFunc, saveFunc } import TeamSelect from "../../components/TeamSelect" import Transformers from "./Transformers" import Labels from "./Labels" -import {Access, getSchema, Schema as SchemaDef, schemaApi} from "../../api" +import {Access, getSchema, Schema as SchemaDef, schemaApi, Banner as BannerData} from "../../api" import SchemaExportImport from "./SchemaExportImport" import {AppContext} from "../../context/appContext"; import {AppContextType} from "../../context/@types/appContextTypes"; +import { TimeoutBanner, TimeoutBannerProps } from "../../Banner" type SchemaParams = { schemaId: string @@ -212,6 +213,7 @@ export default function Schema() { const [editorSchema, setEditorSchema] = useState(schema?.schema ? toString(schema?.schema) : undefined) const [modifiedSchema, setModifiedSchema] = useState(schema) const [modified, setModified] = useState(false) + const [showMessageBanner, setShowMessageBanner] = useState(false) // any tester can save to add new labels/transformers const isTester = useTester() @@ -246,8 +248,6 @@ export default function Schema() { return schemaUri || undefined } - - const save = () => { if (!modified) { return Promise.resolve(schemaIdVal) @@ -261,13 +261,29 @@ export default function Schema() { return schemaApi.add(newSchema) .then(id=> id, error => alerting.dispatchError(error, "SAVE_SCHEMA", "Failed to save schema") - ).then(id => setSchemaIdVal(id)) + ).then(id => { + setSchemaIdVal(id) + setShowMessageBanner(true) + }) } const transformersFuncsRef = useRef() const labelsFuncsRef = useRef() + const bannerProps = { + bannerData: { + title: "Schema sync alert", + message: "Horreum schema changes is processed asynchronously. As a result, changes in runs and schemas links may not be reflected immediately.", + severity: "info", + active: true, + }, + onTimeout: () => setShowMessageBanner(false) + } as TimeoutBannerProps + return ( + {showMessageBanner && ( + + )} {loading && (