Skip to content

Commit

Permalink
Split model by signal type and move it to the new pdata module (#…
Browse files Browse the repository at this point in the history
…5168)

Split all `pdata` related code by type and move it from `model` to the new module `pdata`.

- `model/pdata` and `model/otlp` are moved to `pdata/plog`, `pdata/pmetric` and `pdata/ptrace`.
- `model/otlpgrpc` is moved to `pdata/plogotlp`, `pdata/pmetricotlp` and `pdata/ptraceotlp`.

Now all the API in `model` except for `model/semconv` is deprecated.
  • Loading branch information
dmitryax authored Apr 11, 2022
1 parent 02ce41b commit 734e25e
Show file tree
Hide file tree
Showing 203 changed files with 4,057 additions and 2,271 deletions.
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ coverage:
target: 95%

ignore:
- "model/internal/data/protogen/**/*"
- "pdata/internal/data/protogen/**/*"
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ updates:
directory: "/model"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/pdata"
schedule:
interval: "weekly"
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

### 🚩 Deprecations 🚩

- All pdata related APIs from model (model/pdata, model/otlp and model/otlpgrpc) are deprecated in
favor of packages in the new pdata module separated by telemetry signal type (#5168)
- `model/pdata`, `model/otlp` -> `pdata/pcommon`, `pdata/plog`, `pdata/pmetric`, `pdata/ptrace`
- `model/otlpgrpc` -> `pdata/plog/plogotlp`, `pdata/pmetric/pmetricotlp`, `pdata/ptrace/ptraceotlp`
- Deprecate configmapprovider package, replace with mapconverter (#5167)
- Deprecate `service.MustNewConfigProvider` and `service.MustNewDefaultConfigProvider`in favor of `service.NewConfigProvider` (#4762)

Expand Down
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include ./Makefile.Common
# This is the code that we want to run lint, etc.
ALL_SRC := $(shell find . -name '*.go' \
-not -path './internal/tools/*' \
-not -path './model/internal/data/protogen/*' \
-not -path './pdata/internal/data/protogen/*' \
-not -path './service/internal/zpages/tmplgen/*' \
-type f | sort)

Expand Down Expand Up @@ -226,7 +226,7 @@ gendependabot: $(eval SHELL:=/bin/bash)
# Definitions for ProtoBuf generation.

# The source directory for OTLP ProtoBufs.
OPENTELEMETRY_PROTO_SRC_DIR=model/internal/opentelemetry-proto
OPENTELEMETRY_PROTO_SRC_DIR=pdata/internal/opentelemetry-proto

# The SHA matching the current version of the proto to use
OPENTELEMETRY_PROTO_VERSION=v0.15.0
Expand All @@ -235,13 +235,13 @@ OPENTELEMETRY_PROTO_VERSION=v0.15.0
OPENTELEMETRY_PROTO_FILES := $(subst $(OPENTELEMETRY_PROTO_SRC_DIR)/,,$(wildcard $(OPENTELEMETRY_PROTO_SRC_DIR)/opentelemetry/proto/*/v1/*.proto $(OPENTELEMETRY_PROTO_SRC_DIR)/opentelemetry/proto/collector/*/v1/*.proto))

# Target directory to write generated files to.
PROTO_TARGET_GEN_DIR=model/internal/data/protogen
PROTO_TARGET_GEN_DIR=pdata/internal/data/protogen

# Go package name to use for generated files.
PROTO_PACKAGE=go.opentelemetry.io/collector/$(PROTO_TARGET_GEN_DIR)

# Intermediate directory used during generation.
PROTO_INTERMEDIATE_DIR=model/internal/.patched-otlp-proto
PROTO_INTERMEDIATE_DIR=pdata/internal/.patched-otlp-proto

DOCKER_PROTOBUF ?= otel/build-protobuf:0.9.0
PROTOC := docker run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD}/$(PROTO_INTERMEDIATE_DIR) ${DOCKER_PROTOBUF} --proto_path=${PWD}
Expand Down Expand Up @@ -289,7 +289,7 @@ genproto_sub:
# Generate structs, functions and tests for pdata package. Must be used after any changes
# to proto and after running `make genproto`
genpdata:
$(GOCMD) run model/internal/cmd/pdatagen/main.go
$(GOCMD) run pdata/internal/cmd/pdatagen/main.go
$(MAKE) fmt

# Generate semantic convention constants. Requires a clone of the opentelemetry-specification repo
Expand All @@ -306,6 +306,7 @@ gensemconv:
check-contrib:
@echo Setting contrib at $(CONTRIB_PATH) to use this core checkout
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector=$(CURDIR)"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/model=$(CURDIR)/model"
@$(MAKE) -C $(CONTRIB_PATH) -j2 gotidy
@$(MAKE) -C $(CONTRIB_PATH) test
Expand Down
4 changes: 2 additions & 2 deletions client/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func Example_receiver() {
// Your receiver get a next consumer when it's constructed
var next consumer.Traces

// You'll convert the incoming data into pipeline data
td := pdata.NewTraces()
td := ptrace.NewTraces()

// You probably have a context with client metadata from your listener or
// scraper
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ processors:
replaces:
- go.opentelemetry.io/collector => ../../
- go.opentelemetry.io/collector/model => ../../model
- go.opentelemetry.io/collector/pdata => ../../pdata
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.48.0 // indirect
go.opentelemetry.io/collector/pdata v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.31.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0 // indirect
go.opentelemetry.io/contrib/zpages v0.31.0 // indirect
Expand All @@ -77,3 +78,5 @@ require (
replace go.opentelemetry.io/collector => ../../

replace go.opentelemetry.io/collector/model => ../../model

replace go.opentelemetry.io/collector/pdata => ../../pdata
10 changes: 6 additions & 4 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNewNopExporterFactory(t *testing.T) {
Expand All @@ -35,18 +37,18 @@ func TestNewNopExporterFactory(t *testing.T) {
traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
10 changes: 6 additions & 4 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNewNopProcessorFactory(t *testing.T) {
Expand All @@ -38,20 +40,20 @@ func TestNewNopProcessorFactory(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
6 changes: 3 additions & 3 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Receiver interface {
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
//
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
// For example it could be Zipkin data source which translates Zipkin spans into ptrace.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -76,7 +76,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from any format to the collector's internal metrics format.
// MetricsReceiver feeds a consumer.Metrics with data.
//
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
// For example it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand All @@ -85,7 +85,7 @@ type MetricsReceiver interface {
// Its purpose is to translate data from any format to the collector's internal logs data format.
// LogsReceiver feeds a consumer.Logs with data.
//
// For example a LogsReceiver can read syslogs and convert them into pdata.Logs.
// For example a LogsReceiver can read syslogs and convert them into plog.Logs.
type LogsReceiver interface {
Receiver
}
Expand Down
23 changes: 12 additions & 11 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func BenchmarkCompressors(b *testing.B) {
Expand Down Expand Up @@ -106,34 +107,34 @@ type marshaler interface {
}

type logMarshaler struct {
pdata.LogsMarshaler
plog.Marshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
return m.MarshalLogs(e.(plog.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
ptrace.Marshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
return m.MarshalTraces(e.(ptrace.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
pmetric.Marshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
return m.MarshalMetrics(e.(pmetric.Metrics))
}

func setupTestPayloads() []testPayload {
payloads := make([]testPayload, 0)

// log payloads
logMarshaler := &logMarshaler{otlp.NewProtobufLogsMarshaler()}
logMarshaler := &logMarshaler{plog.NewProtoMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_log_request",
message: testdata.GenerateLogsOneLogRecord(),
Expand All @@ -148,7 +149,7 @@ func setupTestPayloads() []testPayload {
marshaler: logMarshaler})

// trace payloads
tracesMarshaler := &traceMarshaler{otlp.NewProtobufTracesMarshaler()}
tracesMarshaler := &traceMarshaler{ptrace.NewProtoMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_trace_request",
message: testdata.GenerateTracesOneSpan(),
Expand All @@ -163,7 +164,7 @@ func setupTestPayloads() []testPayload {
marshaler: tracesMarshaler})

// metric payloads
metricsMarshaler := &metricsMarshaler{otlp.NewProtobufMetricsMarshaler()}
metricsMarshaler := &metricsMarshaler{pmetric.NewProtoMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_metric_request",
message: testdata.GenerateMetricsOneMetric(),
Expand Down
28 changes: 14 additions & 14 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)

func TestDefaultGrpcClientSettings(t *testing.T) {
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestHttpReception(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
ptraceotlp.RegisterServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -581,9 +581,9 @@ func TestHttpReception(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
client := ptraceotlp.NewClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, ptraceotlp.NewRequest(), grpc.WaitForReady(true))
if test.hasError {
assert.Error(t, errResp)
} else {
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
ptraceotlp.RegisterServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -632,9 +632,9 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
client := ptraceotlp.NewClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, ptraceotlp.NewRequest(), grpc.WaitForReady(true))
assert.NoError(t, errResp)
assert.NotNil(t, resp)
cancelFunc()
Expand Down Expand Up @@ -783,14 +783,14 @@ func (ms *mockedStream) Context() context.Context {
func TestClientInfoInterceptors(t *testing.T) {
testCases := []struct {
desc string
tester func(context.Context, otlpgrpc.TracesClient)
tester func(context.Context, ptraceotlp.Client)
}{
{
// we only have unary services, we don't have any clients we could use
// to test with streaming services
desc: "unary",
tester: func(ctx context.Context, cl otlpgrpc.TracesClient) {
resp, errResp := cl.Export(ctx, otlpgrpc.NewTracesRequest())
tester: func(ctx context.Context, cl ptraceotlp.Client) {
resp, errResp := cl.Export(ctx, ptraceotlp.NewRequest())
require.NoError(t, errResp)
require.NotNil(t, resp)
},
Expand All @@ -812,7 +812,7 @@ func TestClientInfoInterceptors(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
srv := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(srv, mock)
ptraceotlp.RegisterServer(srv, mock)

defer srv.Stop()

Expand Down Expand Up @@ -845,7 +845,7 @@ func TestClientInfoInterceptors(t *testing.T) {
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
require.NoError(t, errDial)

cl := otlpgrpc.NewTracesClient(grpcClientConn)
cl := ptraceotlp.NewClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()

Expand Down Expand Up @@ -1025,9 +1025,9 @@ type grpcTraceServer struct {
recordedContext context.Context
}

func (gts *grpcTraceServer) Export(ctx context.Context, _ otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
func (gts *grpcTraceServer) Export(ctx context.Context, _ ptraceotlp.Request) (ptraceotlp.Response, error) {
gts.recordedContext = ctx
return otlpgrpc.NewTracesResponse(), nil
return ptraceotlp.NewResponse(), nil
}

// tempSocketName provides a temporary Unix socket name for testing.
Expand Down
Loading

0 comments on commit 734e25e

Please sign in to comment.