diff --git a/.chloggen/awss3receiver_notifications.yaml b/.chloggen/awss3receiver_notifications.yaml new file mode 100644 index 000000000000..bab4d623b6b3 --- /dev/null +++ b/.chloggen/awss3receiver_notifications.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Add support for monitoring the progress of ingesting data from an S3 bucket via OpAMP custom messages.' + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30750] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index 402cb8882bf4..8af1ec924267 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -29,6 +29,8 @@ The following exporter configuration parameters are supported. | `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional | | `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional | | `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional | +| `notifications:` | | | | +| `opamp` | Name of the OpAMP Extension to use to send notifications of ingest progress. | | | ### Time format for `starttime` and `endtime` The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data. @@ -46,4 +48,23 @@ receivers: s3_bucket: "mybucket" s3_prefix: "trace" s3_partition: "minute" -``` \ No newline at end of file +``` + +## Notifications +The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of +"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus". +The format of the notifications is a JSON object with the following fields: + +| Field | Description | +|:------------------|:--------------------------------------------------------------------------------| +| `telemetry_type` | The type of telemetry being ingested. One of "traces", "metrics", or "logs". | +| `ingest_status` | The status of the data ingestion. One of "ingesting", "failed", or "completed". | +| `start_time` | The time to start retrieving data in RFC3339 format. | +| `stop_time` | The time to stop retrieving data in RFC3339 format. | +| `ingest_time` | The time of the data currently being ingested in RFC3339 format. | +| `failure_message` | Error message if `ingest_status` is "failed". | + +The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time. +If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with +the time of the data being ingested when the failure occurred. +If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent. \ No newline at end of file diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 60c954acc7d2..2564eaf96f14 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -26,11 +26,16 @@ type S3DownloaderConfig struct { S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` } +type Notifications struct { + OpAMP *component.ID `mapstructure:"opamp"` +} + // Config defines the configuration for the file receiver. type Config struct { - S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"` - StartTime string `mapstructure:"starttime"` - EndTime string `mapstructure:"endtime"` + S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"` + StartTime string `mapstructure:"starttime"` + EndTime string `mapstructure:"endtime"` + Notifications Notifications `mapstructure:"notifications"` } const ( diff --git a/receiver/awss3receiver/examples/notifications/Makefile b/receiver/awss3receiver/examples/notifications/Makefile new file mode 100644 index 000000000000..a51dd235cf47 --- /dev/null +++ b/receiver/awss3receiver/examples/notifications/Makefile @@ -0,0 +1 @@ +include ../../../../Makefile.Common \ No newline at end of file diff --git a/receiver/awss3receiver/examples/notifications/go.mod b/receiver/awss3receiver/examples/notifications/go.mod new file mode 100644 index 000000000000..82a6be1092f3 --- /dev/null +++ b/receiver/awss3receiver/examples/notifications/go.mod @@ -0,0 +1,11 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver/examples/notifications + +go 1.21.0 + +require github.com/open-telemetry/opamp-go v0.14.0 + +require ( + github.com/gorilla/websocket v1.5.1 // indirect + golang.org/x/net v0.17.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/receiver/awss3receiver/examples/notifications/go.sum b/receiver/awss3receiver/examples/notifications/go.sum new file mode 100644 index 000000000000..c5a911258636 --- /dev/null +++ b/receiver/awss3receiver/examples/notifications/go.sum @@ -0,0 +1,20 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/open-telemetry/opamp-go v0.14.0 h1:KoziIK+wsFojhUXNTkCSTnCPf0eCMqFAaccOs0HrWIY= +github.com/open-telemetry/opamp-go v0.14.0/go.mod h1:XOGCigljsLSTZ8FfLwvat0M1QDj3conIIgRa77BWrKs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/awss3receiver/examples/notifications/main.go b/receiver/awss3receiver/examples/notifications/main.go new file mode 100644 index 000000000000..08fbe184acd6 --- /dev/null +++ b/receiver/awss3receiver/examples/notifications/main.go @@ -0,0 +1,134 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "time" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + "github.com/open-telemetry/opamp-go/server/types" +) + +const ( + openTelemetryCollectorReceiverAWSS3 = "org.opentelemetry.collector.receiver.awss3" +) + +type TimeBasedIngestStatus struct { + TelemetryType string `json:"telemetry_type"` + IngestStatus string `json:"ingest_status"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + IngestTime time.Time `json:"ingest_time"` + FailureMessage string `json:"failure_message,omitempty"` +} + +type ProgressServer struct { + server server.OpAMPServer +} + +func main() { + progressServer := newProgressServer() + err := progressServer.Start() + if err != nil { + panic(err) + } + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + <-interrupt + + progressServer.Stop() +} + +func newProgressServer() *ProgressServer { + return &ProgressServer{ + server: server.New(nil), + } +} + +func (p *ProgressServer) Start() error { + return p.server.Start(server.StartSettings{ + Settings: server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: p.onConnecting, + }, + + EnableCompression: false, + CustomCapabilities: []string{openTelemetryCollectorReceiverAWSS3}, + }, + ListenEndpoint: "localhost:8080", + ListenPath: "", + TLSConfig: nil, + HTTPMiddleware: nil, + }) +} + +func (p *ProgressServer) Stop() { + p.server.Stop(context.Background()) +} + +func (p *ProgressServer) onConnecting(request *http.Request) types.ConnectionResponse { + fmt.Println("OnConnecting") + return types.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: server.ConnectionCallbacksStruct{ + OnMessageFunc: p.onMessage, + }, + } +} + +func (p *ProgressServer) onMessage(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + response := &protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + } + + if message.CustomCapabilities != nil { + capabilities := make([]string, 0) + for _, capability := range message.CustomCapabilities.Capabilities { + if capability == "org.opentelemetry.collector.receiver.awss3" { + + capabilities = append(capabilities, capability) + } + } + if len(capabilities) == 0 { + fmt.Println("🛑 - Agent does not support AWS S3 receiver progress") + } else { + fmt.Println("✅ - Agent supports AWS S3 receiver progress") + } + + response.CustomCapabilities = &protobufs.CustomCapabilities{ + Capabilities: capabilities, + } + } + + if message.CustomMessage != nil && message.CustomMessage.Capability == openTelemetryCollectorReceiverAWSS3 { + if message.CustomMessage.Type == "TimeBasedIngestStatus" { + status := &TimeBasedIngestStatus{} + err := json.Unmarshal(message.CustomMessage.Data, status) + if err != nil { + fmt.Println("💣 - Error unmarshalling custom message data", err) + } else { + switch status.IngestStatus { + case "complete": + fmt.Println("🎉 - Ingest complete") + case "failed": + fmt.Println("🚨 - Ingest failed:", status.FailureMessage) + case "ingesting": + done := status.IngestTime.Sub(status.StartTime) + left := status.EndTime.Sub(status.IngestTime) + fmt.Printf("🚀 - Ingesting %s done, %s left (current %s)\n", done, left, status.IngestTime) + } + } + } + } + + return response +} diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index 9e4c2b4425ad..393f107489b7 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -7,6 +7,8 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.13 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.17 github.com/aws/aws-sdk-go-v2/service/s3 v1.53.2 + github.com/open-telemetry/opamp-go v0.14.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.0.0-20240513080536-a133a8efefbe github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.1-0.20240509190532-c555005fcc80 go.opentelemetry.io/collector/confmap v0.100.1-0.20240509190532-c555005fcc80 @@ -72,3 +74,5 @@ require ( google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 794d99f4a3df..adb71af5a199 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -84,6 +84,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opamp-go v0.14.0 h1:KoziIK+wsFojhUXNTkCSTnCPf0eCMqFAaccOs0HrWIY= +github.com/open-telemetry/opamp-go v0.14.0/go.mod h1:XOGCigljsLSTZ8FfLwvat0M1QDj3conIIgRa77BWrKs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= diff --git a/receiver/awss3receiver/internal/metadata/generated_telemetry_test.go b/receiver/awss3receiver/internal/metadata/generated_telemetry_test.go index 4cfb246d99d5..b0b86b8a3abb 100644 --- a/receiver/awss3receiver/internal/metadata/generated_telemetry_test.go +++ b/receiver/awss3receiver/internal/metadata/generated_telemetry_test.go @@ -6,14 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" ) type mockMeter struct { diff --git a/receiver/awss3receiver/notifications.go b/receiver/awss3receiver/notifications.go new file mode 100644 index 000000000000..1798f545f366 --- /dev/null +++ b/receiver/awss3receiver/notifications.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/open-telemetry/opamp-go/client/types" + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" +) + +const ( + IngestStatusCompleted = "completed" + IngestStatusFailed = "failed" + IngestStatusIngesting = "ingesting" + CustomCapability = "org.opentelemetry.collector.receiver.awss3" +) + +type StatusNotification struct { + TelemetryType string `json:"telemetry_type"` + IngestStatus string `json:"ingest_status"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + IngestTime time.Time `json:"ingest_time"` + FailureMessage string `json:"failure_message,omitempty"` +} + +type statusNotifier interface { + Start(ctx context.Context, host component.Host) error + Shutdown(ctx context.Context) error + SendStatus(ctx context.Context, message StatusNotification) +} + +type opampNotifier struct { + opampExtensionID component.ID + handler opampcustommessages.CustomCapabilityHandler +} + +func newNotifier(config *Config) statusNotifier { + if config.Notifications.OpAMP != nil { + return &opampNotifier{opampExtensionID: *config.Notifications.OpAMP} + } + return nil +} + +func (n *opampNotifier) Start(_ context.Context, host component.Host) error { + ext, ok := host.GetExtensions()[n.opampExtensionID] + if !ok { + return fmt.Errorf("extension %q does not exist", n.opampExtensionID) + } + + registry, ok := ext.(opampcustommessages.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not a custom message registry", n.opampExtensionID) + } + + handler, err := registry.Register(CustomCapability) + if err != nil { + return fmt.Errorf("failed to register custom capability: %w", err) + } + n.handler = handler + return nil +} + +func (n *opampNotifier) Shutdown(_ context.Context) error { + n.handler.Unregister() + return nil +} + +func (n *opampNotifier) SendStatus(_ context.Context, message StatusNotification) { + bytes, err := json.Marshal(message) + if err != nil { + return + } + sendingChan, err := n.handler.SendMessage("TimeBasedIngestStatus", bytes) + switch { + case err == nil: + break + case errors.Is(err, types.ErrCustomMessagePending): + <-sendingChan + default: + return + } +} diff --git a/receiver/awss3receiver/notifications_test.go b/receiver/awss3receiver/notifications_test.go new file mode 100644 index 000000000000..4a31da15e523 --- /dev/null +++ b/receiver/awss3receiver/notifications_test.go @@ -0,0 +1,186 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + + "github.com/open-telemetry/opamp-go/client/types" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" +) + +type mockCustomCapabilityRegistry struct { + component.Component + + shouldFailRegister bool + shouldReturnPending bool + + pendingChannel chan struct{} + unregisterCalled bool + sentMessages []customMessage +} + +type customMessage struct { + messageType string + message []byte +} + +type hostWithExtensions struct { + extension *mockCustomCapabilityRegistry +} + +func (h hostWithExtensions) Start(context.Context, component.Host) error { + panic("unsupported") +} + +func (h hostWithExtensions) Shutdown(context.Context) error { + panic("unsupported") +} + +func (h hostWithExtensions) GetFactory(_ component.Kind, _ component.Type) component.Factory { + panic("unsupported") +} + +func (h hostWithExtensions) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + component.MustNewID("foo"): h.extension, + } +} + +func (h hostWithExtensions) GetExporters() map[component.DataType]map[component.ID]component.Component { + panic("unsupported") +} + +func (m *mockCustomCapabilityRegistry) Register(_ string, _ ...opampcustommessages.CustomCapabilityRegisterOption) (handler opampcustommessages.CustomCapabilityHandler, err error) { + if m.shouldFailRegister { + return nil, fmt.Errorf("register failed") + } + return m, nil +} + +func (m *mockCustomCapabilityRegistry) Message() <-chan *protobufs.CustomMessage { + panic("unsupported") +} + +func (m *mockCustomCapabilityRegistry) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { + if m.unregisterCalled { + return nil, fmt.Errorf("unregister called") + } + m.sentMessages = append(m.sentMessages, customMessage{messageType: messageType, message: message}) + if m.shouldReturnPending { + return m.pendingChannel, types.ErrCustomMessagePending + } + return nil, nil +} + +func (m *mockCustomCapabilityRegistry) Unregister() { + m.unregisterCalled = true +} + +func Test_opampNotifier_Start(t *testing.T) { + id := component.MustNewID("foo") + + tests := []struct { + name string + host component.Host + wantErr bool + }{ + { + name: "success", + host: hostWithExtensions{ + extension: &mockCustomCapabilityRegistry{}, + }, + wantErr: false, + }, + { + name: "extension not found", + host: componenttest.NewNopHost(), + wantErr: true, + }, + { + name: "register failed", + host: hostWithExtensions{ + extension: &mockCustomCapabilityRegistry{ + shouldFailRegister: true, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + notifier := &opampNotifier{opampExtensionID: id} + err := notifier.Start(context.Background(), tt.host) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func Test_opampNotifier_Shutdown(t *testing.T) { + registry := mockCustomCapabilityRegistry{} + notifier := &opampNotifier{handler: ®istry} + err := notifier.Shutdown(context.Background()) + require.NoError(t, err) + require.True(t, registry.unregisterCalled) +} + +func Test_opampNotifier_SendStatus(t *testing.T) { + registry := mockCustomCapabilityRegistry{} + notifier := &opampNotifier{handler: ®istry} + toSend := StatusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: time.Time{}, + } + notifier.SendStatus(context.Background(), toSend) + require.Len(t, registry.sentMessages, 1) + require.Equal(t, "TimeBasedIngestStatus", registry.sentMessages[0].messageType) + got := StatusNotification{} + err := json.Unmarshal(registry.sentMessages[0].message, &got) + require.NoError(t, err) + require.Equal(t, toSend, got) +} + +func Test_opampNotifier_SendStatus_MessagePending(t *testing.T) { + registry := mockCustomCapabilityRegistry{ + shouldReturnPending: true, + pendingChannel: make(chan struct{}), + } + notifier := &opampNotifier{handler: ®istry} + toSend := StatusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: time.Time{}, + } + + var completionTime time.Time + now := time.Now() + go func() { + time.Sleep(10 * time.Millisecond) + registry.pendingChannel <- struct{}{} + }() + notifier.SendStatus(context.Background(), toSend) + completionTime = time.Now() + require.True(t, completionTime.After(now)) + require.Len(t, registry.sentMessages, 1) + require.Equal(t, "TimeBasedIngestStatus", registry.sentMessages[0].messageType) + got := StatusNotification{} + err := json.Unmarshal(registry.sentMessages[0].message, &got) + require.NoError(t, err) + require.Equal(t, toSend, got) +} diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index 06dc1ea7d678..71cff16115a9 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -21,10 +21,12 @@ type awss3TraceReceiver struct { consumer consumer.Traces logger *zap.Logger cancel context.CancelFunc + notifier statusNotifier } func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3TraceReceiver, error) { - reader, err := newS3Reader(ctx, cfg) + notifier := newNotifier(cfg) + reader, err := newS3Reader(ctx, notifier, cfg) if err != nil { return nil, err } @@ -33,19 +35,31 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra consumer: traces, logger: logger, cancel: nil, + notifier: notifier, }, nil } -func (r *awss3TraceReceiver) Start(_ context.Context, _ component.Host) error { - var ctx context.Context - ctx, r.cancel = context.WithCancel(context.Background()) +func (r *awss3TraceReceiver) Start(ctx context.Context, host component.Host) error { + if r.notifier != nil { + if err := r.notifier.Start(ctx, host); err != nil { + return err + } + } + var ingestCtx context.Context + ingestCtx, r.cancel = context.WithCancel(context.Background()) go func() { - _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) + _ = r.s3Reader.readAll(ingestCtx, "traces", r.receiveBytes) }() return nil } -func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { +func (r *awss3TraceReceiver) Shutdown(ctx context.Context) error { + if r.notifier != nil { + if err := r.notifier.Shutdown(ctx); err != nil { + return err + } + } + if r.cancel != nil { r.cancel() } diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index 1733cdbee5d7..55cf2336ee85 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -22,11 +22,12 @@ type s3Reader struct { filePrefix string startTime time.Time endTime time.Time + notifier statusNotifier } type s3ReaderDataCallback func(context.Context, string, []byte) error -func newS3Reader(ctx context.Context, cfg *Config) (*s3Reader, error) { +func newS3Reader(ctx context.Context, notifier statusNotifier, cfg *Config) (*s3Reader, error) { listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader) if err != nil { return nil, err @@ -52,9 +53,11 @@ func newS3Reader(ctx context.Context, cfg *Config) (*s3Reader, error) { s3Partition: cfg.S3Downloader.S3Partition, startTime: startTime, endTime: endTime, + notifier: notifier, }, nil } +//nolint:golint,unparam func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dataCallback s3ReaderDataCallback) error { var timeStep time.Duration if s3Reader.s3Partition == "hour" { @@ -64,15 +67,53 @@ func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dat } for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(timeStep) { + if s3Reader.notifier != nil { + s3Reader.notifier.SendStatus(ctx, StatusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusIngesting, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + }) + } + select { case <-ctx.Done(): + if s3Reader.notifier != nil { + s3Reader.notifier.SendStatus(ctx, StatusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusCompleted, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + }) + } return nil default: if err := s3Reader.readTelemetryForTime(ctx, currentTime, telemetryType, dataCallback); err != nil { + if s3Reader.notifier != nil { + s3Reader.notifier.SendStatus(ctx, StatusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusFailed, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + FailureMessage: err.Error(), + }) + } return err } } } + if s3Reader.notifier != nil { + s3Reader.notifier.SendStatus(ctx, StatusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusCompleted, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: s3Reader.endTime, + }) + } return nil } diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index 7b329735cc62..9d497fb80d16 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" ) var testTime = time.Date(2021, 02, 01, 17, 32, 00, 00, time.UTC) @@ -310,6 +311,20 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { require.Error(t, err) } +type mockNotifier struct { + messages []StatusNotification +} + +func (m *mockNotifier) Start(_ context.Context, _ component.Host) error { + return nil +} +func (m *mockNotifier) Shutdown(_ context.Context) error { + return nil +} +func (m *mockNotifier) SendStatus(_ context.Context, notification StatusNotification) { + m.messages = append(m.messages, notification) +} + func Test_readAll(t *testing.T) { reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { @@ -356,7 +371,77 @@ func Test_readAll(t *testing.T) { require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") } +func Test_readAll_StatusMessages(t *testing.T) { + notifier := mockNotifier{} + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + key := fmt.Sprintf("%s%s", *params.Prefix, "1") + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &key, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + s3Partition: "minute", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), + notifier: ¬ifier, + } + + dataCallbackKeys := make([]string, 0) + + err := reader.readAll(context.Background(), "traces", func(_ context.Context, key string, data []byte) error { + t.Helper() + require.Equal(t, "this is the body of the object", string(data)) + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.NoError(t, err) + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=32/traces_1") + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") + require.Equal(t, []StatusNotification{ + { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime.Add(time.Minute), + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusCompleted, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime.Add(time.Minute * 2), + }, + }, notifier.messages) +} + func Test_readAll_ContextDone(t *testing.T) { + notifier := mockNotifier{} reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { t.Helper() @@ -387,6 +472,7 @@ func Test_readAll_ContextDone(t *testing.T) { filePrefix: "", startTime: testTime, endTime: testTime.Add(time.Minute * 2), + notifier: ¬ifier, } dataCallbackKeys := make([]string, 0) @@ -399,4 +485,19 @@ func Test_readAll_ContextDone(t *testing.T) { }) require.NoError(t, err) require.Len(t, dataCallbackKeys, 0) + require.Equal(t, []StatusNotification{ + { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusCompleted, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + }, + }, notifier.messages) }