diff --git a/firehose/firehose.go b/firehose/firehose.go index f2578cf..e122b2b 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -65,16 +65,12 @@ type OutputPlugin struct { } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) { - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(region), - }) +func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) { + client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint) if err != nil { return nil, err } - client := newPutRecordBatcher(roleARN, sess, endpoint) - records := make([]*firehose.Record, 0, maximumRecordsPerPut) timer, err := plugins.NewTimeout(func(d time.Duration) { @@ -112,21 +108,30 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe }, nil } -func newPutRecordBatcher(roleARN string, sess *session.Session, endpoint string) *firehose.Firehose { - svcConfig := &aws.Config{} - if endpoint != "" { - defaultResolver := endpoints.DefaultResolver() - cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { - if service == "firehose" { - return endpoints.ResolvedEndpoint{ - URL: endpoint, - }, nil - } - return defaultResolver.EndpointFor(service, region, optFns...) +func newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint string) (*firehose.Firehose, error) { + customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + if service == endpoints.FirehoseServiceID && firehoseEndpoint != "" { + return endpoints.ResolvedEndpoint{ + URL: firehoseEndpoint, + }, nil + } else if service == endpoints.StsServiceID && stsEndpoint != "" { + return endpoints.ResolvedEndpoint{ + URL: stsEndpoint, + }, nil } - svcConfig.EndpointResolver = endpoints.ResolverFunc(cwCustomResolverFn) + return endpoints.DefaultResolver().EndpointFor(service, region, optFns...) + } + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region), + EndpointResolver: endpoints.ResolverFunc(customResolverFn), + CredentialsChainVerboseErrors: aws.Bool(true), + }) + if err != nil { + return nil, err } + svcConfig := &aws.Config{} if roleARN != "" { creds := stscreds.NewCredentials(sess, roleARN) svcConfig.Credentials = creds @@ -134,7 +139,7 @@ func newPutRecordBatcher(roleARN string, sess *session.Session, endpoint string) client := firehose.New(sess, svcConfig) client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler()) - return client + return client, nil } // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index e5ae2dc..df74670 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -66,8 +66,10 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin logrus.Infof("[firehose %d] plugin parameter data_keys = '%s'\n", pluginID, dataKeys) roleARN := output.FLBPluginConfigKey(ctx, "role_arn") logrus.Infof("[firehose %d] plugin parameter role_arn = '%s'\n", pluginID, roleARN) - endpoint := output.FLBPluginConfigKey(ctx, "endpoint") - logrus.Infof("[firehose %d] plugin parameter endpoint = '%s'\n", pluginID, endpoint) + firehoseEndpoint := output.FLBPluginConfigKey(ctx, "endpoint") + logrus.Infof("[firehose %d] plugin parameter endpoint = '%s'\n", pluginID, firehoseEndpoint) + stsEndpoint := output.FLBPluginConfigKey(ctx, "sts_endpoint") + logrus.Infof("[firehose %d] plugin parameter sts_endpoint = '%s'\n", pluginID, stsEndpoint) timeKey := output.FLBPluginConfigKey(ctx, "time_key") logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") @@ -77,7 +79,7 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID) } - return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeKeyFmt, pluginID) + return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, pluginID) } //export FLBPluginInit