Skip to content

Commit

Permalink
MPL-68: make run_schemas updates asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
lampajr committed Apr 15, 2024
1 parent 95108a0 commit 5f0a4d8
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum MessageBusChannels {
RUN_VALIDATED,
CHANGE_NEW,
EXPERIMENT_RESULT_NEW,
SCHEMA_SYNC,
FOOBAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,24 @@ public Integer add(Schema schemaDTO){
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
}
else {
schema.id = null;
schema.persist();
em.flush();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
log.debugf("Added schema %s (%d), URI %s", schema.name, schema.id, schema.uri);
return schema.id;
}

private void newOrUpdatedSchema(SchemaDAO schema) {
log.debugf("Push schema event for async run schemas update: %d (%s)", schema.id, schema.uri);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(schema.id));
}

private void verifyNewSchema(Schema schemaDTO) {
if (schemaDTO.uri == null || Arrays.stream(ALL_URNS).noneMatch(scheme -> schemaDTO.uri.startsWith(scheme + ":"))) {
throw ServiceException.badRequest("Please use URI starting with one of these schemes: " + Arrays.toString(ALL_URNS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.hyperfoil.tools.horreum.api.data.Test;
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.SchemaDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -76,6 +75,10 @@ public class ServiceMediator {
@Channel("run-recalc-out")
Emitter<Integer> runEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("schema-sync-out")
Emitter<Integer> schemaEmitter;

public ServiceMediator() {
}

Expand Down Expand Up @@ -158,6 +161,18 @@ void queueRunRecalculation(int runId) {
runEmitter.send(runId);
}

@Incoming("schema-sync-in")
@Blocking(ordered = false, value = "horreum.schema.pool")
@ActivateRequestContext
public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -206,9 +221,6 @@ void importTestToAll(TestExport test) {
subscriptionService.importSubscriptions(test);
}

public void newOrUpdatedSchema(SchemaDAO schema) {
runService.processNewOrUpdatedSchema(schema);
}
public void updateFingerprints(int testId) {
datasetService.updateFingerprints(testId);
}
Expand All @@ -222,4 +234,5 @@ public void validateDataset(Integer datasetId) {
public void validateSchema(int schemaId) {
schemaService.revalidateAll(schemaId);
}

}
17 changes: 15 additions & 2 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ mp.messaging.outgoing.run-recalc-out.address=run-recalc
mp.messaging.outgoing.run-recalc-out.durable=true
mp.messaging.outgoing.run-recalc-out.container-id=horreum-broker
mp.messaging.outgoing.run-recalc-out.link-name=run-recalc
# schema-sync incoming
mp.messaging.incoming.schema-sync-in.connector=smallrye-amqp
mp.messaging.incoming.schema-sync-in.address=schema-sync
mp.messaging.incoming.schema-sync-in.durable=true
mp.messaging.incoming.schema-sync-in.container-id=horreum-broker
mp.messaging.incoming.schema-sync-in.link-name=schema-sync
# schema-sync outgoing
mp.messaging.outgoing.schema-sync-out.connector=smallrye-amqp
mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down Expand Up @@ -74,8 +86,9 @@ horreum.test-mode=false
#quarkus.native.additional-build-args=

# thread pool sizes
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10
smallrye.messaging.worker.horreum.run.pool.max-concurrency=10
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=7
smallrye.messaging.worker.horreum.run.pool.max-concurrency=7
smallrye.messaging.worker.horreum.schema.pool.max-concurrency=7


hibernate.jdbc.time_zone=UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.quarkus.test.junit.TestProfile;
import io.quarkus.test.oidc.server.OidcWiremockTestResource;
import io.restassured.common.mapper.TypeRef;
import jakarta.inject.Inject;
import jakarta.persistence.Tuple;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;

Expand Down Expand Up @@ -256,7 +258,7 @@ private void testExportImport(boolean wipe) {
}

@org.junit.jupiter.api.Test
public void testFindUsages(TestInfo info) throws InterruptedException {
public void testFindUsages() throws InterruptedException {
Test test = createTest(createExampleTest("nofilter"));
createComparisonSchema();
uploadExampleRuns(test);
Expand All @@ -268,12 +270,145 @@ public void testFindUsages(TestInfo info) throws InterruptedException {

assertNotEquals(0, report.data.size());

List<SchemaService.LabelLocation> usages = jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);
List<SchemaService.LabelLocation> usages =
jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);

assertNotNull(usages);
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithArrayData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// two records as the run is an array of two objects, both referencing the same schema
assertEquals(2, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithMultipleSchemas() throws InterruptedException {
String firstSchemaUri = "urn:unknown1:schema";
String secondSchemaUri = "urn:unknown2:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", firstSchemaUri).put("foo", "bar");
data.addObject().put("$schema", secondSchemaUri).put("foo", "zip");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema 1 afterward
Schema schema1 = createSchema("Unknown schema 1", firstSchemaUri);
assertNotNull(schema1);
assertTrue(schema1.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<Tuple> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1", Tuple.class).setParameter(1, runId).getResultList();
// 1 record as the run is an array of two objects referencing different schemas and only the first one is created
assertEquals(1, runSchemas.size());
assertEquals(schema1.id, (int)runSchemas.get(0).get(3));
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithObjectData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ObjectNode data = JsonNodeFactory.instance.objectNode();
data.put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// run has single object data, thus referencing one schema
assertEquals(1, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testChangeUriForReferencedSchema() throws InterruptedException {
String schemaUri = "urn:dummy:schema";
Schema schema = createSchema("Dummy schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(2, runSchemasBefore.size());

// update the schema uri afterward
schema.uri = "urn:new-dummy:schema";
Schema updatedSchema = addOrUpdateSchema(schema);
assertNotNull(updatedSchema);
assertEquals(schema.id, updatedSchema.id);

List<?> runSchemasAfter = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasAfter.size());
}
}
51 changes: 45 additions & 6 deletions horreum-web/src/Banner.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,58 @@ import { useEffect, useState } from "react"
import { Alert } from "@patternfly/react-core"
import {Banner as BannerData, bannerApi} from "./api"

function getAlertBanner(banner: BannerData) {
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)
}

// 30 seconds
const DEFAULT_TIMEOUT = 30000
export type TimeoutBannerProps = {
bannerData: BannerData
timeout?: number
onTimeout?: () => void
}

export function TimeoutBanner({bannerData, timeout, onTimeout}: TimeoutBannerProps) {
timeout = timeout ?? DEFAULT_TIMEOUT
const [banner, setBanner] = useState<BannerData | undefined>(bannerData)

useEffect(() => {
if (banner) {
const timeoutId = setTimeout(() => {
setBanner(undefined)
if (onTimeout) {
onTimeout()
}
}, timeout)

return () => clearTimeout(timeoutId);
}
}, [])

if (!banner) {
return null
}

return getAlertBanner(banner)
}

export default function Banner() {
const [banner, setBanner] = useState<BannerData>()
const [updateCounter, setUpdateCounter] = useState(0)
useEffect(() => {
setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
const timeoutId = setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
bannerApi.get().then(setBanner)

return () => clearTimeout(timeoutId)
}, [updateCounter])
if (!banner) {
return null
}
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)

return getAlertBanner(banner)
}
Loading

0 comments on commit 5f0a4d8

Please sign in to comment.