diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index b239804d2..175347b39 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -211,9 +211,12 @@ func convertCommentRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck, - }}} + stages = []icingaDbOutputStage{ + {insert: commentHistory}, + {insert: acknowledgementHistory}, + {insert: allHistoryComment}, + {insert: allHistoryAck}, + } return } @@ -367,7 +370,11 @@ func convertDowntimeRows( sla = append(sla, s) } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: downtimeHistory}, + {insert: allHistory}, + {insert: sla}, + } return } @@ -521,11 +528,9 @@ func convertFlappingRows( } stages = []icingaDbOutputStage{ - { - inserts: [][]contracts.Entity{flappingHistory}, - upserts: [][]contracts.Entity{flappingHistoryUpserts}, - }, - {inserts: [][]contracts.Entity{allHistory}}, + {insert: flappingHistory}, + {upsert: flappingHistoryUpserts}, + {insert: allHistory}, } return } @@ -688,9 +693,11 @@ func convertNotificationRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - notificationHistory, userNotificationHistory, allHistory, - }}} + stages = []icingaDbOutputStage{ + {insert: notificationHistory}, + {insert: userNotificationHistory}, + {insert: allHistory}, + } return } @@ -848,6 +855,10 @@ func convertStateRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: stateHistory}, + {insert: allHistory}, + {insert: sla}, + } return } diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index b407dfa95..c089e78a7 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -9,7 +9,6 @@ import ( "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/icinga/icingadb/pkg/config" - "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" icingadbTypes "github.com/icinga/icingadb/pkg/types" @@ -428,30 +427,21 @@ func migrateOneType[IdoRow any]( // ... and insert them: for _, stage := range stages { - for _, op := range []struct { - kind string - data [][]contracts.Entity - streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error - }{ - {"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed}, - {"UPSERT", stage.upserts, idb.UpsertStreamed}, - } { - for _, table := range op.data { - if len(table) < 1 { - continue - } - - ch := make(chan contracts.Entity, len(table)) - for _, row := range table { - ch <- row - } - - close(ch) - - if err := op.streamer(context.Background(), ch); err != nil { - log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). - Fatalf("%+v", errors.Wrap(err, "can't perform DML")) - } + if len(stage.insert) > 0 { + ch := utils.ChanFromSlice(stage.insert) + + if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + if len(stage.upsert) > 0 { + ch := utils.ChanFromSlice(stage.upsert) + + if err := idb.UpsertStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) } } } diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index c1130a975..f1db20cbe 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -238,7 +238,7 @@ func (hts historyTypes) forEach(f func(*historyType)) { } type icingaDbOutputStage struct { - inserts, upserts [][]contracts.Entity + insert, upsert []contracts.Entity } var types = historyTypes{ diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b5a963f9a..8ccf29b33 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -216,3 +216,16 @@ func JoinHostPort(host string, port int) string { return net.JoinHostPort(host, fmt.Sprint(port)) } + +// ChanFromSlice takes a slice of values and returns a channel from which these values can be received. +// This channel is closed after the last value was sent. +func ChanFromSlice[T any](values []T) <-chan T { + ch := make(chan T, len(values)) + for _, value := range values { + ch <- value + } + + close(ch) + + return ch +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go new file mode 100644 index 000000000..40f3d773c --- /dev/null +++ b/pkg/utils/utils_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestChanFromSlice(t *testing.T) { + // requireReceive is a helper function to check if a value can immediately be received from a channel. + requireReceive := func(t *testing.T, ch <-chan int, expected int) { + t.Helper() + + select { + case v, ok := <-ch: + require.True(t, ok, "receiving should return a value") + require.Equal(t, expected, v) + default: + require.Fail(t, "receiving should not block") + } + } + + // requireReceive is a helper function to check if the channel is closed and empty. + requireClosedEmpty := func(t *testing.T, ch <-chan int) { + t.Helper() + + select { + case _, ok := <-ch: + require.False(t, ok, "receiving from channel should not return anything") + default: + require.Fail(t, "receiving should not block") + } + } + + t.Run("Nil", func(t *testing.T) { + ch := ChanFromSlice[int](nil) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("Empty", func(t *testing.T) { + ch := ChanFromSlice([]int{}) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("NonEmpty", func(t *testing.T) { + ch := ChanFromSlice([]int{42, 23, 1337}) + require.NotNil(t, ch) + requireReceive(t, ch, 42) + requireReceive(t, ch, 23) + requireReceive(t, ch, 1337) + requireClosedEmpty(t, ch) + }) +}