-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: cdp migration - processing pending tasks in bgQ #424
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you share some more information about this PR? What work has been done and what has not been done?
Without this context, I am unsure what to review.
Quickly reviewing it now, tests need to be added, implementation is incomplete, and there are performance issues in this implementation.
I understand if more PRs will be made to address these concerns but with the information I have right now, I am unsure if that's true.
Sample app builds 📱Below you will find the list of the latest versions of the sample apps. It's recommended to always download the latest builds of the sample apps to accurately test the pull request.
|
@levibostian Thanks for your feedback. I have updated the description with some detail. I hope it helps. Also to address the issue you highlighted with profileAttributes, I have considered that and updated the code. Let me know if you have any questions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some suggestions to start with. I will probably have more suggestions once more of the implementation is written for this feature.
public func deleteProcessedTask(_ task: QueueTaskMetadata) { | ||
let storageId = task.taskPersistedId | ||
if !storage.delete(storageId: storageId) { | ||
logger.error("Failed to delete task with storage id: \(storageId).") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of deleting BQ tasks after they have been migrated, I think it would be more performant that we tell the OS to delete the entire directory where all BQ inventory tasks are stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is one option I did think of but this does not cover a scenario where the task isn't processed for some reason but then deleting the entire directory would remove those tasks too. Does this make sense?
Sources/Tracking/CustomerIO.swift
Outdated
// Check if any unprocessed tasks are pending in the background queue. | ||
// If so, iterate over them and process each one. | ||
if let allStoredTasks = implementation.getAllStoredTasks(), !allStoredTasks.isEmpty { | ||
allStoredTasks.forEach { task in | ||
implementation.getStoredTask(for: task) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the SDK will experience a lot of performance problems if we have the BQ migration code existing where it is here. Instead, I think the BQ migration should run on a different thread.
Here is an example of where we have done background thread processing in the past. Could we use similar logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also want to suggest that we encapsulate all of this BQ migration code into it's own separate file. So we can write tests against it easier instead of having the logic inside of CustomerIO.initialize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, Levi ! I do believe that this could create performance issues for bigger queues and it is a good idea to move this processing to a separate background thread. FYI, as mentioned in the description, it is a TODO to test this implementation with large data !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
|
||
// To process pending tasks in background queue | ||
extension DataPipelineImplementation { | ||
func processIdentifyFromBGQ(identifier: String, body: [String: Any]?) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be adding timestamp as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identify
does not have a timestamp
in JSON that we get from the queue.
{"attributes_json_string":"null","identifier":"[email protected]"}
All event and screen tracks have timestamp
and have been added to the methods (Refer this and this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These timestamps are from the metric model, we can have timestamp for events from TaskMetaData
. I added the comment where i think we can probably get and return that information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is createdAt
which I believe might differ in terms of the actual timestamp
. If that works for us then I can utilise and make updates in all the methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code with timestamp
as available, if not then createdAt
from QueueTaskMetadata
!!!
CustomerIO.shared.identify(identifier: emailId) | ||
return | ||
} | ||
CustomerIO.shared.identify(identifier: emailId, body: data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a crash that happens when this method sends a nil
data to identify
that CDP method doesn't expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should fix the identify method as well, even if it gets nil there shouldn't be a crash. thanks for this workaround
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. We must fix the identify method too in our SDK. This can be done in a separate PR as this is unrelated to this ticket.
CustomerIO.shared.identify(identifier: emailId) | ||
return | ||
} | ||
CustomerIO.shared.identify(identifier: emailId, body: data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should fix the identify method as well, even if it gets nil there shouldn't be a crash. thanks for this workaround
case .trackDeliveryMetric: | ||
// TODO: Segment doesn't provide this method by default needs to get added | ||
// Remove isProcessed when the method is added | ||
print("Track Delivery Metrics for in-app - Needs discussion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its the same method as track metric
for push, we just need to exclude recipient
let properties: [String: Any] = metaData.mergeWith([
"metric": event.rawValue,
"deliveryId": deliveryId,
])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calls are automatically going to trackMetric
. What I have been thinking about is the use case of trackDeliveryMetric
as I could not figure out a way to test this case. I added a "need discussion" comment because this is one of the possible cases in QueueTaskType
but I had a hard time trying to reproduce this one.
func processIdentifyFromBGQ(identifier: String, body: [String: Any]?) | ||
func processScreenEventFromBGQ(identifier: String, name: String, timestamp: String?, properties: [String: Any]) | ||
func processEventFromBGQ(identifier: String, name: String, timestamp: String?, properties: [String: Any]) | ||
func processDeleteTokenFromBGQ(identifier: String, token: String) | ||
func processRegisterDeviceFromBGQ(identifier: String, token: String, attributes: [String: Any]?) | ||
func processPushMetricsFromBGQ(token: String, event: Metric, deliveryId: String, timestamp: String, metaData: [String: Any]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have timestamps in here, commenting in the method i think we can get that value from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
// TODO: Write test case | ||
public func getTaskDetail(_ task: QueueTaskMetadata) -> (data: Data, taskType: QueueTaskType)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably get the timestamp from here?
let createdAt = task.createdAt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// To process pending tasks in background queue | ||
extension DataPipelineImplementation { | ||
func processIdentifyFromBGQ(identifier: String, body: [String: Any]?) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These timestamps are from the metric model, we can have timestamp for events from TaskMetaData
. I added the comment where i think we can probably get and return that information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, some suggestions to increase confidence in migration tasks.
extension DataPipelineImplementation { | ||
func processIdentifyFromBGQ(identifier: String, timestamp: String, body: [String: Any]?) { | ||
var identifyEvent = IdentifyEvent(userId: identifier, traits: nil) | ||
identifyEvent.timestamp = timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make sure we have verified the format of timestamp that we are adding from BQ is same as one Analytics expect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated all timestamps to ISO format.
func test_givenBacklog_expectTaskProcessed() { | ||
var inventory: [QueueTaskMetadata] = [] | ||
let givenType = QueueTaskType.identifyProfile | ||
let givenTask = IdentifyProfileQueueTaskData(identifier: String.random, attributesJsonString: "null") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are only appending the same kind of task, it would have been great if we could have an additional test case where we verify each type of task and make sure its values are being received accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think that will change the functionality in any way., Since it is a test case and not actual implementation so I do not believe it will make any change!
I would still suggest to do a peer and group testing of this feature since this is the major one. I am confident in it's working as far as I have tested but I do agree that dev testing is not enough at times specially when the feature is an important one like this ! We can do a test run once the team is back next week of all the modules that have been done. It can be a kind of UAT ! |
closes: https://github.com/customerio/issues/issues/11649
What does this PR do?
This pull request introduces a method that analyzes the Journey's SDK background queue to determine if any tasks from the Journeys SDK remain unexecuted. Should such tasks exist, it then fetches each individual task and executes them one by one. This ensures that all pending tasks are executed prior to transitioning to the CDP and all the pending tasks are sent to the CDP. Once a task is successfully executed, it is removed from the background queue.
This PR currently handles :
Pending:
Complete each step to get your pull request merged in. Learn more about the workflow this project uses.