diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 1b0149e309c6..66ca672fea52 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" @@ -52,6 +53,26 @@ var ( // Name is the name of the outlier detection balancer. const Name = "outlier_detection_experimental" +var ( + ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_enforced", + Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, + OptionalLabels: []string{"grpc.lb.backend_service"}, + Default: false, + }) + + ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_unenforced", + Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, + OptionalLabels: []string{"grpc.lb.backend_service"}, + Default: false, + }) +) + func init() { balancer.Register(bb{}) } @@ -60,14 +81,16 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &outlierDetectionBalancer{ - ClientConn: cc, - closed: grpcsync.NewEvent(), - done: grpcsync.NewEvent(), - addrs: make(map[string]*endpointInfo), - scUpdateCh: buffer.NewUnbounded(), - pickerUpdateCh: buffer.NewUnbounded(), - channelzParent: bOpts.ChannelzParent, - endpoints: resolver.NewEndpointMap[*endpointInfo](), + ClientConn: cc, + closed: grpcsync.NewEvent(), + done: grpcsync.NewEvent(), + addrs: make(map[string]*endpointInfo), + scUpdateCh: buffer.NewUnbounded(), + pickerUpdateCh: buffer.NewUnbounded(), + channelzParent: bOpts.ChannelzParent, + endpoints: resolver.NewEndpointMap[*endpointInfo](), + metricsRecorder: cc.MetricsRecorder(), + target: bOpts.Target.String(), } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -134,6 +157,15 @@ func (bb) Name() string { return Name } +// extractBackendService extracts the backend service from resolver state attributes. +// This is a placeholder implementation - the actual extraction logic should be +// implemented based on the specific resolver attributes available. +func extractBackendService(resolver.State) string { + // TODO: Implement backend service extraction from resolver attributes per A89 and A75 + // For now, return empty string as this is optional + return "" +} + // scUpdate wraps a subConn update to be sent to the child balancer. type scUpdate struct { scw *subConnWrapper @@ -169,10 +201,13 @@ type outlierDetectionBalancer struct { // to suppress redundant picker updates. recentPickerNoop bool - closed *grpcsync.Event - done *grpcsync.Event - logger *grpclog.PrefixLogger - channelzParent channelz.Identifier + closed *grpcsync.Event + done *grpcsync.Event + logger *grpclog.PrefixLogger + channelzParent channelz.Identifier + metricsRecorder estats.MetricsRecorder + target string + backendService string child synchronizingBalancerWrapper @@ -294,6 +329,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt b.inhibitPickerUpdates = true b.updateUnconditionally = false b.cfg = lbCfg + b.backendService = extractBackendService(s.ResolverState) newEndpoints := resolver.NewEndpointMap[bool]() for _, ep := range s.ResolverState.Endpoints { @@ -791,15 +827,21 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket ejectionCfg := b.cfg.SuccessRateEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures) requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000) if successRate < requiredSuccessRate { channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate) + // Check if max ejection percentage would prevent ejection + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow", b.backendService) + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "success_rate") + } else { + // Record unenforced ejection due to enforcement percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage", b.backendService) } } } @@ -819,21 +861,27 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket ejectionCfg := b.cfg.FailurePercentageEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage) + // Check if max ejection percentage would prevent ejection + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow", b.backendService) + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "failure_percentage") + } else { + // Record unenforced ejection due to enforcement percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage", b.backendService) } } } } // Caller must hold b.mu. -func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { +func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detectionMethod string) { b.numEndpointsEjected++ epInfo.latestEjectionTimestamp = b.timerStartTime epInfo.ejectionTimeMultiplier++ @@ -842,6 +890,8 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw) } + // Record the enforced ejection metric + ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod, b.backendService) } // Caller must hold b.mu. diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 25b6a22152f9..fc81a7f4b205 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -46,6 +46,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -1159,6 +1160,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { } } + // Create test metrics recorder + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr od.intervalTimerAlgorithm() // verify no StateListener() call on the child, as no addresses got @@ -1168,6 +1172,12 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Since no addresses are ejected, a SubConn update should forward down // to the child. @@ -1234,6 +1244,12 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Now that an address is ejected, SubConn updates for SubConns using // that address should not be forwarded downward. These SubConn updates @@ -1414,6 +1430,8 @@ func (s) TestEjectFailureRate(t *testing.T) { pi.Done(balancer.DoneInfo{}) } } + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr od.intervalTimerAlgorithm() sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) @@ -1421,6 +1439,12 @@ func (s) TestEjectFailureRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Set two upstream addresses to have five successes each, and one // upstream address to have five failures. This should cause the address @@ -1465,6 +1489,12 @@ func (s) TestEjectFailureRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // upon the Outlier Detection balancer being reconfigured with a noop // configuration, every ejected SubConn should be unejected.