Skip to content

Commit

Permalink
Merge pull request #1960 from telefonicaid/fix/prepared_statement
Browse files Browse the repository at this point in the history
Fix/prepared statement
  • Loading branch information
fgalan authored Oct 15, 2020
2 parents 2bed57d + 17729c2 commit 953dc1c
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 190 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[cygnus-ngsi, cygnus-common][PosgtgreSQLSink, PostgisSink, MySQLSQLSink] Remove PreparedStatement building, to use String query (walkaround for #1959)
[cygnus-ngsi, cygnus-common][PosgtgreSQLSink, PostgisSink, MySQLSQLSink] Log info about persisted data (#1939)
[cygnus-ngsi, cygnus-common][PosgtgreSQLSink, PostgisSink, MySQLSQLSink] Create upsert transaction (#1806, #1936)
[cygnus-common][NameMappingInterceptor] Catch and log Compile error namemapping (#1924)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,21 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg
String insertQuery = SQLQueryUtils.sqlInsertQuery(aggregation,
tableName,
sqlInstance,
destination).toString();
destination,
attrNativeTypes).toString();

PreparedStatement insertStatement = null;
PreparedStatement insertStatement;
insertStatement = connection.prepareStatement(insertQuery);
/*
FIXME https://github.com/telefonicaid/fiware-cygnus/issues/1959
Add SQLSafe values with native PreparedStatement methods
insertPreparedStatement = SQLQueryUtils.addJsonValues(insertStatement, aggregation, attrNativeTypes);
insertedRows = insertPreparedStatement.executeBatch();
*/
insertStatement.executeUpdate();

String upsertQuery = SQLQueryUtils.sqlUpsertQuery(aggregation,
lastData,
Expand All @@ -597,15 +606,25 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg
timestampKey,
timestampFormat,
sqlInstance,
destination).toString();
destination,
attrNativeTypes).toString();

PreparedStatement upsertStatement = null;
PreparedStatement upsertStatement;
upsertStatement = connection.prepareStatement(upsertQuery);

/*
FIXME https://github.com/telefonicaid/fiware-cygnus/issues/1959
Add SQLSafe values with native PreparedStatement methods
upsertPreparedStatement = SQLQueryUtils.addJsonValues(upsertStatement, lastData, attrNativeTypes);
upsertPreparedStatement.executeBatch();
*/
upsertStatement.executeUpdate();

connection.commit();
LOGGER.info("Finished transaction: \n" + upsertPreparedStatement + "\n Also, " + insertedRows.length + " where Inserted. THE LAS ONE WAS: " + " " + insertQuery);
LOGGER.info("Finished transaction: \n" + upsertQuery + "\n Also, some where Inserted. QUERY: " + insertQuery);

} catch (SQLTimeoutException e) {
cygnusSQLRollback(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ protected static StringBuffer sqlUpsertQuery(LinkedHashMap<String, ArrayList<Jso
String timestampKey,
String timestampFormat,
String sqlInstance,
String destination) {
String destination,
boolean attrNativeTypes) {

if (sqlInstance.equals("postgresql")) {
return postgreSqlUpsertQuery(aggregation,
Expand All @@ -72,7 +73,8 @@ protected static StringBuffer sqlUpsertQuery(LinkedHashMap<String, ArrayList<Jso
timestampKey,
timestampFormat,
sqlInstance,
destination);
destination,
attrNativeTypes);
} else if (sqlInstance.equals("mysql")) {
return mySqlUpsertQuery(aggregation,
lastData,
Expand All @@ -82,7 +84,8 @@ protected static StringBuffer sqlUpsertQuery(LinkedHashMap<String, ArrayList<Jso
timestampKey,
timestampFormat,
sqlInstance,
destination);
destination,
attrNativeTypes);
}
return null;
}
Expand All @@ -109,7 +112,8 @@ protected static StringBuffer postgreSqlUpsertQuery(LinkedHashMap<String, ArrayL
String timestampKey,
String timestampFormat,
String sqlInstance,
String destination) {
String destination,
boolean attrNativeTypes) {

StringBuffer updateSet = new StringBuffer();
StringBuffer postgisTempReference = new StringBuffer("EXCLUDED");
Expand All @@ -129,7 +133,8 @@ protected static StringBuffer postgreSqlUpsertQuery(LinkedHashMap<String, ArrayL
StringBuffer insertQuery = sqlInsertQuery(lastData,
tableName.concat(tableSuffix),
sqlInstance,
destination);
destination,
attrNativeTypes);

query.append(insertQuery).
append("ON CONFLICT ").append("(").append(uniqueKey).append(") ").
Expand Down Expand Up @@ -165,7 +170,8 @@ protected static StringBuffer mySqlUpsertQuery(LinkedHashMap<String, ArrayList<J
String timestampKey,
String timestampFormat,
String sqlInstance,
String destination) {
String destination,
boolean attrNativeTypes) {

StringBuffer updateSet = new StringBuffer();
StringBuffer query = new StringBuffer();
Expand Down Expand Up @@ -197,7 +203,8 @@ protected static StringBuffer mySqlUpsertQuery(LinkedHashMap<String, ArrayList<J
StringBuffer insertQuery = sqlInsertQuery(lastData,
tableName.concat(tableSuffix),
sqlInstance,
destination);
destination,
attrNativeTypes);

query.append(insertQuery).
append("ON DUPLICATE KEY ").
Expand Down Expand Up @@ -251,10 +258,21 @@ protected static StringBuffer mySQLUpdateRecordQuery(String key,
protected static StringBuffer sqlInsertQuery(LinkedHashMap<String, ArrayList<JsonElement>> aggregation,
String tableName,
String sqlInstance,
String destination) {
String destination,
boolean attrNativeTypes) {

StringBuffer fieldsForInsert;
/*
FIXME https://github.com/telefonicaid/fiware-cygnus/issues/1959
Add SQLSafe values with native PreparedStatement methods
StringBuffer valuesForInsert = sqlQuestionValues(aggregation.keySet());
*/
StringBuffer valuesForInsert = new StringBuffer(getValuesForInsert(aggregation, attrNativeTypes));

StringBuffer postgisDestination = new StringBuffer(destination).append(".").append(tableName);
StringBuffer query = new StringBuffer();

Expand Down Expand Up @@ -294,6 +312,9 @@ protected static StringBuffer sqlQuestionValues(Set<String> keyList) {
return questionValues;
}


// INSERT INTO database (recvtime, location) VALUES ('valor', )

/**
* Gets fields for insert.
*
Expand Down Expand Up @@ -361,8 +382,8 @@ protected static PreparedStatement addJsonValues (PreparedStatement previousStat
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added postgis Function " + stringValue + " as Object");
position++;
} else {
preparedStatement.setString(position, stringValue);
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added " + stringValue + " as String");
preparedStatement.setObject(position, stringValue);
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added " + stringValue + " as Object");
position++;
}
} // else
Expand All @@ -380,17 +401,17 @@ protected static PreparedStatement addJsonValues (PreparedStatement previousStat
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added postgis Function " + stringValue + " as Object");
position++;
} else {
preparedStatement.setString(position, stringValue);
preparedStatement.setObject(position, stringValue);
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added " + stringValue + " as String");
position++;
}
} else {
if (value == null){
preparedStatement.setString(position, "NULL");
preparedStatement.setObject(position, "NULL");
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added NULL as String");
position++;
} else {
preparedStatement.setString(position, value.toString());
preparedStatement.setObject(position, value.toString());
LOGGER.debug("[NGSISQLUtils.addJsonValues] " + "Added " + value.toString() + " as String");
position++;
}
Expand All @@ -411,7 +432,92 @@ protected static PreparedStatement addJsonValues (PreparedStatement previousStat
*/
protected static int collectionSizeOnLinkedHashMap(LinkedHashMap<String, ArrayList<JsonElement>> aggregation) {
ArrayList<ArrayList<JsonElement>> list = new ArrayList<>(aggregation.values());
return list.get(0).size();
if (list.size() > 0)
return list.get(0).size();
else
return 0;
}

/**
* Gets string value from json element.
*
* @param value the value to process
* @param quotationMark the quotation mark
* @param attrNativeTypes the attr native types
* @return the string value from json element
*/
protected static String getStringValueFromJsonElement(JsonElement value, String quotationMark, boolean attrNativeTypes) {
String stringValue;
if (attrNativeTypes) {
if (value == null || value.isJsonNull()) {
stringValue = "NULL";
} else if (value.isJsonPrimitive()) {
if (value.getAsJsonPrimitive().isBoolean()) {
stringValue = value.getAsString().toUpperCase();
} else if (value.getAsJsonPrimitive().isNumber()) {
stringValue = value.getAsString();
}else {
if (value.toString().contains("ST_GeomFromGeoJSON") || value.toString().contains("ST_SetSRID")) {
stringValue = value.getAsString().replace("\\", "");
} else {
stringValue = quotationMark + value.getAsString() + quotationMark;
}
}
} else {
stringValue = quotationMark + value.toString() + quotationMark;
}
} else {
if (value != null && value.isJsonPrimitive()) {
if (value.toString().contains("ST_GeomFromGeoJSON") || value.toString().contains("ST_SetSRID")) {
stringValue = value.getAsString().replace("\\", "");
} else {
stringValue = quotationMark + value.getAsString() + quotationMark;
}
} else {
if (value == null){
stringValue = quotationMark + "NULL" + quotationMark;
} else {
stringValue = quotationMark + value.toString() + quotationMark;
}
}
}
return stringValue;
}

/**
* Gets values for insert.
*
* @param aggregation the aggregation
* @param attrNativeTypes the attr native types
* @return a String with all VALUES in SQL query format.
*/
protected static String getValuesForInsert(LinkedHashMap<String, ArrayList<JsonElement>> aggregation, boolean attrNativeTypes) {
String valuesForInsert = "";
int numEvents = collectionSizeOnLinkedHashMap(aggregation);

for (int i = 0; i < numEvents; i++) {
if (i == 0) {
valuesForInsert += "(";
} else {
valuesForInsert += ",(";
} // if else
boolean first = true;
Iterator<String> it = aggregation.keySet().iterator();
while (it.hasNext()) {
String entry = (String) it.next();
ArrayList<JsonElement> values = (ArrayList<JsonElement>) aggregation.get(entry);
JsonElement value = values.get(i);
String stringValue = getStringValueFromJsonElement(value, "'", attrNativeTypes);
if (first) {
valuesForInsert += stringValue;
first = false;
} else {
valuesForInsert += "," + stringValue;
} // if else
} // while
valuesForInsert += ")";
} // for
return valuesForInsert;
} // getValuesForInsert

}
Loading

0 comments on commit 953dc1c

Please sign in to comment.