-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GetRecords hangs #301
Comments
At first glance I think there might be an issue with the I know this can sometimes cause unexpected results and might be the cause of the expired iterators for i := 0; i < 3; i++ {
go func() {
fmt.Println("Got", i)
}()
}
// outputs
// Got 3
// Got 3
// Got 3 http://play.golang.org/p/eqW58QXz5x In a situation like this a new variable needs to be used within the loop. for i := 0; i < 3; i++ {
myIdx := i
go func() {
fmt.Println("Got", myIdx)
}()
} http://play.golang.org/p/E9e7g6NxMy I'm not sure about the |
Sorry for taking a while to get back to you. Each worker "own" a shard and as such aren't sharing anything like iterators or the likes. I'm fairly confident that the two issues are the one and same (eg. a GetRecords request taking very long to finish causing either the http request being reset by peer or actually finishing but taking longer to do so than the 5 minutes which is the maximum allowed age of an iterator). It is running on a m3.medium instance in EC2. It has no problems keeping up with the load (<20% CPU usage, plenty of memory left). I've tried a few different instance types (including much larger instance types as well) but the problem persists. |
Hi @fantyz have you been able to reproduce this issue with a simple single goroutine test app? Also I'd be curious your SDK client is retrying the failed connection multiple times with exponential backoff before exhausting all retry changes. This might shed some light in what is going on. I would suggest enable SDK logging. Using the svc := kinesis.New(&aws.Config{LogLevel: 1})
//... use the service. |
Hi @fantyz With our latest changes to svc := kinesis.New(aws.NewConfig().WithLogLevel(aws.LogDebug))
// ... use the service. |
Thanks- I hope to find time to investigate soon. I'll be back! :) On Thu, Aug 6, 2015 at 1:03 AM, Jason Del Ponte [email protected]
Best regards, Lead Backend Developer SYBO Games ApS |
Hi @fantyz did you get a chance to gather log outputs when you were encountering this issue? I'm going to close this issue, please reopen if you are still encountering this problem. |
Yea, we still have it. I've just not had the chance yet to dig further into it. I'll be back :) |
@jasdel I don't have permissions to reopen the issue. I'm using I've had a chance to spend some time on this now. I've made a very simple Kinesis reader and had it running for a short while. As it is a transient problem its not easy to catch it happening. The closest has been a 30 second delay. As shown below the actual delay is happening after the response from the Kinesis API has been received..? Any ideas how to proceed from here?
I modified func logResponse(r *request.Request) {
var msg = "no response data"
if r.HTTPResponse.StatusCode == 200 {
// Temporary hack to avoid outputting the body of a succesful GetRecords
msg = "<skipping body>"
} else if r.HTTPResponse != nil {
logBody := r.Config.LogLevel.Matches(aws.LogDebugWithHTTPBody)
dumpedBody, _ := httputil.DumpResponse(r.HTTPResponse, logBody)
msg = string(dumpedBody)
} else if r.Error != nil {
msg = r.Error.Error()
}
r.Config.Logger.Log(fmt.Sprintf(logRespMsg, r.ClientInfo.ServiceName, r.Operation.Name, msg))
} My kinesis reader application looks like this: package main
import (
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
// "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
/*
var kc = kinesis.New(session.New(aws.NewConfig().
WithRegion("us-east-1").
WithLogLevel(aws.LogDebugWithRequestErrors | aws.LogDebugWithHTTPBody).
WithCredentials(credentials.NewSharedCredentials("", "prod")),
))
*/
var kc = kinesis.New(session.New(aws.NewConfig().WithRegion("us-east-1").WithLogLevel(aws.LogDebugWithRequestErrors | aws.LogDebugWithHTTPBody)))
func GetKinesisTrimHorizonIterator(streamName, shardId string) (string, error) {
getShardIteratorInput := &kinesis.GetShardIteratorInput{
StreamName: &streamName,
ShardId: &shardId,
ShardIteratorType: aws.String("TRIM_HORIZON"),
//ShardIteratorType: aws.String("LATEST"),
}
resp, err := kc.GetShardIterator(getShardIteratorInput)
if err != nil {
return "", fmt.Errorf("Failed to get kinesis shard iterator (TRIM_HORIZON) (streamName=%v, shardId=%v, err=%v)", streamName, shardId, err)
}
return *resp.ShardIterator, nil
}
func GetKinesisRecords(iterator string) ([]*kinesis.Record, string, error) {
getRecordsInput := &kinesis.GetRecordsInput{
ShardIterator: &iterator,
}
resp, err := kc.GetRecords(getRecordsInput)
if err != nil {
return nil, "", fmt.Errorf("Failed to get records from kinesis (iterator=%v, err=%v)", iterator, err)
}
return resp.Records, *resp.NextShardIterator, nil
}
func main() {
minReadDuration := 1 * time.Second
streamName := "<stream>"
shardId := "<shard>"
outer:
for {
log.Printf("[DEBUGGER] Getting kinesis iterator")
iterator, err := GetKinesisTrimHorizonIterator(streamName, shardId)
if err != nil {
log.Printf("ERROR [DEBUGGER] Unable to get kinesis shard iterator: %v", err)
return
}
log.Printf("[DEBUGGER] Got iterator %v", iterator)
var ts_start, ts_last, lag time.Time
var records []*kinesis.Record
ts_last = time.Now()
for {
// get records
ts_start = time.Now()
records, iterator, err = GetKinesisRecords(iterator)
if err != nil {
log.Printf("[DEBUGGER] Unable to get data (requestTime=%v, lastIterationRequestTime=%v, err=%v)", time.Since(ts_start), ts_start.Sub(ts_last), err)
continue outer
}
ts_last = ts_start
if len(records) > 0 {
lag = *(records[len(records)-1]).ApproximateArrivalTimestamp
}
took := time.Since(ts_start)
log.Printf("[DEBUGGER] Read %v records from Kinesis (took=%v, lag=%v)", len(records), took, lag)
if took > 1*time.Minute {
log.Printf("[DEBUGGER] Request took more than 1 minute!")
}
sleepDuration := minReadDuration - took
if sleepDuration > 0 {
log.Printf("[DEBUGGER] Sleeping for %v", sleepDuration)
time.Sleep(sleepDuration)
}
}
}
} |
I left the debug application running over night. It contains a handful of examples of the
|
Hi thanks for the updated information @fantyz, and the sample and output logs. The section that is marked as I'll try to reproduce this on my end. Are you using Kinesis Streams or Firehose? In addition about how many records do you have in your stream, and about how large are the records? |
@jasdel yes, but it is the minor hack I did to the SDK shown in my reply above. It's putting the The I'm using Kinesis streams. Our records vary a lot in size average of around 10kb but does have cases of records of 1 mb (analytics coming in from clients that potentially could have been offline for a while batching up a lot of analytics). Currently I think the throughput is around 10-15 mb/minute total on this system. |
Thanks for the update and additional info @fantyz. Updating this issue to a bug, because the SDK should at least provide more information about the error causing the retry. I'll work to see if I can reproduce this on my end using a similar setup to what you described. Copying from our gitter discussion: {"__type":"ProvisionedThroughputExceededException","message":"Rate exceeded for shard shardId-000000000003 in stream brim-prod-analytics under account xxx."}
When your code receives a The delay you're seeing with GetRecords may be related to a Provisioned Throughput set too load for your use case. |
@jasdel these are two unrelated issues though! This issue is about the seemingly random hang I'm getting on The other issue you list popped up while trying to debug the first one. I think a new issue is in its place for that one! :) |
In the case of the eventual successful request that was delayed, was retry logging enabled? If not I'm thinking that the request were just being throttled and retried multiple times. The alternative would be that Kinesis is holding the connection open. and delaying a response. But this delay would be limited by the Go default HTTP timeouts of about a minute. If you can provide me with a request ID I can forward this on to the Kinesis service team. In addition it might be helpful to ask on the Kinesis AWS Forums, since others may of experienced issues similar to this. |
According to the log timestamps the SDK have the full body at hand and is able to print it out (although I bypassed that to avoid flooding the logs with data) before the delay incurs. I don't think it has anything to do with the actual connection? You can see the jump from 17:03:41 to 17:15:46 is after getting the response.
|
Retrying logging was not explicitly enabled, but does it have to be when request logging is? I've seen many examples clearly being able to see the retries from the requests being logged with the current settings- the individual requests are shown (including the failing ones). The above example only have one single request that succeeds, but the SDK does not return from the function call for a significant amount of time afterwards. How do I find the request id? I'll dig it out tomorrow then, although I'm not sure what the Kinesis service team can do as the GetRecords request is responded to in a timely manner (as shown by the logs)? It looks to me like it is printing the body of the succesful GetRecords API request around 12 minutes before returning the function call or am I missing something obvious here? |
The base The debug responses you're seeing do not include the HTTP body, and LogDebugWithHTTPBody is not set as the log level, correct? The first thing that comes to mind is that the SDK is waiting for the response to download from the Kinesis endpoint. The GetRecords call won't be able to return until the full response body is downloaded and parsed. Are you able to see the The request id should be visible in the logged response's HTTP Headers. With the request ID I can work with the Kinesis team to hopefully investigate why the request is taking so long to download. In addition how many shards is your stream set for? |
I'm looking into reproduce the delay your seeing but not yet able to. I created a scripts to put data to Kinesis, and retrieve it. In my tests the retrieves are taking about 3s on average. I haven't seen the delay yet, but will let it run for a while. |
Hi @fantyz I let my test run overnight, but no luck reproducing the issue. The longest delay I found was 22s. |
It varies a lot how often it happens. Not more than a couple of times a day normally though. However I updated my debugger with getting the status code and content length as well as printing the size of the records earlier today. I think its been running for ~8 hours and have one occurrence: As shown, the jump in time happens after the printing of the response but before the function returns.
|
Thanks for the update @fantyz I've forward this information on the the Kinesis team to get their insight. One thing that we could try is to add a custom I've updated the kinesisStream.go gist example including this extra debug. It adds detailed logging to the GetRecords request and adds a DebugReadCloser wrapping the HTTP responses's Body that will log each block of bytes that are read from the HTTP body stream. This will produce output like the following. All non-read times are relative to the time the GetRecords time was started at.
Its a lot of extra information in the log, but it might help identify where things are getting hung up at. |
I used the following to find long running requests: AWS_REGION=<region> go run kinesisStream.go <stream> > /tmp/kinesisStream.out grep "records from shard" /tmp/kinesisStream.out | awk '{print $10}' | go run durSort.go
|
I've done slight modifications to get your version to run in our system. You can see the source here. Modifications done:
To be continued once it hopefully catches the issue. |
Caught one!
|
Thanks for the update @fantyz .The 16k read taking 10 min, and other multiple min delays reading from the network are surprising. I've included the request ID in my reach out to the Kinesis team. This should help investigating this issue further on their end. |
Thanks, let me know what they come up with! In the mean while I'm implementing client side retries on the |
Good idea, I think this is the best workaround for the issue. I will get back to you when Kinesis has any additional information. I don't think there is very much we can do from the SDK side beyond the work you're doing to add retries in . |
Hi @fantyz I heard back from the service team. For the request ID you provided the service finished processing and responding to the the request quickly. This suggests the latency is being introduced client side or on the route between the server and the client. In addition Kinesis agreed using timeouts and retry logic for the rare cases that networking issues cause latency. |
Hi we've merged in PR ##1166 which adds retrying of connection reset errors discovered during unmarshaling response body, and read timeouts for Kinesis API Get operations. The read timeouts should prevent your application hanging for significant amounts of time. |
Hi,
I'm having some problems with my application that is reading records from Kinesis. It seems to be randomly hanging on the GetRecords request until either Kinesis kills the connection or the subsequent GetRecords will fail due to an expired iterator.
I have a stream with four shards. Total incoming data is around 500 kb/s. I have an application that starts a worker per shard- each worker is split into 5 parts, each communicating with the next over its own channel.
The
KinesisReader
is very straight forward:If it does not read any records it will go to sleep for 10 seconds. It will kill the worker on any errors and start a new one (hence it has failed 165 times in the last 12 hours).
The three other workers happily continue reading records while this is happening.
This results in the following log examples:
The second one is by far the most frequent example.
Am I doing something wrong here? Any suggestions as to what might be wrong?
The text was updated successfully, but these errors were encountered: