-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Setup Storage interface for log persistence #186
Conversation
} | ||
|
||
// Read from the s3 client and write to the websocket. | ||
buf := make([]byte, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we increase the buffer size?
p.closeActiveChannels(jobID) | ||
|
||
// Persist logs to storage backend | ||
ok, err := p.storageBackend.Write(jobID, outputBuffer.Buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we persist the job synchronously. We could spin up new goroutines for this but I thought it would be good to see how long it actually takes and how much it impacts our response time(after we have our instrumented implementation) before we make premature optimizations!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see more encapsulation here, instead of having ProjectCommandOutputHandler doing a million things. This seems like could have another struct that's a field here that abstracts usage of projectOutputBuffers in addition to the StorageBackend to make it almost seamless integration.
One idea for this is as follows:
type JobStatus int
const (
Processing JobStatus = iota
Complete
)
type Job struct {
Output []string
Status JobStatus
}
type JobStore interface {
// gets the job id from the in memory buffers if available, and if not
// reaches to storage backend
Get(jobID string) Job
// Appends a given string to the job's output, this won't accept anymore output
// if the status of the job is complete
AppendOutput(jobID string, output string)
// sets a job status and triggers any associated workflow,
// e.g. if the status is complete, the job is flushed to the associated storage
// backend.
SetJobStatus(jobID string, status JobStatus)
}
JobStore here basically abstracts dealing with all the in memory buffers and storage backend so that project command output handler doesn't have to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I like this approach better! I've made the changes accordingly.
Read(key string) io.ReadCloser | ||
|
||
// Write logs to the storage backend | ||
Write(key string, logs []string) (success bool, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This returns success bool which indicates if the logs were successfully persisted. This would make the code path cleaner when log persistence is not configured. The NoopStorageBackend
returns false, nil
which means the logs were not persisted without any errors. So, we leave the output buffer as it is instead of clearing it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a first pass and have some comments.
server/jobs/storage_backend.go
Outdated
type StorageBackend interface { | ||
// Checks the backend storage for the specified key | ||
IsKeyExists(key string) bool | ||
|
||
// Read logs from the storage backend. Must close the reader | ||
Read(key string) io.ReadCloser | ||
|
||
// Write logs to the storage backend | ||
Write(key string, logs []string) (success bool, err error) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have an interface on the read side you don't need this, that's just too much indirection.
p.closeActiveChannels(jobID) | ||
|
||
// Persist logs to storage backend | ||
ok, err := p.storageBackend.Write(jobID, outputBuffer.Buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see more encapsulation here, instead of having ProjectCommandOutputHandler doing a million things. This seems like could have another struct that's a field here that abstracts usage of projectOutputBuffers in addition to the StorageBackend to make it almost seamless integration.
One idea for this is as follows:
type JobStatus int
const (
Processing JobStatus = iota
Complete
)
type Job struct {
Output []string
Status JobStatus
}
type JobStore interface {
// gets the job id from the in memory buffers if available, and if not
// reaches to storage backend
Get(jobID string) Job
// Appends a given string to the job's output, this won't accept anymore output
// if the status of the job is complete
AppendOutput(jobID string, output string)
// sets a job status and triggers any associated workflow,
// e.g. if the status is complete, the job is flushed to the associated storage
// backend.
SetJobStatus(jobID string, status JobStatus)
}
JobStore here basically abstracts dealing with all the in memory buffers and storage backend so that project command output handler doesn't have to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks SO much better! You're basically good to go, just have a couple things.
This PR updates the multiplexer to check for the job in the storage backend before registering with partition registry. For persisting the job, the project output wrapper calls the
completeJob()
method which handles closing active ws connections, marking operation complete and clearing buffers if the job is successfully persisted.