Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run background queue tasks #78

Merged
merged 21 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Sources/MessagingPush/MessagingPushImplementation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ internal class MessagingPushImplementation: MessagingPushInstance {
Delete the currently registered device token
*/
public func deleteDeviceToken(onComplete: @escaping (Result<Void, CustomerIOError>) -> Void) {
guard let bodyData = jsonAdapter.toJson(DeleteDeviceRequest()) else {
return onComplete(.failure(.http(.noRequestMade(nil))))
}

guard let deviceToken = self.deviceToken else {
// no device token, delete has already happened or is not needed
return onComplete(Result.success(()))
Expand All @@ -96,7 +92,7 @@ internal class MessagingPushImplementation: MessagingPushInstance {
let httpRequestParameters =
HttpRequestParams(endpoint: .deleteDevice(identifier: identifier,
deviceToken: deviceToken),
headers: nil, body: bodyData)
headers: nil, body: nil)

httpClient
.request(httpRequestParameters) { [weak self] result in
Expand Down

This file was deleted.

122 changes: 117 additions & 5 deletions Sources/Tracking/Background Queue/Queue.swift
Original file line number Diff line number Diff line change
@@ -1,20 +1,132 @@
import Foundation

public protocol Queue {
func addTask(type: QueueTaskType, data: Data) -> (success: Bool, queueStatus: QueueStatus)
/**
A background queue to perform actions in the background (probably network requests).
A queue exists to make our public facing SDK functions synchronous:
`CustomerIO.shared.trackEvent("foo")` where we perform the API call sometime in the future
and handle errors/retry so customers don't have to.

The queue is designed with the main purpose of performing API calls. Let's show an example of
how it's recommended you use the queue and how *not* to use the queue.

Let's say you're identifying a profile.
```
// recommended use of the background queue
func identifyProfile(identifier: String, data: Encodable) {
// perform all operations here, first before touching the queue
keyValueStorage.save(identifier)

if deviceTokenSavedToDifferentProfile {
// it's OK to add tasks to the queue here before we identify the new profile
queue.addTask(.deleteDeviceToken, identifier: oldProfileIdentifier)
keyValueStorage.delete(deviceToken)
}

// then, add a background queue task
queue.addTask(.identifyProfile, identifier: identifier)
}

// then later on in the code, the background queue task runs
// *all* of our logic for identifying a new profile.
levibostian marked this conversation as resolved.
Show resolved Hide resolved
func runQueueTask() {
httpClient.identifyProfile(newProfileIdentifier
}
```

Not recommended way of using the background queue:
levibostian marked this conversation as resolved.
Show resolved Hide resolved
```
func identifyProfile(identifier: String, data: Encodable) {
queue.addTask(.identifyProfile, identifier: identifier, oldProfileIdentifier)
}

// then later on in the code, the background queue task runs
// *all* of our logic for identifying a new profile.
func runQueueTask() {
let newProfileIdentifier = ...

keyValueStorage.save(identifier)

if deviceTokenSavedToDifferentProfile {
httpClient.delete(deviceTokenFromOldProfile)

keyValueStorage.delete(deviceToken)
}

httpClient.identifyProfile(newProfileIdentifier
}
```
*/
public protocol Queue: AutoMockable {
/**
Add a task to the queue to be performed sometime in the future.

`data` - Probably a struct that contains "a snapshot" of the data needed to perform the
background task (probably a network request).
*/
func addTask<TaskData: Codable>(
type: QueueTaskType,
// sourcery:Type=AnyEncodable
// sourcery:TypeCast="AnyEncodable(data)"
data: TaskData
) -> (success: Bool, queueStatus: QueueStatus)
func run(onComplete: @escaping () -> Void)
}

// sourcery: InjectRegister = "Queue"
public class CioQueue: Queue {
private let storage: QueueStorage
private let siteId: SiteId
private let runRequest: QueueRunRequest
private let jsonAdapter: JsonAdapter
private let logger: Logger
private let sdkConfigStore: SdkConfigStore

init(siteId: SiteId, storage: QueueStorage) {
init(
siteId: SiteId,
storage: QueueStorage,
runRequest: QueueRunRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this runner + runRequest + Manager pattern feels like it could all be flattened into just the runner - is there some specific benefit you're aiming for by breaking them apart?

Copy link
Contributor Author

@levibostian levibostian Nov 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

I did try to add comments to each of the queue classes (queue, run request, runner, storage, manager) explaining the use case of each.

Breaking the queue logic into separate files are for

  1. readability of the code. encapsulation of logic.
  2. scalability (since the runner will grow more and more with each queue task type that we add)
  3. memory safety. I try to limit our use of singletons so I encapsulated all of the singleton logic into just the QueueRequestManager class to keep singletons as small as possible.

I want to avoid the queue or queue runner being the singleton since the runner can have many dependencies that it would have a strong reference to. It needs strong references since the runner performs async operations and weak references would be GCed before the onComplete callback is called in the runner.

Now that I think about it, if we were to keep the runner as it is today where it's just a place where HTTP requests are performed, then we may not have to worry about the runner having many dependencies because it's just the HttpClient it's having a dependency on. My concern was that as the SDK grows, we may break out that runner HTTP logic of the runner into using a Repository or other design pattern. When we would do this, the dependencies list can grow greatly as the repository may have lots of children dependencies themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found it a little hard to differentiate between them (re: readability) and understand their purpose. Of the three reasons for breaking, memory safety though is a trump card - assuming that doing it in a flat way would increase likelihood that we could become more of a mem 🐷 - let's get a second opinion on the strategy here from @ami-aman

re: keeping the runner as a pure http client, I think the question ends up being whether there are any things we've discussed adding to our SDK that would require different behaviour than that? I haven't seen anything, and would err on the side of handling our current known & planned needs, vs worrying too much about long term. If we need to do a major refactor in future to handle some fancier use case, we can definitely do that, not worth encoding complexity now for an unknown future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though there is nothing wrong with modulating the files into Manager, runner and handler. @levibostian has focussed on keeping the code segregated / modulated + clean considering the future of the SDK and also I believe, testability since testing gets easier and reliable with modulated code. I appreciate Levi's efforts in this.

But at this moment, as @hownowstephen said, when we are introducing Alphas and Betas and we are not even sure what exactly is the customer needs OR how would our releases be welcomed or accepted I think we should focus more on getting the work done and bringing out new features and understand what next would the customer want. As we introduce new features in our SDKs, code will be re-written sometimes or might (at least) need major/minor tweaks. So, we will have time to re-consider the code not modulated. And revisiting the code base later will also help us find out our own mistakes (or can call it as scope of improvement) which we, the developers usually miss out while development.

Copy link
Contributor Author

@levibostian levibostian Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took another glance at the files that make up the queue. Here they are:

  • QueueRequestManager - the small singleton
  • QueueRunner - contains 1 function that gets called to run each of the queue tasks.
  • QueueRunRequest - The logic of the queue.
  • QueueStorage - wrapper around the file system to read/write data to files.
  • Queue - a delegate that is the entry point in the SDK to communicate with the SDK and all of it's features.

Thinking about the above list, I don't see any value in combining any of the classes. Every scenario I have thought through in my head I am hesitant because of 1 of the 3 issues I pointed out previously.

I can see something like combining QueueRunRequest and QueueRunner perhaps if we were able to think of a better replacement to the way QueueRunner.runTask()` works today (Maybe there is a way to reduce the copy/paste networking code that is in the runner today?).

Are we on the same page or am I talking about something completely off?

I did add comments explaining the value of the QueueRequestManager being small.

I am bias because I wrote the code, haha! but would another solution to this problem be to write some high level overview docs about the background queue and how it works? Would that help to make this code quicker to read in the future? I find it a red flag when we come up with the idea of docs explaining code over just making the code more readable in the first place, but I currently dont have major ideas for making the code more readable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to @ami-aman and possibly @Shahroz16 on the readability side, as I'm still quite new to Swift and am definitely not the target audience here!

jsonAdapter: JsonAdapter,
logger: Logger,
sdkConfigStore: SdkConfigStore
) {
self.storage = storage
self.siteId = siteId
self.runRequest = runRequest
self.jsonAdapter = jsonAdapter
self.logger = logger
self.sdkConfigStore = sdkConfigStore
}

public func addTask<T: Codable>(type: QueueTaskType, data: T) -> (success: Bool, queueStatus: QueueStatus) {
guard let data = jsonAdapter.toJson(data, encoder: nil) else {
return (success: false,
queueStatus: QueueStatus(queueId: siteId, numTasksInQueue: storage.getInventory().count))
}

let addTaskResult = storage.create(type: type, data: data)
processQueueStatus(addTaskResult.queueStatus)

return addTaskResult
}

public func addTask(type: QueueTaskType, data: Data) -> (success: Bool, queueStatus: QueueStatus) {
return storage.create(type: type, data: data)
public func run(onComplete: @escaping () -> Void) {
logger.verbose("Manually running background queue")

runRequest.start(onComplete: onComplete)
}

private func processQueueStatus(_ status: QueueStatus) {
logger.verbose("Processing queue status \(status).")
let isManyTasksInQueue = status.numTasksInQueue >= sdkConfigStore.config.backgroundQueueMinNumberOfTasks
hownowstephen marked this conversation as resolved.
Show resolved Hide resolved

let runQueue = isManyTasksInQueue

if runQueue {
logger.verbose("Automatically running background queue")

runRequest.start { [weak self] in
self?.logger.verbose("Automatic running background queue completed")
}
}
}
}
49 changes: 49 additions & 0 deletions Sources/Tracking/Background Queue/QueueRequestManager.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import Foundation

/**
Simple singleton that helps assert that the background queue is only running
one run request at one time (1 run request per site id since each site id can have it's own background queue).

When the background queue wants to run it's tasks, it's important that the queue only have 1 concurrent runner
running at one time to prevent race conditions and tasks running multiple times.
*/
public protocol QueueRequestManager: AutoMockable {
levibostian marked this conversation as resolved.
Show resolved Hide resolved
/// call when a runner run request is complete
func requestComplete()
/// call when a new run request is requested. adds callback to list of callbacks
/// to call when run request is done running.
/// returns is an existing run request is currently running or not.
func startRequest(onComplete: @escaping () -> Void) -> Bool
}

// sourcery: InjectRegister = "QueueRequestManager"
// sourcery: InjectSingleton
public class CioQueueRequestManager: QueueRequestManager {
@Atomic internal var isRunningRequest = false
@Atomic internal var callbacks: [() -> Void] = []

public func requestComplete() {
let existingCallbacks = callbacks

callbacks = []
isRunningRequest = false

existingCallbacks.forEach { callback in
callback()
}
}

public func startRequest(onComplete: @escaping () -> Void) -> Bool {
let isQueueRunningARequest = isRunningRequest

callbacks.append(onComplete)

if !isQueueRunningARequest {
isRunningRequest = true
}

// return the isRunningRequest value before modification or we will
// *always* return true (since we modify to true or ignore)
return isQueueRunningARequest
}
}
93 changes: 93 additions & 0 deletions Sources/Tracking/Background Queue/QueueRunRequest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import Foundation

public protocol QueueRunRequest: AutoMockable {
func start(onComplete: @escaping () -> Void)
}

// sourcery: InjectRegister = "QueueRunRequest"
public class CioQueueRunRequest: QueueRunRequest {
private let runner: QueueRunner
private let storage: QueueStorage
private let requestManger: QueueRequestManager
levibostian marked this conversation as resolved.
Show resolved Hide resolved
private let logger: Logger

init(runner: QueueRunner, storage: QueueStorage, requestManger: QueueRequestManager, logger: Logger) {
self.runner = runner
self.storage = storage
self.requestManger = requestManger
self.logger = logger
}

public func start(onComplete: @escaping () -> Void) {
let isRequestCurrentlyRunning = requestManger.startRequest(onComplete: onComplete)

if !isRequestCurrentlyRunning {
startNewRequestRun()
}
}

private func startNewRequestRun() {
let inventory = storage.getInventory()

runTasks(query: inventory)
}

private func runTasks(query: [QueueTaskMetadata]) {
let goToNextTask: () -> Void = {
var newQuery = query
newQuery.removeFirst()
self.runTasks(query: newQuery)
}

if query.isEmpty { // we hit the end of the current inventory. Done!
logger.verbose("background queue out of tasks.")

requestManger.requestComplete()

return
}

let nextTaskToRunInventoryItem = query[0]
let nextTaskStorageId = nextTaskToRunInventoryItem.taskPersistedId
guard let nextTaskToRun = storage.get(storageId: nextTaskStorageId) else {
// delete task from inventory since we can't find it in storage so we don't want to run it.
// ignore result because if it's successful or not, all we can do is try and delete and move on.
_ = storage.delete(storageId: nextTaskStorageId)

// log error. this scenario shouldn't happen where task can't be found.
logger.error("Tried to get queue task with storage id: \(nextTaskStorageId), but storage couldn't find it.")

return goToNextTask()
}

logger
.verbose("background queue next task \(nextTaskStorageId). query tasks remaining: \(query.count)")
logger.debug("next background queue task to run: \(nextTaskToRunInventoryItem) => \(nextTaskToRun)")

// we are not using [weak self] because if the task is currently running,
// we dont want the result handler to get garbage collected which could
// make the task run again when it shouldn't.
//
// if we wanted to use [weak self] then we should allow running a task to cancel
// while executing which would then allow this to use [weak self].
runner.runTask(nextTaskToRun) { result in
switch result {
case .success:
self.logger.verbose("background queue task \(nextTaskStorageId) success")

_ = self.storage.delete(storageId: nextTaskToRunInventoryItem.taskPersistedId)
case .failure(let error):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be differentiating between issues in CIO (50x) and issues from the customer (40x)

self.logger.verbose("background queue task \(nextTaskStorageId) fail - \(error.localizedDescription)")

let executedTaskPreviousRunResults = nextTaskToRun.runResults
let newRunResults = executedTaskPreviousRunResults
.totalRunsSet(executedTaskPreviousRunResults.totalRuns + 1)

_ = self.storage.update(storageId: nextTaskToRunInventoryItem.taskPersistedId,
hownowstephen marked this conversation as resolved.
Show resolved Hide resolved
runResults: newRunResults)
}

return goToNextTask()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit of an interesting questions as to what we should do if a request fails, I think it's probably fine to just continue in the queue due to how we handle customer creation, but worth having a think about the use case where there are two items in the queue:

request1 -> initial identify
request2 -> add device
request3 -> send event

if request1 fails and defers to later then the profile could in theory be empty and just contain a device until the queue is run again. Likely a small window, but it does raise the question as to whether the initial identify should be given a blocking behaviour 🤔 so that until it gets a non 50x response we don't continue. Or possibly any 50x stops the queue?

}
}
}
77 changes: 77 additions & 0 deletions Sources/Tracking/Background Queue/QueueRunner.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import Foundation

/**
Where queue tasks are executed asynchronously.

To keep this class testable, try to keep it small. So, the class's job is to take a
task type and generic `Data` for task data and call some other code to perform the
actual work on executing the task.
*/
public protocol QueueRunner: AutoMockable {
func runTask(_ task: QueueTask, onComplete: @escaping (Result<Void, CustomerIOError>) -> Void)
}

// sourcery: InjectRegister = "QueueRunner"
public class CioQueueRunner: QueueRunner {
private let jsonAdapter: JsonAdapter
private let siteId: SiteId
private let logger: Logger
private let httpClient: HttpClient

init(siteId: SiteId, jsonAdapter: JsonAdapter, logger: Logger, httpClient: HttpClient) {
self.siteId = siteId
self.jsonAdapter = jsonAdapter
self.logger = logger
self.httpClient = httpClient
}

public func runTask(_ task: QueueTask, onComplete: @escaping (Result<Void, CustomerIOError>) -> Void) {
let failureIfDontDecodeTaskData: Result<Void, CustomerIOError> = .failure(.http(.noRequestMade(nil)))

switch task.type {
case .identifyProfile:
guard let taskData = getTaskData(task, type: IdentifyProfileQueueTaskData.self) else {
levibostian marked this conversation as resolved.
Show resolved Hide resolved
return onComplete(failureIfDontDecodeTaskData)
}

let httpParams = HttpRequestParams(endpoint: .identifyCustomer(identifier: taskData.identifier),
headers: nil, body: taskData.attributesJsonString?.data)

performHttpRequest(params: httpParams, onComplete: onComplete)
case .trackEvent:
guard let taskData = getTaskData(task, type: TrackEventQueueTaskData.self) else {
return onComplete(failureIfDontDecodeTaskData)
}

let httpParams = HttpRequestParams(endpoint: .trackCustomerEvent(identifier: taskData.identifier),
headers: nil, body: taskData.attributesJsonString.data)

performHttpRequest(params: httpParams, onComplete: onComplete)
}
}

/// (1) less code for `runTask` function to decode JSON and (2) one place to do error logging if decoding wrong.
private func getTaskData<T: Decodable>(_ task: QueueTask, type: T.Type) -> T? {
let taskData: T? = jsonAdapter.fromJson(task.data, decoder: nil)

if taskData == nil {
/// log as error because it's a developer error since SDK is who encoded the TaskData in the first place
/// we should always be able to decode it without problem.
logger.error("Failure decoding: \(task.data.string ?? "()") to \(type)")
}

return taskData
}

private func performHttpRequest(
params: HttpRequestParams,
onComplete: @escaping (Result<Void, CustomerIOError>) -> Void
) {
httpClient.request(params) { result in
switch result {
case .success: onComplete(.success(()))
case .failure(let httpError): onComplete(.failure(.http(httpError)))
}
}
}
}
Loading