diff --git a/README.md b/README.md index e37ab8e..c086fc3 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,9 @@ # Klepto -[![](https://travis-ci.org/hellofresh/klepto.svg?branch=master)](https://travis-ci.org/hellofresh/klepto) +[![Build Status](https://travis-ci.org/hellofresh/klepto.svg?branch=master)](https://travis-ci.org/hellofresh/klepto) +[![Go Report Card](https://goreportcard.com/badge/github.com/hellofresh/klepto)](https://goreportcard.com/report/github.com/hellofresh/klepto) +[![Go Doc](https://godoc.org/github.com/hellofresh/klepto?status.svg)](https://godoc.org/github.com/hellofresh/klepto) > Klepto is a tool for copying and anonymising data @@ -121,8 +123,21 @@ We recommend to always set the following parameters: ## Configuration File Options -You can set a number of keys in the configuration file. - +You can set a number of keys in the configuration file. Here is the list of all configuration options: +- `Matchers` are variables to store filter data, you can declare a filter once and reuse it among tables. +- `Tables` represents a klepto table definition. + - `Name` is the table name. + - `IgnoreData` if set to true, it will dump the table structure without importing data. + - `Filter` represents the way you want to filter the results. + - `Match` is a condition field to dump only certain amount data. + - `Limit` defines a limit of results to be fetched. + - `Sorts` is the sort condition for the table. + - `Anonymise` anonymise columns. + - `Relationships` represents the relationship between the table and referenced table. + - `Table` is the table name. + - `ForeignKey` is the table name foreign key. + - `ReferencedTable` is the referenced table name. + - `ReferencedKey` is the referenced table primary key name. ### Relationships diff --git a/cmd/steal.go b/cmd/steal.go index 703429b..636beda 100644 --- a/cmd/steal.go +++ b/cmd/steal.go @@ -18,8 +18,8 @@ import ( _ "github.com/hellofresh/klepto/pkg/reader/postgres" ) -// StealOptions represents the command options type ( + // StealOptions represents the command options StealOptions struct { from string to string diff --git a/cmd/update.go b/cmd/update.go index 81de03b..4fbf1f4 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -11,15 +11,16 @@ const ( githubRepo = "klepto" ) -// UpdateOptions are the command flags -type UpdateOptions struct { - token string -} +type ( + // UpdateOptions are the command flags + UpdateOptions struct { + token string + } +) // NewUpdateCmd creates a new update command func NewUpdateCmd() *cobra.Command { - opts := &UpdateOptions{} - + opts := new(UpdateOptions) cmd := &cobra.Command{ Use: "update", Short: "Check for new versions of kepto", diff --git a/pkg/anonymiser/anonymiser.go b/pkg/anonymiser/anonymiser.go index da00e77..52dde8a 100644 --- a/pkg/anonymiser/anonymiser.go +++ b/pkg/anonymiser/anonymiser.go @@ -22,18 +22,19 @@ const ( password = "Password" ) -// anonymiser is responsible for anonymising columns -type anonymiser struct { - reader.Reader - tables config.Tables -} +type ( + anonymiser struct { + reader.Reader + tables config.Tables + } +) -// NewAnonymiser returns an initialised instance of MySQLAnonymiser +// NewAnonymiser returns a new anonymiser reader. func NewAnonymiser(source reader.Reader, tables config.Tables) reader.Reader { return &anonymiser{source, tables} } -// ReadTable wraps reader.ReadTable method for anonymising rows published from the reader.Reader +// ReadTable decorates reader.ReadTable method for anonymising rows published from the reader.Reader func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error { logger := log.WithField("table", tableName) logger.Debug("Loading anonymiser config") diff --git a/pkg/config/config.go b/pkg/config/config.go index f747c9b..b9c66a3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,44 +3,57 @@ package config import "errors" type ( - // Spec represents the global app configuration + // Spec represents the global app configuration. Spec struct { Matchers Tables } - // Matchers are variables to replace placeolders in filter + // Matchers are variables to store filter data, + // you can declare a filter once and reuse it among tables. Matchers map[string]string - // Tables are an array of table + // Tables are an array of table definitions. Tables []*Table - // Table represents a klepto table definition + // Table represents a klepto table definition. Table struct { - Name string - IgnoreData bool - Filter Filter - Anonymise map[string]string + // Name is the table name. + Name string + // IgnoreData if set to true, it will dump the table structure without importing data. + IgnoreData bool + // Filter represents the way you want to filter the results. + Filter + // Anonymise anonymise columns. + Anonymise map[string]string + // Relationship is an collection of relationship definitions. Relationships []*Relationship } - // Filter represents the way you want to filter the results + // Filter represents the way you want to filter the results. Filter struct { + // Match is a condition field to dump only certain amount data. Match string + // Limit defines a limit of results to be fetched. Limit uint64 + // Sorts is the sort condition for the table. Sorts map[string]string } - // Relationship represents a relationship definition + // Relationship represents the relationship between the table and referenced table. Relationship struct { - Table string - ForeignKey string + // Table is the table name. + Table string + // ForeignKey is the table name foreign key. + ForeignKey string + // ReferencedTable is the referenced table name. ReferencedTable string - ReferencedKey string + // ReferencedKey is the referenced table primary key name. + ReferencedKey string } ) -// FindByName filter a table by its name +// FindByName find a table by its name. func (t Tables) FindByName(name string) (*Table, error) { for _, table := range t { if table.Name == name { diff --git a/pkg/database/database.go b/pkg/database/database.go index a5e9fb5..deca7eb 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -1,5 +1,6 @@ package database type ( + // Row is the database column row. Row map[string]interface{} ) diff --git a/pkg/dsn/dsn.go b/pkg/dsn/dsn.go index 0d344e6..ef626e1 100644 --- a/pkg/dsn/dsn.go +++ b/pkg/dsn/dsn.go @@ -14,6 +14,14 @@ var ( ErrEmptyDsn = errors.New("Empty string provided for dsn") // ErrInvalidDsn defines error returned when the dsn is invalid ErrInvalidDsn = errors.New("Invalid dsn") + + // From https://github.com/go-sql-driver/mysql/blob/f4bf8e8e0aa93d4ead0c6473503ca2f5d5eb65a8/utils.go#L34 + regex = regexp.MustCompile( + `^(?:(?P.*?)?://)?` + // [type://] + `(?:(?P.*?)(?::(?P.*))?@)?` + // [username[:password]@] + `(?:(?P[^\(]*)(?:\((?P
[^\)]*)\))?)?` + // [protocol[(address)]] + `\/(?P.*?)` + // /datasource + `(?:\?(?P[^\?]*))?$`) // [?param1=value1] ) // DSN describes how a DSN looks like @@ -36,7 +44,7 @@ func Parse(s string) (*DSN, error) { return nil, ErrEmptyDsn } - dsn := &DSN{} + dsn := new(DSN) matches := regex.FindStringSubmatch(s) if len(matches) < 1 || len(matches) > 1 && matches[1] == "" { @@ -74,17 +82,7 @@ func Parse(s string) (*DSN, error) { return dsn, nil } -var ( - // From https://github.com/go-sql-driver/mysql/blob/f4bf8e8e0aa93d4ead0c6473503ca2f5d5eb65a8/utils.go#L34 - regex = regexp.MustCompile( - `^(?:(?P.*?)?://)?` + // [type://] - `(?:(?P.*?)(?::(?P.*))?@)?` + // [username[:password]@] - `(?:(?P[^\(]*)(?:\((?P
[^\)]*)\))?)?` + // [protocol[(address)]] - `\/(?P.*?)` + // /datasource - `(?:\?(?P[^\?]*))?$`) // [?param1=value1] -) - -// Converts a DSN struct into its string representation. +// String converts a DSN struct into its string representation. func (d DSN) String() string { str := "" diff --git a/pkg/dumper/dumper.go b/pkg/dumper/dumper.go index 71a6533..75a27f8 100644 --- a/pkg/dumper/dumper.go +++ b/pkg/dumper/dumper.go @@ -12,12 +12,15 @@ import ( type ( // Driver is a driver interface used to support multiple drivers Driver interface { + // IsSupported checks if the given dsn connection string is supported. IsSupported(dsn string) bool + // NewConnection creates a new database connection and retrieves a dumper implementation. NewConnection(ConnOpts, reader.Reader) (Dumper, error) } // A Dumper writes a database's stucture to the provided stream. Dumper interface { + // Dump executes the dump process. Dump(chan<- struct{}, *config.Spec, int) error // Close closes the dumper resources and releases them. Close() error @@ -25,11 +28,16 @@ type ( // ConnOpts are the options to create a connection ConnOpts struct { - DSN string - Timeout time.Duration + // DSN is the connection address. + DSN string + // Timeout is the timeout for dump operations. + Timeout time.Duration + // MaxConnLifetime is the maximum amount of time a connection may be reused on the read database. MaxConnLifetime time.Duration - MaxConns int - MaxIdleConns int + // MaxConns is the maximum number of open connections to the target database. + MaxConns int + // MaxIdleConns is the maximum number of connections in the idle connection pool for the write database. + MaxIdleConns int } ) @@ -40,7 +48,7 @@ func NewDumper(opts ConnOpts, rdr reader.Reader) (dumper Dumper, err error) { if !ok || !driver.IsSupported(opts.DSN) { return true } - log.WithField("driver", key).Debug("Found driver") + log.WithField("driver", key).Debug("found driver") dumper, err = driver.NewConnection(opts, rdr) return false diff --git a/pkg/dumper/generic/sql.go b/pkg/dumper/engine/engine.go similarity index 58% rename from pkg/dumper/generic/sql.go rename to pkg/dumper/engine/engine.go index e597a5e..c0bd27c 100644 --- a/pkg/dumper/generic/sql.go +++ b/pkg/dumper/engine/engine.go @@ -1,4 +1,4 @@ -package generic +package engine import ( "sync" @@ -12,72 +12,77 @@ import ( ) type ( - sqlDumper struct { - SqlEngine - + // Engine is the engine which dispatches and orchestrates a dump. + Engine struct { + Dumper reader reader.Reader } - SqlEngine interface { + // Dumper is the dump engine. + Dumper interface { + // DumpStructure dumps database structure given a sql. DumpStructure(sql string) error - + // DumpTable dumps a table by name. DumpTable(tableName string, rowChan <-chan database.Row) error - // Close closes the dumper resources and releases them. Close() error } - SqlEngineAdvanced interface { + // Hooker are the actions you perform before or after a specified database operation. + Hooker interface { + // PreDumpTables performs a action before dumping tables before dumping tables. PreDumpTables([]string) error + // PostDumpTables performs a action after dumping tables before dumping tables. PostDumpTables([]string) error } ) -func NewSqlDumper(rdr reader.Reader, engine SqlEngine) dumper.Dumper { - return &sqlDumper{ - SqlEngine: engine, - reader: rdr, +// New creates a new engine given the reader and dumper. +func New(rdr reader.Reader, dumper Dumper) dumper.Dumper { + return &Engine{ + Dumper: dumper, + reader: rdr, } } -func (p *sqlDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error { - if err := p.readAndDumpStructure(); err != nil { +// Dump executes the dump process. +func (e *Engine) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error { + if err := e.readAndDumpStructure(); err != nil { return err } - return p.readAndDumpTables(done, spec, concurrency) + return e.readAndDumpTables(done, spec, concurrency) } -func (p *sqlDumper) readAndDumpStructure() error { - log.Debug("Dumping structure...") - structureSQL, err := p.reader.GetStructure() +func (e *Engine) readAndDumpStructure() error { + log.Debug("dumping structure...") + sql, err := e.reader.GetStructure() if err != nil { return errors.Wrap(err, "failed to get structure") } - if err := p.DumpStructure(structureSQL); err != nil { + if err := e.DumpStructure(sql); err != nil { return errors.Wrap(err, "failed to dump structure") } - log.Debug("Structure dumped") + log.Debug("structure was dumped") return nil } -func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, concurrency int) error { - tables, err := p.reader.GetTables() +func (e *Engine) readAndDumpTables(done chan<- struct{}, spec *config.Spec, concurrency int) error { + tables, err := e.reader.GetTables() if err != nil { - return err + return errors.Wrap(err, "failed to read and dump tables") } // Trigger pre dump tables - if adv, ok := p.SqlEngine.(SqlEngineAdvanced); ok { + if adv, ok := e.Dumper.(Hooker); ok { if err := adv.PreDumpTables(tables); err != nil { - return err + return errors.Wrap(err, "failed to execute pre dump tables") } } semChan := make(chan struct{}, concurrency) - var wg sync.WaitGroup for _, tbl := range tables { logger := log.WithField("table", tbl) @@ -97,7 +102,7 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c Match: tableConfig.Filter.Match, Sorts: tableConfig.Filter.Sorts, Limit: tableConfig.Filter.Limit, - Relationships: p.relationshipConfigToOptions(tableConfig.Relationships), + Relationships: e.relationshipConfigToOptions(tableConfig.Relationships), } } @@ -110,13 +115,13 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c defer wg.Done() defer func(semChan <-chan struct{}) { <-semChan }(semChan) - if err := p.DumpTable(tableName, rowChan); err != nil { + if err := e.DumpTable(tableName, rowChan); err != nil { logger.WithError(err).Error("Failed to dump table") } }(tbl, rowChan, logger) go func(tableName string, opts reader.ReadTableOpt, rowChan chan<- database.Row, logger *log.Entry) { - if err := p.reader.ReadTable(tableName, rowChan, opts, spec.Matchers); err != nil { + if err := e.reader.ReadTable(tableName, rowChan, opts, spec.Matchers); err != nil { logger.WithError(err).Error("Failed to read table") } }(tbl, opts, rowChan, logger) @@ -128,9 +133,9 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c close(semChan) // Trigger post dump tables - if adv, ok := p.SqlEngine.(SqlEngineAdvanced); ok { + if adv, ok := e.Dumper.(Hooker); ok { if err := adv.PostDumpTables(tables); err != nil { - log.WithError(err).Error("Post dump tables failed") + log.WithError(err).Error("post dump tables failed") } } @@ -140,7 +145,7 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c return nil } -func (p *sqlDumper) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt { +func (e *Engine) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt { var opts []*reader.RelationshipOpt for _, r := range relationshipsConfig { diff --git a/pkg/dumper/mysql/dumper.go b/pkg/dumper/mysql/dumper.go index 0b8afe3..de24837 100644 --- a/pkg/dumper/mysql/dumper.go +++ b/pkg/dumper/mysql/dumper.go @@ -11,43 +11,54 @@ import ( "github.com/go-sql-driver/mysql" "github.com/hellofresh/klepto/pkg/database" "github.com/hellofresh/klepto/pkg/dumper" - "github.com/hellofresh/klepto/pkg/dumper/generic" + "github.com/hellofresh/klepto/pkg/dumper/engine" "github.com/hellofresh/klepto/pkg/reader" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) -// myDumper dumps a database into a mysql db -type myDumper struct { - conn *sql.DB - reader reader.Reader -} +const ( + null = "NULL" +) +type ( + myDumper struct { + conn *sql.DB + reader reader.Reader + } +) + +// NewDumper returns a new mysql dumper. func NewDumper(conn *sql.DB, rdr reader.Reader) dumper.Dumper { - return generic.NewSqlDumper(rdr, &myDumper{ + return engine.New(rdr, &myDumper{ conn: conn, reader: rdr, }) } -func (p *myDumper) DumpStructure(sql string) error { - if _, err := p.conn.Exec(sql); err != nil { +// DumpStructure dump the mysql database structure. +func (d *myDumper) DumpStructure(sql string) error { + if _, err := d.conn.Exec(sql); err != nil { return err } return nil } -func (p *myDumper) DumpTable(tableName string, rowChan <-chan database.Row) error { - txn, err := p.conn.Begin() +// DumpTable dumps a mysql table. +func (d *myDumper) DumpTable(tableName string, rowChan <-chan database.Row) error { + txn, err := d.conn.Begin() if err != nil { return errors.Wrap(err, "failed to open transaction") } - insertedRows, err := p.insertIntoTable(txn, tableName, rowChan) + insertedRows, err := d.insertIntoTable(txn, tableName, rowChan) if err != nil { - txn.Rollback() - return errors.Wrap(err, "failed to insert rows") + err = errors.Wrap(err, "failed to insert rows") + if err := txn.Rollback(); err != nil { + return errors.Wrap(err, "failed to rollback transaction") + } + return err } log.WithFields(log.Fields{ @@ -62,25 +73,29 @@ func (p *myDumper) DumpTable(tableName string, rowChan <-chan database.Row) erro return nil } -func (p *myDumper) Close() error { - return p.conn.Close() +// Close closes the mysql database connection. +func (d *myDumper) Close() error { + err := d.conn.Close() + if err != nil { + return errors.Wrap(err, "failed to close mysql connection") + } + return nil } -func (p *myDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan database.Row) (int64, error) { - columns, err := p.reader.GetColumns(tableName) +func (d *myDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan database.Row) (int64, error) { + columns, err := d.reader.GetColumns(tableName) if err != nil { - return 0, err + return 0, errors.Wrap(err, "failed to get columns") } - // Create query columnsQuoted := make([]string, len(columns)) for i, column := range columns { - columnsQuoted[i] = p.quoteIdentifier(column) + columnsQuoted[i] = d.quoteIdentifier(column) } query := fmt.Sprintf( "LOAD DATA LOCAL INFILE 'Reader::%s' INTO TABLE %s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\"' (%s)", tableName, - p.quoteIdentifier(tableName), + d.quoteIdentifier(tableName), strings.Join(columnsQuoted, ","), ) @@ -104,7 +119,7 @@ func (p *myDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan for i, col := range columns { switch v := row[col].(type) { case nil: - rowValues[i] = "NULL" + rowValues[i] = null case string: rowValues[i] = row[col].(string) case []uint8: @@ -124,20 +139,20 @@ func (p *myDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan }(rowWriter) // Register the reader for reading the csv - mysql.RegisterReaderHandler(tableName, func() io.Reader { - return rowReader - }) + mysql.RegisterReaderHandler(tableName, func() io.Reader { return rowReader }) defer mysql.DeregisterReaderHandler(tableName) - // Execute the query - txn.Exec("SET foreign_key_checks = 0;") + if _, err := txn.Exec("SET foreign_key_checks = 0;"); err != nil { + return 0, errors.Wrap(err, "failed to disable foreign key checks") + } + if _, err := txn.Exec(query); err != nil { - return 0, err + return 0, errors.Wrap(err, "failed to execute query") } return inserted, nil } -func (p *myDumper) quoteIdentifier(name string) string { +func (d *myDumper) quoteIdentifier(name string) string { return fmt.Sprintf("`%s`", strings.Replace(name, "`", "``", -1)) } diff --git a/pkg/dumper/mysql/mysql.go b/pkg/dumper/mysql/mysql.go index 61dfd1c..d48c1e6 100644 --- a/pkg/dumper/mysql/mysql.go +++ b/pkg/dumper/mysql/mysql.go @@ -6,11 +6,13 @@ import ( "github.com/go-sql-driver/mysql" "github.com/hellofresh/klepto/pkg/dumper" "github.com/hellofresh/klepto/pkg/reader" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) type driver struct{} +// IsSupported checks if the given dsn connection string is supported. func (m *driver) IsSupported(dsn string) bool { if dsn == "" { return false @@ -20,11 +22,13 @@ func (m *driver) IsSupported(dsn string) bool { return err == nil } +// NewConnection creates a new mysql connection and retrieves a new mysql dumper. func (m *driver) NewConnection(opts dumper.ConnOpts, rdr reader.Reader) (dumper.Dumper, error) { dsnCfg, err := mysql.ParseDSN(opts.DSN) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to parse mysql dsn") } + if !dsnCfg.MultiStatements { log.WithField("help", "https://github.com/go-sql-driver/mysql#multistatements"). Warning("MYSQL dumper forcing multistatements!") @@ -33,7 +37,7 @@ func (m *driver) NewConnection(opts dumper.ConnOpts, rdr reader.Reader) (dumper. conn, err := sql.Open("mysql", dsnCfg.FormatDSN()) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to open mysql connection") } conn.SetMaxOpenConns(opts.MaxConns) diff --git a/pkg/dumper/postgres/dumper.go b/pkg/dumper/postgres/dumper.go index 505d187..9e3e273 100644 --- a/pkg/dumper/postgres/dumper.go +++ b/pkg/dumper/postgres/dumper.go @@ -8,49 +8,55 @@ import ( "github.com/hellofresh/klepto/pkg/config" "github.com/hellofresh/klepto/pkg/database" "github.com/hellofresh/klepto/pkg/dumper" - "github.com/hellofresh/klepto/pkg/dumper/generic" + "github.com/hellofresh/klepto/pkg/dumper/engine" "github.com/hellofresh/klepto/pkg/reader" "github.com/lib/pq" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) -// pgDumper dumps a database into a postgres db -type pgDumper struct { - conn *sql.DB - reader reader.Reader -} +type ( + pgDumper struct { + conn *sql.DB + reader reader.Reader + } +) +// NewDumper returns a new postgres dumper. func NewDumper(conn *sql.DB, rdr reader.Reader) dumper.Dumper { - return generic.NewSqlDumper( - rdr, - &pgDumper{ - conn: conn, - reader: rdr, - }, - ) + return engine.New(rdr, &pgDumper{ + conn: conn, + reader: rdr, + }) } -func (p *pgDumper) DumpStructure(sql string) error { - if _, err := p.conn.Exec(sql); err != nil { +// DumpStructure dump the mysql database structure. +func (d *pgDumper) DumpStructure(sql string) error { + if _, err := d.conn.Exec(sql); err != nil { return err } return nil } -func (p *pgDumper) DumpTable(tableName string, rowChan <-chan database.Row) error { - txn, err := p.conn.Begin() +// DumpTable dumps a postgres table. +func (d *pgDumper) DumpTable(tableName string, rowChan <-chan database.Row) error { + txn, err := d.conn.Begin() if err != nil { return errors.Wrap(err, "failed to open transaction") } - insertedRows, err := p.insertIntoTable(txn, tableName, rowChan) + insertedRows, err := d.insertIntoTable(txn, tableName, rowChan) if err != nil { - txn.Rollback() - return errors.Wrap(err, "failed to insert rows") + err = errors.Wrap(err, "failed to insert rows") + if err := txn.Rollback(); err != nil { + return errors.Wrap(err, "failed to rollback transaction") + } } - log.WithField("table", tableName).WithField("inserted", insertedRows).Debug("inserted rows") + log.WithFields(log.Fields{ + "table": tableName, + "inserted": insertedRows, + }).Debug("inserted rows") if err := txn.Commit(); err != nil { return errors.Wrap(err, "failed to commit transaction") @@ -60,11 +66,11 @@ func (p *pgDumper) DumpTable(tableName string, rowChan <-chan database.Row) erro } // PreDumpTables Disable triggers on all tables to avoid foreign key constraints -func (p *pgDumper) PreDumpTables(tables []string) error { +func (d *pgDumper) PreDumpTables(tables []string) error { // We can't use `SET session_replication_role = replica` because multiple connections and stuff for _, tbl := range tables { query := fmt.Sprintf("ALTER TABLE %s DISABLE TRIGGER ALL", strconv.Quote(tbl)) - if _, err := p.conn.Exec(query); err != nil { + if _, err := d.conn.Exec(query); err != nil { return errors.Wrapf(err, "Failed to disable triggers for %s", tbl) } } @@ -72,34 +78,39 @@ func (p *pgDumper) PreDumpTables(tables []string) error { return nil } -// PostDumpTables Enable triggers on all tables to enforce foreign key constraints -func (p *pgDumper) PostDumpTables(tables []string) error { +// PostDumpTables enable triggers on all tables to enforce foreign key constraints +func (d *pgDumper) PostDumpTables(tables []string) error { // We can't use `SET session_replication_role = DEFAULT` because multiple connections and stuff for _, tbl := range tables { query := fmt.Sprintf("ALTER TABLE %s ENABLE TRIGGER ALL", strconv.Quote(tbl)) - if _, err := p.conn.Exec(query); err != nil { - return errors.Wrap(err, "Failed to enable triggers") + if _, err := d.conn.Exec(query); err != nil { + return errors.Wrapf(err, "Failed to anble triggers for %s", tbl) } } return nil } -func (p *pgDumper) Close() error { - return p.conn.Close() +// Close closes the postgres database connection. +func (d *pgDumper) Close() error { + err := d.conn.Close() + if err != nil { + return errors.Wrap(err, "failed to close postgres connection") + } + return nil } -func (p *pgDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan database.Row) (int64, error) { - columns, err := p.reader.GetColumns(tableName) +func (d *pgDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan database.Row) (int64, error) { + columns, err := d.reader.GetColumns(tableName) if err != nil { - return 0, err + return 0, errors.Wrap(err, "failed to get columns") } logger := log.WithFields(log.Fields{ "table": tableName, "columns": columns, }) - logger.Debug("Preparing copy in") + logger.Debug("preparing copy in") stmt, err := txn.Prepare(pq.CopyIn(tableName, columns...)) if err != nil { @@ -141,7 +152,7 @@ func (p *pgDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan inserted++ } - logger.Debug("Executing copy in") + logger.Debug("executing copy in") if _, err := stmt.Exec(); err != nil { return 0, errors.Wrap(err, "failed to exec copy in") } @@ -149,7 +160,7 @@ func (p *pgDumper) insertIntoTable(txn *sql.Tx, tableName string, rowChan <-chan return inserted, nil } -func (p *pgDumper) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt { +func (d *pgDumper) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt { var opts []*reader.RelationshipOpt for _, r := range relationshipsConfig { diff --git a/pkg/dumper/query/dumper.go b/pkg/dumper/query/dumper.go index 6746bb4..ea89780 100644 --- a/pkg/dumper/query/dumper.go +++ b/pkg/dumper/query/dumper.go @@ -15,13 +15,14 @@ import ( log "github.com/sirupsen/logrus" ) -// textDumper dumps a database's structure to a stream -type textDumper struct { - reader reader.Reader - output io.Writer -} +type ( + textDumper struct { + reader reader.Reader + output io.Writer + } +) -// NewDumper is the constructor for MySQLDumper +// NewDumper returns a new text dumper implementation. func NewDumper(output io.Writer, rdr reader.Reader) dumper.Dumper { return &textDumper{ reader: rdr, @@ -29,10 +30,11 @@ func NewDumper(output io.Writer, rdr reader.Reader) dumper.Dumper { } } +// Dump executes the dump stream process. func (d *textDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error { tables, err := d.reader.GetTables() if err != nil { - return errors.Wrap(err, "could not get tables") + return errors.Wrap(err, "failed to get tables") } structure, err := d.reader.GetStructure() @@ -86,13 +88,17 @@ func (d *textDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency i return nil } +// Close closes the output stream. func (d *textDumper) Close() error { closer, ok := d.output.(io.WriteCloser) if ok { - closer.Close() + if err := closer.Close(); err != nil { + return errors.Wrap(err, "failed to close output stream") + } + return nil } - return nil + return errors.New("unable to close output: wrong closer type") } func (d *textDumper) toSQLColumnMap(row database.Row) (map[string]interface{}, error) { diff --git a/pkg/dumper/query/dumper_test.go b/pkg/dumper/query/dumper_test.go deleted file mode 100644 index 77a65e1..0000000 --- a/pkg/dumper/query/dumper_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package query - -// storeStub's methods implements database.Reader() interface -type storeStub struct{} - -func (st *storeStub) getTables() (tables []string, err error) { - return -} - -func (st *storeStub) getStructure() (structure string, err error) { - return -} - -func (st *storeStub) getColumns() (columns []string, err error) { - return -} - -func (st *storeStub) getPreamble() (preamble string, err error) { - return -} - -type dumperTestPair struct { - structure string - err error -} - -var dumpTests = []dumperTestPair{ - {"some structure", nil}, - {"some other structure", nil}, -} - -// func TestDumpStructure(t *testing.T) { -// var st storeStub -// var structure string -// var err error -// for _, pair := range dumpTests { -// preamble, _ := st.getPreamble() -// tables, _ := st.getTables() -// var tableStructure string -// for _, table := range tables { -// tableStructure, err = st.getTableStructure(table) -// } -// structure = fmt.Sprintf("%s\n%s;\n\n", preamble, tableStructure) -// // Check that no error is returned -// if structure != pair.structure { -// t.Error( -// "For", pair.structure, -// "expected", pair.err, -// "got", err, -// ) -// } -// } -// } diff --git a/pkg/dumper/query/writer.go b/pkg/dumper/query/writer.go index 897dcae..2423822 100644 --- a/pkg/dumper/query/writer.go +++ b/pkg/dumper/query/writer.go @@ -30,5 +30,4 @@ func getOutputWriter(dsn string) (io.Writer, error) { default: return nil, fmt.Errorf("Unknown output writer type: %v", config.Type) } - } diff --git a/pkg/reader/generic/sql.go b/pkg/reader/engine/engine.go similarity index 67% rename from pkg/reader/generic/sql.go rename to pkg/reader/engine/engine.go index e004f7f..f0bd54d 100644 --- a/pkg/reader/generic/sql.go +++ b/pkg/reader/engine/engine.go @@ -1,4 +1,4 @@ -package generic +package engine import ( "context" @@ -15,10 +15,10 @@ import ( log "github.com/sirupsen/logrus" ) -// sqlReader is a base class for sql related readers type ( - SqlReader struct { - SqlEngine + // Engine is responsible for sql related read operations. + Engine struct { + Storage // tables is a cache variable for all tables in the db tables []string // columns is a cache variable for tables and there columns in the db @@ -27,94 +27,90 @@ type ( timeout time.Duration } - SqlEngine interface { - // GetConnection return the sql.DB connection - GetConnection() *sql.DB - + // Storage is the read storage database interface. + Storage interface { // GetStructure returns the SQL used to create the database tables GetStructure() (string, error) - // GetTables return a list of all database tables GetTables() ([]string, error) - // GetColumns return a list of all columns for a given table GetColumns(string) ([]string, error) - // QuoteIdentifier returns a quoted instance of a identifier (table, column etc.) QuoteIdentifier(string) string - - // Close closes the connection and other resources and releases them. + // Conn return the sql.DB connection + Conn() *sql.DB + // Close closes the reader resources and releases them. Close() error } ) -// NewSqlReader creates a new sql reader -func NewSqlReader(se SqlEngine, t time.Duration) *SqlReader { - return &SqlReader{SqlEngine: se, timeout: t} +// New creates a new sql reader engine. +func New(s Storage, timeout time.Duration) *Engine { + return &Engine{Storage: s, timeout: timeout} } // GetTables gets a list of all tables in the database -func (s *SqlReader) GetTables() ([]string, error) { - if s.tables == nil { - tables, err := s.SqlEngine.GetTables() +func (e *Engine) GetTables() ([]string, error) { + if e.tables == nil { + tables, err := e.Storage.GetTables() if err != nil { return nil, err } - s.tables = tables + e.tables = tables } - return s.tables, nil + return e.tables, nil } // GetColumns returns the columns in the specified database table -func (s *SqlReader) GetColumns(tableName string) ([]string, error) { - columns, ok := s.columns.Load(tableName) +func (e *Engine) GetColumns(tableName string) ([]string, error) { + columns, ok := e.columns.Load(tableName) if !ok { var err error - columns, err = s.SqlEngine.GetColumns(tableName) + columns, err = e.Storage.GetColumns(tableName) if err != nil { return nil, err } - s.columns.Store(tableName, columns) + e.columns.Store(tableName, columns) } return columns.([]string), nil } // ReadTable returns a list of all rows in a table -func (s *SqlReader) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error { +func (e *Engine) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error { defer close(rowChan) logger := log.WithField("table", tableName) logger.Debug("reading table data") if len(opts.Columns) == 0 { - columns, err := s.GetColumns(tableName) + columns, err := e.GetColumns(tableName) if err != nil { return errors.Wrap(err, "failed to get columns") } - opts.Columns = s.formatColumns(tableName, columns) + opts.Columns = e.formatColumns(tableName, columns) } var ( query sq.SelectBuilder err error ) - query, err = s.buildQuery(tableName, opts, matchers) + query, err = e.buildQuery(tableName, opts, matchers) if err != nil { return errors.Wrapf(err, "failed to build query for %s", tableName) } var rows *sql.Rows - ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + ctx, cancel := context.WithTimeout(context.Background(), e.timeout) defer cancel() errchan := make(chan error) go func() { defer close(errchan) - rows, err = query.RunWith(s.GetConnection()).QueryContext(ctx) + rows, err = query.RunWith(e.Conn()).QueryContext(ctx) errchan <- err }() @@ -134,14 +130,14 @@ func (s *SqlReader) ReadTable(tableName string, rowChan chan<- database.Row, opt break } - return s.publishRows(rows, rowChan, tableName) + return e.publishRows(rows, rowChan, tableName) } // BuildQuery builds the query that will be used to read the table -func (s *SqlReader) buildQuery(tableName string, opts reader.ReadTableOpt, matchers map[string]string) (sq.SelectBuilder, error) { +func (e *Engine) buildQuery(tableName string, opts reader.ReadTableOpt, matchers map[string]string) (sq.SelectBuilder, error) { var query sq.SelectBuilder - query = sq.Select(opts.Columns...).From(s.QuoteIdentifier(tableName)) + query = sq.Select(opts.Columns...).From(e.QuoteIdentifier(tableName)) for _, r := range opts.Relationships { if r.Table == "" { r.Table = tableName @@ -176,21 +172,22 @@ func (s *SqlReader) buildQuery(tableName string, opts reader.ReadTableOpt, match } // FormatColumn returns a escaped table+column string -func (s *SqlReader) FormatColumn(tableName string, columnName string) string { +func (e *Engine) FormatColumn(tableName string, columnName string) string { return fmt.Sprintf( "%s.%s", - s.QuoteIdentifier(tableName), - s.QuoteIdentifier(columnName), + e.QuoteIdentifier(tableName), + e.QuoteIdentifier(columnName), ) } -func (s *SqlReader) publishRows(rows *sql.Rows, rowChan chan<- database.Row, tableName string) error { +func (e *Engine) publishRows(rows *sql.Rows, rowChan chan<- database.Row, tableName string) error { defer rows.Close() columnTypes, err := rows.ColumnTypes() if err != nil { - return err + return errors.Wrap(err, "failed to get column types") } + columnCount := len(columnTypes) columns := make([]string, columnCount) for i, col := range columnTypes { @@ -222,10 +219,10 @@ func (s *SqlReader) publishRows(rows *sql.Rows, rowChan chan<- database.Row, tab return nil } -func (s *SqlReader) formatColumns(tableName string, columns []string) []string { +func (e *Engine) formatColumns(tableName string, columns []string) []string { formatted := make([]string, len(columns)) for i, c := range columns { - formatted[i] = s.FormatColumn(tableName, c) + formatted[i] = e.FormatColumn(tableName, c) } return formatted diff --git a/pkg/reader/mysql/mysql.go b/pkg/reader/mysql/mysql.go index 2693a63..54670f3 100644 --- a/pkg/reader/mysql/mysql.go +++ b/pkg/reader/mysql/mysql.go @@ -9,6 +9,7 @@ import ( type driver struct{} +// IsSupported checks if the given dsn connection string is supported. func (m *driver) IsSupported(dsn string) bool { if dsn == "" { return false @@ -18,6 +19,7 @@ func (m *driver) IsSupported(dsn string) bool { return err == nil } +// NewConnection creates a new mysql connection and retrieves a new mysql reader. func (m *driver) NewConnection(opts reader.ConnOpts) (reader.Reader, error) { conn, err := sql.Open("mysql", opts.DSN) if err != nil { diff --git a/pkg/reader/mysql/reader.go b/pkg/reader/mysql/reader.go index 27af88a..27b639b 100644 --- a/pkg/reader/mysql/reader.go +++ b/pkg/reader/mysql/reader.go @@ -8,27 +8,31 @@ import ( "time" "github.com/hellofresh/klepto/pkg/reader" - "github.com/hellofresh/klepto/pkg/reader/generic" + "github.com/hellofresh/klepto/pkg/reader/engine" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) -type storage struct { - conn *sql.DB -} +const ( + baseTable = "BASE TABLE" +) -// NewStorage ... -func NewStorage(conn *sql.DB, timeout time.Duration) reader.Reader { - return generic.NewSqlReader(&storage{conn}, timeout) -} +type ( + storage struct { + conn *sql.DB + } +) -// GetConnection return the connection -func (s *storage) GetConnection() *sql.DB { - return s.conn +// NewStorage creates a new mysql reader. +func NewStorage(conn *sql.DB, timeout time.Duration) reader.Reader { + return engine.New(&storage{ + conn: conn, + }, timeout) } -// GetTables gets a list of all tables in the database +// GetTables gets a list of all tables in the database. func (s *storage) GetTables() ([]string, error) { - log.Debug("Fetching table list") + log.Debug("fetching table list") rows, err := s.conn.Query("SHOW FULL TABLES") if err != nil { @@ -42,12 +46,12 @@ func (s *storage) GetTables() ([]string, error) { if err := rows.Scan(&tableName, &tableType); err != nil { return nil, err } - if tableType == "BASE TABLE" { + if tableType == baseTable { tables = append(tables, tableName) } } - log.WithField("tables", tables).Debug("Fetched table list") + log.WithField("tables", tables).Debug("fetched table list") return tables, nil } @@ -76,7 +80,7 @@ func (s *storage) GetColumns(tableName string) ([]string, error) { return columns, nil } -// GetStructure returns the SQL used to create the database tables structure +// GetStructure dumps the mysql database structure. func (s *storage) GetStructure() (string, error) { tables, err := s.GetTables() if err != nil { @@ -106,14 +110,23 @@ func (s *storage) GetStructure() (string, error) { return buf.String(), nil } +// QuoteIdentifier ... func (s *storage) QuoteIdentifier(name string) string { return fmt.Sprintf("`%s`", strings.Replace(name, "`", "``", -1)) } +// Close closes the mysql database connection. func (s *storage) Close() error { - return s.conn.Close() + err := s.conn.Close() + if err != nil { + return errors.Wrap(err, "failed to close mysql reader database connection") + } + return nil } +// Conn retrieves the storage connection +func (s *storage) Conn() *sql.DB { return s.conn } + // getPreamble puts a big old comment at the top of the database dump. // Also acts as first query to check for errors. func (s *storage) getPreamble() (string, error) { diff --git a/pkg/reader/postgres/pg_dump.go b/pkg/reader/postgres/pg_dump.go index 83d485d..2474ddd 100644 --- a/pkg/reader/postgres/pg_dump.go +++ b/pkg/reader/postgres/pg_dump.go @@ -8,32 +8,29 @@ import ( ) type ( - PgDump interface { - GetStructure() (stmt string, err error) - } - - pgDump struct { + // PgDump is responsible for executing the pg dump command. + PgDump struct { command string dsn string } ) -func NewPgDump(dsn string) (PgDump, error) { - pgDumpPath, err := exec.LookPath("pg_dump") +// NewPgDump creates a new PgDump. +func NewPgDump(dsn string) (*PgDump, error) { + path, err := exec.LookPath("pg_dump") if err != nil { return nil, err } - return &pgDump{ - command: pgDumpPath, + return &PgDump{ + command: path, dsn: dsn, }, nil } -func (p *pgDump) GetStructure() (string, error) { - logger := log.WithFields(log.Fields{ - "command": p.command, - }) +// GetStructure executes the pg dump command. +func (p *PgDump) GetStructure() (string, error) { + logger := log.WithField("command", p.command) cmd := exec.Command( p.command, @@ -43,7 +40,7 @@ func (p *pgDump) GetStructure() (string, error) { "--no-owner", ) - logger.Debug("Loading schema for table") + logger.Debug("loading schema for table") cmdErr := logger.WriterLevel(log.WarnLevel) defer cmdErr.Close() @@ -54,7 +51,7 @@ func (p *pgDump) GetStructure() (string, error) { cmd.Stdout = buf if err := cmd.Run(); err != nil { - logger.Error("Failed to load schema for table") + logger.Error("failed to load schema for table") } return buf.String(), nil diff --git a/pkg/reader/postgres/postgres.go b/pkg/reader/postgres/postgres.go index 97f35a2..a26958b 100644 --- a/pkg/reader/postgres/postgres.go +++ b/pkg/reader/postgres/postgres.go @@ -10,10 +10,12 @@ import ( type driver struct{} +// IsSupported checks if the postgres driver is supported. func (m *driver) IsSupported(dsn string) bool { return strings.HasPrefix(strings.ToLower(dsn), "postgres://") } +// NewConnection takes the connection options and returns a new Reader. func (m *driver) NewConnection(opts reader.ConnOpts) (reader.Reader, error) { conn, err := sql.Open("postgres", opts.DSN) if err != nil { @@ -24,12 +26,12 @@ func (m *driver) NewConnection(opts reader.ConnOpts) (reader.Reader, error) { conn.SetMaxIdleConns(opts.MaxIdleConns) conn.SetConnMaxLifetime(opts.MaxConnLifetime) - dump, err := NewPgDump(opts.DSN) + dumper, err := NewPgDump(opts.DSN) if err != nil { return nil, err } - return NewStorage(conn, dump, opts.Timeout), nil + return NewStorage(conn, dumper, opts.Timeout), nil } func init() { diff --git a/pkg/reader/postgres/reader.go b/pkg/reader/postgres/reader.go index 026d291..1952ead 100644 --- a/pkg/reader/postgres/reader.go +++ b/pkg/reader/postgres/reader.go @@ -6,34 +6,35 @@ import ( "time" "github.com/hellofresh/klepto/pkg/reader" - "github.com/hellofresh/klepto/pkg/reader/generic" + "github.com/hellofresh/klepto/pkg/reader/engine" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) -// Storage ... -type storage struct { - PgDump - - connection *sql.DB -} +type ( + storage struct { + PgDumper + conn *sql.DB + } -// NewStorage ... -func NewStorage(conn *sql.DB, dumper PgDump, timeout time.Duration) reader.Reader { - s := &storage{ - PgDump: dumper, - connection: conn, + // PgDump executes the pg dump command. + PgDumper interface { + GetStructure() (stmt string, err error) } - return generic.NewSqlReader(s, timeout) -} +) -func (s *storage) GetConnection() *sql.DB { - return s.connection +// NewStorage creates a new postgres storage reader. +func NewStorage(conn *sql.DB, dumper PgDumper, timeout time.Duration) reader.Reader { + return engine.New(&storage{ + PgDumper: dumper, + conn: conn, + }, timeout) } // GetTables gets a list of all tables in the database func (s *storage) GetTables() ([]string, error) { - log.Debug("Fetching table list") - rows, err := s.connection.Query( + log.Debug("fetching table list") + rows, err := s.conn.Query( `SELECT table_name FROM information_schema.tables WHERE table_catalog=current_database() AND table_schema NOT IN ('pg_catalog', 'information_schema')`, ) @@ -52,15 +53,14 @@ func (s *storage) GetTables() ([]string, error) { tables = append(tables, tableName) } - log.WithField("tables", tables).Debug("Fetched table list") + log.WithField("tables", tables).Debug("fetched table list") return tables, nil } -// GetColumns returns the columns in the specified database table func (s *storage) GetColumns(table string) ([]string, error) { - log.WithField("table", table).Debug("Fetching table columns") - rows, err := s.connection.Query( + log.WithField("table", table).Debug("fetching table columns") + rows, err := s.conn.Query( "SELECT column_name FROM information_schema.columns WHERE table_catalog=current_database() AND table_name=$1", table, ) @@ -82,10 +82,18 @@ func (s *storage) GetColumns(table string) ([]string, error) { return columns, nil } +// QuoteIdentifier returns a double-quoted name. func (s *storage) QuoteIdentifier(name string) string { return strconv.Quote(name) } +// Close closes the postgres connection reader. func (s *storage) Close() error { - return s.connection.Close() + if err := s.conn.Close(); err != nil { + return errors.Wrap(err, "failed to close postgres connection reader") + } + return nil } + +// Conn retrieves the postgres reader connection. +func (s *storage) Conn() *sql.DB { return s.conn } diff --git a/pkg/reader/reader.go b/pkg/reader/reader.go index fdec048..ab12c8f 100644 --- a/pkg/reader/reader.go +++ b/pkg/reader/reader.go @@ -10,7 +10,9 @@ import ( type ( // Driver is a driver interface used to support multiple drivers Driver interface { + // IsSupported checks if the driver is supported. IsSupported(string) bool + // NewConnection takes the connection options and returns a new Reader. NewConnection(ConnOpts) (Reader, error) } @@ -38,7 +40,7 @@ type ( Match string // Sort the results Sorts map[string]string - // Defines a limit of results to be fetched + // Limit defines a limit of results to be fetched Limit uint64 // Relationships defines an slice of relationship definitions Relationships []*RelationshipOpt @@ -46,19 +48,28 @@ type ( // RelationshipOpt represents the relationships options RelationshipOpt struct { - Table string - ForeignKey string + // Table is the table name. + Table string + // ForeignKey is the table name foreign key. + ForeignKey string + // ReferencedTable is the referenced table name. ReferencedTable string - ReferencedKey string + // ReferencedKey is the referenced table primary key name. + ReferencedKey string } // ConnOpts are the options to create a connection ConnOpts struct { - DSN string - Timeout time.Duration + // DSN is the connection address. + DSN string + // Timeout is the timeout for read operations. + Timeout time.Duration + // MaxConnLifetime is the maximum amount of time a connection may be reused on the read database. MaxConnLifetime time.Duration - MaxConns int - MaxIdleConns int + // MaxConns is the maximum number of open connections to the read database. + MaxConns int + // MaxIdleConns is the maximum number of connections in the idle connection pool for the read database. + MaxIdleConns int } )