Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: remove old OC Collector code #149

Merged
merged 2 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ vet:
install-tools:
go install golang.org/x/lint/golint

.PHONY: collector
collector:
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/occollector_$(GOOS) $(BUILD_INFO) ./cmd/occollector

.PHONY: otelsvc
otelsvc:
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/otelsvc_$(GOOS) $(BUILD_INFO) ./cmd/otelsvc
Expand All @@ -113,7 +109,7 @@ docker-otelsvc:
COMPONENT=otelsvc $(MAKE) docker-component

.PHONY: binaries
binaries: collector otelsvc
binaries: otelsvc

.PHONY: binaries-all-sys
binaries-all-sys:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package collector handles the command-line, configuration, and runs the OC collector.
package collector
// Package application handles the command-line, configuration, and runs the
// OpenTelemetry Service.
package application
pjanotti marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder"
"github.com/open-telemetry/opentelemetry-service/application/builder"
"github.com/open-telemetry/opentelemetry-service/config"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/config/viperutils"
Expand Down Expand Up @@ -193,65 +194,6 @@ func (app *Application) shutdownClosableComponents() {
}
}

func (app *Application) execute() {
app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU()))

// Set memory ballast
ballast, ballastSizeBytes := app.createMemoryBallast()

app.asyncErrorChannel = make(chan error)

// Setup everything.
app.setupPProf()
app.setupHealthCheck()
app.processor, app.closeFns = startProcessor(app.v, app.logger)
app.setupZPages()
app.receivers = createReceivers(app.v, app.logger, app.processor, app)
app.setupTelemetry(ballastSizeBytes)

// Everything is ready, now run until an event requiring shutdown happens.
app.runAndWaitForShutdownEvent()

// Begin shutdown sequence.
runtime.KeepAlive(ballast)
app.healthCheck.Set(healthcheck.Unavailable)
app.logger.Info("Starting shutdown...")

// TODO: orderly shutdown: first receivers, then flushing pipelines giving
// senders a chance to send all their data. This may take time, the allowed
// time should be part of configuration.
app.shutdownReceivers()

app.shutdownClosableComponents()

AppTelemetry.shutdown()

app.logger.Info("Shutdown complete.")
}

// Start the application according to the command and configuration given
// by the user.
func (app *Application) Start() error {
rootCmd := &cobra.Command{
Use: "occollector",
Long: "OpenCensus Collector",
Run: func(cmd *cobra.Command, args []string) {
app.init()
app.execute()
},
}
viperutils.AddFlags(app.v, rootCmd,
telemetryFlags,
builder.Flags,
healthCheckFlags,
loggerFlags,
pprofserver.AddFlags,
zpagesserver.AddFlags,
)

return rootCmd.Execute()
}

func (app *Application) setupPipelines() {
app.logger.Info("Loading configuration...")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

// Package collector handles the command-line, configuration, and runs the OC collector.
package collector
package application

import (
"net"
Expand All @@ -25,43 +25,6 @@ import (
"github.com/open-telemetry/opentelemetry-service/internal/zpagesserver"
)

func TestApplication_Start(t *testing.T) {
App = newApp()

portArg := []string{
healthCheckHTTPPort, // Keep it as first since its address is used later.
zpagesserver.ZPagesHTTPPort,
"metrics-port",
"receivers.opencensus.port",
}
addresses := getMultipleAvailableLocalAddresses(t, uint(len(portArg)))
for i, addr := range addresses {
_, port, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("failed to split host and port from %q: %v", addr, err)
}
App.v.Set(portArg[i], port)
}

// Without exporters the collector will start and just shutdown, no error is expected.
App.v.Set("logging-exporter", true)

appDone := make(chan struct{})
go func() {
defer close(appDone)
if err := App.Start(); err != nil {
t.Fatalf("App.Start() got %v, want nil", err)
}
}()

<-App.readyChan
if !isAppAvailable(t, "http://"+addresses[0]) {
t.Fatalf("App didn't reach ready state")
}
close(App.stopTestChan)
<-appDone
}

func TestApplication_StartUnified(t *testing.T) {

App = newApp()
Expand Down Expand Up @@ -131,7 +94,3 @@ func getMultipleAvailableLocalAddresses(t *testing.T, numAddresses uint) []strin
}
return addresses
}

func mibToBytes(mib int) uint64 {
return uint64(mib) * 1024 * 1024
}
45 changes: 45 additions & 0 deletions application/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"flag"
"fmt"
"github.com/spf13/viper"
)

const (
// flags
configCfg = "config"
memBallastFlag = "mem-ballast-size-mib"
)

// Flags adds flags related to basic building of the collector application to the given flagset.
func Flags(flags *flag.FlagSet) {
flags.String(configCfg, "", "Path to the config file")
flags.Uint(memBallastFlag, 0,
fmt.Sprintf("Flag to specify size of memory (MiB) ballast to set. Ballast is not used when this is not specified. "+
"default settings: 0"))
}

// GetConfigFile gets the config file from the config file flag.
func GetConfigFile(v *viper.Viper) string {
return v.GetString(configCfg)
}

// MemBallastSize returns the size of memory ballast to use in MBs
func MemBallastSize(v *viper.Viper) int {
return v.GetInt(memBallastFlag)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,6 @@ import (
"github.com/spf13/viper"
)

func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/receivers_enabled.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled :=
JaegerReceiverEnabled(v), OpenCensusReceiverEnabled(v), ZipkinReceiverEnabled(v), ZipkinScribeReceiverEnabled(v)
if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled || !scribeEnabled {
t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v scribe:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled)
}

wj := NewDefaultJaegerReceiverCfg()
gj, err := wj.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err)
} else if !reflect.DeepEqual(wj, gj) {
t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj)
}

woc := NewDefaultOpenCensusReceiverCfg()
goc, err := woc.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err)
} else if !reflect.DeepEqual(woc, goc) {
t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc)
}

wz := NewDefaultZipkinReceiverCfg()
gz, err := wz.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err)
} else if !reflect.DeepEqual(wz, gz) {
t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz)
}

wscrb := NewDefaultZipkinScribeReceiverCfg()
gscrb, err := wscrb.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for Zipkin Scribe receiver: %v", err)
} else if !reflect.DeepEqual(wscrb, gscrb) {
t.Errorf("Incorrect config for Zipkin Scribe receiver, want %v got %v", wscrb, gscrb)
}
}

func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/receivers_disabled.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled :=
JaegerReceiverEnabled(v), OpenCensusReceiverEnabled(v), ZipkinReceiverEnabled(v), ZipkinScribeReceiverEnabled(v)
if jaegerEnabled || opencensusEnabled || zipkinEnabled {
t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v scribe:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled, scribeEnabled)
}
}

func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
v, err := loadViperFromFile("./testdata/queued_exporters.yaml")
if err != nil {
Expand Down Expand Up @@ -212,36 +154,6 @@ func TestTailSamplingConfig(t *testing.T) {
}
}

func TestOpencensusReceiverKeepaliveSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/oc_keepalive_config.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

wCfg := NewDefaultOpenCensusReceiverCfg()
wCfg.Keepalive = &serverParametersAndEnforcementPolicy{
ServerParameters: &keepaliveServerParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &keepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
},
}

gCfg, err := NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
if err != nil {
t.Fatalf("got '%v', want nil", err)
}
if !reflect.DeepEqual(*gCfg.Keepalive.ServerParameters, *wCfg.Keepalive.ServerParameters) {
t.Fatalf("Wanted ServerParameters %+v but got %+v", *wCfg.Keepalive.ServerParameters, *gCfg.Keepalive.ServerParameters)
}
if !reflect.DeepEqual(*gCfg.Keepalive.EnforcementPolicy, *wCfg.Keepalive.EnforcementPolicy) {
t.Fatalf("Wanted EnforcementPolicy %+v but got %+v", *wCfg.Keepalive.EnforcementPolicy, *gCfg.Keepalive.EnforcementPolicy)
}
}

func loadViperFromFile(file string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(file)
Expand Down
File renamed without changes.
32 changes: 32 additions & 0 deletions application/factories_registration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package application

import (
// This is a temporary workaround to register all factories that are already
// implemented. This will be removed and factories will be directly registered
// via code.
_ "github.com/open-telemetry/opentelemetry-service/exporter/loggingexporter"
_ "github.com/open-telemetry/opentelemetry-service/exporter/opencensusexporter"
_ "github.com/open-telemetry/opentelemetry-service/exporter/prometheusexporter"
_ "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/nodebatcher"
_ "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/queued"
_ "github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor"
_ "github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver"
_ "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
_ "github.com/open-telemetry/opentelemetry-service/receiver/prometheusreceiver"
_ "github.com/open-telemetry/opentelemetry-service/receiver/vmmetricsreceiver"
_ "github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver"
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package collector
package application

import (
"flag"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package collector
package application

import (
"flag"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package collector
package application

import (
"flag"
Expand Down
8 changes: 0 additions & 8 deletions cmd/occollector/Dockerfile

This file was deleted.

Loading