Skip to content

Commit

Permalink
ETCD/Redis supports "BatchPut"
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Dec 29, 2021
1 parent d903237 commit c3c6b81
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 19 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/mocks/APIClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func validateEngine(ctx context.Context, engine engine.API, timeout time.Duratio

// GetEngineFromCache .
func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, cert, key string) engine.API {
client := engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
client := engineCache.Get(cacheKey)
if client == nil {
return nil
}
Expand All @@ -57,7 +58,9 @@ func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca,

// RemoveEngineFromCache .
func RemoveEngineFromCache(endpoint, ca, cert, key string) {
engineCache.Delete(getEngineCacheKey(endpoint, ca, cert, key))
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
log.Infof(context.TODO(), "[RemoveEngineFromCache] remove %v, key %v", endpoint, cacheKey)
engineCache.Delete(cacheKey)
}

// GetEngine get engine
Expand All @@ -68,7 +71,9 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,

defer func() {
if err == nil && client != nil {
engineCache.Set(getEngineCacheKey(endpoint, ca, cert, key), client)
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
log.Infof(ctx, "[GetEngine] store engine of %v in cache, key: %v", endpoint, cacheKey)
engineCache.Set(cacheKey, client)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lock/mocks/DistributedLock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rpc/mocks/CoreRPC_RunAndWaitServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scheduler/mocks/Scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion source/mocks/Source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions store/etcdv3/meta/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ...
return e.batchUpdate(ctx, data, opts...)
}

// BatchPut .
func (e *ETCD) BatchPut(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return e.batchPut(ctx, data, nil, opts...)
}

// isTTLChanged returns true if there is a lease with a different ttl bound to the key
func (e *ETCD) isTTLChanged(ctx context.Context, key string, ttl int64) (bool, error) {
resp, err := e.GetOne(ctx, key)
Expand Down
1 change: 1 addition & 0 deletions store/etcdv3/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type KV interface {
BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
BatchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)
BatchPut(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error)

StartEphemeral(ctx context.Context, path string, heartbeat time.Duration) (<-chan struct{}, func(), error)
CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error)
Expand Down
2 changes: 1 addition & 1 deletion store/etcdv3/meta/mocks/ETCDClientV3.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 31 additions & 1 deletion store/etcdv3/meta/mocks/KV.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion store/etcdv3/meta/mocks/Txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
enginefactory.RemoveEngineFromCache(node.Endpoint, node.Ca, node.Cert, node.Key)
}

resp, err := m.BatchUpdate(ctx, data)
resp, err := m.BatchPut(ctx, data)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestUpdateNode(t *testing.T) {
Key: "hh",
},
}
assert.Error(t, m.UpdateNodes(ctx, fakeNode))
assert.NoError(t, m.UpdateNodes(ctx, fakeNode))
assert.NoError(t, m.UpdateNodes(ctx, node))
node.Available = false
assert.NoError(t, m.UpdateNodes(ctx, node))
Expand Down
2 changes: 1 addition & 1 deletion store/mocks/Store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
}
return errors.WithStack(r.BatchUpdate(ctx, data))
return errors.WithStack(r.BatchPut(ctx, data))
}

// UpdateNodeResource update cpu and memory on a node, either add or subtract
Expand Down
2 changes: 1 addition & 1 deletion store/redis/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *RediaronTestSuite) TestUpdateNode() {
Key: "hh",
},
}
s.Error(s.rediaron.UpdateNodes(ctx, fakeNode))
s.NoError(s.rediaron.UpdateNodes(ctx, fakeNode))
s.NoError(s.rediaron.UpdateNodes(ctx, node))
}

Expand Down
22 changes: 22 additions & 0 deletions store/redis/rediaron.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,28 @@ func (r *Rediaron) BatchCreate(ctx context.Context, data map[string]string) erro
return nil
}

// BatchPut is wrapper to adapt etcd batch replace
func (r *Rediaron) BatchPut(ctx context.Context, data map[string]string) error {
replace := func(pipe redis.Pipeliner) error {
for key, value := range data {
pipe.Set(ctx, key, value, 0)
}
return nil
}

cmds, err := r.cli.TxPipelined(ctx, replace)
if err != nil {
return err
}

for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
}
}
return nil
}

// BatchCreateAndDecr decr processing and add workload
func (r *Rediaron) BatchCreateAndDecr(ctx context.Context, data map[string]string, decrKey string) (err error) {
batchCreateAndDecr := func(pipe redis.Pipeliner) error {
Expand Down

0 comments on commit c3c6b81

Please sign in to comment.