Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PT-6909 Extracted spec loading to config and added some tests #111

Merged
merged 4 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ klepto
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
/dist
debug
.klepto.toml
/*.klepto.toml
coverage.txt

# Vendor stuff
.glide/
vendor/

*.coverprofile
13 changes: 8 additions & 5 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ func NewInitCmd() *cobra.Command {

// RunInit runs the init command
func RunInit() error {
log.Infof("Initializing %s", configFileName)
log.Infof("Initializing %s", config.DefaultConfigFileName)

_, err := os.Stat(configFileName)
_, err := os.Stat(config.DefaultConfigFileName)
if !os.IsNotExist(err) {
log.Fatal("Config file already exists, refusing to overwrite")
}

f, err := os.Create(configFileName)
f, err := os.Create(config.DefaultConfigFileName)
if err != nil {
return wErrors.Wrap(err, "could not create file")
}

e := toml.NewEncoder(bufio.NewWriter(f))
err = e.Encode(config.Spec{
Matchers: map[string]string{
"ActiveUsers": "users.active = TRUE",
},
Tables: []*config.Table{
{
Name: "users",
Expand All @@ -54,7 +57,7 @@ func RunInit() error {
{
Name: "orders",
Filter: config.Filter{
Match: "users.active = TRUE",
Match: "ActiveUsers",
Limit: 10,
},
Relationships: []*config.Relationship{
Expand All @@ -75,7 +78,7 @@ func RunInit() error {
return wErrors.Wrap(err, "could not encode config")
}

log.Infof("Created %s!", configFileName)
log.Infof("Created %s!", config.DefaultConfigFileName)

return nil
}
53 changes: 9 additions & 44 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ package cmd
import (
"os"

"github.com/hellofresh/klepto/pkg/config"
"github.com/hellofresh/klepto/pkg/formatter"
wErrors "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/hellofresh/klepto/pkg/formatter"
)

var (
globalConfig *config.Spec
configFile string
configFileName = ".klepto.toml"
verbose bool
verbose bool

// RootCmd steals and anonymises databases
RootCmd = &cobra.Command{
Expand All @@ -32,48 +27,18 @@ var (
)

func init() {
RootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Path to config file (default is ./.klepto)")
RootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Make the operation more talkative")
RootCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if verbose {
log.SetLevel(log.DebugLevel)
}
}

RootCmd.AddCommand(NewStealCmd())
RootCmd.AddCommand(NewVersionCmd())
RootCmd.AddCommand(NewUpdateCmd())
RootCmd.AddCommand(NewInitCmd())
RootCmd.AddCommand(NewStealCmd())

log.SetOutput(os.Stderr)
log.SetFormatter(&formatter.CliFormatter{})
}

func initConfig(c *cobra.Command, args []string) error {
if verbose {
log.SetLevel(log.DebugLevel)
}

log.Debugf("Reading config from %s...", configFileName)

if configFile != "" {
// Use config file from the flag.
viper.SetConfigFile(configFile)
} else {
cwd, err := os.Getwd()
if err != nil {
return wErrors.Wrap(err, "can't find current working directory")
}

viper.SetConfigName(".klepto")
viper.AddConfigPath(cwd)
viper.AddConfigPath(".")
}

err := viper.ReadInConfig()
if err != nil {
return wErrors.Wrap(err, "could not read configurations")
}

err = viper.Unmarshal(&globalConfig)
if err != nil {
return wErrors.Wrap(err, "could not unmarshal config file")
}

return nil
}
99 changes: 52 additions & 47 deletions cmd/steal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"runtime"
"time"

"github.com/hellofresh/klepto/pkg/anonymiser"
"github.com/hellofresh/klepto/pkg/dumper"
"github.com/hellofresh/klepto/pkg/reader"
wErrors "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/hellofresh/klepto/pkg/anonymiser"
"github.com/hellofresh/klepto/pkg/config"
"github.com/hellofresh/klepto/pkg/dumper"
"github.com/hellofresh/klepto/pkg/reader"

// imports dumpers and readers
_ "github.com/hellofresh/klepto/pkg/dumper/mysql"
_ "github.com/hellofresh/klepto/pkg/dumper/postgres"
Expand All @@ -22,15 +24,18 @@ import (
type (
// StealOptions represents the command options
StealOptions struct {
configPath string
cfgSpec *config.Spec

from string
to string
concurrency int
readOpts connOpts
writeOpts connOpts
}
connOpts struct {
timeout string
maxConnLifetime string
timeout time.Duration
maxConnLifetime time.Duration
maxConns int
maxIdleConns int
}
Expand All @@ -40,81 +45,81 @@ type (
func NewStealCmd() *cobra.Command {
opts := new(StealOptions)
cmd := &cobra.Command{
Use: "steal",
Short: "Steals and anonymises databases",
PreRunE: initConfig,
Use: "steal",
Short: "Steals and anonymises databases",
PreRunE: func(cmd *cobra.Command, args []string) error {
var err error
opts.cfgSpec, err = config.LoadSpecFromFile(opts.configPath)
if err != nil {
return err
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
return RunSteal(opts)
},
}

cmd.PersistentFlags().StringVarP(&opts.from, "from", "f", "root:root@tcp(localhost:3306)/klepto", "Database dsn to steal from")
cmd.PersistentFlags().StringVarP(&opts.to, "to", "t", "os://stdout/", "Database to output to (default writes to stdOut)")
cmd.PersistentFlags().IntVar(&opts.concurrency, "concurrency", runtime.NumCPU(), "Sets the amount of dumps to be performed concurrently")
cmd.PersistentFlags().StringVar(&opts.readOpts.timeout, "read-timeout", "5m", "Sets the timeout for read operations")
cmd.PersistentFlags().StringVar(&opts.writeOpts.timeout, "write-timeout", "30s", "Sets the timeout for write operations")
cmd.PersistentFlags().StringVar(&opts.readOpts.maxConnLifetime, "read-conn-lifetime", "0", "Sets the maximum amount of time a connection may be reused on the read database")
cmd.PersistentFlags().IntVar(&opts.readOpts.maxConns, "read-max-conns", 5, "Sets the maximum number of open connections to the read database")
cmd.PersistentFlags().IntVar(&opts.readOpts.maxIdleConns, "read-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the read database")
cmd.PersistentFlags().StringVar(&opts.writeOpts.maxConnLifetime, "write-conn-lifetime", "0", "Sets the maximum amount of time a connection may be reused on the write database")
cmd.PersistentFlags().IntVar(&opts.writeOpts.maxConns, "write-max-conns", 5, "Sets the maximum number of open connections to the write database")
cmd.PersistentFlags().IntVar(&opts.writeOpts.maxIdleConns, "write-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the write database")
persistentFlags := cmd.PersistentFlags()
persistentFlags.StringVarP(&opts.configPath, "config", "c", config.DefaultConfigFileName, "Path to config file")
persistentFlags.StringVarP(&opts.from, "from", "f", "mysql://root:root@tcp(localhost:3306)/klepto", "Database dsn to steal from")
persistentFlags.StringVarP(&opts.to, "to", "t", "os://stdout/", "Database to output to (default writes to stdOut)")
persistentFlags.IntVar(&opts.concurrency, "concurrency", runtime.NumCPU(), "Sets the amount of dumps to be performed concurrently")
persistentFlags.DurationVar(&opts.readOpts.timeout, "read-timeout", 5*time.Minute, "Sets the timeout for read operations")
persistentFlags.DurationVar(&opts.readOpts.maxConnLifetime, "read-conn-lifetime", 0, "Sets the maximum amount of time a connection may be reused on the read database")
persistentFlags.IntVar(&opts.readOpts.maxConns, "read-max-conns", 5, "Sets the maximum number of open connections to the read database")
persistentFlags.IntVar(&opts.readOpts.maxIdleConns, "read-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the read database")
persistentFlags.DurationVar(&opts.writeOpts.timeout, "write-timeout", 30*time.Second, "Sets the timeout for write operations")
persistentFlags.DurationVar(&opts.writeOpts.maxConnLifetime, "write-conn-lifetime", 0, "Sets the maximum amount of time a connection may be reused on the write database")
persistentFlags.IntVar(&opts.writeOpts.maxConns, "write-max-conns", 5, "Sets the maximum number of open connections to the write database")
persistentFlags.IntVar(&opts.writeOpts.maxIdleConns, "write-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the write database")

return cmd
}

// RunSteal is the handler for the rootCmd.
func RunSteal(opts *StealOptions) (err error) {
readTimeout, err := time.ParseDuration(opts.readOpts.timeout)
if err != nil {
return wErrors.Wrap(err, "Failed to parse read timeout duration")
}

writeTimeout, err := time.ParseDuration(opts.readOpts.timeout)
if err != nil {
return wErrors.Wrap(err, "Failed to parse write timeout duration")
}

readMaxConnLifetime, err := time.ParseDuration(opts.readOpts.maxConnLifetime)
if err != nil {
return wErrors.Wrap(err, "Failed to parse write timeout duration")
}

writeMaxConnLifetime, err := time.ParseDuration(opts.writeOpts.maxConnLifetime)
if err != nil {
return wErrors.Wrap(err, "Failed to parse the timeout duration")
}

source, err := reader.Connect(reader.ConnOpts{
DSN: opts.from,
Timeout: readTimeout,
MaxConnLifetime: readMaxConnLifetime,
Timeout: opts.readOpts.timeout,
MaxConnLifetime: opts.readOpts.maxConnLifetime,
MaxConns: opts.readOpts.maxConns,
MaxIdleConns: opts.readOpts.maxIdleConns,
})
if err != nil {
return wErrors.Wrap(err, "Could not connecting to reader")
}
defer source.Close()
defer func() {
if err := source.Close(); err != nil {
log.WithError(err).Error("Something is not ok with closing source connection")
}
}()

source = anonymiser.NewAnonymiser(source, globalConfig.Tables)
source = anonymiser.NewAnonymiser(source, opts.cfgSpec.Tables)
target, err := dumper.NewDumper(dumper.ConnOpts{
DSN: opts.to,
Timeout: writeTimeout,
MaxConnLifetime: writeMaxConnLifetime,
Timeout: opts.writeOpts.timeout,
MaxConnLifetime: opts.writeOpts.maxConnLifetime,
MaxConns: opts.writeOpts.maxConns,
MaxIdleConns: opts.writeOpts.maxIdleConns,
}, source)
if err != nil {
return wErrors.Wrap(err, "Error creating dumper")
}
defer target.Close()
defer func() {
if err := target.Close(); err != nil {
log.WithError(err).Error("Something is not ok with closing target connection")
}
}()

log.Info("Stealing...")

done := make(chan struct{})
defer close(done)

start := time.Now()
if err := target.Dump(done, globalConfig, opts.concurrency); err != nil {
if err := target.Dump(done, opts.cfgSpec, opts.concurrency); err != nil {
return wErrors.Wrap(err, "Error while dumping")
}

Expand Down
34 changes: 34 additions & 0 deletions fixtures/.klepto.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[Matchers]
ActiveUsers = "users.active = TRUE"

[[Tables]]
Name = "users"
IgnoreData = false
[Tables.Filter]
Match = "users.active = TRUE"
Limit = 100
[Tables.Filter.Sorts]
"user.id" = "asc"
[Tables.Anonymise]
email = "EmailAddress"
firstName = "FirstName"

[[Tables]]
Name = "orders"
IgnoreData = false
[Tables.Filter]
Match = "ActiveUsers"
Limit = 10

[[Tables.Relationships]]
Table = ""
ForeignKey = "user_id"
ReferencedTable = "users"
ReferencedKey = "id"

[[Tables]]
Name = "logs"
IgnoreData = true
[Tables.Filter]
Match = ""
Limit = 0
6 changes: 3 additions & 3 deletions pkg/anonymiser/anonymiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func NewAnonymiser(source reader.Reader, tables config.Tables) reader.Reader {
func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error {
logger := log.WithField("table", tableName)
logger.Debug("Loading anonymiser config")
table, err := a.tables.FindByName(tableName)
if err != nil {
logger.WithError(err).Debug("the table is not configured to be anonymised")
table := a.tables.FindByName(tableName)
if table == nil {
logger.Debug("the table is not configured to be anonymised")
return a.Reader.ReadTable(tableName, rowChan, opts, matchers)
}

Expand Down
Loading