Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/jaegerreceiver] Remove jaeger remote sampling from receiver #12940

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ const (
protoThriftCompact = "thrift_compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
defaultAgentRemoteSamplingHTTPEndpoint = "0.0.0.0:5778"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have been clearer on the original issue: we should remove this, but we need to first deprecate for at least a couple of versions. So, at this point, we need to only detect whether this is being used and log a info/warn-level message, stating that this port will disappear by version X (0.60.0 is the earliest at this point, I believe).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am addressing this in #13408 and closing the pr until we finally remove the endpoint.

defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
)

// NewFactory creates a new Jaeger receiver factory.
Expand Down Expand Up @@ -92,7 +91,6 @@ func createTracesReceiver(
// Error handling for the conversion is done in the Validate function from the Config object itself.

rCfg := cfg.(*Config)
remoteSamplingConfig := rCfg.RemoteSampling

var config configuration
// Set ports
Expand All @@ -112,24 +110,6 @@ func createTracesReceiver(
config.AgentCompactThrift = *rCfg.ThriftCompact
}

if remoteSamplingConfig != nil {
config.RemoteSamplingClientSettings = remoteSamplingConfig.GRPCClientSettings
if config.RemoteSamplingClientSettings.Endpoint == "" {
config.RemoteSamplingClientSettings.Endpoint = defaultGRPCBindEndpoint
}

config.AgentHTTPEndpoint = remoteSamplingConfig.HostEndpoint
if config.AgentHTTPEndpoint == "" {
config.AgentHTTPEndpoint = defaultAgentRemoteSamplingHTTPEndpoint
}

// strategies are served over grpc so if grpc is not enabled and strategies are present return an error
if len(remoteSamplingConfig.StrategyFile) != 0 {
config.RemoteSamplingStrategyFile = remoteSamplingConfig.StrategyFile
config.RemoteSamplingStrategyFileReloadInterval = remoteSamplingConfig.StrategyFileReloadInterval
}
}

// Create the receiver.
return newJaegerReceiver(rCfg.ID(), &config, nextConsumer, set), nil
}
69 changes: 0 additions & 69 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,72 +187,3 @@ func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, defaultThriftCompactBindEndpoint, r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default")
}

func TestDefaultAgentRemoteSamplingEndpointAndPort(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols.ThriftCompact = &ProtocolUDP{
Endpoint: defaultThriftCompactBindEndpoint,
}
rCfg.RemoteSampling = &RemoteSamplingConfig{}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, defaultGRPCBindEndpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
}

func TestAgentRemoteSamplingEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

endpoint := "localhost:1234"
rCfg.Protocols.ThriftCompact = &ProtocolUDP{
Endpoint: defaultThriftCompactBindEndpoint,
}
rCfg.RemoteSampling = &RemoteSamplingConfig{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: endpoint,
},
}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
}

func TestRemoteSamplingConfigPropagation(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

hostEndpoint := "localhost:5778"
endpoint := "localhost:1234"
strategyFile := "strategies.json"
rCfg.Protocols.GRPC = &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
}
rCfg.RemoteSampling = &RemoteSamplingConfig{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: endpoint,
},
HostEndpoint: hostEndpoint,
StrategyFile: strategyFile,
}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, hostEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be configured value")
assert.Equal(t, strategyFile, r.(*jReceiver).config.RemoteSamplingStrategyFile)
}
26 changes: 4 additions & 22 deletions receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -141,20 +139,14 @@ func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.Samplin
}

func TestJaegerHTTP(t *testing.T) {
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
defer s.GracefulStop()

endpoint := testutil.GetAvailableLocalAddress(t)
config := &configuration{
AgentHTTPEndpoint: endpoint,
RemoteSamplingClientSettings: configgrpc.GRPCClientSettings{
Endpoint: addr.String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
},
}
set := componenttest.NewNopReceiverCreateSettings()
jr := newJaegerReceiver(jaegerAgent, config, nil, set)
Expand All @@ -175,20 +167,10 @@ func TestJaegerHTTP(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

resp, err = http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

resp, err = http.Get(fmt.Sprintf("http://%s/baggageRestrictions?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
return
}
t.Fail()
}

func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {
Expand Down
77 changes: 19 additions & 58 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@ import (
"mime"
"net/http"
"sync"
"time"

apacheThrift "github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
collectorSampling "github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
staticStrategyStore "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/agent"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
Expand All @@ -51,7 +47,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"

jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
Expand All @@ -63,12 +58,9 @@ type configuration struct {
CollectorHTTPSettings confighttp.HTTPServerSettings
CollectorGRPCServerSettings configgrpc.GRPCServerSettings

AgentCompactThrift ProtocolUDP
AgentBinaryThrift ProtocolUDP
AgentHTTPEndpoint string
RemoteSamplingClientSettings configgrpc.GRPCClientSettings
RemoteSamplingStrategyFile string
RemoteSamplingStrategyFileReloadInterval time.Duration
AgentCompactThrift ProtocolUDP
AgentBinaryThrift ProtocolUDP
AgentHTTPEndpoint string
}

// Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
Expand All @@ -82,9 +74,8 @@ type jReceiver struct {
grpc *grpc.Server
collectorServer *http.Server

agentSamplingManager *jSamplingConfig.SamplingManager
agentProcessors []processors.Processor
agentServer *http.Server
agentProcessors []processors.Processor
agentServer *http.Server

goroutines sync.WaitGroup

Expand Down Expand Up @@ -183,7 +174,19 @@ func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.T

var _ agent.Agent = (*agentHandler)(nil)
var _ api_v2.CollectorServiceServer = (*jReceiver)(nil)
var _ configmanager.ClientConfigManager = (*jReceiver)(nil)
var _ configmanager.ClientConfigManager = (*notImplementedConfigManager)(nil)

var errNotImplemented = fmt.Errorf("not implemented")

type notImplementedConfigManager struct{}

func (notImplementedConfigManager) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errNotImplemented
}

func (notImplementedConfigManager) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, errNotImplemented
}

type agentHandler struct {
nextConsumer consumer.Traces
Expand All @@ -204,21 +207,6 @@ func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error
return err
}

func (jr *jReceiver) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return jr.agentSamplingManager.GetSamplingStrategy(ctx, serviceName)
}

func (jr *jReceiver) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
br, err := jr.agentSamplingManager.GetBaggageRestrictions(ctx, serviceName)
if err != nil {
// Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373
// As of today, GetBaggageRestrictions() always returns an error.
// However, we `return nil, nil` here in order to serve a valid `200 OK` response.
return nil, nil
}
return br, nil
}

func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
ctx = jr.grpcObsrecv.StartTracesOp(ctx)

Expand Down Expand Up @@ -283,25 +271,8 @@ func (jr *jReceiver) startAgent(host component.Host) error {
}(processor)
}

// Start upstream grpc client before serving sampling endpoints over HTTP
if jr.config.RemoteSamplingClientSettings.Endpoint != "" {
grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions(host, jr.settings.TelemetrySettings)
if err != nil {
jr.settings.Logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err))
return err
}
jr.config.RemoteSamplingClientSettings.SanitizedEndpoint()
conn, err := grpc.Dial(jr.config.RemoteSamplingClientSettings.Endpoint, grpcOpts...)
if err != nil {
jr.settings.Logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", jr.config.RemoteSamplingClientSettings.Endpoint))
return err
}

jr.agentSamplingManager = jSamplingConfig.NewConfigManager(conn)
}

if jr.config.AgentHTTPEndpoint != "" {
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, jr, metrics.NullFactory, jr.settings.Logger)
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, &notImplementedConfigManager{}, metrics.NullFactory, jr.settings.Logger)

jr.goroutines.Add(1)
go func() {
Expand Down Expand Up @@ -434,16 +405,6 @@ func (jr *jReceiver) startCollector(host component.Host) error {

api_v2.RegisterCollectorServiceServer(jr.grpc, jr)

// init and register sampling strategy store
ss, err := staticStrategyStore.NewStrategyStore(staticStrategyStore.Options{
StrategiesFile: jr.config.RemoteSamplingStrategyFile,
ReloadInterval: jr.config.RemoteSamplingStrategyFileReloadInterval,
}, jr.settings.Logger)
if err != nil {
return fmt.Errorf("failed to create collector strategy store: %w", err)
}
api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))

jr.goroutines.Add(1)
go func() {
defer jr.goroutines.Done()
Expand Down
Loading