-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added ODPEventManager implementation #487
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good overall. A few suggestions.
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); | ||
private static final int DEFAULT_BATCH_SIZE = 10; | ||
private static final int DEFAULT_QUEUE_SIZE = 10000; | ||
private static final int FLUSH_INTERVAL = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the default interval to 10secs for all SDKs consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
private final List<ODPEvent> currentBatch = new ArrayList<>(); | ||
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread-safety for these - currentBatch and eventQueue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Moved
currentBatch
to the thread instance itself which runs in a loop which means there will be NO simultaneous access on this at all. eventQueue
is an instance ofLinkedBlockingQueue
which itself is thread safe
public void run() { | ||
while (true) { | ||
try { | ||
ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use this fixed interval between events (instead of overrall interval between flush), the max gap between flushes can be increased up to (batchSize * FLUSH_INTERVAL). Is this what we have for logx event batching?
Some ODP events can be time-sensitive (relatively more than logx events), so it'll be good if we have more tight control of the extra delay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this whole thing. Made this timeout dynamic based on how much time elapsed since last flush.
} | ||
|
||
public void updateSettings(ODPConfig odpConfig) { | ||
this.odpConfig = odpConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this replacement thread-safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made odpConfig
volatile to make it thread safe.
numAttempts ++; | ||
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); | ||
} else { | ||
logger.warn("ODPConfig not ready, discarding event batch"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be debug level? When ODP is not integrated, we do not want dump many warning messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
if (!eventQueue.offer(event)) { | ||
logger.warn("Failed to Process ODP Event. Event Queue is not accepting any more events"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case, event is discarded here? It should be logger.error in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usual case would be when queue is at capacity but we are already checking that before this. This check is just there because offer
can return false in case of an unexpected error. Changing log level to error
return data; | ||
} | ||
|
||
private void processEvent(ODPEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider filtering out silently events when apiKey/host is null. I see you discards them when flushing below. They don't need to be queued and discarded if ODP is not integrated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@jaeopt I incorporated all the changes you requested. Unit tests are pending. Take a look when you have time. |
@jaeopt I incorporated all the feedback and added unit tests. Please take a look when you can. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the changes and new tests look good.
I found one more potential issue with the event queue batch interval. See my comments.
Also a few more tests suggested.
|
||
private Boolean isRunning = false; | ||
private ODPConfig odpConfig; | ||
private volatile ODPConfig odpConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment why this should be volatile?
@@ -26,8 +27,8 @@ public class ODPEvent { | |||
public ODPEvent(String type, String action, Map<String, String> identifiers, Map<String, Object> data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding @Nullable, @Nonnull annotations to all params?
this(odpConfig, apiManager); | ||
this.batchSize = batchSize; | ||
this.queueSize = queueSize; | ||
public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, Integer batchSize, Integer queueSize, Integer flushInterval) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. @Nonnull, @Nullable will be helpful.
for (ODPEvent event: events) { | ||
sendEvent(event); | ||
} | ||
public void identifyUser(String vuid, String userId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vuid should be @Nullable, good for Android-SDK support only. We need to filter it out for other server-sdk usage.
private EventDispatcherThread eventDispatcherThread; | ||
|
||
private final ODPApiManager apiManager; | ||
private final List<ODPEvent> currentBatch = new ArrayList<>(); | ||
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment about thready-safety requirement will be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Override | ||
public void run() { | ||
while (true) { | ||
try { | ||
ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); | ||
long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); | ||
ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not get events for a while (after 1sec), this will make it busy-waiting all the time instead of blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check to see if the batch has events, then adding a timeout otherwise doing a no argument poll which indefinitely blocks till the next event arrives.
@Override | ||
public void run() { | ||
while (true) { | ||
try { | ||
ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); | ||
long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If nextFlushTime depends on the lastFlushTime, the event arriving after a gap can be fired without waiting enough for batching. Example -
(Event1) (Event2) (flush. lastFlushTime updated) (990ms gap) (Event3) (10ms gap) (flush on nextFlushTime)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it! Now the nextFlushTime is determined when a new batch is started.
eventManager.start(); | ||
ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); | ||
eventManager.sendEvent(event); | ||
logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process Event. ODPConfig is not ready."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process Event. ODPConfig is not ready."); | |
logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process ODP Event. ODPConfig is not ready."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Summary
Added
ODPEventManager
implementation.Test plan
Jira
OASIS-8386