Skip to content

Commit

Permalink
refactor: Store should implement the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Apr 25, 2019
1 parent 1b7fe49 commit 08765f0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
4 changes: 2 additions & 2 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Agent struct {
ProcessorPlugins map[string]ExecutionProcessor
ExecutorPlugins map[string]Executor
HTTPTransport Transport
Store *Store
Store Storage
GRPCServer DkronGRPCServer
GRPCClient DkronGRPCClient

Expand Down Expand Up @@ -343,7 +343,7 @@ func (a *Agent) StartServer() {
}

func (a *Agent) participate() {
a.candidate = leadership.NewCandidate(a.Store.Client, a.Store.LeaderKey(), a.config.NodeName, defaultLeaderTTL)
a.candidate = leadership.NewCandidate(a.Store.Client(), a.Store.LeaderKey(), a.config.NodeName, defaultLeaderTTL)

go func() {
for {
Expand Down
4 changes: 2 additions & 2 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func (j *Job) Lock() error {
return ErrNoAgent
}

lockKey := fmt.Sprintf("%s/job_locks/%s", j.Agent.Store.keyspace, j.Name)
lockKey := fmt.Sprintf("%s/job_locks/%s", j.Agent.Config().Keyspace, j.Name)
// TODO: LockOptions empty is a temporary fix until https://github.com/docker/libkv/pull/99 is fixed
l, err := j.Agent.Store.Client.NewLock(lockKey, &store.LockOptions{RenewLock: make(chan (struct{}))})
l, err := j.Agent.Store.Client().NewLock(lockKey, &store.LockOptions{RenewLock: make(chan (struct{}))})
if err != nil {
return err
}
Expand Down
41 changes: 23 additions & 18 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (
const MaxExecutions = 100

type Storage interface {
SetJob(job *Job) error
SetJob(job *Job, copyDependentJobs bool) error
AtomicJobPut(job *Job, prevJobKVPair *store.KVPair) (bool, error)
SetJobDependencyTree(job *Job, previousJob *Job) error
GetJobs() ([]*Job, error)
GetJobs(options *JobOptions) ([]*Job, error)
GetJob(name string, options *JobOptions) (*Job, error)
GetJobWithKVPair(name string, options *JobOptions) (*Job, *store.KVPair, error)
DeleteJob(name string) (*Job, error)
Expand All @@ -37,10 +36,12 @@ type Storage interface {
DeleteExecutions(jobName string) error
GetLeader() []byte
LeaderKey() string
Healthy() error
Client() store.Store
}

type Store struct {
Client store.Store
client store.Store
agent *Agent
keyspace string
backend store.Backend
Expand Down Expand Up @@ -73,11 +74,15 @@ func NewStore(backend store.Backend, machines []string, a *Agent, keyspace strin
"keyspace": keyspace,
}).Debug("store: Backend config")

return &Store{Client: s, agent: a, keyspace: keyspace, backend: backend}
return &Store{client: s, agent: a, keyspace: keyspace, backend: backend}
}

func (s *Store) Client() store.Store {
return s.client
}

func (s *Store) Healthy() error {
_, err := s.Client.List(s.keyspace, nil)
_, err := s.client.List(s.keyspace, nil)
if err != store.ErrKeyNotFound && err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +136,7 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
"json": string(jobJSON),
}).Debug("store: Setting job")

if err := s.Client.Put(jobKey, jobJSON, nil); err != nil {
if err := s.client.Put(jobKey, jobJSON, nil); err != nil {
return err
}

Expand Down Expand Up @@ -198,7 +203,7 @@ func (s *Store) AtomicJobPut(job *Job, prevJobKVPair *store.KVPair) (bool, error
jobKey := fmt.Sprintf("%s/jobs/%s", s.keyspace, job.Name)
jobJSON, _ := json.Marshal(job)

ok, _, err := s.Client.AtomicPut(jobKey, jobJSON, prevJobKVPair, nil)
ok, _, err := s.client.AtomicPut(jobKey, jobJSON, prevJobKVPair, nil)

return ok, err
}
Expand Down Expand Up @@ -250,7 +255,7 @@ func (s *Store) jobHasTags(job *Job, tags map[string]string) bool {

// GetJobs returns all jobs
func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) {
res, err := s.Client.List(s.keyspace+"/jobs/", nil)
res, err := s.client.List(s.keyspace+"/jobs/", nil)
if err != nil {
if err == store.ErrKeyNotFound {
log.Debug("store: No jobs found")
Expand Down Expand Up @@ -294,7 +299,7 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) {
}

func (s *Store) GetJobWithKVPair(name string, options *JobOptions) (*Job, *store.KVPair, error) {
res, err := s.Client.Get(s.keyspace+"/jobs/"+name, nil)
res, err := s.client.Get(s.keyspace+"/jobs/"+name, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -334,7 +339,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) {
}
}

if err := s.Client.Delete(s.keyspace + "/jobs/" + name); err != nil {
if err := s.client.Delete(s.keyspace + "/jobs/" + name); err != nil {
return nil, err
}

Expand All @@ -343,7 +348,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) {

func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
prefix := fmt.Sprintf("%s/executions/%s", s.keyspace, jobName)
res, err := s.Client.List(prefix, nil)
res, err := s.client.List(prefix, nil)
if err != nil {
return nil, err
}
Expand All @@ -352,7 +357,7 @@ func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
}

func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName), nil)
res, err := s.client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -381,7 +386,7 @@ func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
}

func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) {
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName), nil)
res, err := s.client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -433,7 +438,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
"execution": key,
}).Debug("store: Setting key")

err := s.Client.Put(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execution.JobName, key), exJson, nil)
err := s.client.Put(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execution.JobName, key), exJson, nil)
if err != nil {
log.WithFields(logrus.Fields{
"job": execution.JobName,
Expand All @@ -459,7 +464,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
"job": execs[i].JobName,
"execution": execs[i].Key(),
}).Debug("store: to detele key")
err := s.Client.Delete(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execs[i].JobName, execs[i].Key()))
err := s.client.Delete(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execs[i].JobName, execs[i].Key()))
if err != nil {
log.WithError(err).
WithField("execution", execs[i].Key()).
Expand Down Expand Up @@ -493,12 +498,12 @@ func (s *Store) unmarshalExecutions(res []*store.KVPair, stopWord string) ([]*Ex

// Removes all executions of a job
func (s *Store) DeleteExecutions(jobName string) error {
return s.Client.DeleteTree(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName))
return s.client.DeleteTree(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName))
}

// Retrieve the leader from the store
func (s *Store) GetLeader() []byte {
res, err := s.Client.Get(s.LeaderKey(), nil)
res, err := s.client.Get(s.LeaderKey(), nil)
if err != nil {
if err == store.ErrNotReachable {
log.Fatal("store: Store not reachable, be sure you have an existing key-value store running is running and is reachable.")
Expand Down

0 comments on commit 08765f0

Please sign in to comment.