Skip to content

Commit

Permalink
Remove reflection from local_db and limit it in kv/db.
Browse files Browse the repository at this point in the history
Summary: Limit use of reflection

Test Plan: go test

Reviewers: mrtracy, bdarnell

Reviewed By: mrtracy, bdarnell

Subscribers: mrtracy, team

Differential Revision: http://phabricator.cockroachdb.org/D130
  • Loading branch information
Spencer Kimball committed Aug 5, 2014
1 parent c3919a5 commit d9c07b3
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 102 deletions.
94 changes: 55 additions & 39 deletions kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,25 @@ func (db *DistDB) sendRPC(replicas []storage.Replica, method string, args storag
return rpc.Send(argsMap, method, replyChanI, rpcOpts)
}

// sendErrorReply instantiates a new reply value according to the
// inner element type of replyChan and sets its ResponseHeader
// error to err before sending the new reply on the channel.
func sendErrorReply(err error, replyChan interface{}) {
replyVal := reflect.New(reflect.TypeOf(replyChan).Elem().Elem())
replyVal.Interface().(storage.Response).Header().Error = err
reflect.ValueOf(replyChan).Send(replyVal)
}

// routeRPC verifies permissions and looks up the appropriate range
// based on the supplied key and sends the RPC according to the
// specified options. routeRPC sends asynchronously and returns a
// channel which receives the reply struct when the call is
// complete. Returns a channel of the same type as "reply".
func (db *DistDB) routeRPC(method string, args storage.Request, reply storage.Response) interface{} {
chanVal := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, reflect.TypeOf(reply)), 1)

// response value on the replyChan channel when the call is
// complete.
func (db *DistDB) routeRPC(method string, args storage.Request, replyChan interface{}) {
// Verify permissions.
if err := db.verifyPermissions(method, args.Header()); err != nil {
reply.Header().Error = err
chanVal.Send(reflect.ValueOf(reply))
return chanVal.Interface()
sendErrorReply(err, replyChan)
return
}

// Retry logic for lookup of range by key and RPCs to range replicas.
Expand All @@ -368,7 +374,7 @@ func (db *DistDB) routeRPC(method string, args storage.Request, reply storage.Re
err := util.RetryWithBackoff(retryOpts, func() (bool, error) {
rangeMeta, err := db.rangeCache.LookupRangeMetadata(args.Header().Key)
if err == nil {
err = db.sendRPC(rangeMeta.Replicas, method, args, chanVal.Interface())
err = db.sendRPC(rangeMeta.Replicas, method, args, replyChan)
}
if err != nil {
// Range metadata might be out of date - evict it.
Expand All @@ -385,78 +391,85 @@ func (db *DistDB) routeRPC(method string, args storage.Request, reply storage.Re
return true, err
})
if err != nil {
reply.Header().Error = err
chanVal.Send(reflect.ValueOf(reply))
sendErrorReply(err, replyChan)
}
}()

return chanVal.Interface()
}

// Contains checks for the existence of a key.
func (db *DistDB) Contains(args *storage.ContainsRequest) <-chan *storage.ContainsResponse {
return db.routeRPC("Node.Contains",
args, &storage.ContainsResponse{}).(chan *storage.ContainsResponse)
replyChan := make(chan *storage.ContainsResponse, 1)
db.routeRPC("Node.Contains", args, replyChan)
return replyChan
}

// Get .
func (db *DistDB) Get(args *storage.GetRequest) <-chan *storage.GetResponse {
return db.routeRPC("Node.Get",
args, &storage.GetResponse{}).(chan *storage.GetResponse)
replyChan := make(chan *storage.GetResponse, 1)
db.routeRPC("Node.Get", args, replyChan)
return replyChan
}

// Put .
func (db *DistDB) Put(args *storage.PutRequest) <-chan *storage.PutResponse {
return db.routeRPC("Node.Put",
args, &storage.PutResponse{}).(chan *storage.PutResponse)
replyChan := make(chan *storage.PutResponse, 1)
db.routeRPC("Node.Put", args, replyChan)
return replyChan
}

// ConditionalPut .
func (db *DistDB) ConditionalPut(args *storage.ConditionalPutRequest) <-chan *storage.ConditionalPutResponse {
return db.routeRPC("Node.ConditionalPut",
args, &storage.ConditionalPutResponse{}).(chan *storage.ConditionalPutResponse)
replyChan := make(chan *storage.ConditionalPutResponse, 1)
db.routeRPC("Node.ConditionalPut", args, replyChan)
return replyChan
}

// Increment .
func (db *DistDB) Increment(args *storage.IncrementRequest) <-chan *storage.IncrementResponse {
return db.routeRPC("Node.Increment",
args, &storage.IncrementResponse{}).(chan *storage.IncrementResponse)
replyChan := make(chan *storage.IncrementResponse, 1)
db.routeRPC("Node.Increment", args, replyChan)
return replyChan
}

// Delete .
func (db *DistDB) Delete(args *storage.DeleteRequest) <-chan *storage.DeleteResponse {
return db.routeRPC("Node.Delete",
args, &storage.DeleteResponse{}).(chan *storage.DeleteResponse)
replyChan := make(chan *storage.DeleteResponse, 1)
db.routeRPC("Node.Delete", args, replyChan)
return replyChan
}

// DeleteRange .
func (db *DistDB) DeleteRange(args *storage.DeleteRangeRequest) <-chan *storage.DeleteRangeResponse {
// TODO(spencer): range of keys.
return db.routeRPC("Node.DeleteRange",
args, &storage.DeleteRangeResponse{}).(chan *storage.DeleteRangeResponse)
replyChan := make(chan *storage.DeleteRangeResponse, 1)
db.routeRPC("Node.DeleteRange", args, replyChan)
return replyChan
}

// Scan .
func (db *DistDB) Scan(args *storage.ScanRequest) <-chan *storage.ScanResponse {
// TODO(spencer): range of keys.
return db.routeRPC("Node.Scan",
args, &storage.ScanResponse{}).(chan *storage.ScanResponse)
replyChan := make(chan *storage.ScanResponse, 1)
db.routeRPC("Node.Scan", args, replyChan)
return replyChan
}

// EndTransaction .
func (db *DistDB) EndTransaction(args *storage.EndTransactionRequest) <-chan *storage.EndTransactionResponse {
// TODO(spencer): multiple keys here...
return db.routeRPC("Node.EndTransaction",
args, &storage.EndTransactionResponse{}).(chan *storage.EndTransactionResponse)
replyChan := make(chan *storage.EndTransactionResponse, 1)
db.routeRPC("Node.EndTransaction", args, replyChan)
return replyChan
}

// AccumulateTS is used to efficiently accumulate a time series of
// int64 quantities representing discrete subtimes. For example, a
// key/value might represent a minute of data. Each would contain 60
// int64 counts, each representing a second.
func (db *DistDB) AccumulateTS(args *storage.AccumulateTSRequest) <-chan *storage.AccumulateTSResponse {
return db.routeRPC("Node.AccumulateTS",
args, &storage.AccumulateTSResponse{}).(chan *storage.AccumulateTSResponse)
replyChan := make(chan *storage.AccumulateTSResponse, 1)
db.routeRPC("Node.AccumulateTS", args, replyChan)
return replyChan
}

// ReapQueue scans and deletes messages from a recipient message
Expand All @@ -465,18 +478,21 @@ func (db *DistDB) AccumulateTS(args *storage.AccumulateTSRequest) <-chan *storag
// the requested maximum. If fewer than the maximum were returned,
// then the queue is empty.
func (db *DistDB) ReapQueue(args *storage.ReapQueueRequest) <-chan *storage.ReapQueueResponse {
return db.routeRPC("Node.ReapQueue",
args, &storage.ReapQueueResponse{}).(chan *storage.ReapQueueResponse)
replyChan := make(chan *storage.ReapQueueResponse, 1)
db.routeRPC("Node.ReapQueue", args, replyChan)
return replyChan
}

// EnqueueUpdate enqueues an update for eventual execution.
func (db *DistDB) EnqueueUpdate(args *storage.EnqueueUpdateRequest) <-chan *storage.EnqueueUpdateResponse {
return db.routeRPC("Node.EnqueueUpdate",
args, &storage.EnqueueUpdateResponse{}).(chan *storage.EnqueueUpdateResponse)
replyChan := make(chan *storage.EnqueueUpdateResponse, 1)
db.routeRPC("Node.EnqueueUpdate", args, replyChan)
return replyChan
}

// EnqueueMessage enqueues a message for delivery to an inbox.
func (db *DistDB) EnqueueMessage(args *storage.EnqueueMessageRequest) <-chan *storage.EnqueueMessageResponse {
return db.routeRPC("Node.EnqueueMessage",
args, &storage.EnqueueMessageResponse{}).(chan *storage.EnqueueMessageResponse)
replyChan := make(chan *storage.EnqueueMessageResponse, 1)
db.routeRPC("Node.EnqueueMessage", args, replyChan)
return replyChan
}
Loading

0 comments on commit d9c07b3

Please sign in to comment.