Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Report event_count metric in Broker Ingress for invalid events
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraRoatis committed Oct 5, 2020
1 parent 6bdb0a0 commit 59c5a8b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
7 changes: 4 additions & 3 deletions pkg/broker/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re
httpStatus = nethttp.StatusRequestEntityTooLarge
}
nethttp.Error(response, err.Error(), httpStatus)
h.reportMetrics(ctx, "_invalid_cloud_event_", httpStatus)
return
}

Expand All @@ -186,7 +187,7 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re
statusCode := nethttp.StatusAccepted
ctx, cancel := context.WithTimeout(ctx, decoupleSinkTimeout)
defer cancel()
defer func() { h.reportMetrics(ctx, event, statusCode) }()
defer func() { h.reportMetrics(ctx, event.Type(), statusCode) }()
if res := h.decouple.Send(ctx, broker, *event); !cev2.IsACK(res) {
logging.FromContext(ctx).Error("Error publishing to PubSub", zap.Error(res))
statusCode = nethttp.StatusInternalServerError
Expand Down Expand Up @@ -230,9 +231,9 @@ func (h *Handler) toEvent(ctx context.Context, request *nethttp.Request) (*cev2.
return event, nil
}

func (h *Handler) reportMetrics(ctx context.Context, event *cev2.Event, statusCode int) {
func (h *Handler) reportMetrics(ctx context.Context, eventType string, statusCode int) {
args := metrics.IngressReportArgs{
EventType: event.Type(),
EventType: eventType,
ResponseCode: statusCode,
}
if err := h.reporter.ReportEventCount(ctx, args); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/broker/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ func TestHandler(t *testing.T) {
contentLength: ptr.Int64(-1),
wantCode: nethttp.StatusRequestEntityTooLarge,
timeout: 10 * time.Second,
wantEventCount: 1,
wantMetricTags: map[string]string{
metricskey.LabelEventType: "_invalid_cloud_event_",
metricskey.LabelResponseCode: "413",
metricskey.LabelResponseCodeClass: "4xx",
metricskey.PodName: pod,
metricskey.ContainerName: container,
},
},
{
name: "malformed path",
Expand All @@ -203,6 +211,14 @@ func TestHandler(t *testing.T) {
path: "/ns1/broker1",
wantCode: nethttp.StatusBadRequest,
header: nethttp.Header{},
wantEventCount: 1,
wantMetricTags: map[string]string{
metricskey.LabelEventType: "_invalid_cloud_event_",
metricskey.LabelResponseCode: "400",
metricskey.LabelResponseCodeClass: "4xx",
metricskey.PodName: pod,
metricskey.ContainerName: container,
},
},
{
name: "wrong path - broker doesn't exist in given namespace",
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
allowedEventTypes = map[string]struct{}{
"e2e-dummy-event-type": {},
"e2e-testing-resp-event-type-dummy": {},
"_invalid_cloud_event_": {},
}
)

Expand Down
3 changes: 3 additions & 0 deletions pkg/metrics/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func TestEventTypeMetricValue(t *testing.T) {
{"some.custom.event", "custom"},
{"", "custom"},

// Used to mark invalid cloud events
{"_invalid_cloud_event_", "_invalid_cloud_event_"},

// Used in E2E tests
{"e2e-dummy-event-type", "e2e-dummy-event-type"},
{"e2e-testing-resp-event-type-dummy", "e2e-testing-resp-event-type-dummy"},
Expand Down

0 comments on commit 59c5a8b

Please sign in to comment.