-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for CKAN issues: recvTimeTs and attrValue Integer 0 #1982
base: master
Are you sure you want to change the base?
Conversation
copy-dependencies.
changing it to string 0.
public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { | ||
return cygnusEvent.getRecvTimeTs(); | ||
} | ||
|
||
/** | ||
* Returns a possible adapted value for attribute value. | ||
* @param attrValue The input | ||
* @return the adapted output. Default unchanged, same object. | ||
*/ | ||
public JsonElement adaptAttrValue(JsonElement attrValue) { | ||
// Default: No adaptation. | ||
return attrValue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be here. This data is very specific for this solution. This is a generic class which is used by many sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, but the aggregate method is difficult to understand for non-insiders. By adding these idempotent method I was sure nothing to break and still able to inject a specific change. I do not see an alternative, even not after a fresh look.
cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java
Show resolved
Hide resolved
@@ -342,9 +343,30 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { | |||
} // try catch | |||
} // truncateByTime | |||
|
|||
private class NGSICKANRowAggregator extends NGSIGenericRowAggregator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea. Maybe on this class you can override the aggregation method to loop all the events processed by it's super, then you can add the / 1000
operation to theNGSIConstants.RECV_TIME_TS
key of the LinkedHashMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how to loop, it is too hard to understand what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. This aggregation is a bit difficult to understand.
First of all, here you can find a guide to debug Cygnus with docker in order to understand better how it works.
About the implementation.
In this case (Row-storage) all events are processed by the NGSIGenericRowAggregator.aggregate
method on each sink. Exactly here for this sink. The output of this aggregation
method is a LinkedHashMap
. This LinkedHashMap
contains all events processed on the batch on the following way.
- The keys of the
LinkedHashMap
on Row aggregation are deffined as constants here. - The values for those keys are
ArrayList
collections, wich contains all of the values processed on the batch.
For this solution, you are interested on the RECV_TIME_TS constant. As mentioned before, a way to implement this, would be to get the key RECV_TIME_TS from the LinkedHashMap
after the aggregation ends and loop each one of the elements of the list to add the / 1000
operation. Then set this new list on the LinkedHashMap
for the key RECV_TIME_TS.
Hope this helps.
Implementation doesn't fit project conventions. Please take a look at the comments on the code and make the propper fixes in order to review again and eventually merge it. |
This pull request contains the fixes for two issues: #1947 and #1948.