-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial kinesis + tower sink #93
Conversation
src/sinks/kinesis.rs
Outdated
.downcast_ref::<PutRecordsError>() | ||
.unwrap() | ||
{ | ||
PutRecordsError::ProvisionedThroughputExceeded(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only retry on ProvisionedThroughput here but we probably want to backoff and delay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, lots to unpack here. Overall, the kinesis service looks good, just had a few smaller comments and questions there. For the rest of it, I'll try to split it up a bit:
Retry policy
The Policy
system seems neat, but this doesn't feel ready to go yet. If we're going to ignore all the errors other than PutRecordsError
, we should limit the scope of the policy to only operate over that service. But really it feels like there are a lot of other potential errors to deal with and we should at least stub out actually dealing with them. I imagine things like timeouts won't be terribly uncommon.
BatchSink
This is a lot of generic code that's only used in one place. I'd much prefer we start with something simpler and work up from there.
More specifically, this seam doesn't feel right. Service<Vec<B::Item>>
is not really what we want for sinks like S3. I don't think we have a good idea yet for the best way to split up and compose batching, request building, retries, etc, so I'd rather spend a little time looking at all our sinks and figuring that out than starting with something big and generic.
Maybe instead we start with the absolute minimum thing that lets us use a Service
as a Sink
.
Tests
We should probably read the data back and make sure it all got there 😉
src/sinks/kinesis.rs
Outdated
let policy = RetryPolicy { attempts: 5 }; | ||
|
||
let service = Timeout::new(service, Duration::from_secs(5)); | ||
let service = Buffer::new(service, 1).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this buffer doing for us? It seems like we'd have our own buffer impl, the batch sink, and then this buffer inside of it. I just want to be clear on what they're all doing.
src/sinks/kinesis.rs
Outdated
.source() | ||
.unwrap() | ||
.downcast_ref::<PutRecordsError>() | ||
.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing and the unwraps don't make me very confident. Is there a way we can rework this to avoid them? If we know what type the error is going to be, it'd be nice to encode that and let the type system do its thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few things to follow up on, but in general I think it's looking reasonable.
tests/kinesis.rs
Outdated
.block_on(fetch_records(STREAM_NAME.into(), timestamp)) | ||
.unwrap(); | ||
|
||
assert_eq!(records.len(), 11); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have input_records
and output_records
and assert they're the same? We do that in most other sink tests.
self.batcher.push(item.into()); | ||
|
||
if self.batcher.len() > self.size { | ||
self.poll_complete()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I feel like start_send
should only poll_complete
when it has to (i.e. lazily).
continue; | ||
} | ||
Ok(Async::NotReady) => return Ok(Async::NotReady), | ||
Err(err) => panic!("Error sending request: {:?}", err), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want to panic here? Or just log the error and move on?
tests/kinesis.rs
Outdated
.block_on(futures::lazy(|| { | ||
future::ok::<_, ()>(KinesisService::new(config)) | ||
})) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't all this just be KinesisService::new(config)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, now it can.
})) | ||
.unwrap(); | ||
|
||
let timestamp = chrono::Utc::now().timestamp_millis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's inline this where it's actually used. No reason for it to be way up here.
LOG-2734: A test exists to verify there are no ring dependencies
Adds our initial
aws_kinesis_data_stream
sink.External refs: fluent/fluent-bit#2485