Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][receiver/loki] follow receiver contract #35327

Merged
merged 13 commits into from
Oct 18, 2024
25 changes: 25 additions & 0 deletions internal/coreinternal/errorutil/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package errorutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"

import (
"go.opentelemetry.io/collector/consumer/consumererror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func GrpcError(err error) error {
s, ok := status.FromError(err)
if !ok {
// Default to a retryable error
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
code := codes.Unavailable
if consumererror.IsPermanent(err) {
// non-retryable error
code = codes.Unknown
}
s = status.New(code, err.Error())
}
return s.Err()
}
23 changes: 23 additions & 0 deletions internal/coreinternal/errorutil/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package errorutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"

import (
"net/http"

"go.opentelemetry.io/collector/consumer/consumererror"
)

func HttpError(w http.ResponseWriter, err error) {
if err == nil {
return
}
// non-retryable status
status := http.StatusBadRequest
if !consumererror.IsPermanent(err) {
// retryable status
status = http.StatusServiceUnavailable
}
http.Error(w, err.Error(), status)
}
2 changes: 1 addition & 1 deletion internal/coreinternal/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/text v0.18.0
google.golang.org/grpc v1.66.2
)

require (
Expand Down Expand Up @@ -85,7 +86,6 @@ require (
golang.org/x/sys v0.25.0 // indirect
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.66.2 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
2 changes: 1 addition & 1 deletion receiver/lokireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/grafana/loki/pkg/push v0.0.0-20240514112848-a1b1eeb09583
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.109.0
Expand Down
12 changes: 11 additions & 1 deletion receiver/lokireceiver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal"
)
Expand Down Expand Up @@ -157,12 +160,15 @@ func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest)
logs, err := loki.PushRequestToLogs(pushRequest, r.conf.KeepTimestamp)
if err != nil {
r.settings.Logger.Warn(ErrAtLeastOneEntryFailedToProcess, zap.Error(err))
return &push.PushResponse{}, err
return &push.PushResponse{}, status.New(codes.InvalidArgument, err.Error()).Err()
}
ctx = r.obsrepGRPC.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err)
if err != nil {
return &push.PushResponse{}, errorutil.GrpcError(err)
}
return &push.PushResponse{}, nil
}

Expand Down Expand Up @@ -219,6 +225,10 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) {
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err)
if err != nil {
errorutil.HttpError(resp, err)
return
}

resp.WriteHeader(http.StatusNoContent)
}
76 changes: 76 additions & 0 deletions receiver/lokireceiver/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"compress/gzip"
"compress/zlib"
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -23,6 +24,7 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -362,6 +364,80 @@ func TestSendingPushRequestToGRPCEndpoint(t *testing.T) {
}
}

func TestExpectedStatus(t *testing.T) {

testcases := []struct {
name string
err error
expectedGrpcError string
expectedHttpError string
}{
{
name: "permanent-error",
err: consumererror.NewPermanent(errors.New("permanent")),
expectedGrpcError: "rpc error: code = Internal desc = Permanent error: permanent",
expectedHttpError: "failed to upload logs; HTTP status code: 400",
},
{
name: "non-permanent-error",
err: errors.New("non-permanent"),
expectedGrpcError: "rpc error: code = Unavailable desc = non-permanent",
expectedHttpError: "failed to upload logs; HTTP status code: 503",
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
httpAddr := testutil.GetAvailableLocalAddress(t)
config := &Config{
Protocols: Protocols{
GRPC: &configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
},
},
HTTP: &confighttp.ServerConfig{
Endpoint: httpAddr,
},
},
KeepTimestamp: true,
}

consumer := consumertest.NewErr(tt.err)
lr, err := newLokiReceiver(config, consumer, receivertest.NewNopSettings())
require.NoError(t, err)

require.NoError(t, lr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, lr.Shutdown(context.Background())) })
conn, err := grpc.NewClient(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
grpcClient := push.NewPusherClient(conn)

body := &push.PushRequest{
Streams: []push.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 1676888496000000000),
Line: "logline 1",
},
},
},
},
}

_, err = grpcClient.Push(context.Background(), body)
require.EqualError(t, err, tt.expectedGrpcError)

_, port, _ := net.SplitHostPort(httpAddr)
collectorAddr := fmt.Sprintf("http://localhost:%s/loki/api/v1/push", port)
require.EqualError(t, sendToCollector(collectorAddr, "application/json", "", []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ]]}]}`)), tt.expectedHttpError)
})
}
}

type Log struct {
Timestamp int64
Body pcommon.Value
Expand Down
Loading