Skip to content

Commit

Permalink
Change default OTLP/gRPC port number (#2104)
Browse files Browse the repository at this point in the history
This implements specification change open-telemetry/opentelemetry-specification#1221

To make transition to new port numbers less painful OTLP receiver will
also accept data on the legacy port numbers when it is configured to
use the default endpoint. Users who use the default Collector config
can continue sending data to the legacy ports and have a graceful period
to update their senders to start sending to the new ports.

Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC.
There is separate work in progress to use one port for both.
  • Loading branch information
tigrannajaryan authored Nov 19, 2020
1 parent d885cb2 commit 29859f2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 48 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
- Add config settings for component telemetry (#2148)
- Use net.SplitHostPort for IPv6 support in `prometheus` receiver (#2154)
- Add --log-format command line option (default to "console") #2177.
- Change default OTLP/gRPC port number to 4317, also continue receiving on legacy port
55680 during transition period (#2104).

## 🧰 Bug fixes 🧰

Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ receivers:
The following settings are configurable:
- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is
going to receive data. The valid syntax is described at
https://github.com/grpc/grpc/blob/master/doc/naming.md.
- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:55681 http protocol):
host:port to which the receiver is going to receive data. The valid syntax is
described at https://github.com/grpc/grpc/blob/master/doc/naming.md.

## Advanced Configuration

Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
ReadBufferSize: 512 * 1024,
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
MaxRecvMsgSizeMiB: 32,
Expand All @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
TLSSetting: &configtls.TLSServerSetting{
Expand Down
25 changes: 15 additions & 10 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -37,6 +38,10 @@ const (
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:55681"
legacyGRPCEndpoint = "0.0.0.0:55680"
)

func NewFactory() component.ReceiverFactory {
Expand All @@ -59,14 +64,14 @@ func createDefaultConfig() configmodels.Receiver {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: defaultGRPCEndpoint,
Transport: "tcp",
},
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: defaultHTTPEndpoint,
},
},
}
Expand Down Expand Up @@ -117,11 +122,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
// CreateTracesReceiver creates a trace receiver based on provided config.
func createTraceReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TracesConsumer,
) (component.TracesReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -134,11 +139,11 @@ func createTraceReceiver(
// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -151,11 +156,11 @@ func createMetricsReceiver(
// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.LogsConsumer,
) (component.LogsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -165,7 +170,7 @@ func createLogReceiver(
return r, nil
}

func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
Expand All @@ -176,7 +181,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
if !ok {
var err error
// We don't have a receiver, so create one.
receiver, err = newOtlpReceiver(rCfg)
receiver, err = newOtlpReceiver(rCfg, logger)
if err != nil {
return nil, err
}
Expand Down
102 changes: 72 additions & 30 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"

gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
Expand All @@ -49,14 +51,17 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once

logger *zap.Logger
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
cfg: cfg,
logger: logger,
}
if cfg.GRPC != nil {
opts, err := cfg.GRPC.ToServerOption()
Expand Down Expand Up @@ -84,6 +89,70 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
return r, nil
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)
var gln net.Listener
gln, err := cfg.ToListener()
if err != nil {
return err
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
return nil
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
return err
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
return nil
}

func (r *otlpReceiver) startProtocolServers(host component.Host) error {
var err error
if r.cfg.GRPC != nil {
err = r.startGRPCServer(r.cfg.GRPC, host)
if err != nil {
return err
}
if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint {
r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint)

// Copy the config.
cfgLegacyGRPC := r.cfg.GRPC
// And use the legacy endpoint.
cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint
err = r.startGRPCServer(cfgLegacyGRPC, host)
if err != nil {
return err
}
}
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return err
}
}

return err
}

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
Expand All @@ -93,34 +162,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {

var err error
r.startServerOnce.Do(func() {
if r.cfg.GRPC != nil {
var gln net.Listener
gln, err = r.cfg.GRPC.ToListener()
if err != nil {
return
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
var hln net.Listener
hln, err = r.cfg.HTTP.ToListener()
if err != nil {
return
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
}
err = r.startProtocolServers(host)
})
return err
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) {
}

// TLS is resolved during Creation of the receiver for GRPC.
_, err := createReceiver(cfg)
_, err := createReceiver(cfg, zap.NewNop())
assert.EqualError(t, err,
`failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)
}
Expand Down Expand Up @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer,
}

func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, zap.NewNop())
require.NoError(t, err)
if tc != nil {
params := component.ReceiverCreateParams{}
Expand Down

0 comments on commit 29859f2

Please sign in to comment.