Skip to content

Commit

Permalink
WIP: fix payment channel locking
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jul 29, 2020
1 parent c3ff29c commit a2c541e
Show file tree
Hide file tree
Showing 12 changed files with 1,946 additions and 384 deletions.
2 changes: 1 addition & 1 deletion gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {

err = gen.WriteMapEncodersToFile("./paychmgr/cbor_gen.go", "paychmgr",
paychmgr.VoucherInfo{},
paychmgr.ChannelInfo{},
paychmgr.ChannelInfoStorable{},
)
if err != nil {
fmt.Println(err)
Expand Down
67 changes: 67 additions & 0 deletions paychmgr/accessorcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package paychmgr

import "github.com/filecoin-project/go-address"

// accessorByFromTo gets a channel accessor for a given from / to pair.
// The channel accessor facilitates locking a channel so that operations
// must be performed sequentially on a channel (but can be performed at
// the same time on different channels).
func (pm *Manager) accessorByFromTo(from address.Address, to address.Address) (*channelAccessor, error) {
key := pm.accessorCacheKey(from, to)

// First take a read lock and check the cache
pm.lk.RLock()
ca, ok := pm.channels[key]
pm.lk.RUnlock()
if ok {
return ca, nil
}

// Not in cache, so take a write lock
pm.lk.Lock()
defer pm.lk.Unlock()

// Need to check cache again in case it was updated between releasing read
// lock and taking write lock
ca, ok = pm.channels[key]
if !ok {
// Not in cache, so create a new one and store in cache
ca = pm.addAccessorToCache(from, to)
}

return ca, nil
}

// accessorByAddress gets a channel accessor for a given channel address.
// The channel accessor facilitates locking a channel so that operations
// must be performed sequentially on a channel (but can be performed at
// the same time on different channels).
func (pm *Manager) accessorByAddress(ch address.Address) (*channelAccessor, error) {
// Get the channel from / to
pm.lk.RLock()
channelInfo, err := pm.store.ByAddress(ch)
pm.lk.RUnlock()
if err != nil {
return nil, err
}

// TODO: cache by channel address so we can get by address instead of using from / to
return pm.accessorByFromTo(channelInfo.Control, channelInfo.Target)
}

// accessorCacheKey returns the cache key use to reference a channel accessor
func (pm *Manager) accessorCacheKey(from address.Address, to address.Address) string {
return from.String() + "->" + to.String()
}

// addAccessorToCache adds a channel accessor to a cache. Note that channelInfo
// may be nil if the channel hasn't been created yet, but we still want to
// reference the same channel accessor for a given from/to, so that all
// attempts to access a channel use the same lock (the lock on the accessor)
func (pm *Manager) addAccessorToCache(from address.Address, to address.Address) *channelAccessor {
key := pm.accessorCacheKey(from, to)
ca := newChannelAccessor(pm)
// TODO: Use LRU
pm.channels[key] = ca
return ca
}
207 changes: 196 additions & 11 deletions paychmgr/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions paychmgr/channellock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package paychmgr

import "sync"

type rwlock interface {
RLock()
RUnlock()
}

// channelLock manages locking for a specific channel.
// Some operations update the state of a single channel, and need to block
// other operations only on the same channel's state.
// Some operations update state that affects all channels, and need to block
// any operation against any channel.
type channelLock struct {
globalLock rwlock
chanLock sync.Mutex
}

func (l *channelLock) Lock() {
// Wait for other operations by this channel to finish.
// Exclusive per-channel (no other ops by this channel allowed).
l.chanLock.Lock()
// Wait for operations affecting all channels to finish.
// Allows ops by other channels in parallel, but blocks all operations
// if global lock is taken exclusively (eg when adding a channel)
l.globalLock.RLock()
}

func (l *channelLock) Unlock() {
l.globalLock.RUnlock()
l.chanLock.Unlock()
}
Loading

0 comments on commit a2c541e

Please sign in to comment.