Skip to content

Commit

Permalink
feat: optimize log postgresql table (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent b4e2f83 commit c8fb66b
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 69 deletions.
1 change: 1 addition & 0 deletions pkg/api/controllers/transaction_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func PostTransaction(w http.ResponseWriter, r *http.Request) {
Reference: payload.Reference,
Metadata: payload.Metadata,
}

res, err := l.CreateTransaction(r.Context(), preview, core.TxToScriptData(txData))
if err != nil {
apierrors.ResponseError(w, r, err)
Expand Down
29 changes: 22 additions & 7 deletions pkg/core/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,31 @@ import (
"strings"
)

type LogType int16

const (
SetMetadataLogType = "SET_METADATA"
NewTransactionLogType = "NEW_TRANSACTION"
RevertedTransactionLogType = "REVERTED_TRANSACTION"
SetMetadataLogType LogType = iota // "SET_METADATA"
NewTransactionLogType // "NEW_TRANSACTION"
RevertedTransactionLogType // "REVERTED_TRANSACTION"
)

func (l LogType) String() string {
switch l {
case SetMetadataLogType:
return "SET_METADATA"
case NewTransactionLogType:
return "NEW_TRANSACTION"
case RevertedTransactionLogType:
return "REVERTED_TRANSACTION"
}

return ""
}

// TODO(polo): create Log struct and extended Log struct
type Log struct {
ID uint64 `json:"id"`
Type string `json:"type"`
Type LogType `json:"type"`
Data interface{} `json:"data"`
Hash string `json:"hash"`
Date Time `json:"date"`
Expand Down Expand Up @@ -125,7 +140,7 @@ func NewRevertedTransactionLog(at Time, revertedTxID uint64, tx Transaction) Log
}
}

func HydrateLog(_type string, data string) (interface{}, error) {
func HydrateLog(_type LogType, data []byte) (interface{}, error) {
var payload any
switch _type {
case NewTransactionLogType:
Expand All @@ -135,9 +150,9 @@ func HydrateLog(_type string, data string) (interface{}, error) {
case RevertedTransactionLogType:
payload = &RevertedTransactionLogPayload{}
default:
panic("unknown type " + _type)
panic("unknown type " + _type.String())
}
err := json.Unmarshal([]byte(data), &payload)
err := json.Unmarshal(data, &payload)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (l *Ledger) CreateTransaction(ctx context.Context, dryRun bool, script core
tx, log, err := l.runner.Execute(ctx, script, dryRun, func(expandedTx core.ExpandedTransaction, accountMetadata map[string]core.Metadata) core.Log {
return core.NewTransactionLog(expandedTx.Transaction, accountMetadata)
})
if err == nil {
if err == nil && !dryRun {
l.queryWorker.QueueLog(ctx, log, l.store)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type LedgerStore interface {
GetMigrationsDone(context.Context) ([]core.MigrationInfo, error)
ComputeAccount(ctx context.Context, account string) (*core.AccountWithVolumes, error)
ReadLogWithReference(ctx context.Context, reference string) (*core.Log, error)
ReadLastLogWithType(ctx context.Context, logType ...string) (*core.Log, error)
ReadLastLogWithType(ctx context.Context, logType ...core.LogType) (*core.Log, error)
}

type Driver interface {
Expand Down
108 changes: 60 additions & 48 deletions pkg/storage/sqlstorage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ import (
)

const (
LogTableName = "log"
LogTableName = "logs_v2"
LogIngestionTableName = "logs_ingestion"
)

type Log struct {
bun.BaseModel `bun:"log,alias:log"`
type LogsV2 struct {
bun.BaseModel `bun:"logs_v2,alias:logs_v2"`

ID uint64 `bun:"id,unique,type:bigint"`
Type string `bun:"type,type:varchar"`
Hash string `bun:"hash,type:varchar"`
Date core.Time `bun:"date,type:timestamptz"`
Data json.RawMessage `bun:"data,type:jsonb"`
Reference string `bun:"reference,type:varchar"`
ID uint64 `bun:"id,unique,type:bigint"`
Type int16 `bun:"type,type:smallint"`
Hash string `bun:"hash,type:varchar(256)"`
Date core.Time `bun:"date,type:timestamptz"`
Data []byte `bun:"data,type:bytea"`
Reference string `bun:"reference,type:text"`
}

type LogsIngestion struct {
Expand Down Expand Up @@ -68,14 +68,14 @@ func (s *Store) batchLogs(ctx context.Context, logs []*core.Log) error {
// Beware: COPY query is not supported by bun if the pgx driver is used.
stmt, err := txn.Prepare(pq.CopyInSchema(
s.schema.Name(),
"log",
"logs_v2",
"id", "type", "hash", "date", "data", "reference",
))
if err != nil {
return err
}

ls := make([]Log, len(logs))
ls := make([]LogsV2, len(logs))
for i, l := range logs {
data, err := json.Marshal(l.Data)
if err != nil {
Expand All @@ -90,7 +90,7 @@ func (s *Store) batchLogs(ctx context.Context, logs []*core.Log) error {
logs[i].Hash = core.Hash(previousLog, &logs[i])

ls[i].ID = id
ls[i].Type = l.Type
ls[i].Type = int16(l.Type)
ls[i].Hash = logs[i].Hash
ls[i].Date = l.Date
ls[i].Data = data
Expand Down Expand Up @@ -121,31 +121,36 @@ func (s *Store) AppendLog(ctx context.Context, log *core.Log) error {
}

func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
sb := s.schema.NewSelect(LogTableName).
Model((*Log)(nil)).
raw := &LogsV2{}
err := s.schema.NewSelect(LogTableName).
Model(raw).
Column("id", "type", "hash", "date", "data", "reference").
OrderExpr("id desc").
Limit(1)

l := core.Log{}
data := sql.NullString{}
row := s.schema.QueryRowContext(ctx, sb.String())
if err := row.Scan(&l.ID, &l.Type, &l.Hash, &l.Date, &data, &l.Reference); err != nil {
Limit(1).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, errors.Wrap(err, "scanning log")

return nil, err
}
l.Date = l.Date.UTC()

var err error
l.Data, err = core.HydrateLog(l.Type, data.String)
payload, err := core.HydrateLog(core.LogType(raw.Type), raw.Data)
if err != nil {
return nil, errors.Wrap(err, "hydrating log")
}
l.Date = l.Date.UTC()

return &l, nil
l := &core.Log{
ID: raw.ID,
Type: core.LogType(raw.Type),
Data: payload,
Hash: raw.Hash,
Date: raw.Date.UTC(),
Reference: raw.Reference,
}

return l, nil
}

func (s *Store) GetLogs(ctx context.Context, q *storage.LogsQuery) (api.Cursor[core.Log], error) {
Expand All @@ -163,20 +168,26 @@ func (s *Store) GetLogs(ctx context.Context, q *storage.LogsQuery) (api.Cursor[c
}
defer rows.Close()

for rows.Next() {
l := core.Log{}
data := sql.NullString{}
if err := rows.Scan(&l.ID, &l.Type, &l.Hash, &l.Date, &data, &l.Reference); err != nil {
return api.Cursor[core.Log]{}, err
}
l.Date = l.Date.UTC()
rawLogsV2 := []LogsV2{}
err = s.schema.ScanRows(ctx, rows, &rawLogsV2)
if err != nil {
return api.Cursor[core.Log]{}, errors.Wrap(err, "scanning rows")
}

l.Data, err = core.HydrateLog(l.Type, data.String)
for _, raw := range rawLogsV2 {
payload, err := core.HydrateLog(core.LogType(raw.Type), raw.Data)
if err != nil {
return api.Cursor[core.Log]{}, errors.Wrap(err, "hydrating log")
}
l.Date = l.Date.UTC()
res = append(res, l)

res = append(res, core.Log{
ID: raw.ID,
Type: core.LogType(raw.Type),
Data: payload,
Hash: raw.Hash,
Date: raw.Date.UTC(),
Reference: raw.Reference,
})
}
if rows.Err() != nil {
return api.Cursor[core.Log]{}, s.error(rows.Err())
Expand Down Expand Up @@ -219,7 +230,7 @@ func (s *Store) GetLogs(ctx context.Context, q *storage.LogsQuery) (api.Cursor[c
func (s *Store) buildLogsQuery(q *storage.LogsQuery) (*bun.SelectQuery, LogsPaginationToken) {
t := LogsPaginationToken{}
sb := s.schema.NewSelect(LogTableName).
Model((*Log)(nil)).
Model((*LogsV2)(nil)).
Column("id", "type", "hash", "date", "data", "reference")

if !q.Filters.StartTime.IsZero() {
Expand Down Expand Up @@ -276,7 +287,7 @@ func (s *Store) readLogsStartingFromID(ctx context.Context, exec interface {
NewSelect(tableName string) *bun.SelectQuery
}, id uint64) ([]core.Log, error) {

rawLogs := make([]Log, 0)
rawLogs := make([]LogsV2, 0)
err := exec.
NewSelect(LogTableName).
Where("id >= ?", id).
Expand All @@ -287,13 +298,13 @@ func (s *Store) readLogsStartingFromID(ctx context.Context, exec interface {
}
logs := make([]core.Log, len(rawLogs))
for index, rawLog := range rawLogs {
payload, err := core.HydrateLog(rawLog.Type, string(rawLog.Data))
payload, err := core.HydrateLog(core.LogType(rawLog.Type), rawLog.Data)
if err != nil {
return nil, errors.Wrap(err, "hydrating log")
}
logs[index] = core.Log{
ID: rawLog.ID,
Type: rawLog.Type,
Type: core.LogType(rawLog.Type),
Hash: rawLog.Hash,
Date: rawLog.Date,
Data: payload,
Expand All @@ -317,7 +328,7 @@ func (s *Store) UpdateNextLogID(ctx context.Context, id uint64) error {
}

func (s *Store) ReadLogWithReference(ctx context.Context, reference string) (*core.Log, error) {
raw := &Log{}
raw := &LogsV2{}
err := s.schema.
NewSelect(LogTableName).
Where("reference = ?", reference).
Expand All @@ -327,41 +338,42 @@ func (s *Store) ReadLogWithReference(ctx context.Context, reference string) (*co
if err != nil {
return nil, err
}
payload, err := core.HydrateLog(raw.Type, string(raw.Data))
payload, err := core.HydrateLog(core.LogType(raw.Type), raw.Data)
if err != nil {
return nil, errors.Wrap(err, "hydrating log")
}
return &core.Log{
ID: raw.ID,
Type: raw.Type,
Type: core.LogType(raw.Type),
Data: payload,
Hash: raw.Hash,
Date: raw.Date,
Reference: raw.Reference,
}, nil
}

func (s *Store) ReadLastLogWithType(ctx context.Context, logType ...string) (*core.Log, error) {
raw := &Log{}
func (s *Store) ReadLastLogWithType(ctx context.Context, logTypes ...core.LogType) (*core.Log, error) {
raw := &LogsV2{}
err := s.schema.
NewSelect(LogTableName).
Where("type IN (?)", bun.In(logType)).
Where("type IN (?)", bun.In(logTypes)).
OrderExpr("date DESC").
Model(raw).
Limit(1).
Scan(ctx)

if err != nil {
return nil, err
}

payload, err := core.HydrateLog(raw.Type, string(raw.Data))
payload, err := core.HydrateLog(core.LogType(raw.Type), raw.Data)
if err != nil {
return nil, errors.Wrap(err, "hydrating log")
}

return &core.Log{
ID: raw.ID,
Type: raw.Type,
Type: core.LogType(raw.Type),
Data: payload,
Hash: raw.Hash,
Date: raw.Date,
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/sqlstorage/ledger/migrates/13-clean-logs/any_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestMigrate(t *testing.T) {
ls := []Log{
{
ID: 0,
Type: core.NewTransactionLogType,
Type: core.NewTransactionLogType.String(),
Hash: "",
Date: core.Now(),
Data: []byte(`{
Expand All @@ -60,7 +60,7 @@ func TestMigrate(t *testing.T) {
},
{
ID: 1,
Type: core.NewTransactionLogType,
Type: core.NewTransactionLogType.String(),
Hash: "",
Date: core.Now(),
Data: []byte(`{
Expand All @@ -72,7 +72,7 @@ func TestMigrate(t *testing.T) {
}`),
},
}
_, err = schema.NewInsert(ledgerstore.LogTableName).
_, err = schema.NewInsert("log").
Model(&ls).
Exec(context.Background())
require.NoError(t, err)
Expand All @@ -81,8 +81,8 @@ func TestMigrate(t *testing.T) {
require.NoError(t, err)
require.True(t, modified)

sb := schema.NewSelect(ledgerstore.LogTableName).
Model((*ledgerstore.Log)(nil)).
sb := schema.NewSelect("log").
Model((*Log)(nil)).
Column("data")

rows, err := schema.QueryContext(context.Background(), sb.String())
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/sqlstorage/ledger/migrates/18-v2/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ create table if not exists "VAR_LEDGER_NAME".logs_ingestion (
onerow_id boolean PRIMARY KEY DEFAULT TRUE,
log_id bigint
);

--statement
alter table "VAR_LEDGER_NAME".log
add column reference varchar null;

--statement
create table if not exists "VAR_LEDGER_NAME".logs_v2 (
"id" bigint,
"type" smallint,
"hash" varchar(256),
"date" timestamp with time zone,
"data" bytea,
"reference" text,
UNIQUE ("id")
);
Loading

0 comments on commit c8fb66b

Please sign in to comment.