Skip to content

Commit

Permalink
All Oracle delta streams are applied via proposals. (#2463)
Browse files Browse the repository at this point in the history
We should stop having leaders and followers receive transaction status updates from Zero directly and apply them to their state. This causes multiple race conditions and the behavior becomes indeterministic with edge cases. Instead, we have the leaders receive the updates, and propose them to their corresponding group. Proposals flow through Raft and get applied via `Node.Run` in a serial manner.

- Only print connection failure once, not every run.
- Directly propose OracleDelta, instead of proposing one TxnContext per entry in OracleDelta.
- Batch up multiple OracleDelta receives from Zero into one with smart batching into one proposal to the group. This would amortize the cost of proposals.
- Remove unnecessary trace: "In run mutation"
  • Loading branch information
manishrjain authored Jun 29, 2018
1 parent 0356bbd commit 30575a8
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 343 deletions.
22 changes: 13 additions & 9 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewPool(addr string) (*Pool, error) {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now()}
pl.UpdateHealthStatus()
pl.UpdateHealthStatus(true)

// Initialize ticker before running monitor health.
pl.ticker = time.NewTicker(echoDuration)
Expand All @@ -143,7 +143,7 @@ func (p *Pool) shutdown() {
p.conn.Close()
}

func (p *Pool) UpdateHealthStatus() {
func (p *Pool) UpdateHealthStatus(printError bool) error {
conn := p.Get()

query := new(api.Payload)
Expand All @@ -152,23 +152,27 @@ func (p *Pool) UpdateHealthStatus() {

c := intern.NewRaftClient(conn)
resp, err := c.Echo(context.Background(), query)
var lastEcho time.Time
if err == nil {
x.AssertTruef(bytes.Equal(resp.Data, query.Data),
"non-matching Echo response value from %v", p.Addr)
lastEcho = time.Now()
} else {
p.Lock()
p.lastEcho = time.Now()
p.Unlock()
} else if printError {
x.Printf("Echo error from %v. Err: %v\n", p.Addr, err)
}
p.Lock()
p.lastEcho = lastEcho
p.Unlock()
return err
}

// MonitorHealth monitors the health of the connection via Echo. This function blocks forever.
func (p *Pool) MonitorHealth() {
var lastErr error
for range p.ticker.C {
p.UpdateHealthStatus()
err := p.UpdateHealthStatus(lastErr == nil)
if lastErr != nil && err == nil {
x.Printf("Connection established with %v\n", p.Addr)
}
lastErr = err
}
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/integration/acctupsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

var (
addr = flag.String("addr", "localhost:9080", "dgraph address")
concurr = flag.Int("c", 5, "number of concurrent upserts per account")
concurr = flag.Int("c", 3, "number of concurrent upserts per account")
)

var (
Expand Down
2 changes: 2 additions & 0 deletions dgraph/cmd/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {

d := r.URL.Query().Get("debug")
ctx := context.WithValue(context.Background(), "debug", d)

// Core processing happens here.
resp, err := (&edgraph.Server{}).Query(ctx, &req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
Expand Down
8 changes: 7 additions & 1 deletion dgraph/cmd/server/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ func alterSchema(s string) error {
}

func alterSchemaWithRetry(s string) error {
return alterSchema(s)
var err error
for i := 0; i < 3; i++ {
if err = alterSchema(s); err == nil {
return nil
}
}
return err
}

func dropAll() error {
Expand Down
24 changes: 12 additions & 12 deletions dgraph/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: "${DATA}"
target: /data
# - type: bind
# source: "${DATA}"
# target: /data
# zero2:
# image: debian:latest
# container_name: bank-dg0.2
Expand Down Expand Up @@ -47,9 +47,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: "${DATA}"
target: /data
# - type: bind
# source: "${DATA}"
# target: /data
ports:
- 8180:8180
- 9180:9180
Expand All @@ -63,9 +63,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: "${DATA}"
target: /data
# - type: bind
# source: "${DATA}"
# target: /data
ports:
- 8181:8181
- 9181:9181
Expand All @@ -80,9 +80,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: "${DATA}"
target: /data
# - type: bind
# source: "${DATA}"
# target: /data
ports:
- 8182:8182
- 9182:9182
Expand Down
1 change: 1 addition & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
LinRead: req.LinRead,
}

// Core processing happens here.
var er query.ExecuteResult
if er, err = queryRequest.Process(ctx); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
Expand Down
3 changes: 3 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func Txns() *transactions {
return txns
}

// This structure is useful to keep track of which keys were updated, and whether they should be
// used for conflict detection or not. When a txn is marked committed or aborted, this is what we
// use to go fetch the posting lists and update the txn status in them.
type delta struct {
key []byte
posting *intern.Posting
Expand Down
Loading

0 comments on commit 30575a8

Please sign in to comment.