Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skipset and skipmap support optional array #70

Merged
merged 2 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions collection/skipmap/oparray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skipmap

import (
"sync/atomic"
"unsafe"
)

const (
op1 = 4
op2 = maxLevel - op1
)

type optionalArray struct {
base [op1]unsafe.Pointer
extra *([op2]unsafe.Pointer)
}

func (a *optionalArray) load(i int) unsafe.Pointer {
if i < op1 {
return a.base[i]
}
return a.extra[i-op1]
}

func (a *optionalArray) store(i int, p unsafe.Pointer) {
if i < op1 {
a.base[i] = p
return
}
a.extra[i-op1] = p
}

func (a *optionalArray) atomicLoad(i int) unsafe.Pointer {
if i < op1 {
return atomic.LoadPointer(&a.base[i])
}
return atomic.LoadPointer(&a.extra[i-op1])
}

func (a *optionalArray) atomicStore(i int, p unsafe.Pointer) {
if i < op1 {
atomic.StorePointer(&a.base[i], p)
return
}
atomic.StorePointer(&a.extra[i-op1], p)
}
64 changes: 64 additions & 0 deletions collection/skipmap/oparry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skipmap

import (
"testing"
"unsafe"

"github.com/bytedance/gopkg/lang/fastrand"
)

type dummy struct {
data optionalArray
}

func TestOpArray(t *testing.T) {
n := new(dummy)
n.data.extra = new([op2]unsafe.Pointer)

var array [maxLevel]unsafe.Pointer
for i := 0; i < maxLevel; i++ {
value := unsafe.Pointer(&dummy{})
array[i] = value
n.data.store(i, value)
}

for i := 0; i < maxLevel; i++ {
if array[i] != n.data.load(i) || array[i] != n.data.atomicLoad(i) {
t.Fatal(i, array[i], n.data.load(i))
}
}

for i := 0; i < 1000; i++ {
r := int(fastrand.Uint32n(maxLevel))
value := unsafe.Pointer(&dummy{})
if i%100 == 0 {
value = nil
}
array[r] = value
if fastrand.Uint32n(2) == 0 {
n.data.store(r, value)
} else {
n.data.atomicStore(r, value)
}
}

for i := 0; i < maxLevel; i++ {
if array[i] != n.data.load(i) || array[i] != n.data.atomicLoad(i) {
t.Fatal(i, array[i], n.data.load(i))
}
}
}
76 changes: 43 additions & 33 deletions collection/skipmap/skipmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ type Int64Map struct {

type int64Node struct {
key int64
value unsafe.Pointer
next []*int64Node
value unsafe.Pointer // *interface{}
next optionalArray // [level]*int64Node
mu sync.Mutex
flags bitflag
level uint32
}

func newInt64Node(key int64, value interface{}, level int) *int64Node {
n := &int64Node{
key: key,
next: make([]*int64Node, level),
node := &int64Node{
key: key,
level: uint32(level),
}
n.storeVal(value)
return n
node.storeVal(value)
if level > op1 {
node.next.extra = new([op2]unsafe.Pointer)
}
return node
}

func (n *int64Node) storeVal(value interface{}) {
Expand All @@ -55,14 +59,20 @@ func (n *int64Node) loadVal() interface{} {
return *(*interface{})(atomic.LoadPointer(&n.value))
}

// loadNext return `n.next[i]`(atomic)
func (n *int64Node) loadNext(i int) *int64Node {
return (*int64Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&n.next[i]))))
return (*int64Node)(n.next.load(i))
}

func (n *int64Node) storeNext(i int, node *int64Node) {
n.next.store(i, unsafe.Pointer(node))
}

// storeNext same with `n.next[i] = value`(atomic)
func (n *int64Node) storeNext(i int, value *int64Node) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&n.next[i])), unsafe.Pointer(value))
func (n *int64Node) atomicLoadNext(i int) *int64Node {
return (*int64Node)(n.next.atomicLoad(i))
}

func (n *int64Node) atomicStoreNext(i int, node *int64Node) {
n.next.atomicStore(i, unsafe.Pointer(node))
}

func (n *int64Node) lessthan(key int64) bool {
Expand All @@ -89,10 +99,10 @@ func NewInt64() *Int64Map {
func (s *Int64Map) findNode(key int64, preds *[maxLevel]*int64Node, succs *[maxLevel]*int64Node) *int64Node {
x := s.header
for i := int(atomic.LoadInt64(&s.highestLevel)) - 1; i >= 0; i-- {
succ := x.loadNext(i)
succ := x.atomicLoadNext(i)
for succ != nil && succ.lessthan(key) {
x = succ
succ = x.loadNext(i)
succ = x.atomicLoadNext(i)
}
preds[i] = x
succs[i] = succ
Expand All @@ -111,10 +121,10 @@ func (s *Int64Map) findNodeDelete(key int64, preds *[maxLevel]*int64Node, succs
// lFound represents the index of the first layer at which it found a node.
lFound, x := -1, s.header
for i := int(atomic.LoadInt64(&s.highestLevel)) - 1; i >= 0; i-- {
succ := x.loadNext(i)
succ := x.atomicLoadNext(i)
for succ != nil && succ.lessthan(key) {
x = succ
succ = x.loadNext(i)
succ = x.atomicLoadNext(i)
}
preds[i] = x
succs[i] = succ
Expand Down Expand Up @@ -173,7 +183,7 @@ func (s *Int64Map) Store(key int64, value interface{}) {
// It is valid if:
// 1. The previous node and next node both are not marked.
// 2. The previous node's next node is succ in this layer.
valid = !pred.flags.Get(marked) && (succ == nil || !succ.flags.Get(marked)) && pred.next[layer] == succ
valid = !pred.flags.Get(marked) && (succ == nil || !succ.flags.Get(marked)) && pred.loadNext(layer) == succ
}
if !valid {
unlockInt64(preds, highestLocked)
Expand All @@ -182,8 +192,8 @@ func (s *Int64Map) Store(key int64, value interface{}) {

nn := newInt64Node(key, value, level)
for layer := 0; layer < level; layer++ {
nn.next[layer] = succs[layer]
preds[layer].storeNext(layer, nn)
nn.storeNext(layer, succs[layer])
preds[layer].atomicStoreNext(layer, nn)
}
nn.flags.SetTrue(fullyLinked)
unlockInt64(preds, highestLocked)
Expand Down Expand Up @@ -213,10 +223,10 @@ func (s *Int64Map) randomlevel() int {
func (s *Int64Map) Load(key int64) (value interface{}, ok bool) {
x := s.header
for i := int(atomic.LoadInt64(&s.highestLevel)) - 1; i >= 0; i-- {
nex := x.loadNext(i)
nex := x.atomicLoadNext(i)
for nex != nil && nex.lessthan(key) {
x = nex
nex = x.loadNext(i)
nex = x.atomicLoadNext(i)
}

// Check if the key already in the skip list.
Expand All @@ -242,7 +252,7 @@ func (s *Int64Map) LoadAndDelete(key int64) (value interface{}, loaded bool) {
for {
lFound := s.findNodeDelete(key, &preds, &succs)
if isMarked || // this process mark this node or we can find this node in the skip list
lFound != -1 && succs[lFound].flags.MGet(fullyLinked|marked, fullyLinked) && (len(succs[lFound].next)-1) == lFound {
lFound != -1 && succs[lFound].flags.MGet(fullyLinked|marked, fullyLinked) && (int(succs[lFound].level)-1) == lFound {
if !isMarked { // we don't mark this node for now
nodeToDelete = succs[lFound]
topLayer = lFound
Expand Down Expand Up @@ -274,7 +284,7 @@ func (s *Int64Map) LoadAndDelete(key int64) (value interface{}, loaded bool) {
// It is valid if:
// 1. the previous node exists.
// 2. no another node has inserted into the skip list in this layer.
valid = !pred.flags.Get(marked) && pred.next[layer] == succ
valid = !pred.flags.Get(marked) && pred.loadNext(layer) == succ
}
if !valid {
unlockInt64(preds, highestLocked)
Expand All @@ -283,7 +293,7 @@ func (s *Int64Map) LoadAndDelete(key int64) (value interface{}, loaded bool) {
for i := topLayer; i >= 0; i-- {
// Now we own the `nodeToDelete`, no other goroutine will modify it.
// So we don't need `nodeToDelete.loadNext`
preds[i].storeNext(i, nodeToDelete.next[i])
preds[i].atomicStoreNext(i, nodeToDelete.loadNext(i))
}
nodeToDelete.mu.Unlock()
unlockInt64(preds, highestLocked)
Expand Down Expand Up @@ -331,7 +341,7 @@ func (s *Int64Map) LoadOrStore(key int64, value interface{}) (actual interface{}
// It is valid if:
// 1. The previous node and next node both are not marked.
// 2. The previous node's next node is succ in this layer.
valid = !pred.flags.Get(marked) && (succ == nil || !succ.flags.Get(marked)) && pred.next[layer] == succ
valid = !pred.flags.Get(marked) && (succ == nil || !succ.flags.Get(marked)) && pred.loadNext(layer) == succ
}
if !valid {
unlockInt64(preds, highestLocked)
Expand All @@ -340,8 +350,8 @@ func (s *Int64Map) LoadOrStore(key int64, value interface{}) (actual interface{}

nn := newInt64Node(key, value, level)
for layer := 0; layer < level; layer++ {
nn.next[layer] = succs[layer]
preds[layer].storeNext(layer, nn)
nn.storeNext(layer, succs[layer])
preds[layer].atomicStoreNext(layer, nn)
}
nn.flags.SetTrue(fullyLinked)
unlockInt64(preds, highestLocked)
Expand All @@ -361,7 +371,7 @@ func (s *Int64Map) Delete(key int64) bool {
for {
lFound := s.findNodeDelete(key, &preds, &succs)
if isMarked || // this process mark this node or we can find this node in the skip list
lFound != -1 && succs[lFound].flags.MGet(fullyLinked|marked, fullyLinked) && (len(succs[lFound].next)-1) == lFound {
lFound != -1 && succs[lFound].flags.MGet(fullyLinked|marked, fullyLinked) && (int(succs[lFound].level)-1) == lFound {
if !isMarked { // we don't mark this node for now
nodeToDelete = succs[lFound]
topLayer = lFound
Expand Down Expand Up @@ -393,7 +403,7 @@ func (s *Int64Map) Delete(key int64) bool {
// It is valid if:
// 1. the previous node exists.
// 2. no another node has inserted into the skip list in this layer.
valid = !pred.flags.Get(marked) && pred.loadNext(layer) == succ
valid = !pred.flags.Get(marked) && pred.atomicLoadNext(layer) == succ
}
if !valid {
unlockInt64(preds, highestLocked)
Expand All @@ -402,7 +412,7 @@ func (s *Int64Map) Delete(key int64) bool {
for i := topLayer; i >= 0; i-- {
// Now we own the `nodeToDelete`, no other goroutine will modify it.
// So we don't need `nodeToDelete.loadNext`
preds[i].storeNext(i, nodeToDelete.next[i])
preds[i].atomicStoreNext(i, nodeToDelete.loadNext(i))
}
nodeToDelete.mu.Unlock()
unlockInt64(preds, highestLocked)
Expand All @@ -421,16 +431,16 @@ func (s *Int64Map) Delete(key int64) bool {
// is stored or deleted concurrently, Range may reflect any mapping for that key
// from any point during the Range call.
func (s *Int64Map) Range(f func(key int64, value interface{}) bool) {
x := s.header.loadNext(0)
x := s.header.atomicLoadNext(0)
for x != nil {
if !x.flags.MGet(fullyLinked|marked, fullyLinked) {
x = x.loadNext(0)
x = x.atomicLoadNext(0)
continue
}
if !f(x.key, x.loadVal()) {
break
}
x = x.loadNext(0)
x = x.atomicLoadNext(0)
}
}

Expand Down
Loading