From 7ee443d1b869d8ddc4746850f7425d0a9ccd012b Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 22 Nov 2021 12:31:57 +0200 Subject: [PATCH] feat: add pgdriver.Notify --- driver/pgdriver/listener.go | 6 +++++ example/pg-listen/README.md | 9 ++++++++ example/pg-listen/go.mod | 19 +++++++++++++++ example/pg-listen/go.sum | 46 +++++++++++++++++++++++++++++++++++++ example/pg-listen/main.go | 40 ++++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+) create mode 100644 example/pg-listen/README.md create mode 100644 example/pg-listen/go.mod create mode 100644 example/pg-listen/go.sum create mode 100644 example/pg-listen/main.go diff --git a/driver/pgdriver/listener.go b/driver/pgdriver/listener.go index e493575ee..cd204de0c 100644 --- a/driver/pgdriver/listener.go +++ b/driver/pgdriver/listener.go @@ -17,6 +17,12 @@ var ( errPingTimeout = errors.New("bun: ping timeout") ) +// Notify sends a notification on the channel using `NOTIFY` command. +func Notify(ctx context.Context, db *bun.DB, channel, payload string) error { + _, err := db.ExecContext(ctx, "NOTIFY ?, ?", bun.Ident(channel), payload) + return err +} + type Listener struct { db *bun.DB driver *Connector diff --git a/example/pg-listen/README.md b/example/pg-listen/README.md new file mode 100644 index 000000000..772f22841 --- /dev/null +++ b/example/pg-listen/README.md @@ -0,0 +1,9 @@ +# Example for PostgreSQL listen/notify + +To run this example: + +```shell +go run . +``` + +See [documentation](https://bun.uptrace.dev/postgres/listen-notify.html#listen-notify) for details. diff --git a/example/pg-listen/go.mod b/example/pg-listen/go.mod new file mode 100644 index 000000000..05d7c30aa --- /dev/null +++ b/example/pg-listen/go.mod @@ -0,0 +1,19 @@ +module github.com/uptrace/bun/example/pg-listen + +go 1.16 + +replace github.com/uptrace/bun => ../.. + +replace github.com/uptrace/bun/extra/bundebug => ../../extra/bundebug + +replace github.com/uptrace/bun/driver/pgdriver => ../../driver/pgdriver + +replace github.com/uptrace/bun/dialect/pgdialect => ../../dialect/pgdialect + +require ( + github.com/uptrace/bun v1.0.17 + github.com/uptrace/bun/dialect/pgdialect v1.0.17 + github.com/uptrace/bun/driver/pgdriver v1.0.17 + github.com/uptrace/bun/extra/bundebug v1.0.17 + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) diff --git a/example/pg-listen/go.sum b/example/pg-listen/go.sum new file mode 100644 index 000000000..2b3291407 --- /dev/null +++ b/example/pg-listen/go.sum @@ -0,0 +1,46 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs= +github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= +github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= +github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +mellium.im/sasl v0.2.1 h1:nspKSRg7/SyO0cRGY71OkfHab8tf9kCts6a6oTDut0w= +mellium.im/sasl v0.2.1/go.mod h1:ROaEDLQNuf9vjKqE1SrAfnsobm2YKXT1gnN1uDp1PjQ= diff --git a/example/pg-listen/main.go b/example/pg-listen/main.go new file mode 100644 index 000000000..1654f8215 --- /dev/null +++ b/example/pg-listen/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect/pgdialect" + "github.com/uptrace/bun/driver/pgdriver" + "github.com/uptrace/bun/extra/bundebug" +) + +func main() { + ctx := context.Background() + + dsn := "postgres://postgres:@localhost:5432/postgres?sslmode=disable" + sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn))) + + db := bun.NewDB(sqldb, pgdialect.New()) + db.AddQueryHook(bundebug.NewQueryHook(bundebug.WithVerbose(true))) + + go func() { + for tm := range time.Tick(time.Second) { + if err := pgdriver.Notify(ctx, db, "mychan1", tm.Format(time.RFC3339)); err != nil { + panic(err) + } + } + }() + + ln := pgdriver.NewListener(db) + if err := ln.Listen(ctx, "mychan1", "mychan2"); err != nil { + panic(err) + } + + for notif := range ln.Channel() { + fmt.Println(notif.Channel, notif.Payload) + } +}