Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/add config option persist errors #1930

Merged
merged 11 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[cygnus-common][SQLBackendImpl] Add configuration option to persist errors in SQL sinks (#1928)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the existing functionality for PG?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make it optional by config

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. NTC

[cygnus-ngsi-ld] Creation of the new PostGIS sink for persisting NGSI-LD notifications (#1905)

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class SQLBackendImpl implements SQLBackend{
private SQLBackendImpl.SQLDriver driver;
private final SQLCache cache;
private final String sqlInstance;
private final boolean persistErrors;

/**
* Constructor.
Expand All @@ -56,9 +57,10 @@ public class SQLBackendImpl implements SQLBackend{
* @param sqlInstance
* @param sqlDriverName
* @param defaultSQLDataBase
* @param persistErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlInstance, String sqlDriverName, String defaultSQLDataBase) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, defaultSQLDataBase, null);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlInstance, String sqlDriverName, String defaultSQLDataBase, boolean persistErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, defaultSQLDataBase, null, persistErrors);
} // SQLBackendImpl

/**
Expand All @@ -75,9 +77,28 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlOptions
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlInstance, String sqlDriverName, String defaultSQLDataBase, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, defaultSQLDataBase, sqlOptions, false);
} // SQLBackendImpl

/**
* Constructor.
*
* @param sqlHost
* @param sqlPort
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param sqlInstance
* @param sqlDriverName
* @param defaultSQLDataBase
* @param sqlOptions
* @param persistErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlInstance, String sqlDriverName, String defaultSQLDataBase, String sqlOptions, boolean persistErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, defaultSQLDataBase, sqlOptions);
cache = new SQLCache();
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
} // SQLBackendImpl

/**
Expand Down Expand Up @@ -528,8 +549,10 @@ private void insertErrorLog(String destination, String errorQuery, Exception exc

private void persistError(String destination, String query, Exception exception) throws CygnusPersistenceError, CygnusRuntimeError {
try {
createErrorTable(destination);
insertErrorLog(destination, query, exception);
if (persistErrors) {
createErrorTable(destination);
insertErrorLog(destination, query, exception);
}
return;
} catch (CygnusBadContextData cygnusBadContextData) {
LOGGER.debug(sqlInstance.toUpperCase() + " failed to persist error on database/scheme " + destination + "_error_log" + cygnusBadContextData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class SQLBackendImplTest {
private final String fieldNames2 = "c text, d text";
private static final String MYSQL_DRIVER_NAME = "com.mysql.jdbc.Driver";
private static final String MYSQL_INSTANCE_NAME = "mysql";
private final Boolean persistErrors = true;

// True: real test, False: Mock test
private final boolean runRealTest = false;
Expand All @@ -79,7 +80,7 @@ public class SQLBackendImplTest {
@Before
public void setUp() throws Exception {
// set up the instance of the tested class
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null);
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
Expand Down Expand Up @@ -196,7 +197,7 @@ public void testJDBCUrlMySQL() {
String sqlDriverName = "com.mysql.jdbc.Driver";
String destination = "dest";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -212,7 +213,7 @@ public void testJDBCUrlPostgreSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand All @@ -228,31 +229,31 @@ public void testJDBCUrlMySQLWithOptions() {
String destination = "dest";
String sqlOptions = "useSSL=true&requireSSL=false";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions, persistErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest?useSSL=true&requireSSL=false");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<white spaces>)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -269,31 +270,31 @@ public void testJDBCUrlPostgreSQLWithOptions() {
String defaultDataBase = "default";
String sqlOptions = "sslmode=require";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions, persistErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default?sslmode=require");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<white spaces)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, sqlOptions, persistErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand Down
7 changes: 6 additions & 1 deletion cygnus-ngsi/conf/agent_ngsi.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ cygnus-ngsi.sinks.postgresql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIPos
#cygnus-ngsi.sinks.postgresql-sink.batch_timeout = 30
# number of retries upon persistence error
#cygnus-ngsi.sinks.postgresql-sink.batch_ttl = 10

# true if errors are persisted for this sink, false otherwise
#cygnus-ngsi.sinks.postgresql-sink.persist_errors = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are per-sink docu tables with the allowed parameters. Maybe persiste_errors should be added to all them.


# ============================================
# NGSIPostgisSink configuration
Expand Down Expand Up @@ -233,6 +234,8 @@ cygnus-ngsi.sinks.postgis-sink.type = com.telefonica.iot.cygnus.sinks.NGSIPostgi
#cygnus-ngsi.sinks.postgis-sink.batch_timeout = 30
# number of retries upon persistence error
#cygnus-ngsi.sinks.postgis-sink.batch_ttl = 10
# true if errors are persisted for this sink, false otherwise
#cygnus-ngsi.sinks.postgis-sink.persist_errors = true

# ============================================
# NGSIMySQLSink configuration
Expand Down Expand Up @@ -268,6 +271,8 @@ cygnus-ngsi.sinks.mysql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIMySQLSin
#cygnus-ngsi.sinks.mysql-sink.batch_ttl = 10
# true enables cache, false disables cache
#cygnus-ngsi.sinks.mysql-sink.backend.enable_cache = false
# true if errors are persisted for this sink, false otherwise
#cygnus-ngsi.sinks.mysql-sink.persist_errors = true

# ============================================
# NGSIMongoSink configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class NGSIMySQLSink extends NGSISink {
private boolean attrNativeTypes;
private boolean attrMetadataStore;
private String mysqlOptions;
private boolean persistErrors;

/**
* Constructor.
Expand Down Expand Up @@ -213,13 +214,25 @@ public void configure(Context context) {
mysqlOptions = context.getString("mysql_options", null);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_options=" + mysqlOptions + ")");

String persistErrorsStr = context.getString("persist_errors", "true");

if (persistErrorsStr.equals("true") || persistErrorsStr.equals("false")) {
persistErrors = Boolean.parseBoolean(persistErrorsStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (persist_errors="
+ persistErrors + ")");
} else {
invalidConfiguration = true;
LOGGER.debug("[" + this.getName() + "] Invalid configuration (persist_errors="
+ persistErrorsStr + ") -- Must be 'true' or 'false'");
} // if else

super.configure(context);
} // configure

@Override
public void start() {
try {
createPersistenceBackend(mysqlHost, mysqlPort, mysqlUsername, mysqlPassword, maxPoolSize, mysqlOptions);
createPersistenceBackend(mysqlHost, mysqlPort, mysqlUsername, mysqlPassword, maxPoolSize, mysqlOptions, persistErrors);
LOGGER.debug("[" + this.getName() + "] MySQL persistence backend created");
} catch (Exception e) {
LOGGER.error("Error while creating the MySQL persistence backend. Details="
Expand All @@ -238,9 +251,9 @@ public void stop() {
/**
* Initialices a lazy singleton to share among instances on JVM
*/
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlOptions) {
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlOptions, boolean persistErrors) {
if (mySQLPersistenceBackend == null) {
mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, sqlOptions);
mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, sqlOptions, persistErrors);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class NGSIPostgisSink extends NGSISink {
private boolean attrNativeTypes;
private boolean attrMetadataStore;
private String postgisOptions;
private boolean persistErrors;

/**
* Constructor.
Expand Down Expand Up @@ -257,6 +258,18 @@ public void configure(Context context) {
postgisOptions = context.getString("postgis_options", null);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_options=" + postgisOptions + ")");

String persistErrorsStr = context.getString("persist_errors", "true");

if (persistErrorsStr.equals("true") || persistErrorsStr.equals("false")) {
persistErrors = Boolean.parseBoolean(persistErrorsStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (persist_errors="
+ persistErrors + ")");
} else {
invalidConfiguration = true;
LOGGER.debug("[" + this.getName() + "] Invalid configuration (persist_errors="
+ persistErrorsStr + ") -- Must be 'true' or 'false'");
} // if else

} // configure

@Override
Expand All @@ -269,7 +282,7 @@ public void stop() {
public void start() {
try {
if (buildDBName(null) != null) {
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, buildDBName(null), postgisOptions);
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, buildDBName(null), postgisOptions, persistErrors);
}
} catch (Exception e) {
LOGGER.error("Error while creating the Postgis persistence backend. Details="
Expand All @@ -283,9 +296,9 @@ public void start() {
/**
* Initialices a lazy singleton to share among instances on JVM
*/
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String defaultSQLDataBase, String sqlOptions) {
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String defaultSQLDataBase, String sqlOptions, boolean persistErrors) {
if (postgisPersistenceBackend == null) {
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, defaultSQLDataBase, sqlOptions);
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, defaultSQLDataBase, sqlOptions, persistErrors);
} else {
LOGGER.info("The database name will be created on runtime, so if there is an specified database on the agent properties and you expect it to be read on startup, then you shoul look for the data model you are using. Maybe it's not the correct one");
}
Expand Down Expand Up @@ -376,7 +389,7 @@ private void persistAggregation(NGSIGenericAggregator aggregator) throws CygnusP
}

if (postgisPersistenceBackend == null) {
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, aggregator.getDbName(enableLowercase), postgisOptions);
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, aggregator.getDbName(enableLowercase), postgisOptions, persistErrors);
}

LOGGER.info("[" + this.getName() + "] Persisting data at NGSIPostgisSink. Schema ("
Expand Down
Loading