Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Kafka Otel instrumentation #129

Merged
merged 3 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-cantabular-csv-exporter/event"
"github.com/ONSdigital/dp-cantabular-csv-exporter/schema"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/log.go/v2/log"
)

Expand Down Expand Up @@ -66,7 +66,7 @@ func main() {
// Send bytes to Output channel, after calling Initialise just in case it is not initialised.
// Wait for producer to be initialised
<-kafkaProducer.Channels().Initialised
kafkaProducer.Channels().Output <- bytes
kafkaProducer.Channels().Output <- kafka.BytesMessage{Value: bytes, Context: ctx}
}
}

Expand Down
2 changes: 1 addition & 1 deletion features/steps/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-cantabular-csv-exporter/service"
componenttest "github.com/ONSdigital/dp-component-test"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/log.go/v2/log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand Down
3 changes: 2 additions & 1 deletion features/steps/steps.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package steps

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -296,7 +297,7 @@ func (c *Component) thisExportStartEventIsQueued(input *godog.DocString) error {
log.Info(c.ctx, "event to send for testing: ", log.Data{
"event": testEvent,
})
if err := c.producer.Send(schema.ExportStart, testEvent); err != nil {
if err := c.producer.Send(context.Background(), schema.ExportStart, testEvent); err != nil {
return fmt.Errorf("failed to send event for testing: %w", err)
}
return nil
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/ONSdigital/dp-assistdog v0.0.1
github.com/ONSdigital/dp-component-test v0.9.2
github.com/ONSdigital/dp-healthcheck v1.6.1
github.com/ONSdigital/dp-kafka/v3 v3.10.0
github.com/ONSdigital/dp-kafka/v4 v4.0.0
github.com/ONSdigital/dp-net/v2 v2.9.1
github.com/ONSdigital/dp-otel-go v0.0.6
github.com/ONSdigital/dp-s3/v2 v2.0.0-beta.5
Expand Down Expand Up @@ -101,6 +101,7 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.mongodb.org/mongo-driver v1.11.4 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 // indirect
go.opentelemetry.io/contrib/propagators/autoprop v0.45.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.21.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.20.0 // indirect
Expand All @@ -114,16 +115,16 @@ require (
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
)
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/ONSdigital/dp-component-test v0.9.2 h1:Ua+YOA+K9n1SY8CTCHnXmxLXivToz2
github.com/ONSdigital/dp-component-test v0.9.2/go.mod h1:u/XsI2ldXAHy5lTRaao/WTVlN/r8dQEIiHPmz0B+9FE=
github.com/ONSdigital/dp-healthcheck v1.6.1 h1:YDAnxE2fI3G2hhGC42mKI/fRhAhIYmFZGQwQ/8M65M0=
github.com/ONSdigital/dp-healthcheck v1.6.1/go.mod h1:FURB2RUJHw3lssamKtsGsrbu31ar9yhMSDYzG9vgSIo=
github.com/ONSdigital/dp-kafka/v3 v3.10.0 h1:ScfhAwH4X9L4vaavh0YR3ECHpztP0hDL4RCiBKDqghA=
github.com/ONSdigital/dp-kafka/v3 v3.10.0/go.mod h1:o5/dgPOv9tFjL+Vf6ke5yS68uFD40AE0mfjUxHQ/B/o=
github.com/ONSdigital/dp-kafka/v4 v4.0.0 h1:qskIqgiV+JQOeSgaCa52b9Jdx+ymHBijYZb/NVTYHAo=
github.com/ONSdigital/dp-kafka/v4 v4.0.0/go.mod h1:s6qWDRUgGagnPJl5+fzzxQqr2ji+z8eBxU6MKxr7Rl0=
github.com/ONSdigital/dp-mongodb-in-memory v1.6.0 h1:kJN/oY+Gu7XZ+ZRx9WseA8nWhy+728iGebn9sF8v9VY=
github.com/ONSdigital/dp-mongodb-in-memory v1.6.0/go.mod h1:rSsF/w/frWb/wmaj9eMwE8mJGdxvYgpRCcp2A+RK8wE=
github.com/ONSdigital/dp-net/v2 v2.9.1 h1:2hGa0ArL0m2pMT9cFxMPcSOgvJ0zstH3NxWeykU9Elg=
Expand Down Expand Up @@ -387,6 +387,8 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 h1:/RxdhdIi0HrKSzdWHLjureinjnGL5YQEYevaC/EAg1k=
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0/go.mod h1:BKzh9a9EE+vHuq99EwD2cEa+T+Ts1fQ6W3ovO80mjkY=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1 h1:Ifzy1lucGMQJh6wPRxusde8bWaDhYjSNOqDyn6Hb4TM=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1/go.mod h1:YfFNem80G9UZ/mL5zd5GGXZSy95eXK+RhzIWBkLjLSc=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
Expand Down Expand Up @@ -429,8 +431,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -501,8 +503,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -586,8 +588,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
8 changes: 4 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-cantabular-csv-exporter/event"
"github.com/ONSdigital/dp-cantabular-csv-exporter/schema"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/log.go/v2/log"

"github.com/aws/aws-sdk-go/service/s3/s3manager"
Expand Down Expand Up @@ -134,7 +134,7 @@ func (h *InstanceComplete) Handle(ctx context.Context, _ int, msg kafka.Message)
log.Info(ctx, "producing event")

f := strings.Replace(filename, "datasets/", "", 1)
if err := h.ProduceExportCompleteEvent(e, rowCount, f); err != nil {
if err := h.ProduceExportCompleteEvent(ctx, e, rowCount, f); err != nil {
return fmt.Errorf("failed to produce export complete kafka message: %w", err)
}

Expand Down Expand Up @@ -437,8 +437,8 @@ func (h *InstanceComplete) UpdateInstance(ctx context.Context, e *event.ExportSt
}

// ProduceExportCompleteEvent sends the final kafka message signifying the export complete
func (h *InstanceComplete) ProduceExportCompleteEvent(e *event.ExportStart, rowCount int32, fileName string) error {
if err := h.producer.Send(schema.CSVCreated, &event.CSVCreated{
func (h *InstanceComplete) ProduceExportCompleteEvent(ctx context.Context, e *event.ExportStart, rowCount int32, fileName string) error {
if err := h.producer.Send(ctx, schema.CSVCreated, &event.CSVCreated{
InstanceID: e.InstanceID,
DatasetID: e.DatasetID,
Edition: e.Edition,
Expand Down
8 changes: 4 additions & 4 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/handler"
"github.com/ONSdigital/dp-cantabular-csv-exporter/handler/mock"
"github.com/ONSdigital/dp-cantabular-csv-exporter/schema"
kafka "github.com/ONSdigital/dp-kafka/v3"
"github.com/ONSdigital/dp-kafka/v3/kafkatest"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/dp-kafka/v4/kafkatest"
"github.com/ONSdigital/log.go/v2/log"
)

Expand Down Expand Up @@ -690,7 +690,7 @@ func TestProduceExportCompleteEvent(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := eventHandler.ProduceExportCompleteEvent(testExportStartEvent, testRowCount, "")
err := eventHandler.ProduceExportCompleteEvent(ctx, testExportStartEvent, testRowCount, "")
c.So(err, ShouldBeNil)
}()

Expand Down Expand Up @@ -719,7 +719,7 @@ func TestProduceExportCompleteEvent(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := eventHandler.ProduceExportCompleteEvent(testExportStartEvent, testRowCount, "")
err := eventHandler.ProduceExportCompleteEvent(ctx, testExportStartEvent, testRowCount, "")
c.So(err, ShouldBeNil)
}()

Expand Down
2 changes: 1 addition & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package schema

import (
"github.com/ONSdigital/dp-kafka/v3/avro"
"github.com/ONSdigital/dp-kafka/v4/avro"
)

// Filter ID could be null, but opted for empty string representing part of publishing journey.the avro unmarshal lib does not support pointers to strings
Expand Down
2 changes: 1 addition & 1 deletion service/initialise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-cantabular-csv-exporter/generator"
"github.com/ONSdigital/dp-healthcheck/healthcheck"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
dphttp "github.com/ONSdigital/dp-net/v2/http"
dps3 "github.com/ONSdigital/dp-s3/v2"
vault "github.com/ONSdigital/dp-vault"
Expand Down
2 changes: 1 addition & 1 deletion service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ONSdigital/dp-api-clients-go/v2/filter"
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-healthcheck/healthcheck"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/aws/aws-sdk-go/aws/session"

"github.com/aws/aws-sdk-go/service/s3"
Expand Down
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/config"
"github.com/ONSdigital/dp-cantabular-csv-exporter/handler"
"github.com/ONSdigital/dp-healthcheck/healthcheck"
kafka "github.com/ONSdigital/dp-kafka/v3"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/log.go/v2/log"

"github.com/gorilla/mux"
Expand Down
4 changes: 2 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/ONSdigital/dp-cantabular-csv-exporter/service"
serviceMock "github.com/ONSdigital/dp-cantabular-csv-exporter/service/mock"
"github.com/ONSdigital/dp-healthcheck/healthcheck"
kafka "github.com/ONSdigital/dp-kafka/v3"
"github.com/ONSdigital/dp-kafka/v3/kafkatest"
kafka "github.com/ONSdigital/dp-kafka/v4"
"github.com/ONSdigital/dp-kafka/v4/kafkatest"

. "github.com/smartystreets/goconvey/convey"
)
Expand Down