Skip to content

Commit

Permalink
spanreceiver.SpanReceiver->spansink.Sink, metricsreceiver.MetricsRece…
Browse files Browse the repository at this point in the history
…iver->metricsink.Sink

* spanreceiver.SpanReceiver -> spansink.Sink
* metricsreceiver.MetricsReceiver -> metricsink.Sink

Fixes census-instrumentation#171
Follow-up of census-instrumentation#166
  • Loading branch information
odeke-em committed Nov 7, 2018
1 parent 48c14c7 commit 20d36a9
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 67 deletions.
12 changes: 6 additions & 6 deletions cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver/octrace"
"github.com/census-instrumentation/opencensus-service/receiver/zipkin"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"go.opencensus.io/zpages"
Expand Down Expand Up @@ -68,10 +68,10 @@ func runOCAgent() {
ocReceiverAddr := agentConfig.ocReceiverAddress()

traceExporters, closeFns := exportersFromYAMLConfig(yamlBlob)
commonSpanReceiver := exporter.MultiTraceExporters(traceExporters...)
commonSpanSink := exporter.MultiTraceExporters(traceExporters...)

// Add other receivers here as they are implemented
ocReceiverDoneFn, err := runOCReceiver(ocReceiverAddr, commonSpanReceiver)
ocReceiverDoneFn, err := runOCReceiver(ocReceiverAddr, commonSpanSink)
if err != nil {
log.Fatal(err)
}
Expand All @@ -87,7 +87,7 @@ func runOCAgent() {
// If the Zipkin receiver is enabled, then run it
if agentConfig.zipkinReceiverEnabled() {
zipkinReceiverAddr := agentConfig.zipkinReceiverAddress()
zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanReceiver)
zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanSink)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func runZPages(port int) func() error {
return srv.Close
}

func runOCReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) {
func runOCReceiver(addr string, sr spansink.Sink) (doneFn func() error, err error) {
oci, err := octrace.New(sr, octrace.WithSpanBufferPeriod(800*time.Millisecond))
if err != nil {
return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err)
Expand Down Expand Up @@ -164,7 +164,7 @@ func runOCReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() err
return doneFn, nil
}

func runZipkinReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) {
func runZipkinReceiver(addr string, sr spansink.Sink) (doneFn func() error, err error) {
zi, err := zipkinreceiver.New(sr)
if err != nil {
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions cmd/occollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/receiver/octrace"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"

"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -87,7 +87,7 @@ func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, err
return nil, fmt.Errorf("Cannot bind tcp listener to address %q: %v", addr, err)
}

sr := &fakeSpanReceiver{
sr := &fakeSpanSink{
logger: logger,
}

Expand All @@ -111,12 +111,12 @@ func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, err
return closeFn, nil
}

type fakeSpanReceiver struct {
type fakeSpanSink struct {
logger *zap.Logger
}

func (sr *fakeSpanReceiver) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) {
ack := &spanreceiver.Acknowledgement{
func (sr *fakeSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) {
ack := &spansink.Acknowledgement{
SavedSpans: uint64(len(spans)),
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func runServer() (port uint16, closeFn func()) {
log.Fatalf("Failed to start Jaeger Trace exporter: %v", err)
}

// After each of the exporters have been created, create the common spanreceiver.
commonSpanReceiver := exporter.OCExportersToTraceExporter(sde, je)
// After each of the exporters have been created, create the common spansink.Sink.
commonSpanSink := exporter.OCExportersToTraceExporter(sde, je)

// Now run the octrace receiver which will receive traces from the client applications
// in the various languages instrumented with OpenCensus.
oci, err := octrace.New(commonSpanReceiver, octrace.WithSpanBufferPeriod(100*time.Millisecond))
oci, err := octrace.New(commonSpanSink, octrace.WithSpanBufferPeriod(100*time.Millisecond))
if err != nil {
log.Fatalf("Failed to create the OpenCensus receiver: %v", err)
}
Expand Down
24 changes: 12 additions & 12 deletions exporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
tracetranslator "github.com/census-instrumentation/opencensus-service/translator/trace"
)

Expand All @@ -37,24 +37,24 @@ type toOCExportersTransformer struct {
ocTraceExporters []trace.Exporter
}

// TraceExporterSpanReceiver is a interface connecting a spanreceiver.SpanReceiver and
// a exporter.TraceExporter. The receiver gets data in different serialization formats,
// TraceExporterSink is a interface connecting a spansink.Sink and
// an exporter.TraceExporter. The receiver gets data in different serialization formats,
// transforms it to OpenCensus in memory data and sends it to the exporter.
type TraceExporterSpanReceiver interface {
type TraceExporterSink interface {
TraceExporter
spanreceiver.SpanReceiver
spansink.Sink
}

// OCExportersToTraceExporter is a convenience function that transforms
// traditional OpenCensus trace.Exporter-s into an opencensus-service TraceExporter.
// The resulting TraceExporter ignores the passed in node. To make use of the node,
// please create a custom TraceExporter.
func OCExportersToTraceExporter(ocTraceExporters ...trace.Exporter) TraceExporterSpanReceiver {
func OCExportersToTraceExporter(ocTraceExporters ...trace.Exporter) TraceExporterSink {
return &toOCExportersTransformer{ocTraceExporters: ocTraceExporters}
}

var _ TraceExporter = (*toOCExportersTransformer)(nil)
var _ spanreceiver.SpanReceiver = (*toOCExportersTransformer)(nil)
var _ spansink.Sink = (*toOCExportersTransformer)(nil)

func (tse *toOCExportersTransformer) ExportSpanData(ctx context.Context, node *commonpb.Node, spanData ...*trace.SpanData) error {
for _, sd := range spanData {
Expand All @@ -65,7 +65,7 @@ func (tse *toOCExportersTransformer) ExportSpanData(ctx context.Context, node *c
return nil
}

func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) {
func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) {
// Firstly transform them into spanData.
spanDataList := make([]*trace.SpanData, 0, len(spans))
for _, span := range spans {
Expand All @@ -81,15 +81,15 @@ func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *com
// Now invoke ExportSpanData.
err := tse.ExportSpanData(ctx, node, spanDataList...)
nSaved := len(spanDataList)
ack := &spanreceiver.Acknowledgement{
ack := &spansink.Acknowledgement{
SavedSpans: uint64(nSaved),
DroppedSpans: uint64(len(spans) - nSaved),
}
return ack, err
}

// MultiTraceExporters wraps multiple trace exporters in a single one.
func MultiTraceExporters(tes ...TraceExporter) TraceExporterSpanReceiver {
func MultiTraceExporters(tes ...TraceExporter) TraceExporterSink {
return traceExporters(tes)
}

Expand All @@ -105,7 +105,7 @@ func (tes traceExporters) ExportSpanData(ctx context.Context, node *commonpb.Nod

// ReceiveSpans receives the span data in the protobuf format, translates it, and forwards the transformed
// span data to all trace exporters wrapped by the current one.
func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) {
func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) {
spanDataList := make([]*trace.SpanData, 0, len(spans))
for _, span := range spans {
spanData, _ := tracetranslator.ProtoSpanToOCSpanData(span)
Expand All @@ -116,7 +116,7 @@ func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node,

err := tes.ExportSpanData(ctx, node, spanDataList...)
nSaved := len(spanDataList)
ack := &spanreceiver.Acknowledgement{
ack := &spansink.Acknowledgement{
SavedSpans: uint64(nSaved),
DroppedSpans: uint64(len(spans) - nSaved),
}
Expand Down
6 changes: 3 additions & 3 deletions metricsreceiver/metricsreceiver.go → metricsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package metricsreceiver
package metricsink

import (
"context"
Expand All @@ -21,8 +21,8 @@ import (
metricpb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
)

// MetricsReceiver is an interface that receives metrics from a Node identifier.
type MetricsReceiver interface {
// Sink is an interface that receives metrics from a Node identifier.
type Sink interface {
ReceiveMetrics(ctx context.Context, node *commonpb.Node, metrics ...*metricpb.Metric) (*Acknowledgement, error)
}

Expand Down
12 changes: 6 additions & 6 deletions receiver/end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/receiver"
octrace "github.com/census-instrumentation/opencensus-service/receiver/octrace"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
)

func Example_endToEnd() {
Expand All @@ -44,7 +44,7 @@ func Example_endToEnd() {

// Once we have the span receiver which will connect to the
// various exporter pipeline i.e. *tracepb.Span->OpenCensus.SpanData
lsr := new(logSpanReceiver)
lsr := new(logSpanSink)
for _, tr := range trl {
if err := tr.StartTraceReception(context.Background(), lsr); err != nil {
log.Fatalf("Failed to start trace receiver: %v", err)
Expand Down Expand Up @@ -92,13 +92,13 @@ func Example_endToEnd() {
<-time.After(5 * time.Second)
}

type logSpanReceiver int
type logSpanSink int

var _ spanreceiver.SpanReceiver = (*logSpanReceiver)(nil)
var _ spansink.Sink = (*logSpanSink)(nil)

func (lsr *logSpanReceiver) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) {
func (lsr *logSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) {
spansBlob, _ := json.MarshalIndent(spans, " ", " ")
log.Printf("\n****\nNode: %#v\nSpans: %s\n****\n", node, spansBlob)

return &spanreceiver.Acknowledgement{SavedSpans: uint64(len(spans))}, nil
return &spansink.Acknowledgement{SavedSpans: uint64(len(spans))}, nil
}
2 changes: 1 addition & 1 deletion receiver/octrace/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
// limitations under the License.

// Package octrace is the logic for receiving OpenCensus trace protobuf defined spans from
// already instrumented applications and then passing them onto a spanreceiver instance.
// already instrumented applications and then passing them onto a spansink.Sink instance.
package octrace
8 changes: 4 additions & 4 deletions receiver/octrace/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
)

// Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct {
spanSink spanreceiver.SpanReceiver
spanSink spansink.Sink
spanBufferPeriod time.Duration
spanBufferCount int
}

// New creates a new opencensus.Receiver reference.
func New(sr spanreceiver.SpanReceiver, opts ...Option) (*Receiver, error) {
func New(sr spansink.Sink, opts ...Option) (*Receiver, error) {
if sr == nil {
return nil, errors.New("needs a non-nil spanReceiver")
return nil, errors.New("needs a non-nil spansink.Sink")
}
oci := &Receiver{spanSink: sr}
for _, opt := range opts {
Expand Down
10 changes: 5 additions & 5 deletions receiver/octrace/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver/octrace"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
"go.opencensus.io/trace"
"go.opencensus.io/trace/tracestate"
)
Expand Down Expand Up @@ -445,18 +445,18 @@ func newSpanAppender() *spanAppender {
return &spanAppender{spansPerNode: make(map[*commonpb.Node][]*tracepb.Span)}
}

var _ spanreceiver.SpanReceiver = (*spanAppender)(nil)
var _ spansink.Sink = (*spanAppender)(nil)

func (sa *spanAppender) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) {
func (sa *spanAppender) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) {
sa.Lock()
defer sa.Unlock()

sa.spansPerNode[node] = append(sa.spansPerNode[node], spans...)

return &spanreceiver.Acknowledgement{SavedSpans: uint64(len(spans))}, nil
return &spansink.Acknowledgement{SavedSpans: uint64(len(spans))}, nil
}

func ocReceiverOnGRPCServer(t *testing.T, sr spanreceiver.SpanReceiver, opts ...octrace.Option) (oci *octrace.Receiver, port int, done func()) {
func ocReceiverOnGRPCServer(t *testing.T, sr spansink.Sink, opts ...octrace.Option) (oci *octrace.Receiver, port int, done func()) {
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Failed to find an available address to run the gRPC server: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions receiver/octrace/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (sfd *spanBufferPeriod) WithReceiver(oci *Receiver) {

// WithSpanBufferPeriod is an option that allows one to configure
// the period that spans are buffered for before the Receiver
// sends them to its SpanReceiver.
// sends them to its spansink.Sink.
func WithSpanBufferPeriod(period time.Duration) Option {
return &spanBufferPeriod{period: period}
}
Expand All @@ -50,7 +50,7 @@ func (spc spanBufferCount) WithReceiver(oci *Receiver) {

// WithSpanBufferCount is an option that allows one to configure
// the number of spans that are buffered before the Receiver
// send them to its SpanReceiver.
// send them to its spansink.Sink.
func WithSpanBufferCount(count int) Option {
return spanBufferCount(count)
}
6 changes: 3 additions & 3 deletions receiver/octrace/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/spansink"
)

// NewTraceReceiver will create a handle that runs an OCReceiver at the provided port.
Expand Down Expand Up @@ -57,7 +57,7 @@ var _ receiver.TraceReceiver = (*ocReceiverHandler)(nil)
var errAlreadyStarted = errors.New("already started")

// StartTraceReception starts a gRPC server with an OpenCensus receiver running
func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spanreceiver.SpanReceiver) error {
func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spansink.Sink) error {
var err = errAlreadyStarted
ocih.startOnce.Do(func() {
err = ocih.startInternal(ctx, sr)
Expand All @@ -68,7 +68,7 @@ func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spanr

const defaultOCReceiverPort = 55678

func (ocih *ocReceiverHandler) startInternal(ctx context.Context, sr spanreceiver.SpanReceiver) error {
func (ocih *ocReceiverHandler) startInternal(ctx context.Context, sr spansink.Sink) error {
ocih.mu.Lock()
defer ocih.mu.Unlock()

Expand Down
12 changes: 6 additions & 6 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ package receiver
import (
"context"

"github.com/census-instrumentation/opencensus-service/metricsreceiver"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"github.com/census-instrumentation/opencensus-service/metricsink"
"github.com/census-instrumentation/opencensus-service/spansink"
)

// A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter.
// Its purpose is to translate data from the wild into trace proto accompanied
// by a *commonpb.Node to uniquely identify where that data comes from.
// TraceReceiver feeds a spanreceiver.SpanReceiver with data.
// TraceReceiver feeds a spansink.Sink with data.
//
// For example it could be Zipkin data source which translates
// Zipkin spans into *tracepb.Span-s.
Expand All @@ -34,18 +34,18 @@ import (
// StopTraceReception tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up.
type TraceReceiver interface {
StartTraceReception(ctx context.Context, destination spanreceiver.SpanReceiver) error
StartTraceReception(ctx context.Context, destination spansink.Sink) error
StopTraceReception(ctx context.Context) error
}

// A MetricsReceiver is an "arbitrary data"-to-"metric proto" converter.
// Its purpose is to translate data from the wild into metric proto accompanied
// by a *commonpb.Node to uniquely identify where that data comes from.
// MetricsReceiver feeds a metricsreceiver.MetricsReceiver with data.
// MetricsReceiver feeds a metricsink.Sink with data.
//
// For example it could be Prometheus data source which translates
// Prometheus metrics into *metricpb.Metric-s.
type MetricsReceiver interface {
StartMetricsReception(ctx context.Context, destination metricsreceiver.MetricsReceiver) error
StartMetricsReception(ctx context.Context, destination metricsink.Sink) error
StopMetricsReception(ctx context.Context) error
}
Loading

0 comments on commit 20d36a9

Please sign in to comment.