Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.12.x] MPL-68: make run_schemas updates asynchronous #1593

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum MessageBusChannels {
RUN_VALIDATED,
CHANGE_NEW,
EXPERIMENT_RESULT_NEW,
SCHEMA_SYNC,
FOOBAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,24 @@ public Integer add(Schema schemaDTO){
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
}
else {
schema.id = null;
schema.persist();
em.flush();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
log.debugf("Added schema %s (%d), URI %s", schema.name, schema.id, schema.uri);
return schema.id;
}

private void newOrUpdatedSchema(SchemaDAO schema) {
log.debugf("Push schema event for async run schemas update: %d (%s)", schema.id, schema.uri);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(schema.id));
}

private void verifyNewSchema(Schema schemaDTO) {
if (schemaDTO.uri == null || Arrays.stream(ALL_URNS).noneMatch(scheme -> schemaDTO.uri.startsWith(scheme + ":"))) {
throw ServiceException.badRequest("Please use URI starting with one of these schemes: " + Arrays.toString(ALL_URNS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.hyperfoil.tools.horreum.api.data.Test;
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.SchemaDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -76,6 +75,10 @@ public class ServiceMediator {
@Channel("run-recalc-out")
Emitter<Integer> runEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("schema-sync-out")
Emitter<Integer> schemaEmitter;

public ServiceMediator() {
}

Expand Down Expand Up @@ -158,6 +161,18 @@ void queueRunRecalculation(int runId) {
runEmitter.send(runId);
}

@Incoming("schema-sync-in")
@Blocking(ordered = false, value = "horreum.schema.pool")
@ActivateRequestContext
public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -206,9 +221,6 @@ void importTestToAll(TestExport test) {
subscriptionService.importSubscriptions(test);
}

public void newOrUpdatedSchema(SchemaDAO schema) {
runService.processNewOrUpdatedSchema(schema);
}
public void updateFingerprints(int testId) {
datasetService.updateFingerprints(testId);
}
Expand All @@ -222,4 +234,5 @@ public void validateDataset(Integer datasetId) {
public void validateSchema(int schemaId) {
schemaService.revalidateAll(schemaId);
}

}
17 changes: 15 additions & 2 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ mp.messaging.outgoing.run-recalc-out.address=run-recalc
mp.messaging.outgoing.run-recalc-out.durable=true
mp.messaging.outgoing.run-recalc-out.container-id=horreum-broker
mp.messaging.outgoing.run-recalc-out.link-name=run-recalc
# schema-sync incoming
mp.messaging.incoming.schema-sync-in.connector=smallrye-amqp
mp.messaging.incoming.schema-sync-in.address=schema-sync
mp.messaging.incoming.schema-sync-in.durable=true
mp.messaging.incoming.schema-sync-in.container-id=horreum-broker
mp.messaging.incoming.schema-sync-in.link-name=schema-sync
# schema-sync outgoing
mp.messaging.outgoing.schema-sync-out.connector=smallrye-amqp
mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down Expand Up @@ -74,8 +86,9 @@ horreum.test-mode=false
#quarkus.native.additional-build-args=

# thread pool sizes
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10
smallrye.messaging.worker.horreum.run.pool.max-concurrency=10
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=7
smallrye.messaging.worker.horreum.run.pool.max-concurrency=7
smallrye.messaging.worker.horreum.schema.pool.max-concurrency=7


hibernate.jdbc.time_zone=UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.quarkus.test.junit.TestProfile;
import io.quarkus.test.oidc.server.OidcWiremockTestResource;
import io.restassured.common.mapper.TypeRef;
import jakarta.inject.Inject;
import jakarta.persistence.Tuple;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;

Expand Down Expand Up @@ -256,7 +258,7 @@ private void testExportImport(boolean wipe) {
}

@org.junit.jupiter.api.Test
public void testFindUsages(TestInfo info) throws InterruptedException {
public void testFindUsages() throws InterruptedException {
Test test = createTest(createExampleTest("nofilter"));
createComparisonSchema();
uploadExampleRuns(test);
Expand All @@ -268,12 +270,145 @@ public void testFindUsages(TestInfo info) throws InterruptedException {

assertNotEquals(0, report.data.size());

List<SchemaService.LabelLocation> usages = jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);
List<SchemaService.LabelLocation> usages =
jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);

assertNotNull(usages);
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithArrayData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// two records as the run is an array of two objects, both referencing the same schema
assertEquals(2, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithMultipleSchemas() throws InterruptedException {
String firstSchemaUri = "urn:unknown1:schema";
String secondSchemaUri = "urn:unknown2:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", firstSchemaUri).put("foo", "bar");
data.addObject().put("$schema", secondSchemaUri).put("foo", "zip");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema 1 afterward
Schema schema1 = createSchema("Unknown schema 1", firstSchemaUri);
assertNotNull(schema1);
assertTrue(schema1.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<Tuple> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1", Tuple.class).setParameter(1, runId).getResultList();
// 1 record as the run is an array of two objects referencing different schemas and only the first one is created
assertEquals(1, runSchemas.size());
assertEquals(schema1.id, (int)runSchemas.get(0).get(3));
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithObjectData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ObjectNode data = JsonNodeFactory.instance.objectNode();
data.put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// run has single object data, thus referencing one schema
assertEquals(1, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testChangeUriForReferencedSchema() throws InterruptedException {
String schemaUri = "urn:dummy:schema";
Schema schema = createSchema("Dummy schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(2, runSchemasBefore.size());

// update the schema uri afterward
schema.uri = "urn:new-dummy:schema";
Schema updatedSchema = addOrUpdateSchema(schema);
assertNotNull(updatedSchema);
assertEquals(schema.id, updatedSchema.id);

List<?> runSchemasAfter = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasAfter.size());
}
}
51 changes: 45 additions & 6 deletions horreum-web/src/Banner.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,58 @@ import { useEffect, useState } from "react"
import { Alert } from "@patternfly/react-core"
import {Banner as BannerData, bannerApi} from "./api"

function getAlertBanner(banner: BannerData) {
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)
}

// 30 seconds
const DEFAULT_TIMEOUT = 30000
export type TimeoutBannerProps = {
bannerData: BannerData
timeout?: number
onTimeout?: () => void
}

export function TimeoutBanner({bannerData, timeout, onTimeout}: TimeoutBannerProps) {
timeout = timeout ?? DEFAULT_TIMEOUT
const [banner, setBanner] = useState<BannerData | undefined>(bannerData)

useEffect(() => {
if (banner) {
const timeoutId = setTimeout(() => {
setBanner(undefined)
if (onTimeout) {
onTimeout()
}
}, timeout)

return () => clearTimeout(timeoutId);
}
}, [])

if (!banner) {
return null
}

return getAlertBanner(banner)
}

export default function Banner() {
const [banner, setBanner] = useState<BannerData>()
const [updateCounter, setUpdateCounter] = useState(0)
useEffect(() => {
setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
const timeoutId = setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
bannerApi.get().then(setBanner)

return () => clearTimeout(timeoutId)
}, [updateCounter])
if (!banner) {
return null
}
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)

return getAlertBanner(banner)
}
Loading