Skip to content

Commit

Permalink
GH-9481: Fix JdbcMetadataStore for PostgreSQL & MySQL
Browse files Browse the repository at this point in the history
Fixes: #9481
Issue link: #9481

MySQL throws `CannotAcquireLockException` in case of duplicate key failure.
PostgreSQL just rollbacks a transaction not letting us move on with a `SELECT`

* Include `TransientDataAccessException` to the catch block of the `INSERT`
to ignore it for the subsequent `SELECT`
* Add logic to determine `PostgreSQL` database vendor and include `ON CONFLICT DO NOTHING`
hint into the `INSERT` to not fail in case of duplicate key found on `putIfAbsent` operation

(cherry picked from commit 98d0266)
  • Loading branch information
artembilan authored and spring-builds committed Sep 17, 2024
1 parent d302804 commit 229ffb7
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -168,12 +170,18 @@ public void setLockHint(String lockHint) {

@Override
public void afterPropertiesSet() {
String dataBaseVendor =
this.jdbcTemplate.execute((ConnectionCallback<String>) connection ->
connection.getMetaData().getDatabaseProductName());
this.getValueQuery = String.format(this.getValueQuery, this.tablePrefix);
this.getValueForUpdateQuery = String.format(this.getValueForUpdateQuery, this.tablePrefix, this.lockHint);
this.replaceValueQuery = String.format(this.replaceValueQuery, this.tablePrefix);
this.replaceValueByKeyQuery = String.format(this.replaceValueByKeyQuery, this.tablePrefix);
this.removeValueQuery = String.format(this.removeValueQuery, this.tablePrefix);
this.putIfAbsentValueQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix, this.tablePrefix);
if ("PostgreSQL".equals(dataBaseVendor)) {
this.putIfAbsentValueQuery += " ON CONFLICT DO NOTHING";
}
this.countQuery = String.format(this.countQuery, this.tablePrefix);
}

Expand Down Expand Up @@ -247,7 +255,7 @@ private int tryToPutIfAbsent(String key, String value) {
ps.setString(5, this.region); // NOSONAR magic number
});
}
catch (DataIntegrityViolationException ex) {
catch (TransientDataAccessException | DataIntegrityViolationException ex) {
return 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jdbc.mysql;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Artem Bilan
*
* @since 6.4
*/
@SpringJUnitConfig
@DirtiesContext
class MySqlMetadataStoreTests implements MySqlContainerTest {

@Autowired
ConcurrentMetadataStore jdbcMetadataStore;

@Test
void verifyJdbcMetadataStoreConcurrency() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100);
CountDownLatch successPutIfAbsents = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
this.jdbcMetadataStore.putIfAbsent("testKey", "testValue");
successPutIfAbsents.countDown();
});
}
assertThat(successPutIfAbsents.await(10, TimeUnit.SECONDS)).isTrue();
executorService.shutdown();
}

@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement
static class TestConfiguration {

@Value("org/springframework/integration/jdbc/schema-mysql.sql")
Resource createSchemaScript;

@Bean
DataSource dataSource() {
return MySqlContainerTest.dataSource();
}

@Bean
DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
dataSourceInitializer.setDataSource(dataSource);
dataSourceInitializer.setDatabasePopulator(new ResourceDatabasePopulator(this.createSchemaScript));
return dataSourceInitializer;
}

@Bean
PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
JdbcMetadataStore jdbcMetadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.integration.jdbc.channel;
package org.springframework.integration.jdbc.postgres;

import java.sql.DriverManager;
import java.sql.SQLException;
Expand Down Expand Up @@ -44,6 +44,9 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.jdbc.channel.PgConnectionSupplier;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.integration.jdbc.channel;
package org.springframework.integration.jdbc.postgres;

import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jdbc.postgres;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Artem Bilan
*
* @since 6.2.9
*/
@SpringJUnitConfig
@DirtiesContext
class PostgresMetadataStoreTests implements PostgresContainerTest {

@Autowired
ConcurrentMetadataStore jdbcMetadataStore;

@Test
void verifyJdbcMetadataStoreConcurrency() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100);
CountDownLatch successPutIfAbsents = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
this.jdbcMetadataStore.putIfAbsent("testKey", "testValue");
successPutIfAbsents.countDown();
});
}
assertThat(successPutIfAbsents.await(10, TimeUnit.SECONDS)).isTrue();
executorService.shutdown();
}

@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement
static class TestConfiguration {

@Bean
DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(PostgresContainerTest.getJdbcUrl());
dataSource.setUsername(PostgresContainerTest.getUsername());
dataSource.setPassword(PostgresContainerTest.getPassword());
return dataSource;
}

@Bean
PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
JdbcMetadataStore jdbcMetadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.channel.PostgresContainerTest;
import org.springframework.integration.jdbc.postgres.PostgresContainerTest;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down

0 comments on commit 229ffb7

Please sign in to comment.