Skip to content

Commit

Permalink
Allow all exporters to be batched and queued (census-instrumentation#376
Browse files Browse the repository at this point in the history
)

* Add batching to all exporters

Testing Done: unit tests
  • Loading branch information
Steven Karis authored Feb 14, 2019
1 parent 375ef3e commit 2469ddc
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 176 deletions.
8 changes: 7 additions & 1 deletion cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/census-instrumentation/opencensus-service/exporter"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/internal/config"
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/receiver/jaeger"
"github.com/census-instrumentation/opencensus-service/receiver/opencensus"
Expand Down Expand Up @@ -87,7 +88,12 @@ func runOCAgent() {
log.Fatalf("Could not instantiate logger: %v", err)
}

traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(logger, yamlBlob)
// TODO(skaris): move the rest of the configs to use viper
v, err := viperutils.ViperFromYAMLBytes([]byte(yamlBlob))
if err != nil {
log.Fatalf("Config: failed to create viper from YAML: %v", err)
}
traceExporters, metricsExporters, closeFns, err := config.ExportersFromViperConfig(logger, v)
if err != nil {
log.Fatalf("Config: failed to create exporters from YAML: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/occollector/app/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
DiscoveryMinPeers: 7,
DiscoveryConnCheckTimeout: time.Second * 7,
}
fst.RawConfig = v.Sub(queuedExportersConfigKey).Sub("proc-tchannel")
snd := NewDefaultQueuedSpanProcessorCfg()
snd.Name = "proc-http"
snd.RetryOnFailure = false
Expand All @@ -108,6 +109,7 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
Headers: map[string]string{"x-header-key": "00000000-0000-0000-0000-000000000001"},
Timeout: time.Second * 5,
}
snd.RawConfig = v.Sub(queuedExportersConfigKey).Sub("proc-http")

wCfg := &MultiSpanProcessorCfg{
Processors: []*QueuedSpanProcessorCfg{fst, snd},
Expand Down
2 changes: 2 additions & 0 deletions cmd/occollector/app/builder/processor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type QueuedSpanProcessorCfg struct {
SenderConfig interface{}
// BatchingConfig sets config parameters related to batching
BatchingConfig BatchingConfig `mapstructure:"batching"`
RawConfig *viper.Viper
}

// AttributesCfg holds configuration for attributes that can be added to all spans
Expand Down Expand Up @@ -155,6 +156,7 @@ func (qOpts *QueuedSpanProcessorCfg) InitFromViper(v *viper.Viper) *QueuedSpanPr
}
qOpts.SenderConfig = thsOpts
}
qOpts.RawConfig = v
return qOpts
}

Expand Down
64 changes: 36 additions & 28 deletions cmd/occollector/app/collector/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package collector

import (
"fmt"
"io/ioutil"
"os"
"time"

Expand All @@ -38,18 +37,7 @@ import (

func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []exporter.TraceExporter, []exporter.MetricsExporter) {
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
cfg := builder.GetConfigFile(v)
if cfg == "" {
logger.Info("No config file, exporters can be only configured via the file.")
return nil, nil, nil
}

cfgBlob, err := ioutil.ReadFile(cfg)
if err != nil {
logger.Fatal("Cannot read config file for exporters", zap.Error(err))
}

traceExporters, metricsExporters, doneFns, err := config.ExportersFromYAMLConfig(logger, cfgBlob)
traceExporters, metricsExporters, doneFns, err := config.ExportersFromViperConfig(logger, v)
if err != nil {
logger.Fatal("Failed to create config for exporters", zap.Error(err))
}
Expand Down Expand Up @@ -101,9 +89,23 @@ func buildQueuedSpanProcessor(
sender.HTTPTimeout(thriftHTTPSenderOpts.Timeout),
)
}
doneFns, traceExporters, _ := createExporters(opts.RawConfig, logger)

if spanSender == nil {
logger.Fatal("Unrecognized sender type or no exporters configured", zap.String("SenderType", string(opts.SenderType)))
if spanSender == nil && len(traceExporters) == 0 {
if opts.SenderType != "" {
logger.Fatal("Unrecognized sender type", zap.String("SenderType", string(opts.SenderType)))
}
logger.Fatal("No senders or exporters configured.")
}

allSendersAndExporters := make([]processor.SpanProcessor, 0, 1+len(traceExporters))
if spanSender != nil {
allSendersAndExporters = append(allSendersAndExporters, spanSender)
}
for _, traceExporter := range traceExporters {
allSendersAndExporters = append(
allSendersAndExporters, processor.NewTraceExporterProcessor(traceExporter),
)
}

var batchingOptions []nodebatcher.Option
Expand Down Expand Up @@ -134,19 +136,25 @@ func buildQueuedSpanProcessor(
}
}

// build queued span processor with underlying sender
queuedSpanProcessor = queued.NewQueuedSpanProcessor(
spanSender,
queued.Options.WithLogger(logger),
queued.Options.WithName(opts.Name),
queued.Options.WithNumWorkers(opts.NumWorkers),
queued.Options.WithQueueSize(opts.QueueSize),
queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
queued.Options.WithBackoffDelay(opts.BackoffDelay),
queued.Options.WithBatching(opts.BatchingConfig.Enable),
queued.Options.WithBatchingOptions(batchingOptions...),
)
return nil, queuedSpanProcessor, nil
queuedProcessors := make([]processor.SpanProcessor, 0, len(allSendersAndExporters))
for _, senderOrExporter := range allSendersAndExporters {
// build queued span processor with underlying sender
queuedProcessors = append(
queuedProcessors,
queued.NewQueuedSpanProcessor(
senderOrExporter,
queued.Options.WithLogger(logger),
queued.Options.WithName(opts.Name),
queued.Options.WithNumWorkers(opts.NumWorkers),
queued.Options.WithQueueSize(opts.QueueSize),
queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
queued.Options.WithBackoffDelay(opts.BackoffDelay),
queued.Options.WithBatching(opts.BatchingConfig.Enable),
queued.Options.WithBatchingOptions(batchingOptions...),
),
)
}
return doneFns, processor.NewMultiSpanProcessor(queuedProcessors), nil
}

func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToSpanProcessor map[string]processor.SpanProcessor, v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor, error) {
Expand Down
5 changes: 5 additions & 0 deletions exporter/exporterparser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ queued-exporters:
collector-endpoint: "https://ingest.omnition.io"
headers: { "x-omnition-api-key": "00000000-0000-0000-0000-000000000001" }
timeout: 5s
# Non-sender exporters can now also be used by setting the exporters section in queued-exporters.
exporters:
opencensus:
endpoint: "127.0.0.1:55566"
compression: "gzip"
my-org-jaeger: # A second processor with its own configuration options
num-workers: 2
queue-size: 100
Expand Down
28 changes: 12 additions & 16 deletions exporter/exporterparser/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,46 @@ import (
"context"

datadog "github.com/DataDog/opencensus-go-exporter-datadog"
"github.com/spf13/viper"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter"
)

type datadogConfig struct {
// Namespace specifies the namespaces to which metric keys are appended.
Namespace string `yaml:"namespace,omitempty"`
Namespace string `mapstructure:"namespace,omitempty"`

// TraceAddr specifies the host[:port] address of the Datadog Trace Agent.
// It defaults to localhost:8126.
TraceAddr string `yaml:"trace_addr,omitempty"`
TraceAddr string `mapstructure:"trace_addr,omitempty"`

// MetricsAddr specifies the host[:port] address for DogStatsD. It defaults
// to localhost:8125.
MetricsAddr string `yaml:"metrics_addr,omitempty"`
MetricsAddr string `mapstructure:"metrics_addr,omitempty"`

// Tags specifies a set of global tags to attach to each metric.
Tags []string `yaml:"tags,omitempty"`
Tags []string `mapstructure:"tags,omitempty"`

EnableTracing bool `yaml:"enable_tracing,omitempty"`
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
EnableTracing bool `mapstructure:"enable_tracing,omitempty"`
EnableMetrics bool `mapstructure:"enable_metrics,omitempty"`
}

type datadogExporter struct {
exporter *datadog.Exporter
}

// DatadogTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// DatadogTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
// Datadog according to the configuration settings.
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
func DatadogTraceExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Datadog *datadogConfig `yaml:"datadog"`
} `yaml:"exporters"`
Datadog *datadogConfig `mapstructure:"datadog,omitempty"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
if err := v.Unmarshal(&cfg); err != nil {
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil, nil
}

dc := cfg.Exporters.Datadog
dc := cfg.Datadog
if dc == nil {
return nil, nil, nil, nil
}
Expand Down
11 changes: 1 addition & 10 deletions exporter/exporterparser/exparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

// Package exporterparser provides support for parsing and creating the
// respective exporters given a YAML configuration payload.
// respective exporters given a viper configuration.
// For now it currently only provides statically imported OpenCensus
// exporters like:
// * Stackdriver Tracing and Monitoring
Expand All @@ -23,10 +23,8 @@ package exporterparser

import (
"context"
"fmt"

"go.opencensus.io/trace"
yaml "gopkg.in/yaml.v2"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
Expand All @@ -53,10 +51,3 @@ func exportSpans(ctx context.Context, exporterName string, te trace.Exporter, td

return internal.CombineErrors(errs)
}

func yamlUnmarshal(yamlBlob []byte, dest interface{}) error {
if err := yaml.Unmarshal(yamlBlob, dest); err != nil {
return fmt.Errorf("Cannot YAML unmarshal data: %v", err)
}
return nil
}
27 changes: 12 additions & 15 deletions exporter/exporterparser/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,35 @@ package exporterparser
import (
"context"

"github.com/spf13/viper"
"go.opencensus.io/exporter/jaeger"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter"
"go.opencensus.io/exporter/jaeger"
)

// Slight modified version of go/src/go.opencensus.io/exporter/jaeger/jaeger.go
type jaegerConfig struct {
CollectorEndpoint string `yaml:"collector_endpoint,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
ServiceName string `yaml:"service_name,omitempty"`
CollectorEndpoint string `mapstructure:"collector_endpoint,omitempty"`
Username string `mapstructure:"username,omitempty"`
Password string `mapstructure:"password,omitempty"`
ServiceName string `mapstructure:"service_name,omitempty"`
}

type jaegerExporter struct {
exporter *jaeger.Exporter
}

// JaegerExportersFromYAML parses the yaml bytes and returns exporter.TraceExporters targeting
// JaegerExportersFromViper unmarshals the viper and returns exporter.TraceExporters targeting
// Jaeger according to the configuration settings.
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
func JaegerExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Jaeger *jaegerConfig `yaml:"jaeger"`
} `yaml:"exporters"`
Jaeger *jaegerConfig `mapstructure:"jaeger"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
if err := v.Unmarshal(&cfg); err != nil {
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil, nil
}
jc := cfg.Exporters.Jaeger
jc := cfg.Jaeger
if jc == nil {
return nil, nil, nil, nil
}
Expand Down
20 changes: 8 additions & 12 deletions exporter/exporterparser/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
"context"
"fmt"

"github.com/spf13/viper"
"github.com/yancl/opencensus-go-exporter-kafka"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter"
)

type kafkaConfig struct {
Brokers []string `yaml:"brokers,omitempty"`
Topic string `yaml:"topic,omitempty"`
Brokers []string `mapstructure:"brokers,omitempty"`
Topic string `mapstructure:"topic,omitempty"`
}

type kafkaExporter struct {
Expand All @@ -35,22 +36,17 @@ type kafkaExporter struct {

var _ exporter.TraceExporter = (*kafkaExporter)(nil)

// KafkaExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// KafkaExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
// Kafka according to the configuration settings.
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
func KafkaExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Kafka *kafkaConfig `yaml:"kafka"`
} `yaml:"exporters"`
Kafka *kafkaConfig `mapstructure:"kafka"`
}

if err := yamlUnmarshal(config, &cfg); err != nil {
if err := v.Unmarshal(&cfg); err != nil {
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil, nil
}
kc := cfg.Exporters.Kafka
kc := cfg.Kafka
if kc == nil {
return nil, nil, nil, nil
}
Expand Down
20 changes: 8 additions & 12 deletions exporter/exporterparser/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"contrib.go.opencensus.io/exporter/ocagent"
"github.com/spf13/viper"

agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
Expand All @@ -29,8 +30,8 @@ import (
)

type opencensusConfig struct {
Endpoint string `yaml:"endpoint,omitempty"`
Compression string `yaml:"compression,omitempty"`
Endpoint string `mapstructure:"endpoint,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
// TODO: add insecure, service name options.
}

Expand All @@ -40,21 +41,16 @@ type ocagentExporter struct {

var _ exporter.TraceExporter = (*ocagentExporter)(nil)

// OpenCensusTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// OpenCensusTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
// OpenCensus Agent/Collector according to the configuration settings.
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
func OpenCensusTraceExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
OpenCensus *opencensusConfig `yaml:"opencensus"`
} `yaml:"exporters"`
OpenCensus *opencensusConfig `mapstructure:"opencensus"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
if err := v.Unmarshal(&cfg); err != nil {
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil, nil
}
ocac := cfg.Exporters.OpenCensus
ocac := cfg.OpenCensus
if ocac == nil {
return nil, nil, nil, nil
}
Expand Down
Loading

0 comments on commit 2469ddc

Please sign in to comment.