Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sts_endpoint param to firehose plugin #31

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,12 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint)
if err != nil {
return nil, err
}

client := newPutRecordBatcher(roleARN, sess, endpoint)

records := make([]*firehose.Record, 0, maximumRecordsPerPut)

timer, err := plugins.NewTimeout(func(d time.Duration) {
Expand Down Expand Up @@ -112,29 +108,38 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe
}, nil
}

func newPutRecordBatcher(roleARN string, sess *session.Session, endpoint string) *firehose.Firehose {
svcConfig := &aws.Config{}
if endpoint != "" {
defaultResolver := endpoints.DefaultResolver()
cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == "firehose" {
return endpoints.ResolvedEndpoint{
URL: endpoint,
}, nil
}
return defaultResolver.EndpointFor(service, region, optFns...)
func newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint string) (*firehose.Firehose, error) {
customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == endpoints.FirehoseServiceID && firehoseEndpoint != "" {
return endpoints.ResolvedEndpoint{
URL: firehoseEndpoint,
}, nil
} else if service == endpoints.StsServiceID && stsEndpoint != "" {
return endpoints.ResolvedEndpoint{
URL: stsEndpoint,
}, nil
}
svcConfig.EndpointResolver = endpoints.ResolverFunc(cwCustomResolverFn)
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
EndpointResolver: endpoints.ResolverFunc(customResolverFn),
CredentialsChainVerboseErrors: aws.Bool(true),
})
if err != nil {
return nil, err
}

svcConfig := &aws.Config{}
if roleARN != "" {
creds := stscreds.NewCredentials(sess, roleARN)
svcConfig.Credentials = creds
}

client := firehose.New(sess, svcConfig)
client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler())
return client
return client, nil
}

// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full
Expand Down
8 changes: 5 additions & 3 deletions fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
logrus.Infof("[firehose %d] plugin parameter data_keys = '%s'\n", pluginID, dataKeys)
roleARN := output.FLBPluginConfigKey(ctx, "role_arn")
logrus.Infof("[firehose %d] plugin parameter role_arn = '%s'\n", pluginID, roleARN)
endpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[firehose %d] plugin parameter endpoint = '%s'\n", pluginID, endpoint)
firehoseEndpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[firehose %d] plugin parameter endpoint = '%s'\n", pluginID, firehoseEndpoint)
stsEndpoint := output.FLBPluginConfigKey(ctx, "sts_endpoint")
logrus.Infof("[firehose %d] plugin parameter sts_endpoint = '%s'\n", pluginID, stsEndpoint)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
Expand All @@ -77,7 +79,7 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID)
}

return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeKeyFmt, pluginID)
return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, pluginID)
}

//export FLBPluginInit
Expand Down