Skip to content

Commit

Permalink
fix: schema support
Browse files Browse the repository at this point in the history
  • Loading branch information
wesnick committed Mar 18, 2024
1 parent 44af0e9 commit 8288aad
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions pgsql_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strings"
"time"

"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -32,6 +33,15 @@ func (database PgDatabase) connect() error {
return nil
}

func (database PgDatabase) GetChannelName() string {
// Symfony uses the format "schema.table" for channel name
if strings.Contains(database.TableName, ".") {
return fmt.Sprintf(`"%s"`, strings.Replace(database.TableName, `"`, "", -1))
}

return database.TableName
}

func (database PgDatabase) listen(fn process, message any, sec int) error {
err := database.connect()

Expand All @@ -45,12 +55,12 @@ func (database PgDatabase) listen(fn process, message any, sec int) error {

log.Printf("Successfully connected to the database!")

_, err = pool.Exec(context.Background(), fmt.Sprintf("LISTEN %s", database.TableName))
_, err = pool.Exec(context.Background(), fmt.Sprintf("LISTEN %s", database.GetChannelName()))
if err != nil {
return err
}

defer pool.Exec(context.Background(), fmt.Sprintf("UNLISTEN %s", database.TableName))
defer pool.Exec(context.Background(), fmt.Sprintf("UNLISTEN %s", database.GetChannelName()))

conn, err := pool.Acquire(context.Background())
if err != nil {
Expand All @@ -77,7 +87,7 @@ func (database PgDatabase) listenEvery(seconds int, fn process, message any) {

go func() error {
for {
<- time.After(delay)
<-time.After(delay)
_ = database.processMessage(fn, message)
}
}()
Expand Down

0 comments on commit 8288aad

Please sign in to comment.