Skip to content

Commit

Permalink
feat: implement module for pgx/v5 (#1364)
Browse files Browse the repository at this point in the history
* feat(batch): start implementing better batch tracing

* feat(apmpgxv5): start implementing batch tracing

* fix(apmpgxv5): make check if span is ended in TraceBatchEnd function to make setting values safe in runtime

* test(apmpgxv5): add failure case to batch test

* feat: add copy tracer

* test: add tests for copy tracer

* feat: add query tracer

* test: add tests for query tracer

* feat: add connect tracer and inject it into Tracer struct

* test: add tests for connect tracer

* docs: add doc.go file

* fix: fix go.mod file and add apmpgxv5 entry to Dockerfile-testing file

* refac: remove duplicate code with setting additional fields to span context and replace it with start and end span functions

* test: fix imports

* refac(span): move database type, name and destination service type to consts

* test: remove import with underscore and replace span name from connect to uppercase

* test: fix host setting

* chore: remove unnecessary set span start time, replace interface argument in endSpan function with error type and wrap sql queries using apmsql.QuerySignature

* fix: replace apmtest.WithTransaction with apmtest.WithUncompressedTransaction, make batch start span non-exit and batch query span exit

* test: replace apmtest.WithTransaction to apmtest.WithUncompressedTransaction in connect, copy and query tests

* fix(tracers): change return "nil" in "if !ok" section to return incoming context

* fixup: format files, fix gomod

Signed-off-by: Marc Lopez Rubio <[email protected]>

* ci: add build tags to build only on go1.18+, because it's pgx/v5 requirement + set go version to 1.18 in go.mod file

* chore: remove +build tag, because it's unnecessary

* Revert "chore: remove +build tag, because it's unnecessary"

This reverts commit 2f655ea.

* refac: remove build constraints from doc.go and set go.mod version to 1.15 due to CI requirements

Signed-off-by: Marc Lopez Rubio <[email protected]>
Co-authored-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
gvencadze and marclop authored Jan 25, 2023
1 parent fb87682 commit e7c4e6a
Show file tree
Hide file tree
Showing 14 changed files with 1,163 additions and 0 deletions.
60 changes: 60 additions & 0 deletions module/apmpgxv5/batch_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build go1.18
// +build go1.18

package apmpgxv5 // import "go.elastic.co/apm/module/apmpgxv5/v2"

import (
"context"

"github.com/jackc/pgx/v5"

"go.elastic.co/apm/module/apmsql/v2"
"go.elastic.co/apm/v2"
)

// BatchTracer traces SendBatch
type BatchTracer struct{}

var _ pgx.BatchTracer = (*BatchTracer)(nil)

func (b BatchTracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, _ pgx.TraceBatchStartData) context.Context {
span, spanCtx, ok := startSpan(ctx, apmsql.QuerySignature("BATCH"), batchSpanType, conn.Config(),
apm.SpanOptions{})
if !ok {
return ctx
}

return apm.ContextWithSpan(spanCtx, span)
}

func (b BatchTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchQueryData) {
span, _, ok := startSpan(ctx, apmsql.QuerySignature(data.SQL), querySpanType, conn.Config(), apm.SpanOptions{
ExitSpan: true,
})
if !ok {
return
}

defer span.End()
}

func (b BatchTracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceBatchEndData) {
endSpan(ctx, data.Err)
}
169 changes: 169 additions & 0 deletions module/apmpgxv5/batch_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build go1.18
// +build go1.18

package apmpgxv5_test

import (
"context"
"fmt"
"os"
"testing"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.elastic.co/apm/module/apmpgxv5/v2"
"go.elastic.co/apm/v2/apmtest"
"go.elastic.co/apm/v2/model"
)

type stmt struct {
query string
action string
}

func TestBatchTrace(t *testing.T) {
host := os.Getenv("PGHOST")
if host == "" {
t.Skipf("PGHOST not specified")
}

cfg, err := pgx.ParseConfig(fmt.Sprintf("postgres://postgres:hunter2@%s:5432/test_db", host))
require.NoError(t, err)

ctx := context.TODO()

apmpgxv5.Instrument(cfg)

conn, err := pgx.ConnectConfig(ctx, cfg)
require.NoError(t, err)

_, err = conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS foo (bar INT)")
require.NoError(t, err)

testcases := []struct {
name string
expectErr bool
queryQueue []string
expStmt map[string]stmt
}{
{
name: "BATCH spans, success",
expectErr: false,
queryQueue: []string{
"SELECT * FROM foo WHERE bar = 1",
"SELECT bar FROM foo WHERE bar = 1",
},
expStmt: map[string]stmt{
"BATCH": {
query: "BATCH",
action: "batch",
},
"SELECT * FROM foo WHERE bar = 1": {
query: "SELECT FROM foo",
action: "query",
},
"SELECT bar FROM foo WHERE bar = 1": {
query: "SELECT FROM foo",
action: "query",
},
},
},
{
name: "BATCH spans, error",
expectErr: true,
queryQueue: []string{
"SELECT * FROM foo WHERE bar = 1",
"SELECT bar FROM foo2",
},
expStmt: map[string]stmt{
"BATCH": {
query: "BATCH",
action: "batch",
},
"SELECT * FROM foo WHERE bar = 1": {
query: "SELECT FROM foo",
action: "query",
},
"SELECT bar FROM foo2 WHERE bar = 1": {
query: "SELECT FROM foo",
action: "query",
},
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
_, spans, errs := apmtest.WithUncompressedTransaction(func(ctx context.Context) {
batch := &pgx.Batch{}

for _, query := range tt.queryQueue {
batch.Queue(query)
}

br := conn.SendBatch(ctx, batch)
defer func() {
_ = br.Close()
}()
})

if tt.expectErr {
require.Len(t, errs, 2)
assert.Equal(t, "failure", spans[0].Outcome)
} else {
for i := range tt.queryQueue {
expectedStatement := tt.expStmt[tt.queryQueue[i]]

assert.Equal(t, "success", spans[i].Outcome)
assert.Equal(t, "db", spans[i].Type)
assert.Equal(t, "postgresql", spans[i].Subtype)
assert.Equal(t, expectedStatement.action, spans[i].Action)
assert.Equal(t, expectedStatement.query, spans[i].Name)

assert.Equal(t, &model.SpanContext{
Destination: &model.DestinationSpanContext{
Address: cfg.Host,
Port: int(cfg.Port),
Service: &model.DestinationServiceSpanContext{
Type: "db",
Name: "",
Resource: "postgresql",
},
},
Service: &model.ServiceSpanContext{
Target: &model.ServiceTargetSpanContext{
Type: "db",
Name: "postgresql",
},
},
Database: &model.DatabaseSpanContext{
Instance: cfg.Database,
Statement: expectedStatement.query,
Type: "sql",
User: cfg.User,
},
}, spans[i].Context)
}
}
})
}
}
51 changes: 51 additions & 0 deletions module/apmpgxv5/connect_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build go1.18
// +build go1.18

package apmpgxv5 // import "go.elastic.co/apm/module/apmpgxv5/v2"

import (
"context"

"github.com/jackc/pgx/v5"

"go.elastic.co/apm/module/apmsql/v2"
"go.elastic.co/apm/v2"
)

// ConnectTracer traces Connect and ConnectConfig
type ConnectTracer struct{}

var _ pgx.ConnectTracer = (*ConnectTracer)(nil)

func (c ConnectTracer) TraceConnectStart(ctx context.Context, conn pgx.TraceConnectStartData) context.Context {
span, spanCtx, ok := startSpan(ctx, apmsql.QuerySignature("CONNECT"), connectSpanType, conn.ConnConfig,
apm.SpanOptions{
ExitSpan: false,
})
if !ok {
return ctx
}

return apm.ContextWithSpan(spanCtx, span)
}

func (c ConnectTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) {
endSpan(ctx, data.Err)
}
83 changes: 83 additions & 0 deletions module/apmpgxv5/connect_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build go1.18
// +build go1.18

package apmpgxv5_test

import (
"context"
"fmt"
"os"
"testing"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.elastic.co/apm/module/apmpgxv5/v2"
"go.elastic.co/apm/v2/apmtest"
)

func Test_Connect(t *testing.T) {
host := os.Getenv("PGHOST")
if host == "" {
t.Skipf("PGHOST not specified")
}

testcases := []struct {
name string
dsn string
expectErr bool
}{
{
name: "CONNECT span, success",
expectErr: false,
dsn: "postgres://postgres:hunter2@%s:5432/test_db",
},
{
name: "CONNECT span, failure",
expectErr: true,
dsn: "postgres://postgres:hunter2@%s:5432/non_existing_db",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
cfg, err := pgx.ParseConfig(fmt.Sprintf(tt.dsn, host))
require.NoError(t, err)

apmpgxv5.Instrument(cfg)

_, spans, errs := apmtest.WithUncompressedTransaction(func(ctx context.Context) {
_, _ = pgx.ConnectConfig(ctx, cfg)
})

assert.NotNil(t, spans[0].ID)

if tt.expectErr {
require.Len(t, errs, 1)
assert.Equal(t, "failure", spans[0].Outcome)
assert.Equal(t, "CONNECT", spans[0].Name)
} else {
assert.Equal(t, "success", spans[0].Outcome)
assert.Equal(t, "CONNECT", spans[0].Name)
}
})
}
}
Loading

0 comments on commit e7c4e6a

Please sign in to comment.