Skip to content

Commit

Permalink
plumb a polling option when listing tablets from the topo
Browse files Browse the repository at this point in the history
For the TopologyWatcher which repeatedly lists all tablets from the topo, pass
in a polling option into the service List method as a hint to implementations
that support increased performance with relaxed consistency.

Signed-off-by: Michael Demmer <[email protected]>
  • Loading branch information
demmer committed Nov 15, 2024
1 parent ef248b3 commit f93a9a1
Show file tree
Hide file tree
Showing 12 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency, Polling: true})
}

// Start starts the topology watcher.
Expand Down
4 changes: 3 additions & 1 deletion go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ type Conn interface {
// List returns KV pairs, along with metadata like the version, for
// entries where the key contains the specified prefix.
// filePathPrefix is a path relative to the root directory of the cell.
// polling indicates if the caller will be repeatedly calling for results and
// therefore can accept relaxed consistency
// Can return ErrNoNode if there are no matches.
List(ctx context.Context, filePathPrefix string) ([]KVInfo, error)
List(ctx context.Context, filePathPrefix string, polling bool) ([]KVInfo, error)

// Delete deletes the provided file.
// If version is nil, it is an unconditional delete.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/consultopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
func (s *Server) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)

pairs, _, err := s.kv.List(nodePathPrefix, nil)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/etcd2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
func (s *Server) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)

resp, err := s.cli.Get(ctx, nodePathPrefix, clientv3.WithPrefix())
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (f *FakeConn) Get(ctx context.Context, filePath string) ([]byte, topo.Versi
}

// List is part of the topo.Conn interface.
func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
func (f *FakeConn) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo")
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/helpers/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (c *TeeConn) Get(ctx context.Context, filePath string) ([]byte, topo.Versio
}

// List is part of the topo.Conn interface.
func (c *TeeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return c.primary.List(ctx, filePathPrefix)
func (c *TeeConn) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
return c.primary.List(ctx, filePathPrefix, polling)
}

// Delete is part of the topo.Conn interface.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
}

// List is part of the topo.Conn interface.
func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
func (c *Conn) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
if err := c.dial(ctx); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/stats_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version,
}

// List is part of the Conn interface
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
func (st *StatsConn) List(ctx context.Context, filePathPrefix string, polling bool) ([]KVInfo, error) {
startTime := time.Now()
statsKey := []string{"List", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.List(ctx, filePathPrefix)
bytes, err := st.conn.List(ctx, filePathPrefix, polling)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return bytes, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/stats_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (st *fakeConn) Get(ctx context.Context, filePath string) (bytes []byte, ver
}

// List is part of the Conn interface
func (st *fakeConn) List(ctx context.Context, filePathPrefix string) (bytes []KVInfo, err error) {
func (st *fakeConn) List(ctx context.Context, filePathPrefix string, polling bool) (bytes []KVInfo, err error) {
if filePathPrefix == "error" {
return bytes, fmt.Errorf("Dummy error")
}
Expand Down
5 changes: 4 additions & 1 deletion go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
type GetTabletsByCellOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetTablet.
Concurrency int64
// Polling indicates if the caller is getting tablets in a polling loop and is
// used in some topo implementations to improve performance by relaxing consistency.
Polling bool
}

// GetTabletsByCell returns all the tablets in the cell.
Expand All @@ -300,7 +303,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
if err != nil {
return nil, err
}
listResults, err := cellConn.List(ctx, TabletsPath)
listResults, err := cellConn.List(ctx, TabletsPath, opt.Polling)
if err != nil || len(listResults) == 0 {
// Currently the ZooKeeper implementation does not support scans
// so we fall back to the more costly method of fetching the tablets one by one.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/test/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func checkList(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Fatalf("Create('/myfile') failed: %v", err)
}

_, err = conn.List(ctx, "/")
_, err = conn.List(ctx, "/", false /*polling*/)
if topo.IsErrType(err, topo.NoImplementation) {
// If this is not supported, skip the test
t.Skipf("%T does not support List()", conn)
Expand All @@ -229,7 +229,7 @@ func checkList(t *testing.T, ctx context.Context, ts *topo.Server) {
}

for _, path := range []string{"/top", "/toplevel", "/toplevel/", "/toplevel/nes", "/toplevel/nested/myfile"} {
entries, err := conn.List(ctx, path)
entries, err := conn.List(ctx, path, false /*polling*/)
if err != nil {
t.Fatalf("List failed(path: %q): %v", path, err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/zk2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (zs *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Versio
}

// List is part of the topo.Conn interface.
func (zs *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
func (zs *Server) List(ctx context.Context, filePathPrefix string, polling bool) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in ZK2 topo")
}

Expand Down

0 comments on commit f93a9a1

Please sign in to comment.