diff --git a/extensions/common/sql/sql-core/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java b/extensions/common/sql/sql-core/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java index c8e449dd387..2294ad90a2a 100644 --- a/extensions/common/sql/sql-core/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java +++ b/extensions/common/sql/sql-core/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java @@ -48,7 +48,8 @@ static void createDatabase(String participantName) { "control-plane/store/sql/policy-definition-store-sql", "control-plane/store/sql/transfer-process-store-sql", "data-plane/store/sql/data-plane-store-sql", - "policy-monitor/store/sql/policy-monitor-store-sql" + "policy-monitor/store/sql/policy-monitor-store-sql", + "common/store/sql/edr-index-sql" ) .map(extensionsFolder::resolve) .map(it -> it.resolve("docs")) diff --git a/extensions/common/store/sql/edr-index-sql/build.gradle.kts b/extensions/common/store/sql/edr-index-sql/build.gradle.kts new file mode 100644 index 00000000000..62ecdce7cc6 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/build.gradle.kts @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` +} + +dependencies { + api(project(":spi:common:core-spi")) + api(project(":spi:common:transaction-spi")) + + implementation(project(":extensions:common:sql:sql-core")) + implementation(project(":spi:common:edr-store-spi")) + implementation(project(":spi:common:transaction-datasource-spi")) + testImplementation(project(":core:common:junit")) + testImplementation(testFixtures(project(":extensions:common:sql:sql-core"))) + testImplementation(testFixtures(project(":spi:common:edr-store-spi"))) + +} \ No newline at end of file diff --git a/extensions/common/store/sql/edr-index-sql/docs/schema.sql b/extensions/common/store/sql/edr-index-sql/docs/schema.sql new file mode 100644 index 00000000000..3f3058569d2 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/docs/schema.sql @@ -0,0 +1,11 @@ + +CREATE TABLE IF NOT EXISTS edc_edr_entry +( + transfer_process_id VARCHAR NOT NULL PRIMARY KEY, + agreement_id VARCHAR NOT NULL, + asset_id VARCHAR NOT NULL, + provider_id VARCHAR NOT NULL, + contract_negotiation_id VARCHAR, + created_at BIGINT NOT NULL +); + diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndex.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndex.java new file mode 100644 index 00000000000..21c4b848a5e --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndex.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.jetbrains.annotations.Nullable; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +public class SqlEndpointDataReferenceEntryIndex extends AbstractSqlStore implements EndpointDataReferenceEntryIndex { + + private final EndpointDataReferenceEntryStatements statements; + + public SqlEndpointDataReferenceEntryIndex(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, + ObjectMapper objectMapper, EndpointDataReferenceEntryStatements statements, QueryExecutor queryExecutor) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) { + Objects.requireNonNull(transferProcessId); + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + return findById(connection, transferProcessId); + } catch (Exception exception) { + throw new EdcPersistenceException(exception); + } + }); + } + + @Override + public StoreResult> query(QuerySpec querySpec) { + return transactionContext.execute(() -> { + Objects.requireNonNull(querySpec); + try { + var queryStmt = statements.createQuery(querySpec); + var results = queryExecutor.query(getConnection(), true, this::mapResultSet, queryStmt.getQueryAsString(), queryStmt.getParameters()) + .collect(Collectors.toList()); + return StoreResult.success(results); + } catch (SQLException exception) { + throw new EdcPersistenceException(exception); + } + }); + } + + @Override + public StoreResult save(EndpointDataReferenceEntry entry) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + if (existsById(connection, entry.getTransferProcessId())) { + updateInternal(connection, entry); + } else { + insertInternal(connection, entry); + } + return StoreResult.success(); + } catch (Exception e) { + throw new EdcPersistenceException(e.getMessage(), e); + } + }); + } + + @Override + public StoreResult delete(String transferProcessId) { + Objects.requireNonNull(transferProcessId); + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var entity = findById(connection, transferProcessId); + if (entity != null) { + queryExecutor.execute(connection, statements.getDeleteByIdTemplate(), transferProcessId); + return StoreResult.success(entity); + } else { + return StoreResult.notFound(format(ENDPOINT_DATA_REFERENCE_ENTRY_FOUND, transferProcessId)); + } + } catch (Exception e) { + throw new EdcPersistenceException(e.getMessage(), e); + } + }); + } + + private EndpointDataReferenceEntry findById(Connection connection, String id) { + var sql = statements.getFindByTemplate(); + return queryExecutor.single(connection, false, this::mapResultSet, sql, id); + } + + private boolean existsById(Connection connection, String definitionId) { + var sql = statements.getCountTemplate(); + try (var stream = queryExecutor.query(connection, false, this::mapCount, sql, definitionId)) { + return stream.findFirst().orElse(0L) > 0; + } + } + + private long mapCount(ResultSet resultSet) throws SQLException { + return resultSet.getLong(1); + } + + private void insertInternal(Connection connection, EndpointDataReferenceEntry entry) { + transactionContext.execute(() -> { + queryExecutor.execute(connection, statements.getInsertTemplate(), + entry.getTransferProcessId(), + entry.getAssetId(), + entry.getProviderId(), + entry.getAgreementId(), + entry.getContractNegotiationId(), + entry.getCreatedAt()); + }); + } + + private void updateInternal(Connection connection, EndpointDataReferenceEntry entry) { + transactionContext.execute(() -> { + queryExecutor.execute(connection, statements.getUpdateTemplate(), + entry.getTransferProcessId(), + entry.getAssetId(), + entry.getProviderId(), + entry.getAgreementId(), + entry.getContractNegotiationId(), + entry.getCreatedAt(), + entry.getTransferProcessId()); + }); + } + + private EndpointDataReferenceEntry mapResultSet(ResultSet resultSet) throws Exception { + return EndpointDataReferenceEntry.Builder.newInstance() + .createdAt(resultSet.getLong(statements.getCreatedAtColumn())) + .assetId(resultSet.getString(statements.getAssetIdColumn())) + .transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn())) + .agreementId(resultSet.getString(statements.getAgreementIdColumn())) + .providerId(resultSet.getString(statements.getProviderIdColumn())) + .contractNegotiationId(resultSet.getString(statements.getContractNegotiationIdColumn())) + .build(); + } +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtension.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtension.java new file mode 100644 index 00000000000..223856c9ff2 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtension.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr; + + +import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements; +import org.eclipse.edc.connector.store.sql.edr.schema.postgres.PostgresDialectStatements; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provides; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +@Provides({ EndpointDataReferenceEntryIndex.class }) +@Extension(value = "SQL edr entry store") +public class SqlEndpointDataReferenceEntryIndexExtension implements ServiceExtension { + + /** + * Name of the datasource to use for accessing edr entries. + */ + @Setting(required = true) + public static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name"; + + @Inject + private DataSourceRegistry dataSourceRegistry; + + @Inject + private TransactionContext transactionContext; + + @Inject(required = false) + private EndpointDataReferenceEntryStatements statements; + + @Inject + private QueryExecutor queryExecutor; + + @Inject + private TypeManager typeManager; + + @Override + public void initialize(ServiceExtensionContext context) { + var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE); + + var sqlStore = new SqlEndpointDataReferenceEntryIndex(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(), + getStatementImpl(), queryExecutor); + + context.registerService(EndpointDataReferenceEntryIndex.class, sqlStore); + } + + private EndpointDataReferenceEntryStatements getStatementImpl() { + return statements == null ? new PostgresDialectStatements() : statements; + } + +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/BaseSqlDialectStatements.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/BaseSqlDialectStatements.java new file mode 100644 index 00000000000..d316a5f37fe --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/BaseSqlDialectStatements.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr.schema; + +import org.eclipse.edc.connector.store.sql.edr.schema.postgres.EndpointDataReferenceEntryMapping; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlOperatorTranslator; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +import static java.lang.String.format; + +public class BaseSqlDialectStatements implements EndpointDataReferenceEntryStatements { + + protected final SqlOperatorTranslator operatorTranslator; + + public BaseSqlDialectStatements(SqlOperatorTranslator operatorTranslator) { + this.operatorTranslator = operatorTranslator; + } + + @Override + public String getDeleteByIdTemplate() { + return executeStatement().delete(getEdrEntryTable(), getTransferProcessIdColumn()); + } + + @Override + public String getFindByTemplate() { + return format("SELECT * FROM %s WHERE %s = ?", getEdrEntryTable(), getTransferProcessIdColumn()); + } + + @Override + public String getInsertTemplate() { + return executeStatement() + .column(getTransferProcessIdColumn()) + .column(getAssetIdColumn()) + .column(getProviderIdColumn()) + .column(getAgreementIdColumn()) + .column(getContractNegotiationIdColumn()) + .column(getCreatedAtColumn()) + .insertInto(getEdrEntryTable()); + } + + @Override + public String getCountTemplate() { + return format("SELECT COUNT (%s) FROM %s WHERE %s = ?", + getTransferProcessIdColumn(), + getEdrEntryTable(), + getTransferProcessIdColumn()); + } + + @Override + public String getUpdateTemplate() { + return executeStatement() + .column(getTransferProcessIdColumn()) + .column(getAssetIdColumn()) + .column(getProviderIdColumn()) + .column(getAgreementIdColumn()) + .column(getContractNegotiationIdColumn()) + .column(getCreatedAtColumn()) + .update(getEdrEntryTable(), getTransferProcessIdColumn()); + + } + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + var select = format("SELECT * FROM %s", getEdrEntryTable()); + return new SqlQueryStatement(select, querySpec, new EndpointDataReferenceEntryMapping(this), operatorTranslator); + } + +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/EndpointDataReferenceEntryStatements.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/EndpointDataReferenceEntryStatements.java new file mode 100644 index 00000000000..5ae583bc309 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/EndpointDataReferenceEntryStatements.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr.schema; + +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.statement.SqlStatements; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +/** + * Defines all statements that are needed for the {@link EndpointDataReferenceEntry} store + */ +public interface EndpointDataReferenceEntryStatements extends SqlStatements { + default String getTransferProcessIdColumn() { + return "transfer_process_id"; + } + + default String getAgreementIdColumn() { + return "agreement_id"; + } + + default String getAssetIdColumn() { + return "asset_id"; + } + + default String getProviderIdColumn() { + return "provider_id"; + } + + default String getContractNegotiationIdColumn() { + return "contract_negotiation_id"; + } + + default String getEdrEntryTable() { + return "edc_edr_entry"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + String getDeleteByIdTemplate(); + + String getFindByTemplate(); + + String getInsertTemplate(); + + String getCountTemplate(); + + String getUpdateTemplate(); + + SqlQueryStatement createQuery(QuerySpec querySpec); + +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/EndpointDataReferenceEntryMapping.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/EndpointDataReferenceEntryMapping.java new file mode 100644 index 00000000000..ddd787c028f --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/EndpointDataReferenceEntryMapping.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr.schema.postgres; + +import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements; +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.sql.translation.TranslationMapping; + +/** + * Maps fields of a {@link EndpointDataReferenceEntry} onto the + * corresponding SQL schema (= column names) + */ +public class EndpointDataReferenceEntryMapping extends TranslationMapping { + public EndpointDataReferenceEntryMapping(EndpointDataReferenceEntryStatements statements) { + add("assetId", statements.getAssetIdColumn()); + add("agreementId", statements.getAgreementIdColumn()); + add("transferProcessId", statements.getTransferProcessIdColumn()); + add("providerId", statements.getProviderIdColumn()); + add("contractNegotiationId", statements.getContractNegotiationIdColumn()); + } +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/PostgresDialectStatements.java b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/PostgresDialectStatements.java new file mode 100644 index 00000000000..5b42f92d169 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/java/org/eclipse/edc/connector/store/sql/edr/schema/postgres/PostgresDialectStatements.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr.schema.postgres; + +import org.eclipse.edc.connector.store.sql.edr.schema.BaseSqlDialectStatements; +import org.eclipse.edc.sql.dialect.PostgresDialect; +import org.eclipse.edc.sql.translation.PostgresqlOperatorTranslator; + +/** + * Contains Postgres-specific SQL statements + */ +public class PostgresDialectStatements extends BaseSqlDialectStatements { + + public PostgresDialectStatements() { + super(new PostgresqlOperatorTranslator()); + } + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + +} diff --git a/extensions/common/store/sql/edr-index-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/common/store/sql/edr-index-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..bcac987def1 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.connector.store.sql.edr.SqlEndpointDataReferenceEntryIndexExtension \ No newline at end of file diff --git a/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtensionTest.java b/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtensionTest.java new file mode 100644 index 00000000000..a55f82b32e4 --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtensionTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr; + +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.connector.store.sql.edr.SqlEndpointDataReferenceEntryIndexExtension.DATASOURCE_SETTING_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(DependencyInjectionExtension.class) +public class SqlEndpointDataReferenceEntryIndexExtensionTest { + + + @BeforeEach + void setUp(ServiceExtensionContext context) { + context.registerService(TypeManager.class, new TypeManager()); + } + + @Test + void shouldInitializeTheStore(SqlEndpointDataReferenceEntryIndexExtension extension, ServiceExtensionContext context) { + var config = mock(Config.class); + when(context.getConfig()).thenReturn(config); + when(config.getString(any(), any())).thenReturn("test"); + + extension.initialize(context); + + var service = context.getService(EndpointDataReferenceEntryIndex.class); + assertThat(service).isInstanceOf(SqlEndpointDataReferenceEntryIndex.class); + + verify(config).getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE); + } +} diff --git a/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexTest.java b/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexTest.java new file mode 100644 index 00000000000..5da404ace6c --- /dev/null +++ b/extensions/common/store/sql/edr-index-sql/src/test/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.store.sql.edr; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.store.sql.edr.schema.BaseSqlDialectStatements; +import org.eclipse.edc.connector.store.sql.edr.schema.postgres.PostgresDialectStatements; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.junit.annotations.ComponentTest; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceEntryIndexTestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@ComponentTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +public class SqlEndpointDataReferenceEntryIndexTest extends EndpointDataReferenceEntryIndexTestBase { + + private final BaseSqlDialectStatements statements = new PostgresDialectStatements(); + + private SqlEndpointDataReferenceEntryIndex entryIndex; + + @BeforeEach + void setUp(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException { + + entryIndex = new SqlEndpointDataReferenceEntryIndex(extension.getDataSourceRegistry(), extension.getDatasourceName(), + extension.getTransactionContext(), new ObjectMapper(), statements, queryExecutor); + var schema = Files.readString(Paths.get("./docs/schema.sql")); + extension.runQuery(schema); + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension extension) { + extension.runQuery("DROP TABLE " + statements.getEdrEntryTable() + " CASCADE"); + } + + @Override + protected EndpointDataReferenceEntryIndex getStore() { + return entryIndex; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 14f1553c366..4e80e86422d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -139,10 +139,12 @@ include(":extensions:common:validator:validator-data-address-http-data") include(":extensions:common:validator:validator-data-address-kafka") include(":extensions:common:vault:vault-filesystem") include(":extensions:common:vault:vault-hashicorp") +include(":extensions:common:store:sql:edr-index-sql") include(":extensions:common:api:control-api-configuration") include(":extensions:common:api:management-api-configuration") + include(":extensions:control-plane:api:control-plane-api") include(":extensions:control-plane:api:control-plane-api-client") include(":extensions:control-plane:api:management-api") diff --git a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java index 4a8d3a64924..ba12f5f2f6c 100644 --- a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java +++ b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java @@ -29,6 +29,10 @@ @ExtensionPoint public interface EndpointDataReferenceEntryIndex { + + String ENDPOINT_DATA_REFERENCE_ENTRY_EXISTS = "Endpoint DataReference Entry with ID %s already exists"; + String ENDPOINT_DATA_REFERENCE_ENTRY_FOUND = "Endpoint DataReference Entry with ID %s not found"; + /** * Return a {@link EndpointDataReferenceEntry} associated with the transferProcessId in input * diff --git a/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/TestFunctions.java b/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/TestFunctions.java index 2f64ee3934b..07a6686ec99 100644 --- a/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/TestFunctions.java +++ b/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/TestFunctions.java @@ -22,7 +22,7 @@ public class TestFunctions { - public static EndpointDataReferenceEntry edrEntry(String assetId, String agreementId, String transferProcessId, String contractNegotiationId) { + public static EndpointDataReferenceEntry edrEntry(String assetId, String agreementId, String transferProcessId, String contractNegotiationId, String providerId) { return EndpointDataReferenceEntry.Builder.newInstance() .assetId(assetId) .agreementId(agreementId) @@ -32,6 +32,10 @@ public static EndpointDataReferenceEntry edrEntry(String assetId, String agreeme .build(); } + public static EndpointDataReferenceEntry edrEntry(String assetId, String agreementId, String transferProcessId, String contractNegotiationId) { + return edrEntry(assetId, agreementId, transferProcessId, contractNegotiationId, UUID.randomUUID().toString()); + } + public static EndpointDataReferenceEntry edrEntry() { return edrEntry("assetId", "agreementId", "transferProcessId", "contractNegotiationId"); } diff --git a/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/store/EndpointDataReferenceEntryIndexTestBase.java b/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/store/EndpointDataReferenceEntryIndexTestBase.java index 79615b3cc01..7f470166aa6 100644 --- a/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/store/EndpointDataReferenceEntryIndexTestBase.java +++ b/spi/common/edr-store-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/store/EndpointDataReferenceEntryIndexTestBase.java @@ -21,9 +21,16 @@ import org.eclipse.edc.spi.result.StoreFailure; import org.eclipse.edc.spi.result.StoreResult; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.UUID.randomUUID; import static org.assertj.core.api.Assertions.assertThat; @@ -44,55 +51,54 @@ void save() { getStore().save(entry); - var results = getStore().query(QuerySpec.max()); + var results = getStore().findById(entry.getTransferProcessId()); - assertThat(results.succeeded()).isTrue(); - assertThat(results.getContent()).hasSize(1) - .first() - .isNotNull() - .extracting(EndpointDataReferenceEntry::getTransferProcessId) - .isEqualTo(tpId); + assertThat(results).isNotNull().usingRecursiveComparison().isEqualTo(entry); } @Test - void query_noQuerySpec() { - var all = IntStream.range(0, 10) - .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i, "cnId" + i)) - .peek(entry -> getStore().save(entry)) - .collect(Collectors.toList()); + void update() { + + var tpId = "tp1"; + var assetId = "asset1"; + + var entry = edrEntry(assetId, randomUUID().toString(), tpId, randomUUID().toString()); + + getStore().save(entry); + + var dbEntry = getStore().findById(entry.getTransferProcessId()); + assertThat(dbEntry).isNotNull().usingRecursiveComparison().isEqualTo(entry); + + entry = edrEntry(assetId, randomUUID().toString(), tpId, randomUUID().toString()); + getStore().save(entry); + + dbEntry = getStore().findById(entry.getTransferProcessId()); + assertThat(dbEntry).isNotNull().usingRecursiveComparison().isEqualTo(entry); var results = getStore().query(QuerySpec.max()); assertThat(results.succeeded()).isTrue(); - assertThat(results.getContent()).containsExactlyInAnyOrderElementsOf(all); - + assertThat(results.getContent()).usingRecursiveFieldByFieldElementComparator().containsOnly(entry); } @Test - void query_assetIdQuerySpec() { - IntStream.range(0, 10) + void query_noQuerySpec() { + var all = IntStream.range(0, 10) .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i, "cnId" + i)) - .forEach(entry -> getStore().save(entry)); - - var entry = edrEntry("assetId", "agreementId", "tpId", "cnId"); - getStore().save(entry); - - var filter = Criterion.Builder.newInstance() - .operandLeft("assetId") - .operator("=") - .operandRight(entry.getAssetId()) - .build(); + .peek(entry -> getStore().save(entry)) + .collect(Collectors.toList()); - var results = getStore().query(QuerySpec.Builder.newInstance().filter(filter).build()); + var results = getStore().query(QuerySpec.max()); assertThat(results.succeeded()).isTrue(); - assertThat(results.getContent()).containsOnly(entry); + assertThat(results.getContent()).usingRecursiveFieldByFieldElementComparator().containsExactlyInAnyOrderElementsOf(all); } - @Test - void query_agreementIdQuerySpec() { + @ParameterizedTest + @ArgumentsSource(FilterArgumentProvider.class) + void query_withQuerySpec(String field, Function mapping) { IntStream.range(0, 10) .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i, "cnId" + i)) .forEach(entry -> getStore().save(entry)); @@ -102,18 +108,18 @@ void query_agreementIdQuerySpec() { getStore().save(entry); var filter = Criterion.Builder.newInstance() - .operandLeft("agreementId") + .operandLeft(field) .operator("=") - .operandRight(entry.getAgreementId()) + .operandRight(mapping.apply(entry)) .build(); var results = getStore().query(QuerySpec.Builder.newInstance().filter(filter).build()); assertThat(results.succeeded()).isTrue(); - assertThat(results.getContent()).containsOnly(entry); + assertThat(results.getContent()).usingRecursiveFieldByFieldElementComparator().containsOnly(entry); } - + @Test void delete_shouldDelete_WhenFound() { @@ -122,6 +128,7 @@ void delete_shouldDelete_WhenFound() { assertThat(getStore().delete(entry.getTransferProcessId())) .extracting(StoreResult::getContent) + .usingRecursiveComparison() .isEqualTo(entry); var results = getStore().query(QuerySpec.max()); @@ -140,4 +147,18 @@ void deleteX_shouldReturnError_whenNotFound() { protected abstract EndpointDataReferenceEntryIndex getStore(); + + static class FilterArgumentProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + + return Stream.of( + Arguments.of("agreementId", (Function) EndpointDataReferenceEntry::getAgreementId), + Arguments.of("transferProcessId", (Function) EndpointDataReferenceEntry::getTransferProcessId), + Arguments.of("assetId", (Function) EndpointDataReferenceEntry::getAssetId), + Arguments.of("contractNegotiationId", (Function) EndpointDataReferenceEntry::getContractNegotiationId), + Arguments.of("providerId", (Function) EndpointDataReferenceEntry::getProviderId) + ); + } + } } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java new file mode 100644 index 00000000000..9744a379dbe --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.HashMap; + +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; + +public interface PostgresSignalingRuntimes { + + String[] CONTROL_PLANE_MODULES = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":core:common:edr-store-core", + ":extensions:common:store:sql:edr-index-sql", + ":extensions:common:sql:sql-pool:sql-pool-apache-commons", + ":extensions:common:transaction:transaction-local", + ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":extensions:control-plane:api:management-api:edr-cache-api", + ":extensions:control-plane:edr:edr-store-receiver", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", + ":extensions:control-plane:callback:callback-event-dispatcher", + ":extensions:control-plane:callback:callback-http-dispatcher" + }; + + String[] DATA_PLANE_MODULES = new String[]{ + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane:data-plane-public-api-v2" + }; + + EdcRuntimeExtension DATA_PLANE = new EdcRuntimeExtension( + "provider-data-plane", + PROVIDER.dataPlaneConfiguration(), + DATA_PLANE_MODULES + ); + + @RegisterExtension + EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( + new EdcRuntimeExtension( + "consumer-control-plane", + CONSUMER.controlPlanePostgresConfiguration(), + CONTROL_PLANE_MODULES + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "consumer-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); + } + } + ), + DATA_PLANE, + new EdcRuntimeExtension( + "provider-control-plane", + PROVIDER.controlPlanePostgresConfiguration(), + CONTROL_PLANE_MODULES + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "provider-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); + } + } + ) + ); + + +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java index ac7b5a684e9..a867418ce7e 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java @@ -24,6 +24,7 @@ import jakarta.json.JsonObject; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.types.domain.DataAddress; import org.jetbrains.annotations.NotNull; @@ -31,6 +32,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.RegisterExtension; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.mockserver.model.HttpResponse; @@ -54,6 +57,7 @@ import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.hamcrest.CoreMatchers.equalTo; @@ -253,4 +257,15 @@ class InMemory extends Tests implements InMemorySignalingRuntimes { class EmbeddedDataPlane extends Tests implements EmbeddedDataPlaneSignalingRuntimes { } + + @Nested + @PostgresqlIntegrationTest + class Postgres extends Tests implements PostgresSignalingRuntimes { + + @RegisterExtension + static final BeforeAllCallback CREATE_DATABASES = context -> { + createDatabase(CONSUMER.getName()); + createDatabase(PROVIDER.getName()); + }; + } }