Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement module for pgx/v5 #1364

Merged
merged 32 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
01f6783
feat(batch): start implementing better batch tracing
gvencadze Oct 29, 2022
0a3f548
feat(apmpgxv5): start implementing batch tracing
gvencadze Oct 29, 2022
2b0e014
fix(apmpgxv5): make check if span is ended in TraceBatchEnd function …
gvencadze Oct 30, 2022
5526d63
test(apmpgxv5): add failure case to batch test
gvencadze Oct 30, 2022
813acc6
Merge remote-tracking branch 'origin/main'
gvencadze Oct 30, 2022
043560f
feat: add copy tracer
gvencadze Dec 18, 2022
679fd7c
test: add tests for copy tracer
gvencadze Dec 18, 2022
933b262
feat: add query tracer
gvencadze Dec 18, 2022
77b67b9
test: add tests for query tracer
gvencadze Dec 18, 2022
39390f3
feat: add connect tracer and inject it into Tracer struct
gvencadze Dec 18, 2022
7cde9df
test: add tests for connect tracer
gvencadze Dec 18, 2022
130b6be
docs: add doc.go file
gvencadze Dec 18, 2022
abe1ce5
Merge branch 'elastic:main' into main
gvencadze Dec 18, 2022
57a9ef9
fix: fix go.mod file and add apmpgxv5 entry to Dockerfile-testing file
gvencadze Dec 18, 2022
07a3595
Merge remote-tracking branch 'origin/main'
gvencadze Dec 18, 2022
aee63fb
refac: remove duplicate code with setting additional fields to span c…
gvencadze Dec 19, 2022
817c138
test: fix imports
gvencadze Dec 19, 2022
e787809
refac(span): move database type, name and destination service type to…
gvencadze Dec 19, 2022
2241c61
test: remove import with underscore and replace span name from connec…
gvencadze Dec 19, 2022
496a442
test: fix host setting
gvencadze Dec 19, 2022
9b09848
chore: remove unnecessary set span start time, replace interface argu…
gvencadze Jan 1, 2023
7d83c95
fix: replace apmtest.WithTransaction with apmtest.WithUncompressedTra…
gvencadze Jan 6, 2023
fdb5e5d
test: replace apmtest.WithTransaction to apmtest.WithUncompressedTran…
gvencadze Jan 6, 2023
bfb4e96
fix(tracers): change return "nil" in "if !ok" section to return incom…
gvencadze Jan 7, 2023
e54a7c9
Merge branch 'main' into main
gvencadze Jan 7, 2023
a2fa646
fixup: format files, fix gomod
marclop Jan 13, 2023
3f678ea
ci: add build tags to build only on go1.18+, because it's pgx/v5 requ…
gvencadze Jan 22, 2023
2f655ea
chore: remove +build tag, because it's unnecessary
gvencadze Jan 22, 2023
deae0aa
Merge branch 'main' into main
gvencadze Jan 22, 2023
2a56dc1
Revert "chore: remove +build tag, because it's unnecessary"
gvencadze Jan 24, 2023
c74262c
refac: remove build constraints from doc.go and set go.mod version to…
gvencadze Jan 24, 2023
b60fed8
Merge remote-tracking branch 'origin/main'
gvencadze Jan 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions module/apmpgxv5/batch_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmpgxv5

import (
"context"
"github.com/jackc/pgx/v5"
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
"go.elastic.co/apm/v2"
"time"
)

// BatchTracer traces SendBatch
type BatchTracer struct{}

var _ pgx.BatchTracer = (*BatchTracer)(nil)

const (
batchSpanType = "db.postgresql.batch"
)

func (b BatchTracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, _ pgx.TraceBatchStartData) context.Context {
span, apmCtx := apm.StartSpanOptions(ctx, "BATCH", batchSpanType, apm.SpanOptions{
Start: time.Now(),
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
ExitSpan: false,
})

if span.Dropped() {
span.End()
return nil // todo: should be discussed
}
span.Action = "batch"
gvencadze marked this conversation as resolved.
Show resolved Hide resolved

return apmCtx
}

func (b BatchTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchQueryData) {
span, apmCtx := apm.StartSpanOptions(ctx, data.SQL, querySpanType, apm.SpanOptions{
Start: time.Now(),
ExitSpan: false,
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
})
defer span.End()
gvencadze marked this conversation as resolved.
Show resolved Hide resolved

span.Action = "batch"
span.Context.SetDatabase(apm.DatabaseSpanContext{
Instance: conn.Config().Database,
Statement: data.SQL,
Type: "sql",
User: conn.Config().User,
})
span.Context.SetDestinationAddress(conn.Config().Host, int(conn.Config().Port))

if apmErr := apm.CaptureError(apmCtx, data.Err); apmErr != nil {
apmErr.SetSpan(span)
apmErr.Send()
return
}
}

func (b BatchTracer) TraceBatchEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchEndData) {
span := apm.SpanFromContext(ctx)
defer span.End()

if span.Dropped() {
span.End()
return
}

// check if span is ended or not
if span.SpanData != nil {
span.Context.SetDestinationAddress(conn.Config().Host, int(conn.Config().Port))
span.Action = "batch"
}

if apmErr := apm.CaptureError(ctx, data.Err); apmErr != nil {
apmErr.SetSpan(span)
apmErr.Send()
return
}
}
101 changes: 101 additions & 0 deletions module/apmpgxv5/batch_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package apmpgxv5_test

import (
"context"
"fmt"
"github.com/gvencadze/apm-agent-go/module/apmpgxv5"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"go.elastic.co/apm/v2/model"
_ "go.elastic.co/apm/v2/model"
"os"
"testing"
)

func TestBatchTrace(t *testing.T) {
host := os.Getenv("PGHOST")
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
t.Skipf("PGHOST not specified")
}

cfg, err := pgx.ParseConfig(fmt.Sprintf("postgres://postgres:hunter2@%s:5432/test_db", host))
require.NoError(t, err)

ctx := context.TODO()

apmpgxv5.Instrument(cfg)

conn, err := pgx.ConnectConfig(ctx, cfg)
require.NoError(t, err)

_, err = conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS foo (bar INT)")
require.NoError(t, err)

testcases := []struct {
name string
expectErr bool
queryQueue []string
}{
{
name: "BATCH spans, success",
expectErr: false,
queryQueue: []string{
"SELECT * FROM foo WHERE bar = 1",
"SELECT bar FROM foo WHERE bar = 1",
},
},
{
name: "BATCH spans, error",
expectErr: true,
queryQueue: []string{
"SELECT * FROM foo WHERE bar = 1",
"SELECT bar FROM foo2",
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
_, spans, errs := apmtest.WithTransaction(func(ctx context.Context) {
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
batch := &pgx.Batch{}

for _, query := range tt.queryQueue {
batch.Queue(query)
}

br := conn.SendBatch(ctx, batch)
defer func() {
_ = br.Close()
}()
})

if tt.expectErr {
require.Len(t, errs, 2)
assert.Equal(t, "failure", spans[0].Outcome)
} else {
for i, expectedStmt := range tt.queryQueue {
assert.Equal(t, "success", spans[i].Outcome)
assert.Equal(t, "db", spans[i].Type)
assert.Equal(t, "postgresql", spans[i].Subtype)
assert.Equal(t, "batch", spans[i].Action)
assert.Equal(t, expectedStmt, spans[i].Name)

assert.Equal(t, &model.SpanContext{
Destination: &model.DestinationSpanContext{
Address: cfg.Host,
Port: int(cfg.Port),
},
Database: &model.DatabaseSpanContext{
Instance: cfg.Database,
Statement: expectedStmt,
Type: "sql",
User: cfg.User,
},
}, spans[i].Context)
}
}
})
}
}
103 changes: 103 additions & 0 deletions module/apmpgxv5/connect_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmpgxv5

import (
"context"
"github.com/jackc/pgx/v5"
"go.elastic.co/apm/v2"
"time"
)

// ConnectTracer traces Connect and ConnectConfig
type ConnectTracer struct{}

var _ pgx.ConnectTracer = (*ConnectTracer)(nil)

const (
connectSpanType = "db.postgresql.connect"
)

type contextKey string

const (
dataContextKey contextKey = "data"
)

func (c ConnectTracer) TraceConnectStart(ctx context.Context, _ pgx.TraceConnectStartData) context.Context {
statement := "connect"

span, apmCtx := apm.StartSpanOptions(ctx, statement, connectSpanType, apm.SpanOptions{
ExitSpan: false,
})

newCtx := context.WithValue(apmCtx, dataContextKey, values{
start: time.Now(),
statement: statement,
})

return apm.ContextWithSpan(newCtx, span)
}

func (c ConnectTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) {
span := apm.SpanFromContext(ctx)
defer span.End()

var (
statement string
db string
user string
host string
port int
)

if d, ok := ctx.Value(dataContextKey).(values); ok {
statement = d.statement
}

// TODO: refactor
if data.Conn != nil {
db = data.Conn.Config().Database
user = data.Conn.Config().User
host = data.Conn.Config().Host
port = int(data.Conn.Config().Port)
}

if span.Dropped() {
return
}

span.Context.SetDatabase(apm.DatabaseSpanContext{
Instance: db,
Statement: statement,
Type: "sql",
User: user,
})
span.Context.SetDestinationAddress(host, port)
span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{
Name: "postgresql",
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
Type: "db",
})

if apmErr := apm.CaptureError(ctx, data.Err); apmErr != nil {
apmErr.SetSpan(span)
apmErr.Send()
}

return
}
62 changes: 62 additions & 0 deletions module/apmpgxv5/connect_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package apmpgxv5_test

import (
"context"
"fmt"
"github.com/gvencadze/apm-agent-go/module/apmpgxv5"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"os"
"testing"
)

// TODO: add tests for connect
func Test_Connect(t *testing.T) {
host := os.Getenv("PGHOST")
if host == "" {
t.Skipf("PGHOST not specified")
}

testcases := []struct {
name string
dsn string
expectErr bool
}{
{
name: "CONNECT span, success",
expectErr: false,
dsn: "postgres://postgres:hunter2@%s:5432/test_db",
},
{
name: "CONNECT span, failure",
expectErr: true,
dsn: "postgres://postgres:hunter2@%s:5432/non_existing_db",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe do parallel tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, I don't think it matters in practice, they should be pretty fast.

cfg, err := pgx.ParseConfig(fmt.Sprintf(tt.dsn, host))
require.NoError(t, err)

apmpgxv5.Instrument(cfg)

_, spans, errs := apmtest.WithTransaction(func(ctx context.Context) {
gvencadze marked this conversation as resolved.
Show resolved Hide resolved
_, _ = pgx.ConnectConfig(ctx, cfg)
})

assert.NotNil(t, spans[0].ID)

if tt.expectErr {
require.Len(t, errs, 1)
assert.Equal(t, "failure", spans[0].Outcome)
assert.Equal(t, "connect", spans[0].Name)
} else {
assert.Equal(t, "success", spans[0].Outcome)
assert.Equal(t, "connect", spans[0].Name)
}
})
}
}
Loading