diff --git a/Makefile b/Makefile index d64abadca0c..43a92b94505 100644 --- a/Makefile +++ b/Makefile @@ -87,10 +87,6 @@ vet: install-tools: go install golang.org/x/lint/golint -.PHONY: collector -collector: - GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/occollector_$(GOOS) $(BUILD_INFO) ./cmd/occollector - .PHONY: otelsvc otelsvc: GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/otelsvc_$(GOOS) $(BUILD_INFO) ./cmd/otelsvc @@ -113,7 +109,7 @@ docker-otelsvc: COMPONENT=otelsvc $(MAKE) docker-component .PHONY: binaries -binaries: collector otelsvc +binaries: otelsvc .PHONY: binaries-all-sys binaries-all-sys: diff --git a/cmd/occollector/Dockerfile b/cmd/occollector/Dockerfile deleted file mode 100644 index cbba7f949b1..00000000000 --- a/cmd/occollector/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM alpine:latest as certs -RUN apk --update add ca-certificates - -FROM scratch -COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt -COPY occollector_linux / -ENTRYPOINT ["/occollector_linux"] -EXPOSE 55678 diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go deleted file mode 100644 index 741ef5a7dd8..00000000000 --- a/cmd/occollector/app/builder/builder.go +++ /dev/null @@ -1,258 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "flag" - "fmt" - "strings" - "time" - - "github.com/open-telemetry/opentelemetry-service/internal/config" - "github.com/spf13/viper" -) - -const ( - receiversRoot = "receivers" - jaegerEntry = "jaeger" - opencensusEntry = "opencensus" - zipkinEntry = "zipkin" - zipkinScribeEntry = "zipkin-scribe" - - // flags - configCfg = "config" - jaegerReceiverFlg = "receive-jaeger" - ocReceiverFlg = "receive-oc-trace" - zipkinReceiverFlg = "receive-zipkin" - zipkinScribeReceiverFlg = "receive-zipkin-scribe" - loggingExporterFlg = "logging-exporter" - useTailSamplingAlwaysSample = "tail-sampling-always-sample" - memBallastFlag = "mem-ballast-size-mib" -) - -// Flags adds flags related to basic building of the collector application to the given flagset. -func Flags(flags *flag.FlagSet) { - flags.String(configCfg, "", "Path to the config file") - flags.Bool(jaegerReceiverFlg, false, - fmt.Sprintf("Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: %+v", *NewDefaultJaegerReceiverCfg())) - flags.Bool(ocReceiverFlg, true, - fmt.Sprintf("Flag to run the OpenCensus trace receiver, default settings: %+v", *NewDefaultOpenCensusReceiverCfg())) - flags.Bool(zipkinReceiverFlg, false, - fmt.Sprintf("Flag to run the Zipkin receiver, default settings: %+v", *NewDefaultZipkinReceiverCfg())) - flags.Bool(zipkinScribeReceiverFlg, false, - fmt.Sprintf("Flag to run the Zipkin Scribe receiver, default settings: %+v", *NewDefaultZipkinScribeReceiverCfg())) - flags.Bool(loggingExporterFlg, false, "Flag to add a logging exporter (combine with log level DEBUG to log incoming spans)") - flags.Bool(useTailSamplingAlwaysSample, false, "Flag to use a tail-based sampling processor with an always sample policy, "+ - "unless tail sampling setting is present on configuration file.") - flags.Uint(memBallastFlag, 0, - fmt.Sprintf("Flag to specify size of memory (MiB) ballast to set. Ballast is not used when this is not specified. "+ - "default settings: 0")) -} - -// GetConfigFile gets the config file from the config file flag. -func GetConfigFile(v *viper.Viper) string { - return v.GetString(configCfg) -} - -// LoggingExporterEnabled returns true if the debug processor is enabled, and false otherwise -func LoggingExporterEnabled(v *viper.Viper) bool { - return v.GetBool(loggingExporterFlg) -} - -// DebugTailSamplingEnabled returns true if the debug processor is enabled, and false otherwise -func DebugTailSamplingEnabled(v *viper.Viper) bool { - return v.GetBool(useTailSamplingAlwaysSample) -} - -// MemBallastSize returns the size of memory ballast to use in MBs -func MemBallastSize(v *viper.Viper) int { - return v.GetInt(memBallastFlag) -} - -// JaegerReceiverCfg holds configuration for Jaeger receivers. -type JaegerReceiverCfg struct { - // ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests - ThriftTChannelPort int `mapstructure:"jaeger-thrift-tchannel-port"` - // ThriftHTTPPort is the port that the relay receives on for jaeger thrift http requests - ThriftHTTPPort int `mapstructure:"jaeger-thrift-http-port"` -} - -// JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment -// variable, or configuration file. -func JaegerReceiverEnabled(v *viper.Viper) bool { - return featureEnabled(v, jaegerReceiverFlg, receiversRoot, jaegerEntry) -} - -// NewDefaultJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values -func NewDefaultJaegerReceiverCfg() *JaegerReceiverCfg { - opts := &JaegerReceiverCfg{ - ThriftTChannelPort: 14267, - ThriftHTTPPort: 14268, - } - return opts -} - -// InitFromViper returns a JaegerReceiverCfg according to the configuration. -func (cfg *JaegerReceiverCfg) InitFromViper(v *viper.Viper) (*JaegerReceiverCfg, error) { - return cfg, initFromViper(cfg, v, receiversRoot, jaegerEntry) -} - -// OpenCensusReceiverCfg holds configuration for OpenCensus receiver. -type OpenCensusReceiverCfg struct { - // Port is the port that the receiver will use - Port int `mapstructure:"port"` - - // TLSCredentials is a (cert_file, key_file) configuration. - TLSCredentials *config.TLSCredentials `mapstructure:"tls_credentials"` - - // Keepalive anchor for all the settings related to keepalive. - Keepalive *serverParametersAndEnforcementPolicy `mapstructure:"keepalive,omitempty"` - - // MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server. - MaxRecvMsgSizeMiB uint64 `mapstructure:"max-recv-msg-size-mib"` - - // MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport. - MaxConcurrentStreams uint32 `mapstructure:"max-concurrent-streams"` -} - -type serverParametersAndEnforcementPolicy struct { - ServerParameters *keepaliveServerParameters `mapstructure:"server-parameters,omitempty"` - EnforcementPolicy *keepaliveEnforcementPolicy `mapstructure:"enforcement-policy,omitempty"` -} - -// keepaliveServerParameters allow configuration of the keepalive.ServerParameters. -// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details. -type keepaliveServerParameters struct { - MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle,omitempty"` - MaxConnectionAge time.Duration `mapstructure:"max-connection-age,omitempty"` - MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace,omitempty"` - Time time.Duration `mapstructure:"time,omitempty"` - Timeout time.Duration `mapstructure:"timeout,omitempty"` -} - -// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy. -// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details. -type keepaliveEnforcementPolicy struct { - MinTime time.Duration `mapstructure:"min-time,omitempty"` - PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"` -} - -// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment -// variable, or configuration file. -func OpenCensusReceiverEnabled(v *viper.Viper) bool { - return featureEnabled(v, ocReceiverFlg, receiversRoot, opencensusEntry) -} - -// NewDefaultOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values -func NewDefaultOpenCensusReceiverCfg() *OpenCensusReceiverCfg { - opts := &OpenCensusReceiverCfg{ - Port: 55678, - } - return opts -} - -// InitFromViper returns a OpenCensusReceiverCfg according to the configuration. -func (cfg *OpenCensusReceiverCfg) InitFromViper(v *viper.Viper) (*OpenCensusReceiverCfg, error) { - return cfg, initFromViper(cfg, v, receiversRoot, opencensusEntry) -} - -// ZipkinReceiverCfg holds configuration for Zipkin receiver. -type ZipkinReceiverCfg struct { - // Port is the port that the receiver will use - Port int `mapstructure:"port"` -} - -// ZipkinReceiverEnabled checks if the Zipkin receiver is enabled, via a command-line flag, environment -// variable, or configuration file. -func ZipkinReceiverEnabled(v *viper.Viper) bool { - return featureEnabled(v, zipkinReceiverFlg, receiversRoot, zipkinEntry) -} - -// NewDefaultZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values -func NewDefaultZipkinReceiverCfg() *ZipkinReceiverCfg { - opts := &ZipkinReceiverCfg{ - Port: 9411, - } - return opts -} - -// InitFromViper returns a ZipkinReceiverCfg according to the configuration. -func (cfg *ZipkinReceiverCfg) InitFromViper(v *viper.Viper) (*ZipkinReceiverCfg, error) { - return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry) -} - -// ScribeReceiverCfg carries the settings for the Zipkin Scribe receiver. -type ScribeReceiverCfg struct { - // Address is an IP address or a name that can be resolved to a local address. - // - // It can use a name, but this is not recommended, because it will create - // a listener for at most one of the host's IP addresses. - // - // The default value bind to all available interfaces on the local computer. - Address string `mapstructure:"address"` - Port uint16 `mapstructure:"port"` - // Category is the string that will be used to identify the scribe log messages - // that contain Zipkin spans. - Category string `mapstructure:"category"` -} - -// ZipkinScribeReceiverEnabled checks if the Zipkin Scribe receiver is enabled, via a command-line flag, environment -// variable, or configuration file. -func ZipkinScribeReceiverEnabled(v *viper.Viper) bool { - return featureEnabled(v, zipkinScribeReceiverFlg, receiversRoot, zipkinScribeEntry) -} - -// NewDefaultZipkinScribeReceiverCfg returns an instance of config.ScribeReceiverConfig with default values. -func NewDefaultZipkinScribeReceiverCfg() *ScribeReceiverCfg { - opts := &ScribeReceiverCfg{ - Port: 9410, - Category: "zipkin", - } - return opts -} - -// InitFromViper returns a ScribeReceiverCfg according to the configuration. -func (cfg *ScribeReceiverCfg) InitFromViper(v *viper.Viper) (*ScribeReceiverCfg, error) { - return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry) -} - -// Helper functions - -func initFromViper(cfg interface{}, v *viper.Viper, labels ...string) error { - v = getViperSub(v, labels...) - if v == nil { - return nil - } - if err := v.Unmarshal(cfg); err != nil { - return fmt.Errorf("Failed to read configuration for %s %v", strings.Join(labels, ": "), err) - } - - return nil -} - -func getViperSub(v *viper.Viper, labels ...string) *viper.Viper { - for _, label := range labels { - v = v.Sub(label) - if v == nil { - return nil - } - } - - return v -} - -func featureEnabled(v *viper.Viper, cmdFlag string, labels ...string) bool { - return v.GetBool(cmdFlag) || (getViperSub(v, labels...) != nil) -} diff --git a/cmd/occollector/app/collector/processors.go b/cmd/occollector/app/collector/processors.go deleted file mode 100644 index 8da29bf793f..00000000000 --- a/cmd/occollector/app/collector/processors.go +++ /dev/null @@ -1,362 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "fmt" - "os" - "time" - - tchReporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/sender" - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/exporter/loggingexporter" - "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/nodebatcher" - "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/queued" - "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/tailsampling" - "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" - "github.com/open-telemetry/opentelemetry-service/internal/config" - "github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor" - "github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor" - "github.com/open-telemetry/opentelemetry-service/processor/multiconsumer" - "github.com/open-telemetry/opentelemetry-service/processor/tracesamplerprocessor" -) - -func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []consumer.TraceConsumer, []consumer.MetricsConsumer) { - // TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility. - traceExporters, metricsExporters, doneFns, err := config.ExportersFromViperConfig(logger, v) - if err != nil { - logger.Fatal("Failed to create config for exporters", zap.Error(err)) - } - - wrappedDoneFns := make([]func(), 0, len(doneFns)) - for _, doneFn := range doneFns { - wrapperFn := func() { - if err := doneFn(); err != nil { - logger.Warn("Error when closing exporters", zap.Error(err)) - } - } - - wrappedDoneFns = append(wrappedDoneFns, wrapperFn) - } - - return wrappedDoneFns, traceExporters, metricsExporters -} - -func buildQueuedSpanProcessor( - logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg, -) (closeFns []func(), queuedSpanProcessor consumer.TraceConsumer, err error) { - logger.Info("Constructing queue processor with name", zap.String("name", opts.Name)) - - // build span batch sender from configured options - var spanSender consumer.TraceConsumer - switch opts.SenderType { - case builder.ThriftTChannelSenderType: - logger.Info("Initializing thrift-tChannel sender") - thriftTChannelSenderOpts := opts.SenderConfig.(*builder.JaegerThriftTChannelSenderCfg) - tchrepbuilder := &tchReporter.Builder{ - CollectorHostPorts: thriftTChannelSenderOpts.CollectorHostPorts, - DiscoveryMinPeers: thriftTChannelSenderOpts.DiscoveryMinPeers, - ConnCheckTimeout: thriftTChannelSenderOpts.DiscoveryConnCheckTimeout, - } - tchreporter, err := tchrepbuilder.CreateReporter(logger) - if err != nil { - logger.Fatal("Cannot create tchannel reporter.", zap.Error(err)) - return nil, nil, err - } - spanSender = sender.NewJaegerThriftTChannelSender(tchreporter, logger) - case builder.ThriftHTTPSenderType: - thriftHTTPSenderOpts := opts.SenderConfig.(*builder.JaegerThriftHTTPSenderCfg) - logger.Info("Initializing thrift-HTTP sender", - zap.String("url", thriftHTTPSenderOpts.CollectorEndpoint)) - spanSender = sender.NewJaegerThriftHTTPSender( - thriftHTTPSenderOpts.CollectorEndpoint, - thriftHTTPSenderOpts.Headers, - logger, - sender.HTTPTimeout(thriftHTTPSenderOpts.Timeout), - ) - case builder.ProtoGRPCSenderType: - protoGRPCSenderOpts := opts.SenderConfig.(*builder.JaegerProtoGRPCSenderCfg) - logger.Info("Initializing proto-GRPC sender", - zap.String("url", protoGRPCSenderOpts.CollectorEndpoint)) - spanSender = sender.NewJaegerProtoGRPCSender( - protoGRPCSenderOpts.CollectorEndpoint, - logger, - ) - } - doneFns, traceExporters, _ := createExporters(opts.RawConfig, logger) - - if spanSender == nil && len(traceExporters) == 0 { - if opts.SenderType != "" { - logger.Fatal("Unrecognized sender type", zap.String("SenderType", string(opts.SenderType))) - } - logger.Fatal("No senders or exporters configured.") - } - - allSendersAndExporters := make([]consumer.TraceConsumer, 0, 1+len(traceExporters)) - if spanSender != nil { - allSendersAndExporters = append(allSendersAndExporters, spanSender) - } - for _, traceExporter := range traceExporters { - allSendersAndExporters = append(allSendersAndExporters, traceExporter) - } - - var batchingOptions []nodebatcher.Option - if opts.BatchingConfig.Enable { - cfg := opts.BatchingConfig - if cfg.Timeout != nil { - batchingOptions = append(batchingOptions, nodebatcher.WithTimeout(*cfg.Timeout)) - } - if cfg.NumTickers > 0 { - batchingOptions = append( - batchingOptions, nodebatcher.WithNumTickers(cfg.NumTickers), - ) - } - if cfg.TickTime != nil { - batchingOptions = append( - batchingOptions, nodebatcher.WithTickTime(*cfg.TickTime), - ) - } - if cfg.SendBatchSize != nil { - batchingOptions = append( - batchingOptions, nodebatcher.WithSendBatchSize(*cfg.SendBatchSize), - ) - } - if cfg.RemoveAfterTicks != nil { - batchingOptions = append( - batchingOptions, nodebatcher.WithRemoveAfterTicks(*cfg.RemoveAfterTicks), - ) - } - } - - queuedConsumers := make([]consumer.TraceConsumer, 0, len(allSendersAndExporters)) - for _, senderOrExporter := range allSendersAndExporters { - // build queued span processor with underlying sender - queuedConsumers = append( - queuedConsumers, - queued.NewQueuedSpanProcessor( - senderOrExporter, - queued.Options.WithLogger(logger), - queued.Options.WithName(opts.Name), - queued.Options.WithNumWorkers(opts.NumWorkers), - queued.Options.WithQueueSize(opts.QueueSize), - queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure), - queued.Options.WithBackoffDelay(opts.BackoffDelay), - queued.Options.WithBatching(opts.BatchingConfig.Enable), - queued.Options.WithBatchingOptions(batchingOptions...), - ), - ) - } - return doneFns, multiconsumer.NewTraceProcessor(queuedConsumers), nil -} - -func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToTraceConsumer map[string]consumer.TraceConsumer, v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer, error) { - var policies []*tailsampling.Policy - seenExporter := make(map[string]bool) - for _, polCfg := range cfg.Policies { - policy := &tailsampling.Policy{ - Name: string(polCfg.Name), - } - - // As the number of sampling policies grow this should be changed to a map. - switch polCfg.Type { - case builder.AlwaysSample: - policy.Evaluator = sampling.NewAlwaysSample() - case builder.NumericAttributeFilter: - numAttributeFilterCfg := polCfg.Configuration.(*builder.NumericAttributeFilterCfg) - policy.Evaluator = sampling.NewNumericAttributeFilter(numAttributeFilterCfg.Key, numAttributeFilterCfg.MinValue, numAttributeFilterCfg.MaxValue) - case builder.StringAttributeFilter: - strAttributeFilterCfg := polCfg.Configuration.(*builder.StringAttributeFilterCfg) - policy.Evaluator = sampling.NewStringAttributeFilter(strAttributeFilterCfg.Key, strAttributeFilterCfg.Values) - case builder.RateLimiting: - rateLimitingCfg := polCfg.Configuration.(*builder.RateLimitingCfg) - policy.Evaluator = sampling.NewRateLimiting(rateLimitingCfg.SpansPerSecond) - default: - return nil, fmt.Errorf("unknown sampling policy %s", polCfg.Name) - } - - var policyProcessors []consumer.TraceConsumer - for _, exporter := range polCfg.Exporters { - if _, ok := seenExporter[exporter]; ok { - return nil, fmt.Errorf("multiple sampling polices pointing to exporter %q", exporter) - } - seenExporter[exporter] = true - - policyProcessor, ok := nameToTraceConsumer[exporter] - if !ok { - return nil, fmt.Errorf("invalid exporter %q for sampling policy %q", exporter, polCfg.Name) - } - - policyProcessors = append(policyProcessors, policyProcessor) - } - - numPolicyProcessors := len(policyProcessors) - switch { - case numPolicyProcessors == 1: - policy.Destination = policyProcessors[0] - case numPolicyProcessors > 1: - policy.Destination = multiconsumer.NewTraceProcessor(policyProcessors) - default: - return nil, fmt.Errorf("no exporters for sampling policy %q", polCfg.Name) - } - - policies = append(policies, policy) - } - - if len(policies) < 1 { - return nil, fmt.Errorf("no sampling policies were configured") - } - - tailCfg := builder.NewDefaultTailBasedCfg().InitFromViper(v) - tailSamplingProcessor, err := tailsampling.NewTailSamplingSpanProcessor( - policies, - tailCfg.NumTraces, - 128, - tailCfg.DecisionWait, - logger) - return tailSamplingProcessor, err -} - -func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer, []func()) { - // Build pipeline from its end: 1st exporters, the OC-proto queue processor, and - // finally the receivers. - var closeFns []func() - var traceConsumers []consumer.TraceConsumer - nameToTraceConsumer := make(map[string]consumer.TraceConsumer) - exportersCloseFns, traceExporters, metricsExporters := createExporters(v, logger) - closeFns = append(closeFns, exportersCloseFns...) - if len(traceExporters) > 0 { - // Exporters need an extra hop from OC-proto to span data: to workaround that for now - // we will use a special processor that transforms the data to a format that they can consume. - // TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use - // the exporters: this can lose node information and it is not ideal for performance and delegates - // the retry/buffering to the exporters (that are designed to run within the tracing process). - traceExpProc := multiconsumer.NewTraceProcessor(traceExporters) - nameToTraceConsumer["exporters"] = traceExpProc - traceConsumers = append(traceConsumers, traceExpProc) - } - - // TODO: (@pjanotti) make use of metrics exporters - _ = metricsExporters - - if builder.LoggingExporterEnabled(v) { - dbgProc, _ := loggingexporter.NewTraceExporter(logger) - // TODO: Add this to the exporters list and avoid treating it specially. Don't know all the implications. - nameToTraceConsumer["debug"] = dbgProc - traceConsumers = append(traceConsumers, dbgProc) - } - - multiProcessorCfg := builder.NewDefaultMultiSpanProcessorCfg().InitFromViper(v) - for _, queuedJaegerProcessorCfg := range multiProcessorCfg.Processors { - logger.Info("Queued Jaeger Sender Enabled") - doneFns, queuedJaegerProcessor, err := buildQueuedSpanProcessor(logger, queuedJaegerProcessorCfg) - if err != nil { - logger.Error("Failed to build the queued span processor", zap.Error(err)) - os.Exit(1) - } - nameToTraceConsumer[queuedJaegerProcessorCfg.Name] = queuedJaegerProcessor - traceConsumers = append(traceConsumers, queuedJaegerProcessor) - closeFns = append(closeFns, doneFns...) - } - - if len(traceConsumers) == 0 { - logger.Warn("Nothing to do: no processor was enabled. Shutting down.") - os.Exit(1) - } - - var tailSamplingProcessor consumer.TraceConsumer - samplingProcessorCfg := builder.NewDefaultSamplingCfg().InitFromViper(v) - useHeadSamplingProcessor := false - if samplingProcessorCfg.Mode == builder.HeadSampling { - // Head-sampling should be the first processor in the pipeline to avoid global operations on data - // that is not going to be sampled, for now just set a flag to added the sampler later. - useHeadSamplingProcessor = true - } else if samplingProcessorCfg.Mode == builder.TailSampling { - var err error - tailSamplingProcessor, err = buildSamplingProcessor(samplingProcessorCfg, nameToTraceConsumer, v, logger) - if err != nil { - logger.Error("Falied to build the sampling processor", zap.Error(err)) - os.Exit(1) - } - } else if builder.DebugTailSamplingEnabled(v) { - policy := []*tailsampling.Policy{ - { - Name: "tail-always-sampling", - Evaluator: sampling.NewAlwaysSample(), - Destination: multiconsumer.NewTraceProcessor(traceConsumers), - }, - } - var err error - tailSamplingProcessor, err = tailsampling.NewTailSamplingSpanProcessor(policy, 50000, 128, 10*time.Second, logger) - if err != nil { - logger.Error("Falied to build the debug tail-sampling processor", zap.Error(err)) - os.Exit(1) - } - logger.Info("Debugging tail-sampling with always sample policy (num_traces: 50000; decision_wait: 10s)") - } - - if tailSamplingProcessor != nil { - // SpanProcessors are going to go all via the tail sampling processor. - traceConsumers = []consumer.TraceConsumer{tailSamplingProcessor} - } - - // Wraps processors in a single one to be connected to all enabled receivers. - tp := multiconsumer.NewTraceProcessor(traceConsumers) - if multiProcessorCfg.Global != nil && multiProcessorCfg.Global.Attributes != nil { - logger.Info( - "Found global attributes config", - zap.Bool("overwrite", multiProcessorCfg.Global.Attributes.Overwrite), - zap.Any("values", multiProcessorCfg.Global.Attributes.Values), - zap.Any("key-mapping", multiProcessorCfg.Global.Attributes.KeyReplacements), - ) - - if len(multiProcessorCfg.Global.Attributes.Values) > 0 { - tp, _ = addattributesprocessor.NewTraceProcessor( - tp, - addattributesprocessor.WithAttributes(multiProcessorCfg.Global.Attributes.Values), - addattributesprocessor.WithOverwrite(multiProcessorCfg.Global.Attributes.Overwrite), - ) - } - if len(multiProcessorCfg.Global.Attributes.KeyReplacements) > 0 { - tp, _ = attributekeyprocessor.NewTraceProcessor(tp, multiProcessorCfg.Global.Attributes.KeyReplacements...) - } - } - - if useHeadSamplingProcessor { - vTraceSampler := v.Sub("sampling.policies.probabilistic.configuration") - if vTraceSampler == nil { - logger.Error("Trace head-based sampling mode is enabled but there is no valid policy section defined") - os.Exit(1) - } - - cfg := &tracesamplerprocessor.TraceSamplerCfg{} - samplerCfg, err := cfg.InitFromViper(vTraceSampler) - if err != nil { - logger.Error("Trace head-based sampling configuration error", zap.Error(err)) - os.Exit(1) - } - logger.Info( - "Trace head-sampling enabled", - zap.Float32("sampling-percentage", samplerCfg.SamplingPercentage), - ) - tp, _ = tracesamplerprocessor.NewTraceProcessor(tp, *samplerCfg) - } - - return tp, closeFns -} diff --git a/cmd/occollector/app/collector/processors_test.go b/cmd/occollector/app/collector/processors_test.go deleted file mode 100644 index 73b34d26b69..00000000000 --- a/cmd/occollector/app/collector/processors_test.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "reflect" - "testing" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor" - "github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor" - "github.com/open-telemetry/opentelemetry-service/processor/multiconsumer" - "github.com/open-telemetry/opentelemetry-service/processor/processortest" - "github.com/open-telemetry/opentelemetry-service/processor/tracesamplerprocessor" -) - -func Test_startProcessor(t *testing.T) { - tests := []struct { - name string - setupViperCfg func() *viper.Viper - wantExamplar func(t *testing.T) interface{} - }{ - { - name: "incomplete_global_attrib_config", - setupViperCfg: func() *viper.Viper { - v := viper.New() - v.Set("logging-exporter", true) - v.Set("global.attributes.overwrite", true) - return v - }, - wantExamplar: func(t *testing.T) interface{} { - return multiconsumer.NewTraceProcessor(nil) - }, - }, - { - name: "global_attrib_config_values", - setupViperCfg: func() *viper.Viper { - v := viper.New() - v.Set("logging-exporter", true) - v.Set("global.attributes.values", map[string]interface{}{"foo": "bar"}) - return v - }, - wantExamplar: func(t *testing.T) interface{} { - nopProcessor := processortest.NewNopTraceProcessor(nil) - addAttributesProcessor, err := addattributesprocessor.NewTraceProcessor(nopProcessor) - if err != nil { - t.Fatalf("addattributesprocessor.NewTraceProcessor() = %v", err) - } - return addAttributesProcessor - }, - }, - { - name: "global_attrib_config_key_mapping", - setupViperCfg: func() *viper.Viper { - v := viper.New() - v.Set("logging-exporter", true) - v.Set("global.attributes.key-mapping", - []map[string]interface{}{ - { - "key": "foo", - "replacement": "bar", - }, - }) - return v - }, - wantExamplar: func(t *testing.T) interface{} { - nopProcessor := processortest.NewNopTraceProcessor(nil) - attributeKeyProcessor, err := attributekeyprocessor.NewTraceProcessor(nopProcessor) - if err != nil { - t.Fatalf("attributekeyprocessor.NewTraceProcessor() = %v", err) - } - return attributeKeyProcessor - }, - }, - { - name: "sampling_config_trace_sampler", - setupViperCfg: func() *viper.Viper { - v := viper.New() - v.Set("logging-exporter", true) - v.Set("sampling.mode", "head") - v.Set("sampling.policies.probabilistic.configuration.sampling-percentage", 5) - return v - }, - wantExamplar: func(t *testing.T) interface{} { - nopProcessor := processortest.NewNopTraceProcessor(nil) - tracesamplerprocessor, err := tracesamplerprocessor.NewTraceProcessor(nopProcessor, tracesamplerprocessor.TraceSamplerCfg{}) - if err != nil { - t.Fatalf("tracesamplerprocessor.NewTraceProcessor() = %v", err) - } - return tracesamplerprocessor - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - consumer, closeFns := startProcessor(tt.setupViperCfg(), zap.NewNop()) - if consumer == nil { - t.Errorf("startProcessor() got nil consumer") - } - consumerExamplar := tt.wantExamplar(t) - if reflect.TypeOf(consumer) != reflect.TypeOf(consumerExamplar) { - t.Errorf("startProcessor() got consumer type %q want %q", - reflect.TypeOf(consumer), - reflect.TypeOf(consumerExamplar)) - } - for _, closeFn := range closeFns { - closeFn() - } - }) - } -} diff --git a/cmd/occollector/app/collector/receivers.go b/cmd/occollector/app/collector/receivers.go deleted file mode 100644 index f4cde99cef5..00000000000 --- a/cmd/occollector/app/collector/receivers.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "os" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/consumer" - jaegerreceiver "github.com/open-telemetry/opentelemetry-service/internal/collector/jaeger" - ocreceiver "github.com/open-telemetry/opentelemetry-service/internal/collector/opencensus" - zipkinreceiver "github.com/open-telemetry/opentelemetry-service/internal/collector/zipkin" - zipkinscribereceiver "github.com/open-telemetry/opentelemetry-service/internal/collector/zipkin/scribe" - "github.com/open-telemetry/opentelemetry-service/receiver" -) - -func createReceivers(v *viper.Viper, logger *zap.Logger, traceConsumers consumer.TraceConsumer, host receiver.Host) []receiver.TraceReceiver { - var someReceiverEnabled bool - receivers := []struct { - runFn func(*zap.Logger, *viper.Viper, consumer.TraceConsumer, receiver.Host) (receiver.TraceReceiver, error) - enabled bool - }{ - {jaegerreceiver.Start, builder.JaegerReceiverEnabled(v)}, - {ocreceiver.Start, builder.OpenCensusReceiverEnabled(v)}, - {zipkinreceiver.Start, builder.ZipkinReceiverEnabled(v)}, - {zipkinscribereceiver.Start, builder.ZipkinScribeReceiverEnabled(v)}, - } - - var startedTraceReceivers []receiver.TraceReceiver - for _, receiver := range receivers { - if receiver.enabled { - rec, err := receiver.runFn(logger, v, traceConsumers, host) - if err != nil { - // TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating. - for _, startedTraceReceiver := range startedTraceReceivers { - startedTraceReceiver.StopTraceReception() - } - logger.Fatal("Cannot run receiver for "+rec.TraceSource(), zap.Error(err)) - } - startedTraceReceivers = append(startedTraceReceivers, rec) - someReceiverEnabled = true - } - } - - if !someReceiverEnabled { - logger.Warn("Nothing to do: no receiver was enabled. Shutting down.") - os.Exit(1) - } - - return startedTraceReceivers -} diff --git a/cmd/occollector/app/sender/doc.go b/cmd/occollector/app/sender/doc.go deleted file mode 100644 index 7ffe17809be..00000000000 --- a/cmd/occollector/app/sender/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package sender contains specialized senders to different backends. Unlike -// exporters they do not buffer or attempt to resend that failed batches, all -// of that is delegated to the users of the senders. -package sender diff --git a/cmd/occollector/main.go b/cmd/occollector/main.go deleted file mode 100644 index ff045a0a63c..00000000000 --- a/cmd/occollector/main.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Program occollector receives stats and traces from multiple sources and -// batches them for appropriate forwarding to backends (e.g.: Jaeger or Zipkin) -// or other layers of occollector. The forwarding can be configured so -// buffer sizes, number of retries, backoff policy, etc can be ajusted according -// to specific needs of each deployment. -package main - -import ( - "log" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/collector" -) - -func main() { - if err := collector.App.Start(); err != nil { - log.Fatalf("Failed to run the collector: %v", err) - } -} diff --git a/cmd/otelsvc/main.go b/cmd/otelsvc/main.go index 9f8329baaf7..25844d14aed 100644 --- a/cmd/otelsvc/main.go +++ b/cmd/otelsvc/main.go @@ -19,12 +19,12 @@ package main import ( "log" - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/collector" _ "github.com/open-telemetry/opentelemetry-service/receiver/vmmetricsreceiver" + "github.com/open-telemetry/opentelemetry-service/service" ) func main() { - if err := collector.App.StartUnified(); err != nil { + if err := service.App.StartUnified(); err != nil { log.Fatalf("Failed to run the service: %v", err) } } diff --git a/exporter/jaegerexporter/jaegerexporter_test.go b/exporter/jaegerexporter/doc.go similarity index 74% rename from exporter/jaegerexporter/jaegerexporter_test.go rename to exporter/jaegerexporter/doc.go index b745b4b6f0d..0cae7fb1779 100644 --- a/exporter/jaegerexporter/jaegerexporter_test.go +++ b/exporter/jaegerexporter/doc.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package jaegerexporter contains a specialized Jaeger exporter. Unlike client +// library exporters they do not buffer or attempt to resend failed batches, all +// of that is delegated to the callers of the exporter. package jaegerexporter - -// TODO: Add tests. diff --git a/cmd/occollector/app/sender/empty_test.go b/exporter/jaegerexporter/empty_test.go similarity index 96% rename from cmd/occollector/app/sender/empty_test.go rename to exporter/jaegerexporter/empty_test.go index 1df66e1d46b..b3de29862f7 100644 --- a/cmd/occollector/app/sender/empty_test.go +++ b/exporter/jaegerexporter/empty_test.go @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sender +package jaegerexporter // TODO: Delete me when tests are added. diff --git a/exporter/jaegerexporter/jaeger.go b/exporter/jaegerexporter/jaeger.go deleted file mode 100644 index 44a0f9997bd..00000000000 --- a/exporter/jaegerexporter/jaeger.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package jaegerexporter - -import ( - "github.com/spf13/viper" - - "contrib.go.opencensus.io/exporter/jaeger" - - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/exporter/exporterwrapper" -) - -// Slight modified version of go/src/contrib.go.opencensus.io/exporter/jaeger/jaeger.go -type jaegerConfig struct { - CollectorEndpoint string `mapstructure:"collector_endpoint,omitempty"` - Username string `mapstructure:"username,omitempty"` - Password string `mapstructure:"password,omitempty"` - ServiceName string `mapstructure:"service_name,omitempty"` -} - -// JaegerExportersFromViper unmarshals the viper and returns exporter.TraceExporters targeting -// Jaeger according to the configuration settings. -func JaegerExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) { - var cfg struct { - Jaeger *jaegerConfig `mapstructure:"jaeger"` - } - if err := v.Unmarshal(&cfg); err != nil { - return nil, nil, nil, err - } - jc := cfg.Jaeger - if jc == nil { - return nil, nil, nil, nil - } - - // jaeger.NewExporter performs configurqtion validation - je, err := jaeger.NewExporter(jaeger.Options{ - CollectorEndpoint: jc.CollectorEndpoint, - Username: jc.Username, - Password: jc.Password, - Process: jaeger.Process{ - ServiceName: jc.ServiceName, - }, - }) - if err != nil { - return nil, nil, nil, err - } - - doneFns = append(doneFns, func() error { - je.Flush() - return nil - }) - - jte, err := exporterwrapper.NewExporterWrapper("jaeger", "ocservice.exporter.Jaeger.ConsumeTraceData", je) - if err != nil { - return nil, nil, nil, err - } - // TODO: Examine "contrib.go.opencensus.io/exporter/jaeger" to see - // if trace.ExportSpan was constraining and if perhaps the Jaeger - // upload can use the context and information from the Node. - tps = append(tps, jte) - return -} diff --git a/cmd/occollector/app/sender/jaeger_proto_grpc_sender.go b/exporter/jaegerexporter/jaeger_proto_grpc_sender.go similarity index 99% rename from cmd/occollector/app/sender/jaeger_proto_grpc_sender.go rename to exporter/jaegerexporter/jaeger_proto_grpc_sender.go index e083970238a..8800e9c71b4 100644 --- a/cmd/occollector/app/sender/jaeger_proto_grpc_sender.go +++ b/exporter/jaegerexporter/jaeger_proto_grpc_sender.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sender +package jaegerexporter import ( "context" diff --git a/cmd/occollector/app/sender/jaeger_thrift_http_sender.go b/exporter/jaegerexporter/jaeger_thrift_http_sender.go similarity index 99% rename from cmd/occollector/app/sender/jaeger_thrift_http_sender.go rename to exporter/jaegerexporter/jaeger_thrift_http_sender.go index b2d3ecd282a..39ef5008ef4 100644 --- a/cmd/occollector/app/sender/jaeger_thrift_http_sender.go +++ b/exporter/jaegerexporter/jaeger_thrift_http_sender.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sender +package jaegerexporter import ( "bytes" diff --git a/cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go b/exporter/jaegerexporter/jaeger_thrift_tchannel_sender.go similarity index 98% rename from cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go rename to exporter/jaegerexporter/jaeger_thrift_tchannel_sender.go index a657476ca31..1e1a4e7ee96 100644 --- a/cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go +++ b/exporter/jaegerexporter/jaeger_thrift_tchannel_sender.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sender +package jaegerexporter import ( "context" diff --git a/internal/collector/jaeger/.nocover b/internal/collector/jaeger/.nocover deleted file mode 100644 index a6f2bfa9cc9..00000000000 --- a/internal/collector/jaeger/.nocover +++ /dev/null @@ -1 +0,0 @@ -FIXME: Move this to the receiver. diff --git a/internal/collector/jaeger/receiver.go b/internal/collector/jaeger/receiver.go deleted file mode 100644 index a736706e30c..00000000000 --- a/internal/collector/jaeger/receiver.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package jaegerreceiver wraps the functionality to start the end-point that -// receives Jaeger data sent by the jaeger-agent in jaeger.thrift format over -// TChannel and directly from clients in jaeger.thrift format over binary thrift -// protocol (HTTP transport). -// Note that the UDP transport is not supported since these protocol/transport -// are for task->jaeger-agent communication only and the receiver does not try to -// support jaeger-agent endpoints. -// TODO: add support for the jaeger proto endpoint released in jaeger 1.8package jaegerreceiver -package jaegerreceiver - -import ( - "context" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver" -) - -// Start starts the Jaeger receiver endpoint. -func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, host receiver.Host) (receiver.TraceReceiver, error) { - rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v) - if err != nil { - return nil, err - } - - ctx := context.Background() - config := &jaegerreceiver.Configuration{ - CollectorThriftPort: rOpts.ThriftTChannelPort, - CollectorHTTPPort: rOpts.ThriftHTTPPort, - } - jtr, err := jaegerreceiver.New(ctx, config, traceConsumer) - if err != nil { - return nil, err - } - - if err := jtr.StartTraceReception(host); err != nil { - return nil, err - } - - logger.Info("Jaeger receiver is running.", - zap.Int("thrift-tchannel-port", rOpts.ThriftTChannelPort), - zap.Int("thrift-http-port", rOpts.ThriftHTTPPort)) - - return jtr, nil -} diff --git a/internal/collector/opencensus/receiver.go b/internal/collector/opencensus/receiver.go deleted file mode 100644 index c0afcfd739a..00000000000 --- a/internal/collector/opencensus/receiver.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package ocreceiver wraps the functionality to start the end-point that -// receives data directly in the OpenCensus format. -package ocreceiver - -import ( - "fmt" - "strconv" - - "github.com/spf13/viper" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" -) - -// Start starts the OpenCensus receiver endpoint. -func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, host receiver.Host) (receiver.TraceReceiver, error) { - addr, opts, zapFields, err := receiverOptions(v) - if err != nil { - return nil, err - } - - ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, opts...) - if err != nil { - return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err) - } - - if err := ocr.StartTraceReception(host); err != nil { - return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err) - } - - logger.Info("OpenCensus receiver is running.", zapFields...) - - return ocr, nil -} - -func receiverOptions(v *viper.Viper) (addr string, opts []opencensusreceiver.Option, zapFields []zap.Field, err error) { - rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v) - if err != nil { - return addr, opts, zapFields, err - } - - tlsCredsOption, hasTLSCreds, err := rOpts.TLSCredentials.ToOpenCensusReceiverServerOption() - if err != nil { - return addr, opts, zapFields, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err) - } - if hasTLSCreds { - opts = append(opts, tlsCredsOption) - tlsCreds := rOpts.TLSCredentials - zapFields = append(zapFields, zap.String("cert_file", tlsCreds.CertFile), zap.String("key_file", tlsCreds.KeyFile)) - } - - grpcServerOptions, zapFields := grpcServerOptions(rOpts, zapFields) - if len(grpcServerOptions) > 0 { - opts = append(opts, opencensusreceiver.WithGRPCServerOptions(grpcServerOptions...)) - } - - addr = ":" + strconv.FormatInt(int64(rOpts.Port), 10) - zapFields = append(zapFields, zap.Int("port", rOpts.Port)) - - return addr, opts, zapFields, err -} - -func grpcServerOptions(rOpts *builder.OpenCensusReceiverCfg, zapFields []zap.Field) ([]grpc.ServerOption, []zap.Field) { - var grpcServerOptions []grpc.ServerOption - if rOpts.MaxRecvMsgSizeMiB > 0 { - grpcServerOptions = append(grpcServerOptions, grpc.MaxRecvMsgSize(int(rOpts.MaxRecvMsgSizeMiB*1024*1024))) - zapFields = append(zapFields, zap.Uint64("max-recv-msg-size-mib", rOpts.MaxRecvMsgSizeMiB)) - } - if rOpts.MaxConcurrentStreams > 0 { - grpcServerOptions = append(grpcServerOptions, grpc.MaxConcurrentStreams(rOpts.MaxConcurrentStreams)) - zapFields = append(zapFields, zap.Uint32("max-concurrent-streams", rOpts.MaxConcurrentStreams)) - } - if rOpts.Keepalive != nil { - if rOpts.Keepalive.ServerParameters != nil { - svrParams := rOpts.Keepalive.ServerParameters - grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: svrParams.MaxConnectionIdle, - MaxConnectionAge: svrParams.MaxConnectionAge, - MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace, - Time: svrParams.Time, - Timeout: svrParams.Timeout, - })) - zapFields = append(zapFields, zap.Any("keepalive.server-parameters", rOpts.Keepalive.ServerParameters)) - } - if rOpts.Keepalive.EnforcementPolicy != nil { - enfPol := rOpts.Keepalive.EnforcementPolicy - grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: enfPol.MinTime, - PermitWithoutStream: enfPol.PermitWithoutStream, - })) - zapFields = append(zapFields, zap.Any("keepalive.enforcement-policy", rOpts.Keepalive.EnforcementPolicy)) - } - } - - return grpcServerOptions, zapFields -} diff --git a/internal/collector/opencensus/receiver_test.go b/internal/collector/opencensus/receiver_test.go deleted file mode 100644 index 6ce25a82583..00000000000 --- a/internal/collector/opencensus/receiver_test.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package ocreceiver wraps the functionality to start the end-point that -// receives data directly in the OpenCensus format. -package ocreceiver - -import ( - "testing" - "time" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/processor/processortest" - "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" - "github.com/open-telemetry/opentelemetry-service/receiver/receivertest" -) - -func TestStart(t *testing.T) { - tests := []struct { - name string - viperFn func() *viper.Viper - wantErr bool - }{ - { - name: "default_config", - viperFn: func() *viper.Viper { - v := viper.New() - v.Set("receivers.opencensus.{}", nil) - return v - }, - }, - { - name: "invalid_port", - viperFn: func() *viper.Viper { - v := viper.New() - v.Set("receivers.opencensus.port", -1) - return v - }, - wantErr: true, - }, - { - name: "missing_tls_files", - viperFn: func() *viper.Viper { - v := viper.New() - v.Set("receivers.opencensus.tls_credentials.cert_file", "foo") - return v - }, - wantErr: true, - }, - { - name: "grpc_settings", - viperFn: func() *viper.Viper { - v := viper.New() - v.Set("receivers.opencensus.port", 55678) - v.Set("receivers.opencensus.max-recv-msg-size-mib", 32) - v.Set("receivers.opencensus.max-concurrent-streams", 64) - v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age", 180*time.Second) - v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age-grace", 10*time.Second) - v.Set("receivers.opencensus.keepalive.enforcement-policy.min-time", 60*time.Second) - v.Set("receivers.opencensus.keepalive.enforcement-policy.permit-without-stream", true) - return v - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Enforce that all configurations are actually recognized. - v := tt.viperFn() - rOpts := builder.OpenCensusReceiverCfg{} - if err := v.Sub("receivers.opencensus").UnmarshalExact(&rOpts); err != nil { - t.Errorf("UnmarshalExact error: %v", err) - return - } - nopProcessor := processortest.NewNopTraceProcessor(nil) - mh := receivertest.NewMockHost() - got, err := Start(zap.NewNop(), v, nopProcessor, mh) - if (err != nil) != tt.wantErr { - t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != nil { - // TODO: (@pjanotti) current StopTraceReception, stop the whole receiver. - // See https://github.com/census-instrumentation/opencensus-service/issues/559 - got.(*opencensusreceiver.Receiver).Stop() - } - }) - } -} diff --git a/internal/collector/zipkin/.nocover b/internal/collector/zipkin/.nocover deleted file mode 100644 index a6f2bfa9cc9..00000000000 --- a/internal/collector/zipkin/.nocover +++ /dev/null @@ -1 +0,0 @@ -FIXME: Move this to the receiver. diff --git a/internal/collector/zipkin/receiver.go b/internal/collector/zipkin/receiver.go deleted file mode 100644 index a6df0056400..00000000000 --- a/internal/collector/zipkin/receiver.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package zipkinreceiver wraps the functionality to start the end-point that -// receives Zipkin traces. -package zipkinreceiver - -import ( - "fmt" - "strconv" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver" -) - -// Start starts the Zipkin receiver endpoint. -func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, host receiver.Host) (receiver.TraceReceiver, error) { - rOpts, err := builder.NewDefaultZipkinReceiverCfg().InitFromViper(v) - if err != nil { - return nil, err - } - - addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10) - zi, err := zipkinreceiver.New(addr, traceConsumer) - if err != nil { - return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err) - } - - if err := zi.StartTraceReception(host); err != nil { - return nil, fmt.Errorf("Cannot start Zipkin receiver to address %q: %v", addr, err) - } - - logger.Info("Zipkin receiver is running.", zap.Int("port", rOpts.Port)) - - return zi, nil -} diff --git a/internal/collector/zipkin/scribe/.nocover b/internal/collector/zipkin/scribe/.nocover deleted file mode 100644 index a6f2bfa9cc9..00000000000 --- a/internal/collector/zipkin/scribe/.nocover +++ /dev/null @@ -1 +0,0 @@ -FIXME: Move this to the receiver. diff --git a/internal/collector/zipkin/scribe/receiver.go b/internal/collector/zipkin/scribe/receiver.go deleted file mode 100644 index b54fd65c924..00000000000 --- a/internal/collector/zipkin/scribe/receiver.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package zipkinscribereceiver wraps the functionality to start the end-point that -// receives Zipkin Scribe spans. -package zipkinscribereceiver - -import ( - "fmt" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver/zipkinscribereceiver" -) - -// Start starts the Zipkin Scribe receiver endpoint. -func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, host receiver.Host) (receiver.TraceReceiver, error) { - rOpts, err := builder.NewDefaultZipkinScribeReceiverCfg().InitFromViper(v) - if err != nil { - return nil, err - } - - sr, err := zipkinscribereceiver.NewReceiver(rOpts.Address, rOpts.Port, rOpts.Category, traceConsumer) - if err != nil { - return nil, fmt.Errorf("Failed to create the Zipkin Scribe receiver: %v", err) - } - - if err := sr.StartTraceReception(host); err != nil { - return nil, fmt.Errorf("Cannot start Zipkin Scribe receiver %+v: %v", rOpts, err) - } - - logger.Info("Zipkin Scribe receiver is running.", zap.Uint16("port", rOpts.Port), zap.String("category", rOpts.Category)) - - return sr, nil -} diff --git a/internal/config/config.go b/internal/config/config.go deleted file mode 100644 index 903d301379b..00000000000 --- a/internal/config/config.go +++ /dev/null @@ -1,488 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "fmt" - "net" - "net/url" - "reflect" - "strings" - - "github.com/spf13/viper" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/exporter/jaegerexporter" - "github.com/open-telemetry/opentelemetry-service/exporter/opencensusexporter" - "github.com/open-telemetry/opentelemetry-service/exporter/prometheusexporter" - "github.com/open-telemetry/opentelemetry-service/exporter/zipkinexporter" - "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" - "github.com/open-telemetry/opentelemetry-service/receiver/prometheusreceiver" -) - -// We expect the configuration.yaml file to look like this: -// -// receivers: -// opencensus: -// port: -// -// zipkin: -// reporter:
-// -// prometheus: -// config: -// scrape_configs: -// - job_name: 'foo_service" -// scrape_interval: 5s -// static_configs: -// - targets: ['localhost:8889'] -// buffer_count: 10 -// buffer_period: 5s -// -// exporters: -// zipkin: -// endpoint: "http://localhost:9411/api/v2/spans" -// -// zpages: -// port: 55679 - -const ( - defaultOCReceiverAddress = ":55678" - defaultZPagesPort = 55679 -) - -var defaultOCReceiverCorsAllowedOrigins = []string{} - -var defaultScribeConfiguration = &ScribeReceiverConfig{ - Port: 9410, - Category: "zipkin", -} - -// Config denotes the configuration for the various elements of an agent, that is: -// * Receivers -// * ZPages -// * Exporters -type Config struct { - Receivers *Receivers `mapstructure:"receivers"` - ZPages *ZPagesConfig `mapstructure:"zpages"` - Exporters *Exporters `mapstructure:"exporters"` -} - -// Receivers denotes configurations for the various telemetry ingesters, such as: -// * Jaeger (traces) -// * OpenCensus (metrics and traces) -// * Prometheus (metrics) -// * Zipkin (traces) -type Receivers struct { - OpenCensus *ReceiverConfig `mapstructure:"opencensus"` - Zipkin *ReceiverConfig `mapstructure:"zipkin"` - Jaeger *ReceiverConfig `mapstructure:"jaeger"` - Scribe *ScribeReceiverConfig `mapstructure:"zipkin-scribe"` - VMMetrics *ReceiverConfig `mapstructure:"vmmetrics"` - - // Prometheus contains the Prometheus configurations. - // Such as: - // scrape_configs: - // - job_name: "agent" - // scrape_interval: 5s - // - // static_configs: - // - targets: ['localhost:9988'] - Prometheus *prometheusreceiver.Configuration `mapstructure:"prometheus"` -} - -// ReceiverConfig is the per-receiver configuration that identifies attributes -// that a receiver's configuration can have such as: -// * Address -// * Various ports -type ReceiverConfig struct { - // The address to which the OpenCensus receiver will be bound and run on. - Address string `mapstructure:"address"` - CollectorHTTPPort int `mapstructure:"collector_http_port"` - CollectorThriftPort int `mapstructure:"collector_thrift_port"` - - // The allowed CORS origins for HTTP/JSON requests the grpc-gateway adapter - // for the OpenCensus receiver. See github.com/rs/cors - // An empty list means that CORS is not enabled at all. A wildcard (*) can be - // used to match any origin or one or more characters of an origin. - CorsAllowedOrigins []string `mapstructure:"cors_allowed_origins"` - - // DisableTracing disables trace receiving and is only applicable to trace receivers. - DisableTracing bool `mapstructure:"disable_tracing"` - // DisableMetrics disables metrics receiving and is only applicable to metrics receivers. - DisableMetrics bool `mapstructure:"disable_metrics"` - - // TLSCredentials is a (cert_file, key_file) configuration. - TLSCredentials *TLSCredentials `mapstructure:"tls_credentials"` -} - -// ScribeReceiverConfig carries the settings for the Zipkin Scribe receiver. -type ScribeReceiverConfig struct { - // Address is an IP address or a name that can be resolved to a local address. - // - // It can use a name, but this is not recommended, because it will create - // a listener for at most one of the host's IP addresses. - // - // The default value bind to all available interfaces on the local computer. - Address string `mapstructure:"address" mapstructure:"address"` - Port uint16 `mapstructure:"port" mapstructure:"port"` - // Category is the string that will be used to identify the scribe log messages - // that contain Zipkin spans. - Category string `mapstructure:"category" mapstructure:"category"` -} - -// Exporters denotes the configurations for the various backends -// that this service exports observability signals to. -type Exporters struct { - Zipkin *zipkinexporter.ZipkinConfig `mapstructure:"zipkin"` -} - -// ZPagesConfig denotes the configuration that zPages will be run with. -type ZPagesConfig struct { - Disabled bool `mapstructure:"disabled"` - Port int `mapstructure:"port"` -} - -// OpenCensusReceiverAddress is a helper to safely retrieve the address -// that the OpenCensus receiver will be bound to. -// If Config is nil or the OpenCensus receiver's configuration is nil, it -// will return the default of ":55678" -func (c *Config) OpenCensusReceiverAddress() string { - if c == nil || c.Receivers == nil { - return defaultOCReceiverAddress - } - inCfg := c.Receivers - if inCfg.OpenCensus == nil || inCfg.OpenCensus.Address == "" { - return defaultOCReceiverAddress - } - return inCfg.OpenCensus.Address -} - -// OpenCensusReceiverCorsAllowedOrigins is a helper to safely retrieve the list -// of allowed CORS origins for HTTP/JSON requests to the grpc-gateway adapter. -func (c *Config) OpenCensusReceiverCorsAllowedOrigins() []string { - if c == nil || c.Receivers == nil { - return defaultOCReceiverCorsAllowedOrigins - } - inCfg := c.Receivers - if inCfg.OpenCensus == nil { - return defaultOCReceiverCorsAllowedOrigins - } - return inCfg.OpenCensus.CorsAllowedOrigins -} - -// CanRunOpenCensusTraceReceiver returns true if the configuration -// permits running the OpenCensus Trace receiver. -func (c *Config) CanRunOpenCensusTraceReceiver() bool { - return c.openCensusReceiverEnabled() && !c.Receivers.OpenCensus.DisableTracing -} - -// CanRunOpenCensusMetricsReceiver returns true if the configuration -// permits running the OpenCensus Metrics receiver. -func (c *Config) CanRunOpenCensusMetricsReceiver() bool { - return c.openCensusReceiverEnabled() && !c.Receivers.OpenCensus.DisableMetrics -} - -// openCensusReceiverEnabled returns true if both: -// Config.Receivers and Config.Receivers.OpenCensus -// are non-nil. -func (c *Config) openCensusReceiverEnabled() bool { - return c != nil && c.Receivers != nil && - c.Receivers.OpenCensus != nil -} - -// ZPagesDisabled returns true if zPages have not been enabled. -// It returns true if Config is nil or if ZPages are explicitly disabled. -func (c *Config) ZPagesDisabled() bool { - if c == nil { - return true - } - return c.ZPages != nil && c.ZPages.Disabled -} - -// ZPagesPort tries to dereference the port on which zPages will be -// served. -// If zPages are disabled, it returns (-1, false) -// Else if no port is set, it returns the default 55679 -func (c *Config) ZPagesPort() (int, bool) { - if c.ZPagesDisabled() { - return -1, false - } - port := defaultZPagesPort - if c != nil && c.ZPages != nil && c.ZPages.Port > 0 { - port = c.ZPages.Port - } - return port, true -} - -// ZipkinReceiverEnabled returns true if Config is non-nil -// and if the Zipkin receiver configuration is also non-nil. -func (c *Config) ZipkinReceiverEnabled() bool { - if c == nil { - return false - } - return c.Receivers != nil && c.Receivers.Zipkin != nil -} - -// ZipkinScribeReceiverEnabled returns true if Config is non-nil -// and if the Scribe receiver configuration is also non-nil. -func (c *Config) ZipkinScribeReceiverEnabled() bool { - if c == nil { - return false - } - return c.Receivers != nil && c.Receivers.Scribe != nil -} - -// JaegerReceiverEnabled returns true if Config is non-nil -// and if the Jaeger receiver configuration is also non-nil. -func (c *Config) JaegerReceiverEnabled() bool { - if c == nil { - return false - } - return c.Receivers != nil && c.Receivers.Jaeger != nil -} - -// PrometheusReceiverEnabled returns true if Config is non-nil -// and if the Jaeger receiver configuration is also non-nil. -func (c *Config) PrometheusReceiverEnabled() bool { - if c == nil { - return false - } - return c.Receivers != nil && c.Receivers.Prometheus != nil -} - -// PrometheusConfiguration deferences and returns the Prometheus configuration -// if non-nil. -func (c *Config) PrometheusConfiguration() *prometheusreceiver.Configuration { - if c == nil || c.Receivers == nil { - return nil - } - return c.Receivers.Prometheus -} - -// ZipkinReceiverAddress is a helper to safely retrieve the address -// that the Zipkin receiver will run on. -// If Config is nil or the Zipkin receiver's configuration is nil, it -// will return the default of "localhost:9411" -func (c *Config) ZipkinReceiverAddress() string { - if c == nil || c.Receivers == nil { - return zipkinexporter.DefaultZipkinEndpointHostPort - } - inCfg := c.Receivers - if inCfg.Zipkin == nil || inCfg.Zipkin.Address == "" { - return zipkinexporter.DefaultZipkinEndpointHostPort - } - return inCfg.Zipkin.Address -} - -// ZipkinScribeConfig is a helper to safely retrieve the Zipkin Scribe -// configuration. -func (c *Config) ZipkinScribeConfig() *ScribeReceiverConfig { - if c == nil || c.Receivers == nil || c.Receivers.Scribe == nil { - return defaultScribeConfiguration - } - cfg := c.Receivers.Scribe - if cfg.Port == 0 { - cfg.Port = defaultScribeConfiguration.Port - } - if cfg.Category == "" { - cfg.Category = defaultScribeConfiguration.Category - } - return cfg -} - -// JaegerReceiverPorts is a helper to safely retrieve the address -// that the Jaeger receiver will run on. -func (c *Config) JaegerReceiverPorts() (collectorPort, thriftPort int) { - if c == nil || c.Receivers == nil { - return 0, 0 - } - rCfg := c.Receivers - if rCfg.Jaeger == nil { - return 0, 0 - } - jc := rCfg.Jaeger - return jc.CollectorHTTPPort, jc.CollectorThriftPort -} - -// HasTLSCredentials returns true if TLSCredentials is non-nil -func (rCfg *ReceiverConfig) HasTLSCredentials() bool { - return rCfg != nil && rCfg.TLSCredentials != nil && rCfg.TLSCredentials.nonEmpty() -} - -// OpenCensusReceiverTLSServerCredentials retrieves the TLS credentials -// from this Config's OpenCensus receiver if any. -func (c *Config) OpenCensusReceiverTLSServerCredentials() *TLSCredentials { - if !c.openCensusReceiverEnabled() { - return nil - } - - ocrConfig := c.Receivers.OpenCensus - if !ocrConfig.HasTLSCredentials() { - return nil - } - return ocrConfig.TLSCredentials -} - -// ToOpenCensusReceiverServerOption checks if the TLS credentials -// in the form of a certificate file and a key file. If they aren't, -// it will return opencensusreceiver.WithNoopOption() and a nil error. -// Otherwise, it will try to retrieve gRPC transport credentials from the file combinations, -// and create a option, along with any errors encountered while retrieving the credentials. -func (tlsCreds *TLSCredentials) ToOpenCensusReceiverServerOption() (opt opencensusreceiver.Option, ok bool, err error) { - if tlsCreds == nil { - return opencensusreceiver.WithNoopOption(), false, nil - } - - transportCreds, err := credentials.NewServerTLSFromFile(tlsCreds.CertFile, tlsCreds.KeyFile) - if err != nil { - return nil, false, err - } - gRPCCredsOpt := grpc.Creds(transportCreds) - return opencensusreceiver.WithGRPCServerOptions(gRPCCredsOpt), true, nil -} - -// OpenCensusReceiverTLSCredentialsServerOption checks if the OpenCensus receiver's Configuration -// has TLS credentials in the form of a certificate file and a key file. If it doesn't -// have any, it will return opencensusreceiver.WithNoopOption() and a nil error. -// Otherwise, it will try to retrieve gRPC transport credentials from the file combinations, -// and create a option, along with any errors encountered while retrieving the credentials. -func (c *Config) OpenCensusReceiverTLSCredentialsServerOption() (opt opencensusreceiver.Option, ok bool, err error) { - tlsCreds := c.OpenCensusReceiverTLSServerCredentials() - return tlsCreds.ToOpenCensusReceiverServerOption() -} - -// VMMetricsReceiverEnabled returns true if Config is non-nil. -func (c *Config) VMMetricsReceiverEnabled() bool { - if c == nil { - return false - } - return c.Receivers != nil && c.Receivers.VMMetrics != nil -} - -// CheckLogicalConflicts serves to catch logical errors such as -// if the Zipkin receiver port conflicts with that of the exporter, -// lest we'll have a self DOS because spans will be exported "out" from -// the exporter, yet be received from the receiver, then sent back out -// and back in a never ending loop. -func (c *Config) CheckLogicalConflicts() error { - if c.Exporters == nil || c.Exporters.Zipkin == nil || !c.ZipkinReceiverEnabled() { - return nil - } - - zc := c.Exporters.Zipkin - - zExporterAddr := zc.EndpointURL() - zExporterURL, err := url.Parse(zExporterAddr) - if err != nil { - return fmt.Errorf("parsing ZipkinExporter address %q got error: %v", zExporterAddr, err) - } - - zReceiverHostPort := c.ZipkinReceiverAddress() - - zExporterHostPort := zExporterURL.Host - if zReceiverHostPort == zExporterHostPort { - return fmt.Errorf("ZipkinExporter address (%q) is the same as the receiver address (%q)", - zExporterHostPort, zReceiverHostPort) - } - zExpHost, zExpPort, _ := net.SplitHostPort(zExporterHostPort) - zReceiverHost, zReceiverPort, _ := net.SplitHostPort(zReceiverHostPort) - if eqHosts(zExpHost, zReceiverHost) && zExpPort == zReceiverPort { - return fmt.Errorf("ZipkinExporter address (%q) aka (%s on port %s)\nis the same as the receiver address (%q) aka (%s on port %s)", - zExporterHostPort, zExpHost, zExpPort, zReceiverHostPort, zReceiverHost, zReceiverPort) - } - - // Otherwise, now let's resolve the IPs and ensure that they aren't the same - zExpIPAddr, _ := net.ResolveIPAddr("ip", zExpHost) - zReceiverIPAddr, _ := net.ResolveIPAddr("ip", zReceiverHost) - if zExpIPAddr != nil && zReceiverIPAddr != nil && reflect.DeepEqual(zExpIPAddr, zReceiverIPAddr) && zExpPort == zReceiverPort { - return fmt.Errorf("ZipkinExporter address (%q) aka (%+v)\nis the same as the\nreceiver address (%q) aka (%+v)", - zExporterHostPort, zExpIPAddr, zReceiverHostPort, zReceiverIPAddr) - } - return nil -} - -func eqHosts(host1, host2 string) bool { - if host1 == host2 { - return true - } - return eqLocalHost(host1) && eqLocalHost(host2) -} - -func eqLocalHost(host string) bool { - switch strings.ToLower(host) { - case "", "localhost", "127.0.0.1": - return true - default: - return false - } -} - -// ExportersFromViperConfig uses the viper configuration payload to returns the respective exporters -// from: -// + zipkin -// + jaeger -// + opencensus -// + prometheus -func ExportersFromViperConfig(logger *zap.Logger, v *viper.Viper) ([]consumer.TraceConsumer, []consumer.MetricsConsumer, []func() error, error) { - parseFns := []struct { - name string - fn func(*viper.Viper) ([]consumer.TraceConsumer, []consumer.MetricsConsumer, []func() error, error) - }{ - {name: "zipkin", fn: zipkinexporter.ZipkinExportersFromViper}, - {name: "jaeger", fn: jaegerexporter.JaegerExportersFromViper}, - {name: "opencensus", fn: opencensusexporter.OpenCensusTraceExportersFromViper}, - {name: "prometheus", fn: prometheusexporter.PrometheusExportersFromViper}, - } - - var traceExporters []consumer.TraceConsumer - var metricsExporters []consumer.MetricsConsumer - var doneFns []func() error - exportersViper := v.Sub("exporters") - if exportersViper == nil { - return nil, nil, nil, nil - } - for _, cfg := range parseFns { - tes, mes, tesDoneFns, err := cfg.fn(exportersViper) - if err != nil { - err = fmt.Errorf("failed to create config for %q: %v", cfg.name, err) - return nil, nil, nil, err - } - - for _, te := range tes { - if te != nil { - traceExporters = append(traceExporters, te) - logger.Info("Trace Exporter enabled", zap.String("exporter", cfg.name)) - } - } - - for _, me := range mes { - if me != nil { - metricsExporters = append(metricsExporters, me) - logger.Info("Metrics Exporter enabled", zap.String("exporter", cfg.name)) - } - } - - for _, doneFn := range tesDoneFns { - if doneFn != nil { - doneFns = append(doneFns, doneFn) - } - } - } - return traceExporters, metricsExporters, doneFns, nil -} diff --git a/internal/config/config_test.go b/internal/config/config_test.go deleted file mode 100644 index 27916e44851..00000000000 --- a/internal/config/config_test.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config_test - -import ( - "testing" - - "github.com/spf13/viper" - - "github.com/open-telemetry/opentelemetry-service/exporter/zipkinexporter" - "github.com/open-telemetry/opentelemetry-service/internal/config" - "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" -) - -// Issue #233: Zipkin receiver and exporter loopback detection -// would mistakenly report that "localhost:9410" and "localhost:9411" -// were equal, due to a mistake in parsing out their addresses, -// but also after IP resolution, the equivalence of ports was not being -// checked. -func TestZipkinReceiverExporterLogicalConflictChecks(t *testing.T) { - regressionYAML := []byte(` -receivers: - zipkin: - address: "localhost:9410" - -exporters: - zipkin: - endpoint: "http://localhost:9411/api/v2/spans" -`) - - v := viper.New() - err := viperutils.LoadYAMLBytes(v, regressionYAML) - if err != nil { - t.Fatalf("Unexpected YAML parse error: %v", err) - } - var cfg config.Config - err = v.Unmarshal(&cfg) - if err != nil { - t.Fatalf("Unexpected error unmarshaling viper: %s", err) - } - if err := cfg.CheckLogicalConflicts(); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - if g, w := cfg.Receivers.Zipkin.Address, "localhost:9410"; g != w { - t.Errorf("Receivers.Zipkin.EndpointURL mismatch\nGot: %s\nWant:%s", g, w) - } - - var ecfg struct { - Exporters *struct { - Zipkin *zipkinexporter.ZipkinConfig `mapstructure:"zipkin"` - } `mapstructure:"exporters"` - } - _ = v.Unmarshal(&ecfg) - if g, w := ecfg.Exporters.Zipkin.EndpointURL(), "http://localhost:9411/api/v2/spans"; g != w { - t.Errorf("Exporters.Zipkin.EndpointURL mismatch\nGot: %s\nWant:%s", g, w) - } -} - -// Issue #377: If Config.OpenCensus == nil, invoking -// CanRunOpenCensus{Metrics, Trace}Receiver() would crash. -func TestOpenCensusTraceReceiverEnabledNoCrash(t *testing.T) { - // 1. Test with an in-code struct. - cfg := &config.Config{ - Receivers: &config.Receivers{ - OpenCensus: nil, - }, - } - if cfg.CanRunOpenCensusTraceReceiver() { - t.Fatal("CanRunOpenCensusTraceReceiver: Unexpected True for a nil Receiver.OpenCensus") - } - if cfg.CanRunOpenCensusMetricsReceiver() { - t.Fatal("CanRunOpenCensusMetricsReceiver: Unexpected True for a nil Receiver.OpenCensus") - } - - // 2. Test with a struct unmarshalled from a configuration file's YAML. - regressionYAML := []byte(` -receivers: - zipkin: - address: "localhost:9410"`) - - v := viper.New() - err := viperutils.LoadYAMLBytes(v, regressionYAML) - if err != nil { - t.Fatalf("Unexpected YAML parse error: %v", err) - } - err = v.Unmarshal(cfg) - if err != nil { - t.Fatalf("Unexpected error unmarshaling viper: %s", err) - } - - if cfg.CanRunOpenCensusTraceReceiver() { - t.Fatal("yaml.CanRunOpenCensusTraceReceiver: Unexpected True for a nil Receiver.OpenCensus") - } - if cfg.CanRunOpenCensusMetricsReceiver() { - t.Fatal("yaml.CanRunOpenCensusMetricsReceiver: Unexpected True for a nil Receiver.OpenCensus") - } -} diff --git a/internal/config/tls_credentials.go b/internal/config/tls_credentials.go deleted file mode 100644 index aaef8089048..00000000000 --- a/internal/config/tls_credentials.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -// TLSCredentials holds the fields for TLS credentials -// that are used for starting a server. -type TLSCredentials struct { - // CertFile is the file path containing the TLS certificate. - CertFile string `mapstructure:"cert_file"` - - // KeyFile is the file path containing the TLS key. - KeyFile string `mapstructure:"key_file"` -} - -// nonEmpty returns true if the TLSCredentials are non-nil and -// if either CertFile or KeyFile is non-empty. -func (tc *TLSCredentials) nonEmpty() bool { - return tc != nil && (tc.CertFile != "" || tc.KeyFile != "") -} diff --git a/internal/config/tls_credentials_test.go b/internal/config/tls_credentials_test.go deleted file mode 100644 index 3f2c95e43a4..00000000000 --- a/internal/config/tls_credentials_test.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "reflect" - "testing" - - "github.com/spf13/viper" - - "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" -) - -func TestTLSConfigByParsing(t *testing.T) { - configYAML := []byte(` -receivers: - opencensus: - tls_credentials: - cert_file: "foobar.crt" - key_file: "foobar.key" - `) - - v := viper.New() - err := viperutils.LoadYAMLBytes(v, configYAML) - if err != nil { - t.Fatalf("Unexpected YAML parse error: %v", err) - } - var cfg Config - err = v.Unmarshal(&cfg) - if err != nil { - t.Fatalf("Unexpected error unmarshaling viper: %s", err) - } - - tlsCreds := cfg.OpenCensusReceiverTLSServerCredentials() - if tlsCreds == nil { - t.Error("Surprisingly turned out nil TLS credentials") - } - - if !tlsCreds.nonEmpty() { - t.Error("nonEmpty returned false") - } - - want := &TLSCredentials{ - CertFile: "foobar.crt", - KeyFile: "foobar.key", - } - - if !reflect.DeepEqual(tlsCreds, want) { - t.Errorf("Got: %+v\nWant: %+v", cfg, want) - } -} - -func TestTLSConfigDereferencing(t *testing.T) { - var nilConfig *Config - if g := nilConfig.OpenCensusReceiverTLSServerCredentials(); g != nil { - t.Errorf("Retrieved non-nil TLSServerCredentials: %+v\n", g) - } - - if nilConfig.openCensusReceiverEnabled() { - t.Error("Somehow OpenCensus receiver is enabled on a nil Config") - } -} - -func TestTLSCredentials_nonEmptyChecks(t *testing.T) { - // TLSCredentials are considered "nonEmpty" if at least either - // of "cert_file" or "key_file" are non-empty. - combinations := []struct { - config string - want bool - }{ - {config: ``, want: false}, - { - config: ` -receivers: - opencensus: - tls_credentials: - cert_file: "foo" - `, want: true, - }, - { - config: ` -receivers: - opencensus: - tls_credentials: - key_file: "foo" - `, want: true, - }, - { - config: ` -receivers: - opencensus: - tls_credentials: - key_file: "" - cert_file: "" - `, want: false, - }, - } - - for i, tt := range combinations { - v := viper.New() - err := viperutils.LoadYAMLBytes(v, []byte(tt.config)) - if err != nil { - t.Fatalf("#%d: Unexpected YAML parse error: %v", i, err) - } - var cfg Config - err = v.Unmarshal(&cfg) - if err != nil { - t.Fatalf("#%d: Unexpected error unmarshaling viper: %s", i, err) - } - tlsCreds := cfg.OpenCensusReceiverTLSServerCredentials() - got, want := tlsCreds.nonEmpty(), tt.want - if got != want { - t.Errorf("#%d: got=%t want=%t\nConfig:\n%s", i, got, want, tt.config) - } - } -} diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index b374a20e7f4..f36d65bb5de 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -581,9 +581,3 @@ func zipkinTagsToTraceAttributes(tags map[string]string) *tracepb.Span_Attribute } return &tracepb.Span_Attributes{AttributeMap: amap} } - -func setIfNonEmpty(key, value string, dest map[string]string) { - if value != "" { - dest[key] = value - } -} diff --git a/service/builder/builder.go b/service/builder/builder.go new file mode 100644 index 00000000000..52af877c241 --- /dev/null +++ b/service/builder/builder.go @@ -0,0 +1,45 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "flag" + "fmt" + "github.com/spf13/viper" +) + +const ( + // flags + configCfg = "config" + memBallastFlag = "mem-ballast-size-mib" +) + +// Flags adds flags related to basic building of the collector application to the given flagset. +func Flags(flags *flag.FlagSet) { + flags.String(configCfg, "", "Path to the config file") + flags.Uint(memBallastFlag, 0, + fmt.Sprintf("Flag to specify size of memory (MiB) ballast to set. Ballast is not used when this is not specified. "+ + "default settings: 0")) +} + +// GetConfigFile gets the config file from the config file flag. +func GetConfigFile(v *viper.Viper) string { + return v.GetString(configCfg) +} + +// MemBallastSize returns the size of memory ballast to use in MBs +func MemBallastSize(v *viper.Viper) int { + return v.GetInt(memBallastFlag) +} diff --git a/cmd/occollector/app/builder/builder_test.go b/service/builder/builder_test.go similarity index 58% rename from cmd/occollector/app/builder/builder_test.go rename to service/builder/builder_test.go index 9a23daa2b6e..8135f3ac507 100644 --- a/cmd/occollector/app/builder/builder_test.go +++ b/service/builder/builder_test.go @@ -24,64 +24,6 @@ import ( "github.com/spf13/viper" ) -func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { - v, err := loadViperFromFile("./testdata/receivers_enabled.yaml") - if err != nil { - t.Fatalf("Failed to load viper from test file: %v", err) - } - - jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled := - JaegerReceiverEnabled(v), OpenCensusReceiverEnabled(v), ZipkinReceiverEnabled(v), ZipkinScribeReceiverEnabled(v) - if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled || !scribeEnabled { - t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v scribe:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled) - } - - wj := NewDefaultJaegerReceiverCfg() - gj, err := wj.InitFromViper(v) - if err != nil { - t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err) - } else if !reflect.DeepEqual(wj, gj) { - t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj) - } - - woc := NewDefaultOpenCensusReceiverCfg() - goc, err := woc.InitFromViper(v) - if err != nil { - t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err) - } else if !reflect.DeepEqual(woc, goc) { - t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc) - } - - wz := NewDefaultZipkinReceiverCfg() - gz, err := wz.InitFromViper(v) - if err != nil { - t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err) - } else if !reflect.DeepEqual(wz, gz) { - t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz) - } - - wscrb := NewDefaultZipkinScribeReceiverCfg() - gscrb, err := wscrb.InitFromViper(v) - if err != nil { - t.Errorf("Failed to InitFromViper for Zipkin Scribe receiver: %v", err) - } else if !reflect.DeepEqual(wscrb, gscrb) { - t.Errorf("Incorrect config for Zipkin Scribe receiver, want %v got %v", wscrb, gscrb) - } -} - -func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) { - v, err := loadViperFromFile("./testdata/receivers_disabled.yaml") - if err != nil { - t.Fatalf("Failed to load viper from test file: %v", err) - } - - jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled := - JaegerReceiverEnabled(v), OpenCensusReceiverEnabled(v), ZipkinReceiverEnabled(v), ZipkinScribeReceiverEnabled(v) - if jaegerEnabled || opencensusEnabled || zipkinEnabled { - t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v scribe:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled) - } -} - func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) { v, err := loadViperFromFile("./testdata/queued_exporters.yaml") if err != nil { @@ -212,36 +154,6 @@ func TestTailSamplingConfig(t *testing.T) { } } -func TestOpencensusReceiverKeepaliveSettings(t *testing.T) { - v, err := loadViperFromFile("./testdata/oc_keepalive_config.yaml") - if err != nil { - t.Fatalf("Failed to load viper from test file: %v", err) - } - - wCfg := NewDefaultOpenCensusReceiverCfg() - wCfg.Keepalive = &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - Time: 30 * time.Second, - Timeout: 5 * time.Second, - }, - EnforcementPolicy: &keepaliveEnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, - }, - } - - gCfg, err := NewDefaultOpenCensusReceiverCfg().InitFromViper(v) - if err != nil { - t.Fatalf("got '%v', want nil", err) - } - if !reflect.DeepEqual(*gCfg.Keepalive.ServerParameters, *wCfg.Keepalive.ServerParameters) { - t.Fatalf("Wanted ServerParameters %+v but got %+v", *wCfg.Keepalive.ServerParameters, *gCfg.Keepalive.ServerParameters) - } - if !reflect.DeepEqual(*gCfg.Keepalive.EnforcementPolicy, *wCfg.Keepalive.EnforcementPolicy) { - t.Fatalf("Wanted EnforcementPolicy %+v but got %+v", *wCfg.Keepalive.EnforcementPolicy, *gCfg.Keepalive.EnforcementPolicy) - } -} - func loadViperFromFile(file string) (*viper.Viper, error) { v := viper.New() v.SetConfigFile(file) diff --git a/cmd/occollector/app/builder/doc.go b/service/builder/doc.go similarity index 100% rename from cmd/occollector/app/builder/doc.go rename to service/builder/doc.go diff --git a/cmd/occollector/app/builder/exporters_builder.go b/service/builder/exporters_builder.go similarity index 100% rename from cmd/occollector/app/builder/exporters_builder.go rename to service/builder/exporters_builder.go diff --git a/cmd/occollector/app/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go similarity index 100% rename from cmd/occollector/app/builder/exporters_builder_test.go rename to service/builder/exporters_builder_test.go diff --git a/cmd/occollector/app/builder/pipelines_builder.go b/service/builder/pipelines_builder.go similarity index 100% rename from cmd/occollector/app/builder/pipelines_builder.go rename to service/builder/pipelines_builder.go diff --git a/cmd/occollector/app/builder/pipelines_builder_test.go b/service/builder/pipelines_builder_test.go similarity index 100% rename from cmd/occollector/app/builder/pipelines_builder_test.go rename to service/builder/pipelines_builder_test.go diff --git a/cmd/occollector/app/builder/processor_builder.go b/service/builder/processor_builder.go similarity index 100% rename from cmd/occollector/app/builder/processor_builder.go rename to service/builder/processor_builder.go diff --git a/cmd/occollector/app/builder/processor_builder_test.go b/service/builder/processor_builder_test.go similarity index 100% rename from cmd/occollector/app/builder/processor_builder_test.go rename to service/builder/processor_builder_test.go diff --git a/cmd/occollector/app/builder/receivers_builder.go b/service/builder/receivers_builder.go similarity index 100% rename from cmd/occollector/app/builder/receivers_builder.go rename to service/builder/receivers_builder.go diff --git a/cmd/occollector/app/builder/receivers_builder_test.go b/service/builder/receivers_builder_test.go similarity index 100% rename from cmd/occollector/app/builder/receivers_builder_test.go rename to service/builder/receivers_builder_test.go diff --git a/cmd/occollector/app/builder/sampling_builder.go b/service/builder/sampling_builder.go similarity index 100% rename from cmd/occollector/app/builder/sampling_builder.go rename to service/builder/sampling_builder.go diff --git a/cmd/occollector/app/builder/testdata/global_attributes_all.yaml b/service/builder/testdata/global_attributes_all.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/global_attributes_all.yaml rename to service/builder/testdata/global_attributes_all.yaml diff --git a/cmd/occollector/app/builder/testdata/global_attributes_key_mapping.yaml b/service/builder/testdata/global_attributes_key_mapping.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/global_attributes_key_mapping.yaml rename to service/builder/testdata/global_attributes_key_mapping.yaml diff --git a/cmd/occollector/app/builder/testdata/oc_keepalive_config.yaml b/service/builder/testdata/oc_keepalive_config.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/oc_keepalive_config.yaml rename to service/builder/testdata/oc_keepalive_config.yaml diff --git a/cmd/occollector/app/builder/testdata/pipelines_builder.yaml b/service/builder/testdata/pipelines_builder.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/pipelines_builder.yaml rename to service/builder/testdata/pipelines_builder.yaml diff --git a/cmd/occollector/app/builder/testdata/queued_exporters.yaml b/service/builder/testdata/queued_exporters.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/queued_exporters.yaml rename to service/builder/testdata/queued_exporters.yaml diff --git a/cmd/occollector/app/builder/testdata/receivers_disabled.yaml b/service/builder/testdata/receivers_disabled.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/receivers_disabled.yaml rename to service/builder/testdata/receivers_disabled.yaml diff --git a/cmd/occollector/app/builder/testdata/receivers_enabled.yaml b/service/builder/testdata/receivers_enabled.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/receivers_enabled.yaml rename to service/builder/testdata/receivers_enabled.yaml diff --git a/cmd/occollector/app/builder/testdata/sampling_config.yaml b/service/builder/testdata/sampling_config.yaml similarity index 100% rename from cmd/occollector/app/builder/testdata/sampling_config.yaml rename to service/builder/testdata/sampling_config.yaml diff --git a/service/factories_registration.go b/service/factories_registration.go new file mode 100644 index 00000000000..56ed6aad4ac --- /dev/null +++ b/service/factories_registration.go @@ -0,0 +1,32 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + // This is a temporary workaround to register all factories that are already + // implemented. This will be removed and factories will be directly registered + // via code. + _ "github.com/open-telemetry/opentelemetry-service/exporter/loggingexporter" + _ "github.com/open-telemetry/opentelemetry-service/exporter/opencensusexporter" + _ "github.com/open-telemetry/opentelemetry-service/exporter/prometheusexporter" + _ "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/nodebatcher" + _ "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/queued" + _ "github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor" + _ "github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver" + _ "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" + _ "github.com/open-telemetry/opentelemetry-service/receiver/prometheusreceiver" + _ "github.com/open-telemetry/opentelemetry-service/receiver/vmmetricsreceiver" + _ "github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver" +) diff --git a/cmd/occollector/app/collector/healthcheck.go b/service/healthcheck.go similarity index 98% rename from cmd/occollector/app/collector/healthcheck.go rename to service/healthcheck.go index 5b77fde0bc4..0031b9e5ab4 100644 --- a/cmd/occollector/app/collector/healthcheck.go +++ b/service/healthcheck.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package collector +package service import ( "flag" diff --git a/cmd/occollector/app/collector/logger.go b/service/logger.go similarity index 98% rename from cmd/occollector/app/collector/logger.go rename to service/logger.go index ae1ee93a192..184f02149bb 100644 --- a/cmd/occollector/app/collector/logger.go +++ b/service/logger.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package collector +package service import ( "flag" diff --git a/cmd/occollector/app/collector/collector.go b/service/service.go similarity index 83% rename from cmd/occollector/app/collector/collector.go rename to service/service.go index a3a7597d363..4089673c486 100644 --- a/cmd/occollector/app/collector/collector.go +++ b/service/service.go @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package collector handles the command-line, configuration, and runs the OC collector. -package collector +// Package service handles the command-line, configuration, and runs the +// OpenTelemetry Service. +package service import ( "context" @@ -28,13 +29,13 @@ import ( "github.com/spf13/viper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder" "github.com/open-telemetry/opentelemetry-service/config" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" "github.com/open-telemetry/opentelemetry-service/internal/pprofserver" "github.com/open-telemetry/opentelemetry-service/internal/zpagesserver" "github.com/open-telemetry/opentelemetry-service/receiver" + "github.com/open-telemetry/opentelemetry-service/service/builder" ) var ( @@ -193,65 +194,6 @@ func (app *Application) shutdownClosableComponents() { } } -func (app *Application) execute() { - app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU())) - - // Set memory ballast - ballast, ballastSizeBytes := app.createMemoryBallast() - - app.asyncErrorChannel = make(chan error) - - // Setup everything. - app.setupPProf() - app.setupHealthCheck() - app.processor, app.closeFns = startProcessor(app.v, app.logger) - app.setupZPages() - app.receivers = createReceivers(app.v, app.logger, app.processor, app) - app.setupTelemetry(ballastSizeBytes) - - // Everything is ready, now run until an event requiring shutdown happens. - app.runAndWaitForShutdownEvent() - - // Begin shutdown sequence. - runtime.KeepAlive(ballast) - app.healthCheck.Set(healthcheck.Unavailable) - app.logger.Info("Starting shutdown...") - - // TODO: orderly shutdown: first receivers, then flushing pipelines giving - // senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - app.shutdownReceivers() - - app.shutdownClosableComponents() - - AppTelemetry.shutdown() - - app.logger.Info("Shutdown complete.") -} - -// Start the application according to the command and configuration given -// by the user. -func (app *Application) Start() error { - rootCmd := &cobra.Command{ - Use: "occollector", - Long: "OpenCensus Collector", - Run: func(cmd *cobra.Command, args []string) { - app.init() - app.execute() - }, - } - viperutils.AddFlags(app.v, rootCmd, - telemetryFlags, - builder.Flags, - healthCheckFlags, - loggerFlags, - pprofserver.AddFlags, - zpagesserver.AddFlags, - ) - - return rootCmd.Execute() -} - func (app *Application) setupPipelines() { app.logger.Info("Loading configuration...") diff --git a/cmd/occollector/app/collector/collector_test.go b/service/service_test.go similarity index 73% rename from cmd/occollector/app/collector/collector_test.go rename to service/service_test.go index 77d732fe5f5..94e816ce9dc 100644 --- a/cmd/occollector/app/collector/collector_test.go +++ b/service/service_test.go @@ -13,7 +13,7 @@ // limitations under the License. // Package collector handles the command-line, configuration, and runs the OC collector. -package collector +package service import ( "net" @@ -25,43 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-service/internal/zpagesserver" ) -func TestApplication_Start(t *testing.T) { - App = newApp() - - portArg := []string{ - healthCheckHTTPPort, // Keep it as first since its address is used later. - zpagesserver.ZPagesHTTPPort, - "metrics-port", - "receivers.opencensus.port", - } - addresses := getMultipleAvailableLocalAddresses(t, uint(len(portArg))) - for i, addr := range addresses { - _, port, err := net.SplitHostPort(addr) - if err != nil { - t.Fatalf("failed to split host and port from %q: %v", addr, err) - } - App.v.Set(portArg[i], port) - } - - // Without exporters the collector will start and just shutdown, no error is expected. - App.v.Set("logging-exporter", true) - - appDone := make(chan struct{}) - go func() { - defer close(appDone) - if err := App.Start(); err != nil { - t.Fatalf("App.Start() got %v, want nil", err) - } - }() - - <-App.readyChan - if !isAppAvailable(t, "http://"+addresses[0]) { - t.Fatalf("App didn't reach ready state") - } - close(App.stopTestChan) - <-appDone -} - func TestApplication_StartUnified(t *testing.T) { App = newApp() @@ -131,7 +94,3 @@ func getMultipleAvailableLocalAddresses(t *testing.T, numAddresses uint) []strin } return addresses } - -func mibToBytes(mib int) uint64 { - return uint64(mib) * 1024 * 1024 -} diff --git a/cmd/occollector/app/collector/telemetry.go b/service/telemetry.go similarity index 99% rename from cmd/occollector/app/collector/telemetry.go rename to service/telemetry.go index 9e8395b07cc..dfa3ff397de 100644 --- a/cmd/occollector/app/collector/telemetry.go +++ b/service/telemetry.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package collector +package service import ( "flag" diff --git a/cmd/occollector/app/collector/testdata/otelsvc-config.yaml b/service/testdata/otelsvc-config.yaml similarity index 100% rename from cmd/occollector/app/collector/testdata/otelsvc-config.yaml rename to service/testdata/otelsvc-config.yaml diff --git a/translator/trace/jaeger/jaegerthrift_to_protospan_test.go b/translator/trace/jaeger/jaegerthrift_to_protospan_test.go index e57ab0abec8..eac21277050 100644 --- a/translator/trace/jaeger/jaegerthrift_to_protospan_test.go +++ b/translator/trace/jaeger/jaegerthrift_to_protospan_test.go @@ -601,8 +601,3 @@ func TestHTTPToGRPCStatusCode(t *testing.T) { } } } - -func jsonify(v interface{}) []byte { - jb, _ := json.MarshalIndent(v, "", " ") - return jb -}