-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add retries for enqueuing graphile jobs #11561
Conversation
I know what's up with the test errors, will address tomorrow. Just some tests that weren't updated |
This is a disappointment with regards to Aurora. I remember we chose it just because it's a scalable cloud database that could scale up if we get a sudden spike of jobs. If instead it's a scalable database, except when it scales (probably as we're coming into a spike), then what's the point 🤦 |
Come to think about it - I wonder if part of the problem is that the graphile-worker lib internally is trying to reuse a connection and doesn't even try to establish a new one after a disconnect? |
a9a9636
to
0b85c06
Compare
await instrument( | ||
this.pluginsServer.statsd, | ||
{ | ||
metricName: jobName === JobName.PLUGIN_JOB ? 'vm.enqueuePluginJob' : 'vm.enqueueBufferJob', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is confusing.
- It makes an implicit assertion that there's only two different types of jobs. Which might be true for now, but will it be long-term?
- It prefixes the metric with
vm.
which just is a lie
Suggestion:
metricName: jobName === JobName.PLUGIN_JOB ? 'vm.enqueuePluginJob' : 'vm.enqueueBufferJob', | |
metricName: `job_queues_enqueue` |
And add jobName as a tag instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was making sure we kept the metric that was already established before, but can get rid of that in favor of this new metric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's break compatibility if it adds complexity to the code. Our metric retention period is relatively short anyways.
metricName: jobName === JobName.PLUGIN_JOB ? 'vm.enqueuePluginJob' : 'vm.enqueueBufferJob', | ||
key: instrumentationContext?.key ?? '?', | ||
tag: instrumentationContext?.tag ?? '?', | ||
tags: { pluginServerMode, type: jobType }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no reason to add pluginServerMode tag - https://github.com/PostHog/posthog/blob/master/plugin-server/src/utils/db/hub.ts#L82-L84
pluginServerMode, | ||
}, | ||
tryFn: async () => this._enqueue(jobName, job), | ||
catchFn: () => status.error('🔴', 'Exhausted attempts to enqueue job.'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problems:
- The log message isn't actionable or even clear. Suggestion:
Exhausted attempts to enqueue job, job was dropped.
- We should log the
jobName, job
objects as well
Q:
- This is a
catchFn
notfinallyFn
? Does this swallow all errors or will sentry receive an error as well? T
plugin-server/src/utils/retries.ts
Outdated
const nextRetryMs = getNextRetryMs(retryBaseMs, retryMultiplier, attempt) | ||
hub.statsd?.increment(`${metricPrefix}.${metricName}.RETRY`, { | ||
...metricTags, | ||
attempt: attempt.toString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be really conservative with tags. What value does attempt tag give us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you set this up in the past, happy to remove now
plugin-server/src/utils/retries.ts
Outdated
} | ||
if (error instanceof RetryError && attempt < maxAttempts) { | ||
const nextRetryMs = getNextRetryMs(retryBaseMs, retryMultiplier, attempt) | ||
hub.statsd?.increment(`${metricPrefix}.${metricName}.RETRY`, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Reverse-engineering actual metrics from code in cases of interpolation like this is hell. Do we actually need this? Could we just use metricName
and kill the metricPrefix
?
Also what's with the weird capitalization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understand it was hard to parse the new changes from old, but this is effectively following what was already there.
description: metricTags.plugin || '?', | ||
data: { | ||
metricName, | ||
payload, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does nesting objects like this work nicely in sentry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure - again, this was here before, just it was named "event"
it('calls runRetriableFunction with the correct parameters', async () => { | ||
await jobQueueManager.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob) | ||
expect(runRetriableFunction).toHaveBeenCalled() | ||
const runRetriableFunctionArgs = jest.mocked(runRetriableFunction).mock.calls[0][0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking calls this way is a code smell.
Do you mean to do something like this:
expect(runRetriableFunction).toHaveBeenCalledWith({
metricPrefix: 'enqueueJob',
//...
})
Note you can use expect.any()
and equivalent within the object if there's problematic arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my experience that actually doesn't work very well. See e.g. https://stackoverflow.com/questions/50676554/jest-expect-any-not-working-as-expected
toMatchObject
has also not worked great for me in the past
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have had zero problems and we use both expect.any()
, expect.objectContaining()
extensively. toMatchObject much less so though.
|
||
describe('_enqueue()', () => { | ||
it('enqueues jobs to the first available job queue', async () => { | ||
jobQueueManager.jobQueues[0].enqueue = jest.fn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code nit:
Use jest.spyOn()
here and below, you can also use mockImplementation on it.
describe('JobQueueManager', () => { | ||
let jobQueueManager: JobQueueManager | ||
beforeEach(() => { | ||
jest.clearAllMocks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed.
import { JobQueueManager } from './../src/main/job-queues/job-queue-manager' | ||
import { EnqueuedJob, Hub, JobName } from './../src/types' | ||
|
||
jest.mock('../src/utils/retries') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test log? If so, please also mock /status util - tests should not be noisy.
jobQueueManager = new JobQueueManager(mockHub) | ||
}) | ||
|
||
describe('enqueue()', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a test for error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's tests for _enqueue error handling and error handling in the retries flow. This function just calls both of those, what do you mean?
@@ -1272,23 +1283,28 @@ describe('vm tests', () => { | |||
} | |||
await delay(1010) | |||
|
|||
expect(fetch).toHaveBeenCalledTimes(15) | |||
expect(fetch).toHaveBeenCalledTimes(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change caused by? Sadly all the retry code changes are unreviewable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We stopped checking type errors on test files, so I noticed this file had a bunch and fixed them. One of them was that I added the missing uuid
to all events. That causes the payload to be larger, making this test for buffering based on bytes flush more often.
Also took me a while to figure this out, was very confused, super annoying.
Will address some more follow ups tomorrow |
Problem
We're losing jobs when Aurora has small periods of unavailability due to scaling.
https://sentry.io/organizations/posthog2/issues/3477565970/?query=is%3Aunresolved+no+jobqueue&statsPeriod=14d
Changes
How did you test this code?
Not in scope