-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 relay interface #833
S3 relay interface #833
Conversation
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, some initial comments
go.mod
Outdated
@@ -6,6 +6,7 @@ toolchain go1.21.1 | |||
|
|||
require ( | |||
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a | |||
github.com/aws/aws-sdk-go v1.55.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to do it with just v2 library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, refactored to only use v2 API.
relay/dataplane/config.go
Outdated
PrefixChars int | ||
// This framework utilizes a pool of workers to help upload/download files. This value specifies the number of | ||
// workers to use. Default is 32. | ||
Parallelism int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set the default as number of cpus in machine? Like by using gomaxprocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've turned this into two parameters: ParallelismFactor
and ParallelismConstant
. The total number of workers is set equal to the formula ParallelismFactor * numCores + ParallelismConstant
.
We will want to have a lot more workers than cores, since most of the time the workers are blocked on IO tasks. This allows us to set a sane default that uses a good number of workers as the number of cores grows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Few comments
relay/dataplane/client.go
Outdated
// that are consumed by utilities that are not aware of the multipart upload/download process. | ||
// | ||
// Implementations of this interface are required to be thread-safe. | ||
type S3Client interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we implement new methods that support fragments in the existing s3 client (common/aws/s3
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change made
relay/dataplane/fragment.go
Outdated
if fragmentCount-1 == index { | ||
postfix = "f" | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also validate that index < fragmentCount
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checks added
relay/dataplane/s3_client.go
Outdated
// tasks are placed into this channel to be processed by workers. | ||
tasks chan func() | ||
// this wait group is completed when all workers have finished. | ||
wg *sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you're doing here can be simplified & abstracted out by using workerpool
(ex. https://github.com/Layr-Labs/eigenda/blob/master/api/clients/retrieval_client.go#L181)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a neat little library, good to know about. Done.
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Please resolve the test failures before merging
common/aws/cli.go
Outdated
) | ||
|
||
type ClientConfig struct { | ||
Region string | ||
AccessKey string | ||
// The region to use when interacting with S3. Default is "us-east-2". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: start comment with the name of the field..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
common/aws/cli.go
Outdated
// This framework utilizes a pool of workers to help upload/download files. A non-zero value for this parameter | ||
// adds a number of workers equal to the number of cores times this value. Default is 8. In general, the number | ||
// of workers here can be a lot larger than the number of cores because the workers will be blocked on I/O most | ||
// of the time. | ||
FragmentParallelismFactor int | ||
// This framework utilizes a pool of workers to help upload/download files. A non-zero value for this parameter | ||
// adds a constant number of workers. Default is 0. | ||
FragmentParallelismConstant int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use a single field to explicitly define the number of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is based on @dmanc's request to scale the number of workers based on the number of cores on the machine. The pattern here allows for the user to specify either a fixed number of threads or a number that varies with the number of cores.
If you think this is over complicated I'm willing to go back to having a constant number of worker threads.
|
||
for _, fragment := range fragments { | ||
fragmentCapture := fragment | ||
s.pool.Submit(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pool is shared across all ongoing uploads and downloads. Which means when there are many uploads/downloads in flight, it can build backpressure. Is that problematic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My previous iteration had a config parameter that allowed the client to be have a configurable sized work queue (i.e. when I was using my own implementation of workerpool
. Unfortunately, the workerpool
library does not allow us to override the channel used to send data to the workers.
From the implementation:
workerQueue: make(chan func())
By default, channels have a buffer size of 0. This means that if the number of read/write tasks exceed the number of available workers, the caller will block until all tasks are accepted by a worker. This will provide back pressure if more work is scheduled than there are workers to handle that work.
Are you ok with the way this is configured by default? If not, we will probably need our own implementation of workerpool
.
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
This reverts commit aca3040.
Why are these changes needed?
This PR adds utilities for uploading and downloading files from S3 in a way that supports breaking files into smaller ones.
Checks