Skip to content

Commit

Permalink
improving godoc
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeljesus committed Mar 10, 2018
1 parent bbca864 commit e0b790c
Show file tree
Hide file tree
Showing 21 changed files with 352 additions and 313 deletions.
2 changes: 1 addition & 1 deletion cmd/steal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
_ "github.com/hellofresh/klepto/pkg/reader/postgres"
)

// StealOptions represents the command options
type (
// StealOptions represents the command options
StealOptions struct {
from string
to string
Expand Down
13 changes: 7 additions & 6 deletions cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ const (
githubRepo = "klepto"
)

// UpdateOptions are the command flags
type UpdateOptions struct {
token string
}
type (
// UpdateOptions are the command flags
UpdateOptions struct {
token string
}
)

// NewUpdateCmd creates a new update command
func NewUpdateCmd() *cobra.Command {
opts := &UpdateOptions{}

opts := new(UpdateOptions)
cmd := &cobra.Command{
Use: "update",
Short: "Check for new versions of kepto",
Expand Down
15 changes: 8 additions & 7 deletions pkg/anonymiser/anonymiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ const (
password = "Password"
)

// anonymiser is responsible for anonymising columns
type anonymiser struct {
reader.Reader
tables config.Tables
}
type (
anonymiser struct {
reader.Reader
tables config.Tables
}
)

// NewAnonymiser returns an initialised instance of MySQLAnonymiser
// NewAnonymiser returns a new anonymiser reader.
func NewAnonymiser(source reader.Reader, tables config.Tables) reader.Reader {
return &anonymiser{source, tables}
}

// ReadTable wraps reader.ReadTable method for anonymising rows published from the reader.Reader
// ReadTable decorates reader.ReadTable method for anonymising rows published from the reader.Reader
func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error {
logger := log.WithField("table", tableName)
logger.Debug("Loading anonymiser config")
Expand Down
41 changes: 27 additions & 14 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,57 @@ package config
import "errors"

type (
// Spec represents the global app configuration
// Spec represents the global app configuration.
Spec struct {
Matchers
Tables
}

// Matchers are variables to replace placeolders in filter
// Matchers are variables to store filter data,
// you can declare a filter once and reuse it among tables.
Matchers map[string]string

// Tables are an array of table
// Tables are an array of table definitions.
Tables []*Table

// Table represents a klepto table definition
// Table represents a klepto table definition.
Table struct {
Name string
IgnoreData bool
Filter Filter
Anonymise map[string]string
// Name is the table name.
Name string
// IgnoreData if set to true, it will dump the table structure without importing data.
IgnoreData bool
// Filter represents the way you want to filter the results.
Filter
// Anonymise anonymise columns.
Anonymise map[string]string
// Relationship is an collection of relationship definitions.
Relationships []*Relationship
}

// Filter represents the way you want to filter the results
// Filter represents the way you want to filter the results.
Filter struct {
// Match is a condition field to dump only certain amount data.
Match string
// Limit defines a limit of results to be fetched.
Limit uint64
// Sorts is the sort condition for the table.
Sorts map[string]string
}

// Relationship represents a relationship definition
// Relationship represents the relationship between the table and referenced table.
Relationship struct {
Table string
ForeignKey string
// Table is the table name.
Table string
// ForeignKey is the table name foreign key.
ForeignKey string
// ReferencedTable is the referenced table name.
ReferencedTable string
ReferencedKey string
// ReferencedKey is the referenced table primary key name.
ReferencedKey string
}
)

// FindByName filter a table by its name
// FindByName find a table by its name.
func (t Tables) FindByName(name string) (*Table, error) {
for _, table := range t {
if table.Name == name {
Expand Down
1 change: 1 addition & 0 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package database

type (
// Row is the database column row.
Row map[string]interface{}
)
22 changes: 10 additions & 12 deletions pkg/dsn/dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ var (
ErrEmptyDsn = errors.New("Empty string provided for dsn")
// ErrInvalidDsn defines error returned when the dsn is invalid
ErrInvalidDsn = errors.New("Invalid dsn")

// From https://github.com/go-sql-driver/mysql/blob/f4bf8e8e0aa93d4ead0c6473503ca2f5d5eb65a8/utils.go#L34
regex = regexp.MustCompile(
`^(?:(?P<Type>.*?)?://)?` + // [type://]
`(?:(?P<Username>.*?)(?::(?P<Password>.*))?@)?` + // [username[:password]@]
`(?:(?P<Protocol>[^\(]*)(?:\((?P<Address>[^\)]*)\))?)?` + // [protocol[(address)]]
`\/(?P<DataSource>.*?)` + // /datasource
`(?:\?(?P<Params>[^\?]*))?$`) // [?param1=value1]
)

// DSN describes how a DSN looks like
Expand All @@ -36,7 +44,7 @@ func Parse(s string) (*DSN, error) {
return nil, ErrEmptyDsn
}

dsn := &DSN{}
dsn := new(DSN)

matches := regex.FindStringSubmatch(s)
if len(matches) < 1 || len(matches) > 1 && matches[1] == "" {
Expand Down Expand Up @@ -74,17 +82,7 @@ func Parse(s string) (*DSN, error) {
return dsn, nil
}

var (
// From https://github.com/go-sql-driver/mysql/blob/f4bf8e8e0aa93d4ead0c6473503ca2f5d5eb65a8/utils.go#L34
regex = regexp.MustCompile(
`^(?:(?P<Type>.*?)?://)?` + // [type://]
`(?:(?P<Username>.*?)(?::(?P<Password>.*))?@)?` + // [username[:password]@]
`(?:(?P<Protocol>[^\(]*)(?:\((?P<Address>[^\)]*)\))?)?` + // [protocol[(address)]]
`\/(?P<DataSource>.*?)` + // /datasource
`(?:\?(?P<Params>[^\?]*))?$`) // [?param1=value1]
)

// Converts a DSN struct into its string representation.
// String converts a DSN struct into its string representation.
func (d DSN) String() string {
str := ""

Expand Down
18 changes: 13 additions & 5 deletions pkg/dumper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,32 @@ import (
type (
// Driver is a driver interface used to support multiple drivers
Driver interface {
// IsSupported checks if the given dsn connection string is supported.
IsSupported(dsn string) bool
// NewConnection creates a new database connection and retrieves a dumper implementation.
NewConnection(ConnOpts, reader.Reader) (Dumper, error)
}

// A Dumper writes a database's stucture to the provided stream.
Dumper interface {
// Dump executes the dump process.
Dump(chan<- struct{}, *config.Spec, int) error
// Close closes the dumper resources and releases them.
Close() error
}

// ConnOpts are the options to create a connection
ConnOpts struct {
DSN string
Timeout time.Duration
// DSN is the connection address.
DSN string
// Timeout is the timeout for dump operations.
Timeout time.Duration
// MaxConnLifetime is the maximum amount of time a connection may be reused on the read database.
MaxConnLifetime time.Duration
MaxConns int
MaxIdleConns int
// MaxConns is the maximum number of open connections to the target database.
MaxConns int
// MaxIdleConns is the maximum number of connections in the idle connection pool for the write database.
MaxIdleConns int
}
)

Expand All @@ -40,7 +48,7 @@ func NewDumper(opts ConnOpts, rdr reader.Reader) (dumper Dumper, err error) {
if !ok || !driver.IsSupported(opts.DSN) {
return true
}
log.WithField("driver", key).Debug("Found driver")
log.WithField("driver", key).Debug("found driver")

dumper, err = driver.NewConnection(opts, rdr)
return false
Expand Down
69 changes: 37 additions & 32 deletions pkg/dumper/generic/sql.go → pkg/dumper/engine/engine.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package generic
package engine

import (
"sync"
Expand All @@ -12,72 +12,77 @@ import (
)

type (
sqlDumper struct {
SqlEngine

// Engine is the engine which dispatches and orchestrates a dump.
Engine struct {
Dumper
reader reader.Reader
}

SqlEngine interface {
// Dumper is the dump engine.
Dumper interface {
// DumpStructure dumps database structure given a sql.
DumpStructure(sql string) error

// DumpTable dumps a table by name.
DumpTable(tableName string, rowChan <-chan database.Row) error

// Close closes the dumper resources and releases them.
Close() error
}

SqlEngineAdvanced interface {
// Hooker are the actions you perform before or after a specified database operation.
Hooker interface {
// PreDumpTables performs a action before dumping tables before dumping tables.
PreDumpTables([]string) error
// PostDumpTables performs a action after dumping tables before dumping tables.
PostDumpTables([]string) error
}
)

func NewSqlDumper(rdr reader.Reader, engine SqlEngine) dumper.Dumper {
return &sqlDumper{
SqlEngine: engine,
reader: rdr,
// New creates a new engine given the reader and dumper.
func New(rdr reader.Reader, dumper Dumper) dumper.Dumper {
return &Engine{
Dumper: dumper,
reader: rdr,
}
}

func (p *sqlDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error {
if err := p.readAndDumpStructure(); err != nil {
// Dump executes the dump process.
func (e *Engine) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error {
if err := e.readAndDumpStructure(); err != nil {
return err
}

return p.readAndDumpTables(done, spec, concurrency)
return e.readAndDumpTables(done, spec, concurrency)
}

func (p *sqlDumper) readAndDumpStructure() error {
log.Debug("Dumping structure...")
structureSQL, err := p.reader.GetStructure()
func (e *Engine) readAndDumpStructure() error {
log.Debug("dumping structure...")
sql, err := e.reader.GetStructure()
if err != nil {
return errors.Wrap(err, "failed to get structure")
}

if err := p.DumpStructure(structureSQL); err != nil {
if err := e.DumpStructure(sql); err != nil {
return errors.Wrap(err, "failed to dump structure")
}

log.Debug("Structure dumped")
log.Debug("structure was dumped")
return nil
}

func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, concurrency int) error {
tables, err := p.reader.GetTables()
func (e *Engine) readAndDumpTables(done chan<- struct{}, spec *config.Spec, concurrency int) error {
tables, err := e.reader.GetTables()
if err != nil {
return err
return errors.Wrap(err, "failed to read and dump tables")
}

// Trigger pre dump tables
if adv, ok := p.SqlEngine.(SqlEngineAdvanced); ok {
if adv, ok := e.Dumper.(Hooker); ok {
if err := adv.PreDumpTables(tables); err != nil {
return err
return errors.Wrap(err, "failed to execute pre dump tables")
}
}

semChan := make(chan struct{}, concurrency)

var wg sync.WaitGroup
for _, tbl := range tables {
logger := log.WithField("table", tbl)
Expand All @@ -97,7 +102,7 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c
Match: tableConfig.Filter.Match,
Sorts: tableConfig.Filter.Sorts,
Limit: tableConfig.Filter.Limit,
Relationships: p.relationshipConfigToOptions(tableConfig.Relationships),
Relationships: e.relationshipConfigToOptions(tableConfig.Relationships),
}
}

Expand All @@ -110,13 +115,13 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c
defer wg.Done()
defer func(semChan <-chan struct{}) { <-semChan }(semChan)

if err := p.DumpTable(tableName, rowChan); err != nil {
if err := e.DumpTable(tableName, rowChan); err != nil {
logger.WithError(err).Error("Failed to dump table")
}
}(tbl, rowChan, logger)

go func(tableName string, opts reader.ReadTableOpt, rowChan chan<- database.Row, logger *log.Entry) {
if err := p.reader.ReadTable(tableName, rowChan, opts, spec.Matchers); err != nil {
if err := e.reader.ReadTable(tableName, rowChan, opts, spec.Matchers); err != nil {
logger.WithError(err).Error("Failed to read table")
}
}(tbl, opts, rowChan, logger)
Expand All @@ -128,9 +133,9 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c
close(semChan)

// Trigger post dump tables
if adv, ok := p.SqlEngine.(SqlEngineAdvanced); ok {
if adv, ok := e.Dumper.(Hooker); ok {
if err := adv.PostDumpTables(tables); err != nil {
log.WithError(err).Error("Post dump tables failed")
log.WithError(err).Error("post dump tables failed")
}
}

Expand All @@ -140,7 +145,7 @@ func (p *sqlDumper) readAndDumpTables(done chan<- struct{}, spec *config.Spec, c
return nil
}

func (p *sqlDumper) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt {
func (e *Engine) relationshipConfigToOptions(relationshipsConfig []*config.Relationship) []*reader.RelationshipOpt {
var opts []*reader.RelationshipOpt

for _, r := range relationshipsConfig {
Expand Down
Loading

0 comments on commit e0b790c

Please sign in to comment.