From ae9ff6a4c83b24b06f11ca59ce8b3a41a006aaaa Mon Sep 17 00:00:00 2001 From: Jose Pablo Fernandez Date: Thu, 31 Mar 2016 12:44:57 +0200 Subject: [PATCH] [DECISION-271] Deal with null fields in Avro serialization updated reference to kafka version used --- api/README.md | 2 +- .../api/StreamingAPIAsyncOperation.scala | 10 +- .../stratio/decision/commons/avro/Action.java | 4 +- .../decision/commons/avro/ColumnType.java | 6 +- .../decision/commons/avro/InsertMessage.java | 140 +++++++++--------- .../serializer/impl/JavaToAvroSerializer.java | 65 ++++---- 6 files changed, 120 insertions(+), 107 deletions(-) diff --git a/api/README.md b/api/README.md index 4d94d3f2..2ea1facd 100644 --- a/api/README.md +++ b/api/README.md @@ -23,7 +23,7 @@ Requirements * Scala 2.10.3 * sbt 0.13 * zookeeper - * kafka 0.8.1 + * kafka 0.8.2.1 How to start ============ diff --git a/api/src/main/scala/com/stratio/decision/api/StreamingAPIAsyncOperation.scala b/api/src/main/scala/com/stratio/decision/api/StreamingAPIAsyncOperation.scala index 8ce97f01..9c7da574 100644 --- a/api/src/main/scala/com/stratio/decision/api/StreamingAPIAsyncOperation.scala +++ b/api/src/main/scala/com/stratio/decision/api/StreamingAPIAsyncOperation.scala @@ -41,8 +41,14 @@ class StreamingAPIAsyncOperation(tableProducer: KafkaProducer) { var c : ColumnType = null for (messageColumn : ColumnNameTypeValue <- stratioStreamingMessage.getColumns){ - c = new ColumnType(messageColumn.getColumn, messageColumn.getValue.toString, messageColumn.getValue.getClass - .getName) + + if (messageColumn.getValue != null) { + c = new ColumnType(messageColumn.getColumn, messageColumn.getValue.toString, messageColumn.getValue.getClass + .getName) + } else { + + c = new ColumnType(messageColumn.getColumn, null, null); + } columns.add(c) } diff --git a/commons/src/main/java/com/stratio/decision/commons/avro/Action.java b/commons/src/main/java/com/stratio/decision/commons/avro/Action.java index 01144ab3..b25c32c1 100644 --- a/commons/src/main/java/com/stratio/decision/commons/avro/Action.java +++ b/commons/src/main/java/com/stratio/decision/commons/avro/Action.java @@ -13,10 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.stratio.decision.commons.avro; +package com.stratio.decision.commons.avro; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated -public enum Action { +public enum Action { LISTEN, SAVE_TO_CASSANDRA, SAVE_TO_MONGO, SAVE_TO_SOLR, SAVE_TO_ELASTICSEARCH ; public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Action\",\"namespace\":\"com.stratio.decision.commons.avro\",\"symbols\":[\"LISTEN\",\"SAVE_TO_CASSANDRA\",\"SAVE_TO_MONGO\",\"SAVE_TO_SOLR\",\"SAVE_TO_ELASTICSEARCH\"]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } diff --git a/commons/src/main/java/com/stratio/decision/commons/avro/ColumnType.java b/commons/src/main/java/com/stratio/decision/commons/avro/ColumnType.java index 483d6f19..2959d6aa 100644 --- a/commons/src/main/java/com/stratio/decision/commons/avro/ColumnType.java +++ b/commons/src/main/java/com/stratio/decision/commons/avro/ColumnType.java @@ -17,8 +17,8 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class ColumnType extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = 6385653382887656308L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ColumnType\",\"namespace\":\"com.stratio.decision.commons.avro\",\"fields\":[{\"name\":\"column\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"},{\"name\":\"type\",\"type\":\"string\"}]}"); + private static final long serialVersionUID = 5874033527197767193L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ColumnType\",\"namespace\":\"com.stratio.decision.commons.avro\",\"fields\":[{\"name\":\"column\",\"type\":\"string\"},{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":\"null\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public java.lang.CharSequence column; @Deprecated public java.lang.CharSequence value; @@ -334,4 +334,4 @@ public ColumnType build() { READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in)); } -} \ No newline at end of file +} diff --git a/commons/src/main/java/com/stratio/decision/commons/avro/InsertMessage.java b/commons/src/main/java/com/stratio/decision/commons/avro/InsertMessage.java index a09f85da..067b23a5 100644 --- a/commons/src/main/java/com/stratio/decision/commons/avro/InsertMessage.java +++ b/commons/src/main/java/com/stratio/decision/commons/avro/InsertMessage.java @@ -17,15 +17,15 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class InsertMessage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = -692324677191135735L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"InsertMessage\",\"namespace\":\"com.stratio.decision.commons.avro\",\"fields\":[{\"name\":\"operation\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"streamName\",\"type\":\"string\"},{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"timestamp\",\"type\":[\"null\",\"long\"],\"default\":\"null\"},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"ColumnType\",\"fields\":[{\"name\":\"column\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"},{\"name\":\"type\",\"type\":\"string\"}]}}},{\"name\":\"actions\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Action\",\"symbols\":[\"LISTEN\",\"SAVE_TO_CASSANDRA\",\"SAVE_TO_MONGO\",\"SAVE_TO_SOLR\",\"SAVE_TO_ELASTICSEARCH\"]}}],\"default\":\"null\"}]}"); + private static final long serialVersionUID = 7664662872300091850L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"InsertMessage\",\"namespace\":\"com.stratio.decision.commons.avro\",\"fields\":[{\"name\":\"operation\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"streamName\",\"type\":\"string\"},{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"timestamp\",\"type\":[\"null\",\"long\"],\"default\":\"null\"},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"ColumnType\",\"fields\":[{\"name\":\"column\",\"type\":\"string\"},{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":\"null\"}]}}},{\"name\":\"actions\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Action\",\"symbols\":[\"LISTEN\",\"SAVE_TO_CASSANDRA\",\"SAVE_TO_MONGO\",\"SAVE_TO_SOLR\",\"SAVE_TO_ELASTICSEARCH\"]}}],\"default\":\"null\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public CharSequence operation; - @Deprecated public CharSequence streamName; - @Deprecated public CharSequence sessionId; - @Deprecated public Long timestamp; - @Deprecated public java.util.List data; - @Deprecated public java.util.List actions; + @Deprecated public java.lang.CharSequence operation; + @Deprecated public java.lang.CharSequence streamName; + @Deprecated public java.lang.CharSequence sessionId; + @Deprecated public java.lang.Long timestamp; + @Deprecated public java.util.List data; + @Deprecated public java.util.List actions; /** * Default constructor. Note that this does not initialize fields @@ -37,7 +37,7 @@ public InsertMessage() {} /** * All-args constructor. */ - public InsertMessage(CharSequence operation, CharSequence streamName, CharSequence sessionId, Long timestamp, java.util.List data, java.util.List actions) { + public InsertMessage(java.lang.CharSequence operation, java.lang.CharSequence streamName, java.lang.CharSequence sessionId, java.lang.Long timestamp, java.util.List data, java.util.List actions) { this.operation = operation; this.streamName = streamName; this.sessionId = sessionId; @@ -48,7 +48,7 @@ public InsertMessage(CharSequence operation, CharSequence streamName, CharSequen public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. - public Object get(int field$) { + public java.lang.Object get(int field$) { switch (field$) { case 0: return operation; case 1: return streamName; @@ -61,14 +61,14 @@ public Object get(int field$) { } // Used by DatumReader. Applications should not call. @SuppressWarnings(value="unchecked") - public void put(int field$, Object value$) { + public void put(int field$, java.lang.Object value$) { switch (field$) { - case 0: operation = (CharSequence)value$; break; - case 1: streamName = (CharSequence)value$; break; - case 2: sessionId = (CharSequence)value$; break; - case 3: timestamp = (Long)value$; break; - case 4: data = (java.util.List)value$; break; - case 5: actions = (java.util.List)value$; break; + case 0: operation = (java.lang.CharSequence)value$; break; + case 1: streamName = (java.lang.CharSequence)value$; break; + case 2: sessionId = (java.lang.CharSequence)value$; break; + case 3: timestamp = (java.lang.Long)value$; break; + case 4: data = (java.util.List)value$; break; + case 5: actions = (java.util.List)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -76,7 +76,7 @@ public void put(int field$, Object value$) { /** * Gets the value of the 'operation' field. */ - public CharSequence getOperation() { + public java.lang.CharSequence getOperation() { return operation; } @@ -84,14 +84,14 @@ public CharSequence getOperation() { * Sets the value of the 'operation' field. * @param value the value to set. */ - public void setOperation(CharSequence value) { + public void setOperation(java.lang.CharSequence value) { this.operation = value; } /** * Gets the value of the 'streamName' field. */ - public CharSequence getStreamName() { + public java.lang.CharSequence getStreamName() { return streamName; } @@ -99,14 +99,14 @@ public CharSequence getStreamName() { * Sets the value of the 'streamName' field. * @param value the value to set. */ - public void setStreamName(CharSequence value) { + public void setStreamName(java.lang.CharSequence value) { this.streamName = value; } /** * Gets the value of the 'sessionId' field. */ - public CharSequence getSessionId() { + public java.lang.CharSequence getSessionId() { return sessionId; } @@ -114,14 +114,14 @@ public CharSequence getSessionId() { * Sets the value of the 'sessionId' field. * @param value the value to set. */ - public void setSessionId(CharSequence value) { + public void setSessionId(java.lang.CharSequence value) { this.sessionId = value; } /** * Gets the value of the 'timestamp' field. */ - public Long getTimestamp() { + public java.lang.Long getTimestamp() { return timestamp; } @@ -129,14 +129,14 @@ public Long getTimestamp() { * Sets the value of the 'timestamp' field. * @param value the value to set. */ - public void setTimestamp(Long value) { + public void setTimestamp(java.lang.Long value) { this.timestamp = value; } /** * Gets the value of the 'data' field. */ - public java.util.List getData() { + public java.util.List getData() { return data; } @@ -144,14 +144,14 @@ public java.util.List getData() { * Sets the value of the 'data' field. * @param value the value to set. */ - public void setData(java.util.List value) { + public void setData(java.util.List value) { this.data = value; } /** * Gets the value of the 'actions' field. */ - public java.util.List getActions() { + public java.util.List getActions() { return actions; } @@ -159,7 +159,7 @@ public java.util.List getActions() { * Sets the value of the 'actions' field. * @param value the value to set. */ - public void setActions(java.util.List value) { + public void setActions(java.util.List value) { this.actions = value; } @@ -167,8 +167,8 @@ public void setActions(java.util.List value) { * Creates a new InsertMessage RecordBuilder. * @return A new InsertMessage RecordBuilder */ - public static Builder newBuilder() { - return new Builder(); + public static com.stratio.decision.commons.avro.InsertMessage.Builder newBuilder() { + return new com.stratio.decision.commons.avro.InsertMessage.Builder(); } /** @@ -176,8 +176,8 @@ public static Builder newBuilder() { * @param other The existing builder to copy. * @return A new InsertMessage RecordBuilder */ - public static Builder newBuilder(Builder other) { - return new Builder(other); + public static com.stratio.decision.commons.avro.InsertMessage.Builder newBuilder(com.stratio.decision.commons.avro.InsertMessage.Builder other) { + return new com.stratio.decision.commons.avro.InsertMessage.Builder(other); } /** @@ -185,8 +185,8 @@ public static Builder newBuilder(Builder other) { * @param other The existing instance to copy. * @return A new InsertMessage RecordBuilder */ - public static Builder newBuilder(InsertMessage other) { - return new Builder(other); + public static com.stratio.decision.commons.avro.InsertMessage.Builder newBuilder(com.stratio.decision.commons.avro.InsertMessage other) { + return new com.stratio.decision.commons.avro.InsertMessage.Builder(other); } /** @@ -195,23 +195,23 @@ public static Builder newBuilder(InsertMessage other) { public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase implements org.apache.avro.data.RecordBuilder { - private CharSequence operation; - private CharSequence streamName; - private CharSequence sessionId; - private Long timestamp; - private java.util.List data; - private java.util.List actions; + private java.lang.CharSequence operation; + private java.lang.CharSequence streamName; + private java.lang.CharSequence sessionId; + private java.lang.Long timestamp; + private java.util.List data; + private java.util.List actions; /** Creates a new Builder */ private Builder() { - super(InsertMessage.SCHEMA$); + super(com.stratio.decision.commons.avro.InsertMessage.SCHEMA$); } /** * Creates a Builder by copying an existing Builder. * @param other The existing Builder to copy. */ - private Builder(Builder other) { + private Builder(com.stratio.decision.commons.avro.InsertMessage.Builder other) { super(other); if (isValidValue(fields()[0], other.operation)) { this.operation = data().deepCopy(fields()[0].schema(), other.operation); @@ -243,8 +243,8 @@ private Builder(Builder other) { * Creates a Builder by copying an existing InsertMessage instance * @param other The existing instance to copy. */ - private Builder(InsertMessage other) { - super(InsertMessage.SCHEMA$); + private Builder(com.stratio.decision.commons.avro.InsertMessage other) { + super(com.stratio.decision.commons.avro.InsertMessage.SCHEMA$); if (isValidValue(fields()[0], other.operation)) { this.operation = data().deepCopy(fields()[0].schema(), other.operation); fieldSetFlags()[0] = true; @@ -275,7 +275,7 @@ private Builder(InsertMessage other) { * Gets the value of the 'operation' field. * @return The value. */ - public CharSequence getOperation() { + public java.lang.CharSequence getOperation() { return operation; } @@ -284,7 +284,7 @@ public CharSequence getOperation() { * @param value The value of 'operation'. * @return This builder. */ - public Builder setOperation(CharSequence value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setOperation(java.lang.CharSequence value) { validate(fields()[0], value); this.operation = value; fieldSetFlags()[0] = true; @@ -304,7 +304,7 @@ public boolean hasOperation() { * Clears the value of the 'operation' field. * @return This builder. */ - public Builder clearOperation() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearOperation() { operation = null; fieldSetFlags()[0] = false; return this; @@ -314,7 +314,7 @@ public Builder clearOperation() { * Gets the value of the 'streamName' field. * @return The value. */ - public CharSequence getStreamName() { + public java.lang.CharSequence getStreamName() { return streamName; } @@ -323,7 +323,7 @@ public CharSequence getStreamName() { * @param value The value of 'streamName'. * @return This builder. */ - public Builder setStreamName(CharSequence value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setStreamName(java.lang.CharSequence value) { validate(fields()[1], value); this.streamName = value; fieldSetFlags()[1] = true; @@ -343,7 +343,7 @@ public boolean hasStreamName() { * Clears the value of the 'streamName' field. * @return This builder. */ - public Builder clearStreamName() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearStreamName() { streamName = null; fieldSetFlags()[1] = false; return this; @@ -353,7 +353,7 @@ public Builder clearStreamName() { * Gets the value of the 'sessionId' field. * @return The value. */ - public CharSequence getSessionId() { + public java.lang.CharSequence getSessionId() { return sessionId; } @@ -362,7 +362,7 @@ public CharSequence getSessionId() { * @param value The value of 'sessionId'. * @return This builder. */ - public Builder setSessionId(CharSequence value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setSessionId(java.lang.CharSequence value) { validate(fields()[2], value); this.sessionId = value; fieldSetFlags()[2] = true; @@ -382,7 +382,7 @@ public boolean hasSessionId() { * Clears the value of the 'sessionId' field. * @return This builder. */ - public Builder clearSessionId() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearSessionId() { sessionId = null; fieldSetFlags()[2] = false; return this; @@ -392,7 +392,7 @@ public Builder clearSessionId() { * Gets the value of the 'timestamp' field. * @return The value. */ - public Long getTimestamp() { + public java.lang.Long getTimestamp() { return timestamp; } @@ -401,7 +401,7 @@ public Long getTimestamp() { * @param value The value of 'timestamp'. * @return This builder. */ - public Builder setTimestamp(Long value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setTimestamp(java.lang.Long value) { validate(fields()[3], value); this.timestamp = value; fieldSetFlags()[3] = true; @@ -421,7 +421,7 @@ public boolean hasTimestamp() { * Clears the value of the 'timestamp' field. * @return This builder. */ - public Builder clearTimestamp() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearTimestamp() { timestamp = null; fieldSetFlags()[3] = false; return this; @@ -431,7 +431,7 @@ public Builder clearTimestamp() { * Gets the value of the 'data' field. * @return The value. */ - public java.util.List getData() { + public java.util.List getData() { return data; } @@ -440,7 +440,7 @@ public java.util.List getData() { * @param value The value of 'data'. * @return This builder. */ - public Builder setData(java.util.List value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setData(java.util.List value) { validate(fields()[4], value); this.data = value; fieldSetFlags()[4] = true; @@ -460,7 +460,7 @@ public boolean hasData() { * Clears the value of the 'data' field. * @return This builder. */ - public Builder clearData() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearData() { data = null; fieldSetFlags()[4] = false; return this; @@ -470,7 +470,7 @@ public Builder clearData() { * Gets the value of the 'actions' field. * @return The value. */ - public java.util.List getActions() { + public java.util.List getActions() { return actions; } @@ -479,7 +479,7 @@ public java.util.List getActions() { * @param value The value of 'actions'. * @return This builder. */ - public Builder setActions(java.util.List value) { + public com.stratio.decision.commons.avro.InsertMessage.Builder setActions(java.util.List value) { validate(fields()[5], value); this.actions = value; fieldSetFlags()[5] = true; @@ -499,7 +499,7 @@ public boolean hasActions() { * Clears the value of the 'actions' field. * @return This builder. */ - public Builder clearActions() { + public com.stratio.decision.commons.avro.InsertMessage.Builder clearActions() { actions = null; fieldSetFlags()[5] = false; return this; @@ -509,12 +509,12 @@ public Builder clearActions() { public InsertMessage build() { try { InsertMessage record = new InsertMessage(); - record.operation = fieldSetFlags()[0] ? this.operation : (CharSequence) defaultValue(fields()[0]); - record.streamName = fieldSetFlags()[1] ? this.streamName : (CharSequence) defaultValue(fields()[1]); - record.sessionId = fieldSetFlags()[2] ? this.sessionId : (CharSequence) defaultValue(fields()[2]); - record.timestamp = fieldSetFlags()[3] ? this.timestamp : (Long) defaultValue(fields()[3]); - record.data = fieldSetFlags()[4] ? this.data : (java.util.List) defaultValue(fields()[4]); - record.actions = fieldSetFlags()[5] ? this.actions : (java.util.List) defaultValue(fields()[5]); + record.operation = fieldSetFlags()[0] ? this.operation : (java.lang.CharSequence) defaultValue(fields()[0]); + record.streamName = fieldSetFlags()[1] ? this.streamName : (java.lang.CharSequence) defaultValue(fields()[1]); + record.sessionId = fieldSetFlags()[2] ? this.sessionId : (java.lang.CharSequence) defaultValue(fields()[2]); + record.timestamp = fieldSetFlags()[3] ? this.timestamp : (java.lang.Long) defaultValue(fields()[3]); + record.data = fieldSetFlags()[4] ? this.data : (java.util.List) defaultValue(fields()[4]); + record.actions = fieldSetFlags()[5] ? this.actions : (java.util.List) defaultValue(fields()[5]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); @@ -538,4 +538,4 @@ public InsertMessage build() { READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in)); } -} \ No newline at end of file +} diff --git a/engine/src/main/java/com/stratio/decision/serializer/impl/JavaToAvroSerializer.java b/engine/src/main/java/com/stratio/decision/serializer/impl/JavaToAvroSerializer.java index 9d9bb2cd..02c814d5 100644 --- a/engine/src/main/java/com/stratio/decision/serializer/impl/JavaToAvroSerializer.java +++ b/engine/src/main/java/com/stratio/decision/serializer/impl/JavaToAvroSerializer.java @@ -200,35 +200,42 @@ private StratioStreamingMessage convertMessage(InsertMessage insertMessage){ ColumnNameTypeValue columnNameTypeValue = new ColumnNameTypeValue(); columnNameTypeValue.setColumn(data.getColumn().toString()); - switch (data.getType().toString()) { - case "java.lang.Double" : - Double doubleData = new Double(data.getValue().toString()); - columnNameTypeValue.setValue(doubleData); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.DOUBLE); - break; - case "java.lang.Float" : - Float floatData = new Float(data.getValue().toString()); - columnNameTypeValue.setValue(floatData); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.FLOAT); - break; - case "java.lang.Integer" : - Integer integerData = new Integer(data.getValue().toString()); - columnNameTypeValue.setValue(integerData); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.INTEGER); - break; - case "java.lang.Long" : - Long longData = new Long(data.getValue().toString()); - columnNameTypeValue.setValue(longData); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.LONG); - break; - case "java.lang.Boolean" : - Boolean booleanData = new Boolean(data.getValue().toString()); - columnNameTypeValue.setValue(booleanData); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.BOOLEAN); - break; - default: - columnNameTypeValue.setValue(data.getValue().toString()); - columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.STRING); + if (data.getValue() == null){ + columnNameTypeValue.setValue(null); + columnNameTypeValue.setType(null); + } + else { + + switch (data.getType().toString()) { + case "java.lang.Double": + Double doubleData = new Double(data.getValue().toString()); + columnNameTypeValue.setValue(doubleData); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.DOUBLE); + break; + case "java.lang.Float": + Float floatData = new Float(data.getValue().toString()); + columnNameTypeValue.setValue(floatData); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.FLOAT); + break; + case "java.lang.Integer": + Integer integerData = new Integer(data.getValue().toString()); + columnNameTypeValue.setValue(integerData); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.INTEGER); + break; + case "java.lang.Long": + Long longData = new Long(data.getValue().toString()); + columnNameTypeValue.setValue(longData); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.LONG); + break; + case "java.lang.Boolean": + Boolean booleanData = new Boolean(data.getValue().toString()); + columnNameTypeValue.setValue(booleanData); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.BOOLEAN); + break; + default: + columnNameTypeValue.setValue(data.getValue().toString()); + columnNameTypeValue.setType(com.stratio.decision.commons.constants.ColumnType.STRING); + } } stratioStreamingMessage.addColumn(columnNameTypeValue);