Skip to content

Commit

Permalink
Split model by signal type and move it to the new pdata module
Browse files Browse the repository at this point in the history
Split all `pdata` related code by type and move it from `model` to the new module `pdata`.

- `model/pdata` and `model/otlp` are moved to `pdata/plog`, `pdata/pmetric` and `pdata/ptrace`.
- `model/otlpgrpc` is moved to `pdata/plogotlp`, `pdata/pmetricotlp` and `pdata/ptraceotlp`.

Now all the API in `model` except for `model/semconv` is deprecated.
  • Loading branch information
dmitryax committed Apr 8, 2022
1 parent 620c88c commit e56b0f4
Show file tree
Hide file tree
Showing 202 changed files with 3,973 additions and 2,186 deletions.
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ updates:
directory: "/model"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/pdata"
schedule:
interval: "weekly"
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

### 🚩 Deprecations 🚩

- All pdata related APIs from model (model/pdata, model/otlp and model/otlpgrpc) are deprecated in
favor of packages in the new pdata module separated by telemetry signal type (#5168)
- `model/pdata`, `model/otlp` -> `pdata/pcommon`, `pdata/plog`, `pdata/pmetric`, `pdata/ptrace`
- `model/otlpgrpc` -> `pdata/plog/plogotlp`, `pdata/pmetric/pmetricotlp`, `pdata/ptrace/ptraceotlp`

### 💡 Enhancements 💡

- OTLP HTTP receiver will use HTTP/2 over TLS if client supports it (#5190)
Expand Down
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include ./Makefile.Common
# This is the code that we want to run lint, etc.
ALL_SRC := $(shell find . -name '*.go' \
-not -path './internal/tools/*' \
-not -path './model/internal/data/protogen/*' \
-not -path './pdata/internal/data/protogen/*' \
-not -path './service/internal/zpages/tmplgen/*' \
-type f | sort)

Expand Down Expand Up @@ -226,7 +226,7 @@ gendependabot: $(eval SHELL:=/bin/bash)
# Definitions for ProtoBuf generation.

# The source directory for OTLP ProtoBufs.
OPENTELEMETRY_PROTO_SRC_DIR=model/internal/opentelemetry-proto
OPENTELEMETRY_PROTO_SRC_DIR=pdata/internal/opentelemetry-proto

# The SHA matching the current version of the proto to use
OPENTELEMETRY_PROTO_VERSION=v0.15.0
Expand All @@ -235,13 +235,13 @@ OPENTELEMETRY_PROTO_VERSION=v0.15.0
OPENTELEMETRY_PROTO_FILES := $(subst $(OPENTELEMETRY_PROTO_SRC_DIR)/,,$(wildcard $(OPENTELEMETRY_PROTO_SRC_DIR)/opentelemetry/proto/*/v1/*.proto $(OPENTELEMETRY_PROTO_SRC_DIR)/opentelemetry/proto/collector/*/v1/*.proto))

# Target directory to write generated files to.
PROTO_TARGET_GEN_DIR=model/internal/data/protogen
PROTO_TARGET_GEN_DIR=pdata/internal/data/protogen

# Go package name to use for generated files.
PROTO_PACKAGE=go.opentelemetry.io/collector/$(PROTO_TARGET_GEN_DIR)

# Intermediate directory used during generation.
PROTO_INTERMEDIATE_DIR=model/internal/.patched-otlp-proto
PROTO_INTERMEDIATE_DIR=pdata/internal/.patched-otlp-proto

DOCKER_PROTOBUF ?= otel/build-protobuf:0.9.0
PROTOC := docker run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD}/$(PROTO_INTERMEDIATE_DIR) ${DOCKER_PROTOBUF} --proto_path=${PWD}
Expand Down Expand Up @@ -289,7 +289,7 @@ genproto_sub:
# Generate structs, functions and tests for pdata package. Must be used after any changes
# to proto and after running `make genproto`
genpdata:
$(GOCMD) run model/internal/cmd/pdatagen/main.go
$(GOCMD) run pdata/internal/cmd/pdatagen/main.go
$(MAKE) fmt

# Generate semantic convention constants. Requires a clone of the opentelemetry-specification repo
Expand All @@ -306,6 +306,7 @@ gensemconv:
check-contrib:
@echo Setting contrib at $(CONTRIB_PATH) to use this core checkout
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector=$(CURDIR)"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/model=$(CURDIR)/model"
@$(MAKE) -C $(CONTRIB_PATH) -j2 gotidy
@$(MAKE) -C $(CONTRIB_PATH) test
Expand Down
4 changes: 2 additions & 2 deletions client/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func Example_receiver() {
// Your receiver get a next consumer when it's constructed
var next consumer.Traces

// You'll convert the incoming data into pipeline data
td := pdata.NewTraces()
td := ptrace.NewTraces()

// You probably have a context with client metadata from your listener or
// scraper
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ processors:
replaces:
- go.opentelemetry.io/collector => ../../
- go.opentelemetry.io/collector/model => ../../model
- go.opentelemetry.io/collector/pdata => ../../pdata
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.48.0 // indirect
go.opentelemetry.io/collector/pdata v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.31.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0 // indirect
go.opentelemetry.io/contrib/zpages v0.31.0 // indirect
Expand All @@ -79,3 +80,5 @@ require (
replace go.opentelemetry.io/collector => ../../

replace go.opentelemetry.io/collector/model => ../../model

replace go.opentelemetry.io/collector/pdata => ../../pdata
10 changes: 6 additions & 4 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNewNopExporterFactory(t *testing.T) {
Expand All @@ -35,18 +37,18 @@ func TestNewNopExporterFactory(t *testing.T) {
traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
10 changes: 6 additions & 4 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNewNopProcessorFactory(t *testing.T) {
Expand All @@ -38,20 +40,20 @@ func TestNewNopProcessorFactory(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
6 changes: 3 additions & 3 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Receiver interface {
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
//
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
// For example it could be Zipkin data source which translates Zipkin spans into ptrace.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -76,7 +76,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from any format to the collector's internal metrics format.
// MetricsReceiver feeds a consumer.Metrics with data.
//
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
// For example it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand All @@ -85,7 +85,7 @@ type MetricsReceiver interface {
// Its purpose is to translate data from any format to the collector's internal logs data format.
// LogsReceiver feeds a consumer.Logs with data.
//
// For example a LogsReceiver can read syslogs and convert them into pdata.Logs.
// For example a LogsReceiver can read syslogs and convert them into plog.Logs.
type LogsReceiver interface {
Receiver
}
Expand Down
23 changes: 12 additions & 11 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func BenchmarkCompressors(b *testing.B) {
Expand Down Expand Up @@ -106,34 +107,34 @@ type marshaler interface {
}

type logMarshaler struct {
pdata.LogsMarshaler
plog.LogsMarshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
return m.MarshalLogs(e.(plog.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
ptrace.TracesMarshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
return m.MarshalTraces(e.(ptrace.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
pmetric.MetricsMarshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
return m.MarshalMetrics(e.(pmetric.Metrics))
}

func setupTestPayloads() []testPayload {
payloads := make([]testPayload, 0)

// log payloads
logMarshaler := &logMarshaler{otlp.NewProtobufLogsMarshaler()}
logMarshaler := &logMarshaler{plog.NewProtobufLogsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_log_request",
message: testdata.GenerateLogsOneLogRecord(),
Expand All @@ -148,7 +149,7 @@ func setupTestPayloads() []testPayload {
marshaler: logMarshaler})

// trace payloads
tracesMarshaler := &traceMarshaler{otlp.NewProtobufTracesMarshaler()}
tracesMarshaler := &traceMarshaler{ptrace.NewProtobufTracesMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_trace_request",
message: testdata.GenerateTracesOneSpan(),
Expand All @@ -163,7 +164,7 @@ func setupTestPayloads() []testPayload {
marshaler: tracesMarshaler})

// metric payloads
metricsMarshaler := &metricsMarshaler{otlp.NewProtobufMetricsMarshaler()}
metricsMarshaler := &metricsMarshaler{pmetric.NewProtobufMetricsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_metric_request",
message: testdata.GenerateMetricsOneMetric(),
Expand Down
28 changes: 14 additions & 14 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)

func TestDefaultGrpcClientSettings(t *testing.T) {
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestHttpReception(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
ptraceotlp.RegisterTracesServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -581,9 +581,9 @@ func TestHttpReception(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
client := ptraceotlp.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, ptraceotlp.NewTracesRequest(), grpc.WaitForReady(true))
if test.hasError {
assert.Error(t, errResp)
} else {
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
ptraceotlp.RegisterTracesServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -632,9 +632,9 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
client := ptraceotlp.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, ptraceotlp.NewTracesRequest(), grpc.WaitForReady(true))
assert.NoError(t, errResp)
assert.NotNil(t, resp)
cancelFunc()
Expand Down Expand Up @@ -783,14 +783,14 @@ func (ms *mockedStream) Context() context.Context {
func TestClientInfoInterceptors(t *testing.T) {
testCases := []struct {
desc string
tester func(context.Context, otlpgrpc.TracesClient)
tester func(context.Context, ptraceotlp.TracesClient)
}{
{
// we only have unary services, we don't have any clients we could use
// to test with streaming services
desc: "unary",
tester: func(ctx context.Context, cl otlpgrpc.TracesClient) {
resp, errResp := cl.Export(ctx, otlpgrpc.NewTracesRequest())
tester: func(ctx context.Context, cl ptraceotlp.TracesClient) {
resp, errResp := cl.Export(ctx, ptraceotlp.NewTracesRequest())
require.NoError(t, errResp)
require.NotNil(t, resp)
},
Expand All @@ -812,7 +812,7 @@ func TestClientInfoInterceptors(t *testing.T) {
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
srv := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(srv, mock)
ptraceotlp.RegisterTracesServer(srv, mock)

defer srv.Stop()

Expand Down Expand Up @@ -845,7 +845,7 @@ func TestClientInfoInterceptors(t *testing.T) {
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
require.NoError(t, errDial)

cl := otlpgrpc.NewTracesClient(grpcClientConn)
cl := ptraceotlp.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()

Expand Down Expand Up @@ -1025,9 +1025,9 @@ type grpcTraceServer struct {
recordedContext context.Context
}

func (gts *grpcTraceServer) Export(ctx context.Context, _ otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
func (gts *grpcTraceServer) Export(ctx context.Context, _ ptraceotlp.TracesRequest) (ptraceotlp.TracesResponse, error) {
gts.recordedContext = ctx
return otlpgrpc.NewTracesResponse(), nil
return ptraceotlp.NewTracesResponse(), nil
}

// tempSocketName provides a temporary Unix socket name for testing.
Expand Down
Loading

0 comments on commit e56b0f4

Please sign in to comment.