From 586062e4546a61506b5561d8899856183598bf0c Mon Sep 17 00:00:00 2001 From: Ryan Faerman Date: Tue, 30 Jan 2024 23:37:24 -0500 Subject: [PATCH] migrate to json encoding for the events --- internal/dao/events.sql.go | 42 ++++++++++ .../migrations/0008_add_event_data_json.sql | 15 ++++ .../0009_migrate_events_data_to_json.go | 79 +++++++++++++++++++ .../0010_replace_event_data_with_json.sql | 17 ++++ internal/sql/migrations/migrations.go | 32 ++++++++ internal/sql/queries/events.sql | 6 ++ internal/sql/sql.go | 14 +++- 7 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 internal/sql/migrations/0008_add_event_data_json.sql create mode 100644 internal/sql/migrations/0009_migrate_events_data_to_json.go create mode 100644 internal/sql/migrations/0010_replace_event_data_with_json.sql create mode 100644 internal/sql/migrations/migrations.go diff --git a/internal/dao/events.sql.go b/internal/dao/events.sql.go index fcf4d05..9462484 100644 --- a/internal/dao/events.sql.go +++ b/internal/dao/events.sql.go @@ -114,6 +114,48 @@ func (q *Queries) GetEvents(ctx context.Context, ids []int64) ([]Event, error) { return items, nil } +const getEventsForCallsign = `-- name: GetEventsForCallsign :many +SELECT id, created, stream_id, account_id, event_type, event_data +FROM events +WHERE event_type = ?1 +AND json_extract(event_data, '$.Callsign') = ?2 +` + +type GetEventsForCallsignParams struct { + EventType string + Callsign []byte +} + +func (q *Queries) GetEventsForCallsign(ctx context.Context, arg GetEventsForCallsignParams) ([]Event, error) { + rows, err := q.db.QueryContext(ctx, getEventsForCallsign, arg.EventType, arg.Callsign) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Event + for rows.Next() { + var i Event + if err := rows.Scan( + &i.ID, + &i.Created, + &i.StreamID, + &i.AccountID, + &i.EventType, + &i.EventData, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getEventsForStream = `-- name: GetEventsForStream :many SELECT id, created, stream_id, account_id, event_type, event_data FROM events WHERE stream_id = ?1 diff --git a/internal/sql/migrations/0008_add_event_data_json.sql b/internal/sql/migrations/0008_add_event_data_json.sql new file mode 100644 index 0000000..2194cce --- /dev/null +++ b/internal/sql/migrations/0008_add_event_data_json.sql @@ -0,0 +1,15 @@ +-- add_event_data_json +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE events +ADD COLUMN event_data_json BLOB NOT NULL; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +ALTER TABLE events +DROP COLUMN event_data_json; +-- +goose StatementEnd + diff --git a/internal/sql/migrations/0009_migrate_events_data_to_json.go b/internal/sql/migrations/0009_migrate_events_data_to_json.go new file mode 100644 index 0000000..c233477 --- /dev/null +++ b/internal/sql/migrations/0009_migrate_events_data_to_json.go @@ -0,0 +1,79 @@ +package migrations + +import ( + "bytes" + "context" + "database/sql" + "encoding/gob" + "encoding/json" + "errors" + "fmt" +) + +func init() { + fmt.Println("hello world") + AddMigration(Up0009, Down0009) +} + +func Up0009(ctx context.Context, tx *sql.Tx) error { + l := Log.With("migration", "0009", "direction", "up") + + l.Debug("starting migration") + + rows, err := tx.QueryContext(ctx, `select id, event_data from events`) + if err != nil { + return err + } + defer rows.Close() + + var decoder *gob.Decoder + + for rows.Next() { + var ( + id int64 + data []byte + ) + if err := rows.Scan(&id, &data); err != nil { + return err + } + + l := l.With("event.id", id) + + l.Debug("decoding event data") + decoder = gob.NewDecoder(bytes.NewReader(data)) + var p any + if err := decoder.Decode(&p); err != nil { + l.Error("unable to decode event", "error", err) + return err + } + + l.Debug("marshaling event data to json") + jsonData, err := json.Marshal(p) + if err != nil { + return err + } + + l.Debug("updating event data json") + if _, err := tx.ExecContext( + ctx, + `update events set event_data_json = ? where id = ?`, + jsonData, id, + ); err != nil { + return err + } + l.Debug("updated event data json") + + } + if err := rows.Close(); err != nil { + return err + } + if err := rows.Err(); err != nil { + return err + } + + return nil +} + +func Down0009(ctx context.Context, tx *sql.Tx) error { + return errors.New("cannot downgrade") +} diff --git a/internal/sql/migrations/0010_replace_event_data_with_json.sql b/internal/sql/migrations/0010_replace_event_data_with_json.sql new file mode 100644 index 0000000..fbccd48 --- /dev/null +++ b/internal/sql/migrations/0010_replace_event_data_with_json.sql @@ -0,0 +1,17 @@ +-- replace_event_data_with_json +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE events +DROP COLUMN event_data; + +ALTER TABLE events +RENAME COLUMN event_data_json TO event_data; + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +-- +goose StatementEnd + diff --git a/internal/sql/migrations/migrations.go b/internal/sql/migrations/migrations.go new file mode 100644 index 0000000..b937774 --- /dev/null +++ b/internal/sql/migrations/migrations.go @@ -0,0 +1,32 @@ +package migrations + +import ( + "runtime" + + "github.com/pressly/goose/v3" + + "github.com/charmbracelet/log" +) + +// A migrations file is a Go file that perform a migration. It must start with +// a number, followed by an underscore and a description of the migration. +type Migration struct { + Up goose.GoMigrationContext + Down goose.GoMigrationContext + Name string +} + +var ( + Migrations []Migration + Log *log.Logger +) + +// Add a migration to the list of migrations to run +func AddMigration(up, down goose.GoMigrationContext) { + _, filename, _, _ := runtime.Caller(1) + Migrations = append(Migrations, Migration{ + Name: filename, + Up: up, + Down: down, + }) +} diff --git a/internal/sql/queries/events.sql b/internal/sql/queries/events.sql index 8ae0757..a9fb4f2 100644 --- a/internal/sql/queries/events.sql +++ b/internal/sql/queries/events.sql @@ -36,3 +36,9 @@ INSERT INTO events_recovery ( -- name: DeleteEventRecovery :exec DELETE FROM events_recovery WHERE id = ?1; + +-- name: GetEventsForCallsign :many +SELECT * +FROM events +WHERE event_type = ?1 +AND json_extract(event_data, '$.Callsign') = @callsign; diff --git a/internal/sql/sql.go b/internal/sql/sql.go index 7786eec..f3cd468 100644 --- a/internal/sql/sql.go +++ b/internal/sql/sql.go @@ -1,11 +1,15 @@ package sql import ( + "context" "database/sql" "embed" "github.com/charmbracelet/log" "github.com/pressly/goose/v3" + + _ "github.com/glebarez/go-sqlite" + goMigrations "github.com/ryanfaerman/netctl/internal/sql/migrations" ) //go:generate sqlc generate @@ -14,7 +18,8 @@ import ( var migrations embed.FS func RunMigrations(log *log.Logger, db *sql.DB) error { - l := log.With("pgk", "sql") + l := log.With("pkg", "sql") + goMigrations.Log = log.With("pkg", "go-migrations") goose.SetLogger(logAdapter{*l}) goose.SetBaseFS(migrations) @@ -23,5 +28,10 @@ func RunMigrations(log *log.Logger, db *sql.DB) error { return err } - return goose.Up(db, "migrations") + // Add these migrations manually + for _, migration := range goMigrations.Migrations { + goose.AddNamedMigrationContext(migration.Name, migration.Up, migration.Down) + } + + return goose.UpContext(context.Background(), db, "migrations") }