From 210c0923b3b0990642b8605a3d278f1644ddfc70 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 16 Dec 2024 20:00:59 +0100 Subject: [PATCH] Ensure all topo read calls consider `--topo_read_concurrency` (#17276) Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtbackup.txt | 1 + go/flags/endtoend/vtcombo.txt | 2 +- go/flags/endtoend/vtctld.txt | 2 +- go/flags/endtoend/vtgate.txt | 2 +- go/flags/endtoend/vtorc.txt | 2 +- go/flags/endtoend/vttablet.txt | 1 + go/vt/discovery/healthcheck.go | 2 +- go/vt/discovery/topology_watcher.go | 29 +++++++--------- go/vt/discovery/topology_watcher_test.go | 10 +++--- go/vt/topo/keyspace.go | 28 +++------------ go/vt/topo/server.go | 13 +++++-- go/vt/topo/shard.go | 9 ----- go/vt/topo/stats_conn.go | 29 +++++++++++++++- go/vt/topo/stats_conn_test.go | 44 +++++++++++++++++++----- go/vt/topo/tablet.go | 32 +++-------------- go/vt/topo/tablet_test.go | 5 --- go/vt/vtorc/logic/tablet_discovery.go | 2 +- 17 files changed, 107 insertions(+), 106 deletions(-) diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 25730ab892f..151487d5522 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -235,6 +235,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 126b073c6b5..4336385a5f4 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -379,7 +379,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 325c4ae30cd..6ef6e4685db 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -169,7 +169,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 07353a0beaa..a56a6893934 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -227,7 +227,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 5fbfe95dc49..654a0fdc544 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,7 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 502b81f1f6a..89f15a24239 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -381,6 +381,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 0531a8aee13..010cfa9eab3 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -381,7 +381,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 64346d524ad..d1e358e1aa5 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -26,16 +26,13 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/topo/topoproto" - - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/stats" "vitess.io/vitess/go/trace" - + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/topodata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" ) const ( @@ -56,7 +53,7 @@ var ( // tabletInfo is used internally by the TopologyWatcher struct. type tabletInfo struct { alias string - tablet *topodata.Tablet + tablet *topodatapb.Tablet } // TopologyWatcher polls the topology periodically for changes to @@ -70,7 +67,6 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,7 +88,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -100,7 +96,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) @@ -112,7 +107,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC } func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { - return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil) } // Start starts the topology watcher. @@ -271,14 +266,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { // to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { // IsIncluded returns whether tablet is included in this filter - IsIncluded(tablet *topodata.Tablet) bool + IsIncluded(tablet *topodatapb.Tablet) bool } // TabletFilters contains filters for tablets. type TabletFilters []TabletFilter // IsIncluded returns true if a tablet passes all filters. -func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { +func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { for _, filter := range tf { if !filter.IsIncluded(tablet) { return false @@ -299,7 +294,7 @@ type FilterByShard struct { type filterShard struct { keyspace string shard string - keyRange *topodata.KeyRange // only set if shard is also a KeyRange + keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange } // NewFilterByShard creates a new FilterByShard for use by a @@ -344,7 +339,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { } // IsIncluded returns true iff the tablet's keyspace and shard match what we have. -func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { +func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) @@ -384,7 +379,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { } // IsIncluded returns true if the tablet's keyspace matches what we have. -func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { +func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } @@ -403,7 +398,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { } // IsIncluded returns true if the tablet's tags match what we expect. -func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool { if fbtg.tags == nil { return true } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 834fdcb1afe..b84347d38ce 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { fhc := NewFakeHealthCheck(nil) defer fhc.Close() topologyWatcherOperations.ZeroAll() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true) done := make(chan bool, 3) result := make(chan bool, 1) @@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) { f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() - tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} - tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true) defer tw.Stop() // Force fallback to getting tablets individually. diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 08b1c59f4cc..4e1d19ec79a 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -22,44 +22,26 @@ import ( "sort" "sync" - "github.com/spf13/pflag" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/event" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/event" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo/events" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo/events" + "vitess.io/vitess/go/vt/vterrors" ) // This file contains keyspace utility functions. -// Default concurrency to use in order to avoid overhwelming the topo server. -var DefaultConcurrency = 32 - // shardKeySuffix is the suffix of a shard key. // The full key looks like this: // /vitess/global/keyspaces/customer/shards/80-/Shard const shardKeySuffix = "Shard" -func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.") -} - -func init() { - servenv.OnParseFor("vtcombo", registerFlags) - servenv.OnParseFor("vtctld", registerFlags) - servenv.OnParseFor("vtgate", registerFlags) - servenv.OnParseFor("vtorc", registerFlags) -} - // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact // with a keyspace. @@ -198,7 +180,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { type FindAllShardsInKeyspaceOptions struct { // Concurrency controls the maximum number of concurrent calls to GetShard. // If <= 0, Concurrency is set to 1. - Concurrency int + Concurrency int64 } // FindAllShardsInKeyspace reads and returns all the existing shards in a @@ -212,7 +194,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt = &FindAllShardsInKeyspaceOptions{} } if opt.Concurrency <= 0 { - opt.Concurrency = DefaultConcurrency + opt.Concurrency = DefaultReadConcurrency } // Unescape the keyspace name as this can e.g. come from the VSchema where diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 1995e8b6ec4..b9bf46a1c3d 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -48,6 +48,7 @@ import ( "sync" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" @@ -175,6 +176,9 @@ var ( FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate", "vtorc", "vtbackup"} + + // Default read concurrency to use in order to avoid overhwelming the topo server. + DefaultReadConcurrency int64 = 32 ) func init() { @@ -187,6 +191,7 @@ func registerTopoFlags(fs *pflag.FlagSet) { fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use") fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server") fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server") + fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell.") } // RegisterFactory registers a Factory for an implementation for a Server. @@ -202,11 +207,12 @@ func RegisterFactory(name string, factory Factory) { // NewWithFactory creates a new Server based on the given Factory. // It also opens the global cell connection. func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) { + globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency) conn, err := factory.Create(GlobalCell, serverAddress, root) if err != nil { return nil, err } - conn = NewStatsConn(GlobalCell, conn) + conn = NewStatsConn(GlobalCell, conn, globalReadSem) var connReadOnly Conn if factory.HasGlobalReadOnlyCell(serverAddress, root) { @@ -214,7 +220,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error if err != nil { return nil, err } - connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly) + connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem) } else { connReadOnly = conn } @@ -295,7 +301,8 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root) switch { case err == nil: - conn = NewStatsConn(cell, conn) + cellReadSem := semaphore.NewWeighted(DefaultReadConcurrency) + conn = NewStatsConn(cell, conn, cellReadSem) ts.cellConns[cell] = cellConn{ci, conn} return conn, nil case IsErrType(err, NoNode): diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 220510e71f5..613a9f265ba 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -671,16 +671,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } - // Divide the concurrency limit by the number of cells. If there are more - // cells than the limit, default to concurrency of 1. - cellConcurrency := 1 - if len(cells) < DefaultConcurrency { - cellConcurrency = DefaultConcurrency / len(cells) - } - mu := sync.Mutex{} eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(DefaultConcurrency) tablets := make([]*TabletInfo, 0, len(cells)) var kss *KeyspaceShard @@ -691,7 +683,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } options := &GetTabletsByCellOptions{ - Concurrency: cellConcurrency, KeyspaceShard: kss, } for _, cell := range cells { diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 08f44c0f75e..65c2b8b479a 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -20,6 +20,8 @@ import ( "context" "time" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -37,6 +39,11 @@ var ( "TopologyConnErrors", "TopologyConnErrors errors per operation", []string{"Operation", "Cell"}) + + topoStatsConnReadWaitTimings = stats.NewMultiTimings( + "TopologyConnReadWaits", + "TopologyConnReadWait timings", + []string{"Operation", "Cell"}) ) const readOnlyErrorStrFormat = "cannot perform %s on %s as the topology server connection is read-only" @@ -46,14 +53,16 @@ type StatsConn struct { cell string conn Conn readOnly bool + readSem *semaphore.Weighted } // NewStatsConn returns a StatsConn -func NewStatsConn(cell string, conn Conn) *StatsConn { +func NewStatsConn(cell string, conn Conn, readSem *semaphore.Weighted) *StatsConn { return &StatsConn{ cell: cell, conn: conn, readOnly: false, + readSem: readSem, } } @@ -61,6 +70,12 @@ func NewStatsConn(cell string, conn Conn) *StatsConn { func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) { startTime := time.Now() statsKey := []string{"ListDir", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -106,6 +121,12 @@ func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byt func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) { startTime := time.Now() statsKey := []string{"Get", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -119,6 +140,12 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) { startTime := time.Now() statsKey := []string{"List", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index e26e8c97f31..730a2fc1613 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -20,11 +20,16 @@ import ( "context" "fmt" "testing" + "time" + + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) +var testStatsConnReadSem = semaphore.NewWeighted(1) + // The fakeConn is a wrapper for a Conn that emits stats for every operation type fakeConn struct { v Version @@ -150,18 +155,33 @@ func (st *fakeConn) IsReadOnly() bool { return st.readOnly } +// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore. +func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) { + if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil { + panic(err) + } + defer testStatsConnReadSem.Release(1) + time.Sleep(duration) +} + // TestStatsConnTopoListDir emits stats on ListDir func TestStatsConnTopoListDir(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + go createTestReadSemaphoreContention(ctx, 100*time.Millisecond) statsConn.ListDir(ctx, "", true) timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] if got, want := timingCounts, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"] + if got := waitTimingsCounts; got != 1 { + t.Errorf("stats were not properly recorded: got = %d, want = 1", got) + } + // error is zero before getting an error errorCount := topoStatsConnErrors.Counts()["ListDir.global"] if got, want := errorCount, int64(0); got != want { @@ -180,7 +200,7 @@ func TestStatsConnTopoListDir(t *testing.T) { // TestStatsConnTopoCreate emits stats on Create func TestStatsConnTopoCreate(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Create(ctx, "", []byte{}) @@ -207,7 +227,7 @@ func TestStatsConnTopoCreate(t *testing.T) { // TestStatsConnTopoUpdate emits stats on Update func TestStatsConnTopoUpdate(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Update(ctx, "", []byte{}, conn.v) @@ -234,15 +254,21 @@ func TestStatsConnTopoUpdate(t *testing.T) { // TestStatsConnTopoGet emits stats on Get func TestStatsConnTopoGet(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + go createTestReadSemaphoreContention(ctx, time.Millisecond*100) statsConn.Get(ctx, "") timingCounts := topoStatsConnTimings.Counts()["Get.global"] if got, want := timingCounts, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"] + if got := waitTimingsCounts; got != 1 { + t.Errorf("stats were not properly recorded: got = %d, want = 1", got) + } + // error is zero before getting an error errorCount := topoStatsConnErrors.Counts()["Get.global"] if got, want := errorCount, int64(0); got != want { @@ -261,7 +287,7 @@ func TestStatsConnTopoGet(t *testing.T) { // TestStatsConnTopoDelete emits stats on Delete func TestStatsConnTopoDelete(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Delete(ctx, "", conn.v) @@ -288,7 +314,7 @@ func TestStatsConnTopoDelete(t *testing.T) { // TestStatsConnTopoLock emits stats on Lock func TestStatsConnTopoLock(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Lock(ctx, "", "") @@ -315,7 +341,7 @@ func TestStatsConnTopoLock(t *testing.T) { // TestStatsConnTopoWatch emits stats on Watch func TestStatsConnTopoWatch(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Watch(ctx, "") @@ -329,7 +355,7 @@ func TestStatsConnTopoWatch(t *testing.T) { // TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) _, _ = statsConn.NewLeaderParticipation("", "") timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"] @@ -355,7 +381,7 @@ func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { // TestStatsConnTopoClose emits stats on Close func TestStatsConnTopoClose(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) statsConn.Close() timingCounts := topoStatsConnTimings.Counts()["Close.global"] diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 7dccf3bf183..ffc21153a7a 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -24,21 +24,17 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" ) // IsTrivialTypeChange returns if this db type be trivially reassigned @@ -287,8 +283,6 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t // GetTabletsByCellOptions controls the behavior of // Server.FindAllShardsInKeyspace. type GetTabletsByCellOptions struct { - // Concurrency controls the maximum number of concurrent calls to GetTablet. - Concurrency int // KeyspaceShard is the optional keyspace/shard that tablets must match. // An empty shard value will match all shards in the keyspace. KeyspaceShard *KeyspaceShard @@ -550,29 +544,11 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb. returnErr error ) - concurrency := DefaultConcurrency - if opt != nil && opt.Concurrency > 0 { - concurrency = opt.Concurrency - } - var sem = semaphore.NewWeighted(int64(concurrency)) - for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() - if err := sem.Acquire(ctx, 1); err != nil { - // Only happens if context is cancelled. - mu.Lock() - defer mu.Unlock() - log.Warningf("%v: %v", tabletAlias, err) - // We only need to set this on the first error. - if returnErr == nil { - returnErr = NewError(PartialResult, tabletAlias.GetCell()) - } - return - } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - sem.Release(1) mu.Lock() defer mu.Unlock() if err != nil { diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index e659a0d01b9..1c242e8778b 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -69,7 +69,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, // Ensure this doesn't panic. - opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { name: "single", @@ -151,7 +150,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { name: "multiple with list error", @@ -210,7 +208,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, listError: topo.NewError(topo.ResourceExhausted, ""), }, { @@ -249,7 +246,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: shard, @@ -317,7 +313,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: "", diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 73cd61676ca..f59efcaaff4 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -202,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return