From 20d36a9dd97fad2b0be2de85707f59c06a67380f Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 7 Nov 2018 15:20:21 -0800 Subject: [PATCH] spanreceiver.SpanReceiver->spansink.Sink, metricsreceiver.MetricsReceiver->metricsink.Sink * spanreceiver.SpanReceiver -> spansink.Sink * metricsreceiver.MetricsReceiver -> metricsink.Sink Fixes #171 Follow-up of #166 --- cmd/ocagent/main.go | 12 +++++----- cmd/occollector/main.go | 10 ++++---- exporter/example_test.go | 6 ++--- exporter/trace_exporter.go | 24 +++++++++---------- .../metricsreceiver.go => metricsink/sink.go | 6 ++--- receiver/end_to_end_test.go | 12 +++++----- receiver/octrace/doc.go | 2 +- receiver/octrace/opencensus.go | 8 +++---- receiver/octrace/opencensus_test.go | 10 ++++---- receiver/octrace/options.go | 4 ++-- receiver/octrace/trace_receiver.go | 6 ++--- receiver/receiver.go | 12 +++++----- receiver/zipkin/trace_receiver.go | 10 ++++---- receiver/zipkin/trace_receiver_test.go | 6 ++--- .../spanreceiver.go => spansink/spansink.go | 6 ++--- 15 files changed, 67 insertions(+), 67 deletions(-) rename metricsreceiver/metricsreceiver.go => metricsink/sink.go (88%) rename spanreceiver/spanreceiver.go => spansink/spansink.go (89%) diff --git a/cmd/ocagent/main.go b/cmd/ocagent/main.go index 1fae51c7..c43247f6 100644 --- a/cmd/ocagent/main.go +++ b/cmd/ocagent/main.go @@ -31,7 +31,7 @@ import ( "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/receiver/octrace" "github.com/census-instrumentation/opencensus-service/receiver/zipkin" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" "go.opencensus.io/zpages" @@ -68,10 +68,10 @@ func runOCAgent() { ocReceiverAddr := agentConfig.ocReceiverAddress() traceExporters, closeFns := exportersFromYAMLConfig(yamlBlob) - commonSpanReceiver := exporter.MultiTraceExporters(traceExporters...) + commonSpanSink := exporter.MultiTraceExporters(traceExporters...) // Add other receivers here as they are implemented - ocReceiverDoneFn, err := runOCReceiver(ocReceiverAddr, commonSpanReceiver) + ocReceiverDoneFn, err := runOCReceiver(ocReceiverAddr, commonSpanSink) if err != nil { log.Fatal(err) } @@ -87,7 +87,7 @@ func runOCAgent() { // If the Zipkin receiver is enabled, then run it if agentConfig.zipkinReceiverEnabled() { zipkinReceiverAddr := agentConfig.zipkinReceiverAddress() - zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanReceiver) + zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanSink) if err != nil { log.Fatal(err) } @@ -132,7 +132,7 @@ func runZPages(port int) func() error { return srv.Close } -func runOCReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) { +func runOCReceiver(addr string, sr spansink.Sink) (doneFn func() error, err error) { oci, err := octrace.New(sr, octrace.WithSpanBufferPeriod(800*time.Millisecond)) if err != nil { return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) @@ -164,7 +164,7 @@ func runOCReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() err return doneFn, nil } -func runZipkinReceiver(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) { +func runZipkinReceiver(addr string, sr spansink.Sink) (doneFn func() error, err error) { zi, err := zipkinreceiver.New(sr) if err != nil { return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err) diff --git a/cmd/occollector/main.go b/cmd/occollector/main.go index f6433293..84c2c7e1 100644 --- a/cmd/occollector/main.go +++ b/cmd/occollector/main.go @@ -33,7 +33,7 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/receiver/octrace" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" @@ -87,7 +87,7 @@ func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, err return nil, fmt.Errorf("Cannot bind tcp listener to address %q: %v", addr, err) } - sr := &fakeSpanReceiver{ + sr := &fakeSpanSink{ logger: logger, } @@ -111,12 +111,12 @@ func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, err return closeFn, nil } -type fakeSpanReceiver struct { +type fakeSpanSink struct { logger *zap.Logger } -func (sr *fakeSpanReceiver) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { - ack := &spanreceiver.Acknowledgement{ +func (sr *fakeSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { + ack := &spansink.Acknowledgement{ SavedSpans: uint64(len(spans)), } diff --git a/exporter/example_test.go b/exporter/example_test.go index b9a19a8b..a4b0c8a8 100644 --- a/exporter/example_test.go +++ b/exporter/example_test.go @@ -106,12 +106,12 @@ func runServer() (port uint16, closeFn func()) { log.Fatalf("Failed to start Jaeger Trace exporter: %v", err) } - // After each of the exporters have been created, create the common spanreceiver. - commonSpanReceiver := exporter.OCExportersToTraceExporter(sde, je) + // After each of the exporters have been created, create the common spansink.Sink. + commonSpanSink := exporter.OCExportersToTraceExporter(sde, je) // Now run the octrace receiver which will receive traces from the client applications // in the various languages instrumented with OpenCensus. - oci, err := octrace.New(commonSpanReceiver, octrace.WithSpanBufferPeriod(100*time.Millisecond)) + oci, err := octrace.New(commonSpanSink, octrace.WithSpanBufferPeriod(100*time.Millisecond)) if err != nil { log.Fatalf("Failed to create the OpenCensus receiver: %v", err) } diff --git a/exporter/trace_exporter.go b/exporter/trace_exporter.go index ce45f0ad..e5dd2dab 100644 --- a/exporter/trace_exporter.go +++ b/exporter/trace_exporter.go @@ -21,7 +21,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" tracetranslator "github.com/census-instrumentation/opencensus-service/translator/trace" ) @@ -37,24 +37,24 @@ type toOCExportersTransformer struct { ocTraceExporters []trace.Exporter } -// TraceExporterSpanReceiver is a interface connecting a spanreceiver.SpanReceiver and -// a exporter.TraceExporter. The receiver gets data in different serialization formats, +// TraceExporterSink is a interface connecting a spansink.Sink and +// an exporter.TraceExporter. The receiver gets data in different serialization formats, // transforms it to OpenCensus in memory data and sends it to the exporter. -type TraceExporterSpanReceiver interface { +type TraceExporterSink interface { TraceExporter - spanreceiver.SpanReceiver + spansink.Sink } // OCExportersToTraceExporter is a convenience function that transforms // traditional OpenCensus trace.Exporter-s into an opencensus-service TraceExporter. // The resulting TraceExporter ignores the passed in node. To make use of the node, // please create a custom TraceExporter. -func OCExportersToTraceExporter(ocTraceExporters ...trace.Exporter) TraceExporterSpanReceiver { +func OCExportersToTraceExporter(ocTraceExporters ...trace.Exporter) TraceExporterSink { return &toOCExportersTransformer{ocTraceExporters: ocTraceExporters} } var _ TraceExporter = (*toOCExportersTransformer)(nil) -var _ spanreceiver.SpanReceiver = (*toOCExportersTransformer)(nil) +var _ spansink.Sink = (*toOCExportersTransformer)(nil) func (tse *toOCExportersTransformer) ExportSpanData(ctx context.Context, node *commonpb.Node, spanData ...*trace.SpanData) error { for _, sd := range spanData { @@ -65,7 +65,7 @@ func (tse *toOCExportersTransformer) ExportSpanData(ctx context.Context, node *c return nil } -func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { +func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { // Firstly transform them into spanData. spanDataList := make([]*trace.SpanData, 0, len(spans)) for _, span := range spans { @@ -81,7 +81,7 @@ func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *com // Now invoke ExportSpanData. err := tse.ExportSpanData(ctx, node, spanDataList...) nSaved := len(spanDataList) - ack := &spanreceiver.Acknowledgement{ + ack := &spansink.Acknowledgement{ SavedSpans: uint64(nSaved), DroppedSpans: uint64(len(spans) - nSaved), } @@ -89,7 +89,7 @@ func (tse *toOCExportersTransformer) ReceiveSpans(ctx context.Context, node *com } // MultiTraceExporters wraps multiple trace exporters in a single one. -func MultiTraceExporters(tes ...TraceExporter) TraceExporterSpanReceiver { +func MultiTraceExporters(tes ...TraceExporter) TraceExporterSink { return traceExporters(tes) } @@ -105,7 +105,7 @@ func (tes traceExporters) ExportSpanData(ctx context.Context, node *commonpb.Nod // ReceiveSpans receives the span data in the protobuf format, translates it, and forwards the transformed // span data to all trace exporters wrapped by the current one. -func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { +func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { spanDataList := make([]*trace.SpanData, 0, len(spans)) for _, span := range spans { spanData, _ := tracetranslator.ProtoSpanToOCSpanData(span) @@ -116,7 +116,7 @@ func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, err := tes.ExportSpanData(ctx, node, spanDataList...) nSaved := len(spanDataList) - ack := &spanreceiver.Acknowledgement{ + ack := &spansink.Acknowledgement{ SavedSpans: uint64(nSaved), DroppedSpans: uint64(len(spans) - nSaved), } diff --git a/metricsreceiver/metricsreceiver.go b/metricsink/sink.go similarity index 88% rename from metricsreceiver/metricsreceiver.go rename to metricsink/sink.go index 15b179aa..d772338d 100644 --- a/metricsreceiver/metricsreceiver.go +++ b/metricsink/sink.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package metricsreceiver +package metricsink import ( "context" @@ -21,8 +21,8 @@ import ( metricpb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" ) -// MetricsReceiver is an interface that receives metrics from a Node identifier. -type MetricsReceiver interface { +// Sink is an interface that receives metrics from a Node identifier. +type Sink interface { ReceiveMetrics(ctx context.Context, node *commonpb.Node, metrics ...*metricpb.Metric) (*Acknowledgement, error) } diff --git a/receiver/end_to_end_test.go b/receiver/end_to_end_test.go index b3251bb7..89f10017 100644 --- a/receiver/end_to_end_test.go +++ b/receiver/end_to_end_test.go @@ -27,7 +27,7 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/receiver" octrace "github.com/census-instrumentation/opencensus-service/receiver/octrace" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" ) func Example_endToEnd() { @@ -44,7 +44,7 @@ func Example_endToEnd() { // Once we have the span receiver which will connect to the // various exporter pipeline i.e. *tracepb.Span->OpenCensus.SpanData - lsr := new(logSpanReceiver) + lsr := new(logSpanSink) for _, tr := range trl { if err := tr.StartTraceReception(context.Background(), lsr); err != nil { log.Fatalf("Failed to start trace receiver: %v", err) @@ -92,13 +92,13 @@ func Example_endToEnd() { <-time.After(5 * time.Second) } -type logSpanReceiver int +type logSpanSink int -var _ spanreceiver.SpanReceiver = (*logSpanReceiver)(nil) +var _ spansink.Sink = (*logSpanSink)(nil) -func (lsr *logSpanReceiver) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { +func (lsr *logSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { spansBlob, _ := json.MarshalIndent(spans, " ", " ") log.Printf("\n****\nNode: %#v\nSpans: %s\n****\n", node, spansBlob) - return &spanreceiver.Acknowledgement{SavedSpans: uint64(len(spans))}, nil + return &spansink.Acknowledgement{SavedSpans: uint64(len(spans))}, nil } diff --git a/receiver/octrace/doc.go b/receiver/octrace/doc.go index 4ea854dd..728653ab 100644 --- a/receiver/octrace/doc.go +++ b/receiver/octrace/doc.go @@ -13,5 +13,5 @@ // limitations under the License. // Package octrace is the logic for receiving OpenCensus trace protobuf defined spans from -// already instrumented applications and then passing them onto a spanreceiver instance. +// already instrumented applications and then passing them onto a spansink.Sink instance. package octrace diff --git a/receiver/octrace/opencensus.go b/receiver/octrace/opencensus.go index 650b825f..6aaa8a81 100644 --- a/receiver/octrace/opencensus.go +++ b/receiver/octrace/opencensus.go @@ -27,20 +27,20 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/internal" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" ) // Receiver is the type used to handle spans from OpenCensus exporters. type Receiver struct { - spanSink spanreceiver.SpanReceiver + spanSink spansink.Sink spanBufferPeriod time.Duration spanBufferCount int } // New creates a new opencensus.Receiver reference. -func New(sr spanreceiver.SpanReceiver, opts ...Option) (*Receiver, error) { +func New(sr spansink.Sink, opts ...Option) (*Receiver, error) { if sr == nil { - return nil, errors.New("needs a non-nil spanReceiver") + return nil, errors.New("needs a non-nil spansink.Sink") } oci := &Receiver{spanSink: sr} for _, opt := range opts { diff --git a/receiver/octrace/opencensus_test.go b/receiver/octrace/opencensus_test.go index 612ba600..5f56c59e 100644 --- a/receiver/octrace/opencensus_test.go +++ b/receiver/octrace/opencensus_test.go @@ -38,7 +38,7 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/receiver/octrace" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" "go.opencensus.io/trace" "go.opencensus.io/trace/tracestate" ) @@ -445,18 +445,18 @@ func newSpanAppender() *spanAppender { return &spanAppender{spansPerNode: make(map[*commonpb.Node][]*tracepb.Span)} } -var _ spanreceiver.SpanReceiver = (*spanAppender)(nil) +var _ spansink.Sink = (*spanAppender)(nil) -func (sa *spanAppender) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { +func (sa *spanAppender) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { sa.Lock() defer sa.Unlock() sa.spansPerNode[node] = append(sa.spansPerNode[node], spans...) - return &spanreceiver.Acknowledgement{SavedSpans: uint64(len(spans))}, nil + return &spansink.Acknowledgement{SavedSpans: uint64(len(spans))}, nil } -func ocReceiverOnGRPCServer(t *testing.T, sr spanreceiver.SpanReceiver, opts ...octrace.Option) (oci *octrace.Receiver, port int, done func()) { +func ocReceiverOnGRPCServer(t *testing.T, sr spansink.Sink, opts ...octrace.Option) (oci *octrace.Receiver, port int, done func()) { ln, err := net.Listen("tcp", ":0") if err != nil { t.Fatalf("Failed to find an available address to run the gRPC server: %v", err) diff --git a/receiver/octrace/options.go b/receiver/octrace/options.go index 5ea7485b..9a01decb 100644 --- a/receiver/octrace/options.go +++ b/receiver/octrace/options.go @@ -35,7 +35,7 @@ func (sfd *spanBufferPeriod) WithReceiver(oci *Receiver) { // WithSpanBufferPeriod is an option that allows one to configure // the period that spans are buffered for before the Receiver -// sends them to its SpanReceiver. +// sends them to its spansink.Sink. func WithSpanBufferPeriod(period time.Duration) Option { return &spanBufferPeriod{period: period} } @@ -50,7 +50,7 @@ func (spc spanBufferCount) WithReceiver(oci *Receiver) { // WithSpanBufferCount is an option that allows one to configure // the number of spans that are buffered before the Receiver -// send them to its SpanReceiver. +// send them to its spansink.Sink. func WithSpanBufferCount(count int) Option { return spanBufferCount(count) } diff --git a/receiver/octrace/trace_receiver.go b/receiver/octrace/trace_receiver.go index ba5a58bc..1db8de01 100644 --- a/receiver/octrace/trace_receiver.go +++ b/receiver/octrace/trace_receiver.go @@ -26,7 +26,7 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/receiver" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" ) // NewTraceReceiver will create a handle that runs an OCReceiver at the provided port. @@ -57,7 +57,7 @@ var _ receiver.TraceReceiver = (*ocReceiverHandler)(nil) var errAlreadyStarted = errors.New("already started") // StartTraceReception starts a gRPC server with an OpenCensus receiver running -func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spanreceiver.SpanReceiver) error { +func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spansink.Sink) error { var err = errAlreadyStarted ocih.startOnce.Do(func() { err = ocih.startInternal(ctx, sr) @@ -68,7 +68,7 @@ func (ocih *ocReceiverHandler) StartTraceReception(ctx context.Context, sr spanr const defaultOCReceiverPort = 55678 -func (ocih *ocReceiverHandler) startInternal(ctx context.Context, sr spanreceiver.SpanReceiver) error { +func (ocih *ocReceiverHandler) startInternal(ctx context.Context, sr spansink.Sink) error { ocih.mu.Lock() defer ocih.mu.Unlock() diff --git a/receiver/receiver.go b/receiver/receiver.go index 3552adb6..6eb5d105 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -17,14 +17,14 @@ package receiver import ( "context" - "github.com/census-instrumentation/opencensus-service/metricsreceiver" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/metricsink" + "github.com/census-instrumentation/opencensus-service/spansink" ) // A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter. // Its purpose is to translate data from the wild into trace proto accompanied // by a *commonpb.Node to uniquely identify where that data comes from. -// TraceReceiver feeds a spanreceiver.SpanReceiver with data. +// TraceReceiver feeds a spansink.Sink with data. // // For example it could be Zipkin data source which translates // Zipkin spans into *tracepb.Span-s. @@ -34,18 +34,18 @@ import ( // StopTraceReception tells the receiver that should stop reception, // giving it a chance to perform any necessary clean-up. type TraceReceiver interface { - StartTraceReception(ctx context.Context, destination spanreceiver.SpanReceiver) error + StartTraceReception(ctx context.Context, destination spansink.Sink) error StopTraceReception(ctx context.Context) error } // A MetricsReceiver is an "arbitrary data"-to-"metric proto" converter. // Its purpose is to translate data from the wild into metric proto accompanied // by a *commonpb.Node to uniquely identify where that data comes from. -// MetricsReceiver feeds a metricsreceiver.MetricsReceiver with data. +// MetricsReceiver feeds a metricsink.Sink with data. // // For example it could be Prometheus data source which translates // Prometheus metrics into *metricpb.Metric-s. type MetricsReceiver interface { - StartMetricsReception(ctx context.Context, destination metricsreceiver.MetricsReceiver) error + StartMetricsReception(ctx context.Context, destination metricsink.Sink) error StopMetricsReception(ctx context.Context) error } diff --git a/receiver/zipkin/trace_receiver.go b/receiver/zipkin/trace_receiver.go index 33e5e9f7..83e24408 100644 --- a/receiver/zipkin/trace_receiver.go +++ b/receiver/zipkin/trace_receiver.go @@ -38,24 +38,24 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/receiver" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" ) // ZipkinReceiver type is used to handle spans received in the Zipkin format. type ZipkinReceiver struct { - spanSink spanreceiver.SpanReceiver + spanSink spansink.Sink } var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil) var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkingreceiver.ZipkinReceiver reference. -func New(sr spanreceiver.SpanReceiver) (*ZipkinReceiver, error) { +func New(sr spansink.Sink) (*ZipkinReceiver, error) { return &ZipkinReceiver{spanSink: sr}, nil } // StartTraceReception tells the receiver to start its processing. -func (zi *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink spanreceiver.SpanReceiver) error { +func (zi *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink spansink.Sink) error { zi.spanSink = spanSink return nil } @@ -166,7 +166,7 @@ func zlibUncompressedbody(r io.Reader) io.Reader { } // The ZipkinReceiver receives spans from endpoint /api/v2 as JSON, -// unmarshals them and sends them along to the spanreceiver. +// unmarshals them and sends them along to the spansink.Sink. func (zi *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Trace this method ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export") diff --git a/receiver/zipkin/trace_receiver_test.go b/receiver/zipkin/trace_receiver_test.go index 59a6d223..229b4f99 100644 --- a/receiver/zipkin/trace_receiver_test.go +++ b/receiver/zipkin/trace_receiver_test.go @@ -35,7 +35,7 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal/testutils" - "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/spansink" "github.com/census-instrumentation/opencensus-service/translator/trace" ) @@ -397,8 +397,8 @@ func TestConversionRoundtrip(t *testing.T) { type noopSink int -var _ spanreceiver.SpanReceiver = (*noopSink)(nil) +var _ spansink.Sink = (*noopSink)(nil) -func (ns *noopSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { +func (ns *noopSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { return nil, nil } diff --git a/spanreceiver/spanreceiver.go b/spansink/spansink.go similarity index 89% rename from spanreceiver/spanreceiver.go rename to spansink/spansink.go index 9ad0106b..a403b18b 100644 --- a/spanreceiver/spanreceiver.go +++ b/spansink/spansink.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package spanreceiver +package spansink import ( "context" @@ -21,8 +21,8 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" ) -// SpanReceiver is an interface that receives spans from a Node identifier. -type SpanReceiver interface { +// Sink is an interface that receives spans from a Node identifier. +type Sink interface { ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*Acknowledgement, error) }