Skip to content

Commit

Permalink
Minimal delay argument to activation
Browse files Browse the repository at this point in the history
closes #86

---

Pull Request resolved: #88
commit_hash:60e84971346bb27b50fa089b95e9ec1be5d43432
  • Loading branch information
laskoviymishka authored and robot-piglet committed Nov 6, 2024
1 parent a55eb9f commit 146107d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
28 changes: 24 additions & 4 deletions cmd/trcli/activate/activate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,49 @@ import (

func ActivateCommand(cp *coordinator.Coordinator, rt abstract.Runtime, registry metrics.Registry) *cobra.Command {
var transferParams string
var activateDelay time.Duration
activationCommand := &cobra.Command{
Use: "activate",
Short: "Activate transfer locally",
Args: cobra.MatchAll(cobra.ExactArgs(0)),
RunE: activate(cp, rt, &transferParams, registry),
RunE: activate(cp, rt, &transferParams, registry, activateDelay),
}
activationCommand.Flags().StringVar(&transferParams, "transfer", "./transfer.yaml", "path to yaml file with transfer configuration")
activationCommand.Flags().DurationVar(&activateDelay, "min-delay", 10*time.Second, "minial delay for activation, use to ensure metrics got scrapped, default 10s")
return activationCommand
}

func activate(cp *coordinator.Coordinator, rt abstract.Runtime, transferYaml *string, registry metrics.Registry) func(cmd *cobra.Command, args []string) error {
func activate(
cp *coordinator.Coordinator,
rt abstract.Runtime,
transferYaml *string,
registry metrics.Registry,
delay time.Duration,
) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
transfer, err := config.TransferFromYaml(transferYaml)
if err != nil {
return xerrors.Errorf("unable to load transfer: %w", err)
}
transfer.Runtime = rt
return RunActivate(*cp, transfer, registry)
return RunActivate(*cp, transfer, registry, delay)
}
}

func RunActivate(cp coordinator.Coordinator, transfer *model.Transfer, registry metrics.Registry) error {
func RunActivate(
cp coordinator.Coordinator,
transfer *model.Transfer,
registry metrics.Registry,
delay time.Duration,
) error {
st := time.Now()
defer func() {
if time.Since(st) < delay {
extraWait := delay.Truncate(time.Since(st))
logger.Log.Infof("activation done faster then minimal delay, wait for: %v", extraWait)
time.Sleep(extraWait)
}
}()
logger.Log.Infof("run activate with: %T", cp)
op := new(model.TransferOperation)
op.OperationID = transfer.ID + "/activation"
Expand Down
25 changes: 24 additions & 1 deletion cmd/trcli/activate/tests/pg2ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,31 @@ func TestActivate(t *testing.T) {
transfer.Src = src
transfer.Dst = dst

require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts())))
require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), 0))

require.NoError(t, helpers.WaitDestinationEqualRowsCount(dst.Database, "t2", helpers.GetSampleableStorageByModel(t, dst), 60*time.Second, 2))
require.NoError(t, helpers.WaitDestinationEqualRowsCount(dst.Database, "t3", helpers.GetSampleableStorageByModel(t, dst), 60*time.Second, 5))
}

func TestActivateWithDelay(t *testing.T) {
src := pgrecipe.RecipeSource(
pgrecipe.WithPrefix(""),
pgrecipe.WithFiles("dump/pg_init.sql"),
)

dst, err := chrecipe.Target(
chrecipe.WithInitFile("ch_init.sql"),
chrecipe.WithDatabase("trcli_activate_test_ch"),
)
require.NoError(t, err)

transfer, err := config.ParseTransfer(transferYaml)
require.NoError(t, err)

transfer.Src = src
transfer.Dst = dst

st := time.Now()
require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), 10*time.Second))
require.Less(t, 10*time.Second, time.Since(st))
}
2 changes: 1 addition & 1 deletion cmd/trcli/replicate/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func replicate(cp *coordinator.Coordinator, rt abstract.Runtime, transferYaml *s
}
transfer.Runtime = rt
if transfer.IncrementOnly() {
if err := activate.RunActivate(*cp, transfer, registry); err != nil {
if err := activate.RunActivate(*cp, transfer, registry, 0); err != nil {
return xerrors.Errorf("unable to activate transfer: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/trcli/replicate/tests/pg2ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestReplicate(t *testing.T) {
transfer.Src = src
transfer.Dst = dst

require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()))) // so that a replication slot is created for source
require.NoError(t, activate.RunActivate(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), 0)) // so that a replication slot is created for source

go func() {
require.NoError(t, replicate.RunReplication(coordinator.NewStatefulFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts())))
Expand Down

0 comments on commit 146107d

Please sign in to comment.