Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Snapshots #2475

Merged
merged 8 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/scripts/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function runCluster {
go build . && go install . && md5sum dgraph $GOPATH/bin/dgraph
DATA="/tmp/dg" docker-compose up --force-recreate --remove-orphans --detach
popd
$basedir/contrib/wait-for-it.sh localhost:6080
$basedir/contrib/wait-for-it.sh localhost:9180
$basedir/contrib/wait-for-it.sh -t 60 localhost:6080
$basedir/contrib/wait-for-it.sh -t 60 localhost:9180
sleep 10 # Sleep 10 seconds to get things ready.
}
23 changes: 11 additions & 12 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type Oracle struct {
x.SafeMutex
commits map[uint64]uint64 // startTs -> commitTs
// TODO: Check if we need LRU.
rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict.
aborts map[uint64]struct{} // key is startTs
maxPending uint64 // max transaction startTs given out by us.
rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict.
aborts map[uint64]struct{} // key is startTs
maxAssigned uint64 // max transaction assigned by us.

// timestamp at the time of start of server or when it became leader. Used to detect conflicts.
tmax uint64
Expand Down Expand Up @@ -131,7 +131,7 @@ func (o *Oracle) currentState() *intern.OracleDelta {
for abort := range o.aborts {
resp.Aborts = append(resp.Aborts, abort)
}
resp.MaxPending = o.maxPending
resp.MaxAssigned = o.maxAssigned
return resp
}

Expand Down Expand Up @@ -169,8 +169,8 @@ func (o *Oracle) sendDeltasToSubscribers() {
slurp_loop:
for {
// Consume tctx.
if update.MaxPending > delta.MaxPending {
delta.MaxPending = update.MaxPending
if update.MaxAssigned > delta.MaxAssigned {
delta.MaxAssigned = update.MaxAssigned
}
for _, startTs := range update.Aborts {
delta.Aborts = append(delta.Aborts, startTs)
Expand Down Expand Up @@ -244,19 +244,19 @@ func (o *Oracle) storePending(ids *api.AssignedIds) {
// Wait to finish up processing everything before start id.
o.doneUntil.WaitForMark(context.Background(), ids.EndId)
// Now send it out to updates.
o.updates <- &intern.OracleDelta{MaxPending: ids.EndId}
o.updates <- &intern.OracleDelta{MaxAssigned: ids.EndId}
o.Lock()
defer o.Unlock()
max := ids.EndId
if o.maxPending < max {
o.maxPending = max
if o.maxAssigned < max {
o.maxAssigned = max
}
}

func (o *Oracle) MaxPending() uint64 {
o.RLock()
defer o.RUnlock()
return o.maxPending
return o.maxAssigned
}

var errConflict = errors.New("Transaction conflict")
Expand Down Expand Up @@ -317,8 +317,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
// map to be keyed by uint64, which would be cheaper. But, unsure about the repurcussions of
// that. It would save some memory. So, worth a try.

var num intern.Num
num.Val = 1
num := intern.Num{Val: 1}
assigned, err := s.lease(ctx, &num, true)
if err != nil {
return err
Expand Down
137 changes: 69 additions & 68 deletions dgraph/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,42 @@ services:
# - type: bind
# source: "${DATA}"
# target: /data
zero2:
image: debian:latest
container_name: bank-dg0.2
command: /gobin/dgraph zero --my=zero2:5080 --replicas 3 --peer=zero1:5080 --idx 2
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
zero3:
image: debian:latest
container_name: bank-dg0.3
command: /gobin/dgraph zero --my=zero3:5080 --replicas 3 --peer=zero1:5080 --idx 3
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
zero4:
image: debian:latest
container_name: bank-dg0.4
command: /gobin/dgraph zero --my=zero4:5080 --replicas 3 --peer=zero1:5080 --idx 4
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
zero5:
image: debian:latest
container_name: bank-dg0.5
command: /gobin/dgraph zero --my=zero5:5080 --replicas 3 --peer=zero1:5080 --idx 5
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
# zero2:
# image: debian:latest
# container_name: bank-dg0.2
# command: /gobin/dgraph zero --my=zero2:5080 --replicas 3 --peer=zero1:5080 --idx 2
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true
# zero3:
# image: debian:latest
# container_name: bank-dg0.3
# command: /gobin/dgraph zero --my=zero3:5080 --replicas 3 --peer=zero1:5080 --idx 3
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true
# zero4:
# image: debian:latest
# container_name: bank-dg0.4
# command: /gobin/dgraph zero --my=zero4:5080 --replicas 3 --peer=zero1:5080 --idx 4
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true
# zero5:
# image: debian:latest
# container_name: bank-dg0.5
# command: /gobin/dgraph zero --my=zero5:5080 --replicas 3 --peer=zero1:5080 --idx 5
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true

dg1:
image: debian:latest
Expand All @@ -73,6 +73,7 @@ services:
- 8180:8180
- 9180:9180
command: /gobin/dgraph server --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10

dg2:
image: debian:latest
container_name: bank-dg2
Expand Down Expand Up @@ -108,37 +109,37 @@ services:
command: /gobin/dgraph server --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10


dg4:
image: debian:latest
container_name: bank-dg4
working_dir: /data/dg4
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
# - type: bind
# source: "${DATA}"
# target: /data
ports:
- 8184:8184
- 9184:9184
command: /gobin/dgraph server --my=dg4:7184 --lru_mb=1024 --zero=zero1:5080 -o 104 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
# dg4:
# image: debian:latest
# container_name: bank-dg4
# working_dir: /data/dg4
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true
# # - type: bind
# # source: "${DATA}"
# # target: /data
# ports:
# - 8184:8184
# - 9184:9184
# command: /gobin/dgraph server --my=dg4:7184 --lru_mb=1024 --zero=zero1:5080 -o 104 --expose_trace --trace 1.0 --profile_mode block --block_rate 10

dg5:
image: debian:latest
container_name: bank-dg5
working_dir: /data/dg5
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
# - type: bind
# source: "${DATA}"
# target: /data
ports:
- 8185:8185
- 9185:9185
command: /gobin/dgraph server --my=dg5:7185 --lru_mb=1024 --zero=zero1:5080 -o 105 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
# dg5:
# image: debian:latest
# container_name: bank-dg5
# working_dir: /data/dg5
# volumes:
# - type: bind
# source: $GOPATH/bin
# target: /gobin
# read_only: true
# # - type: bind
# # source: "${DATA}"
# # target: /data
# ports:
# - 8185:8185
# - 9185:9185
# command: /gobin/dgraph server --my=dg5:7185 --lru_mb=1024 --zero=zero1:5080 -o 105 --expose_trace --trace 1.0 --profile_mode block --block_rate 10

51 changes: 8 additions & 43 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ var (
txnMarks *x.WaterMark // Used to find out till what RAFT index we can snapshot entries.
)

type transactions struct {
x.SafeMutex
m map[uint64]*Txn
}

func init() {
txns = new(transactions)
txns.m = make(map[uint64]*Txn)
txnMarks = &x.WaterMark{Name: "Transaction watermark"}
txnMarks.Init()
}

func TxnMarks() *x.WaterMark {
return txnMarks
}

func Txns() *transactions {
return txns
}
Expand All @@ -52,6 +50,8 @@ type delta struct {
posting *intern.Posting
checkConflict bool // Check conflict detection.
}

// TODO: This structure can be merged back into Oracle.
type Txn struct {
StartTs uint64

Expand All @@ -66,11 +66,7 @@ type Txn struct {
nextKeyIdx int
}

type transactions struct {
x.SafeMutex
m map[uint64]*Txn
}

// TODO: What is this for?
func (t *transactions) MinTs() uint64 {
t.Lock()
var minTs uint64
Expand All @@ -94,28 +90,9 @@ func (t *transactions) MinTs() uint64 {
return minTs
}

func (t *transactions) TxnsSinceSnapshot(pending uint64) []uint64 {
lastSnapshotIdx := TxnMarks().DoneUntil()
var timestamps []uint64
t.Lock()
defer t.Unlock()
var oldest float64 = 0.2 * float64(pending)
for _, txn := range t.m {
index := txn.startIdx()
// We abort oldest 20% of the transactions.
if index-lastSnapshotIdx <= uint64(oldest) {
timestamps = append(timestamps, txn.StartTs)
}
}
return timestamps
}

func (t *transactions) Reset() {
t.Lock()
defer t.Unlock()
for _, txn := range t.m {
txn.done()
}
t.m = make(map[uint64]*Txn)
}

Expand Down Expand Up @@ -158,21 +135,9 @@ func (t *transactions) Get(startTs uint64) *Txn {
func (t *transactions) Done(startTs uint64) {
t.Lock()
defer t.Unlock()
txn, ok := t.m[startTs]
if !ok {
return
}
txn.done()
delete(t.m, startTs)
}

func (t *Txn) done() {
t.Lock()
defer t.Unlock()
// All indices should have been added by now.
TxnMarks().DoneMany(t.Indices)
}

// LastIndex returns the index of last prewrite proposal associated with
// the transaction.
func (t *Txn) LastIndex() uint64 {
Expand Down
Loading