diff --git a/databus-core/databus-core-impl/build.gradle b/databus-core/databus-core-impl/build.gradle index 9f14e788..bda1cb88 100644 --- a/databus-core/databus-core-impl/build.gradle +++ b/databus-core/databus-core-impl/build.gradle @@ -16,6 +16,7 @@ dependencies { compile externalDependency.json compile externalDependency.log4j compile externalDependency.netty + compile externalDependency.c3p0 testCompile externalDependency.testng testCompile externalDependency.easymock diff --git a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java index b3093f62..3dc5fc3a 100644 --- a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java +++ b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java @@ -27,6 +27,7 @@ public class MaxSCNReaderWriterConfig implements ConfigBuilder + { + private String jdbcUrl= "jdbc:mysql://localhost:3306/databus"; + + private String scnTable = "databus_scn_store"; + + private String driverClass = "com.mysql.jdbc.Driver"; + + private String dbUser = "root"; + + private String dbPassword = ""; + + private Long flushItvl = 1L; + + private Long initVal = 0L; + + private String upsertSCNQuery = "insert into databus_scn_store (max_scn) values (?)"; + + private String getSCNQuery = "select max_scn from databus_scn_store order by updated_at desc limit 1"; + + private String scnColumnName = "max_scn"; + + public String getScnColumnName() { + return scnColumnName; + } + + public void setScnColumnName(String scnColumnName) { + this.scnColumnName = scnColumnName; + } + + public Long getInitVal() { + return initVal; + } + + public void setInitVal(Long initVal) { + this.initVal = initVal; + } + + public String getUpsertSCNQuery() { + return upsertSCNQuery; + } + + public void setUpsertSCNQuery(String upsertSCNQuery) { + this.upsertSCNQuery = upsertSCNQuery; + } + + public String getGetSCNQuery() { + return getSCNQuery; + } + + public void setGetSCNQuery(String getSCNQuery) { + this.getSCNQuery = getSCNQuery; + } + + public Long getFlushItvl() { + return flushItvl; + } + + public void setFlushItvl(Long flushItvl) { + this.flushItvl = flushItvl; + } + + public String getDbUser() { + return dbUser; + } + + public void setDbUser(String dbUser) { + this.dbUser = dbUser; + } + + public String getDbPassword() { + return dbPassword; + } + + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getScnTable() { + return scnTable; + } + + public void setScnTable(String scnTable) { + this.scnTable = scnTable; + } + + public String getDriverClass() { + return driverClass; + } + + public void setDriverClass(String driverClass) { + this.driverClass = driverClass; + } + + @Override + public StaticConfig build() throws InvalidConfigException { + //TODO : verify + return new StaticConfig(jdbcUrl,scnTable, driverClass, dbUser, dbPassword, flushItvl, initVal , + upsertSCNQuery, getSCNQuery ,scnColumnName); + } + } + public static class StaticConfig{ + + private String driverClass = "com.mysql.jdbc.Driver"; + + private String jdbcUrl; + + private String scnTable; + + private String dbUser; + + private String dbPassword; + + private Long flushItvl; + + private Long initVal; + + private String upsertSCNQuery; + + private String getSCNQuery; + + private String scnColumnName; + + public String getScnColumnName() { + return scnColumnName; + } + + public Long getInitVal() { + return initVal; + } + + public String getUpsertSCNQuery() { + return upsertSCNQuery; + } + + public String getGetSCNQuery() { + return getSCNQuery; + } + + public Long getFlushItvl() { + return flushItvl; + } + + public String getDriverClass() { + return driverClass; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getScnTable() { + return scnTable; + } + + public String getDbUser() { + return dbUser; + } + + public String getDbPassword() { + return dbPassword; + } + + public StaticConfig(String host, String table, String driverClass, String dbUser, String dbPassword, long flushItvl, long initVal, + String upsertSCNQuery, String getSCNQuery, String scnColumnName){ + this.jdbcUrl = host; + this.scnTable = table; + this.driverClass = driverClass; + this.dbUser = dbUser; + this.dbPassword = dbPassword; + this.flushItvl = flushItvl; + this.initVal = initVal; + this.upsertSCNQuery = upsertSCNQuery; + this.getSCNQuery = getSCNQuery; + this.scnColumnName = scnColumnName; + } + + } + +} + diff --git a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java new file mode 100644 index 00000000..43a99d79 --- /dev/null +++ b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java @@ -0,0 +1,26 @@ +package com.linkedin.databus2.core.seq; + +import com.linkedin.databus2.core.DatabusException; + +/** + * + */ +public class MysqlMaxSCNHandlerFactory implements SequenceNumberHandlerFactory { + private final MysqlMaxSCNHandler.Config _configBuilder; + + public MysqlMaxSCNHandlerFactory(MysqlMaxSCNHandler.Config configBuilder) + { + _configBuilder = configBuilder; + } + + @Override + public MaxSCNReaderWriter createHandler(String id) throws DatabusException { + MysqlMaxSCNHandler maxSCNHandler; + MysqlMaxSCNHandler.StaticConfig config; + synchronized (_configBuilder) { + config = _configBuilder.build(); + maxSCNHandler = MysqlMaxSCNHandler.create(config); + } + return maxSCNHandler; + } +} diff --git a/subprojects.gradle b/subprojects.gradle index 6a72a627..2eafa51a 100644 --- a/subprojects.gradle +++ b/subprojects.gradle @@ -47,7 +47,8 @@ ext.externalDependency = [ 'zookeeper': 'org.apache.zookeeper:zookeeper:3.3.3', 'ojdbc6': 'com.oracle:ojdbc6:11.2.0.2.0', 'helixCore': 'org.apache.helix:helix-core:0.6.2.0', - 'or': 'com.linkedin.dds-mysql:open-replicator-impl:1.0.63' + 'or': 'com.linkedin.dds-mysql:open-replicator-impl:1.0.63', + 'c3p0': 'com.mchange:c3p0:0.9.5' ]; if (isDefaultEnvironment) {