Skip to content

Commit

Permalink
Move GOMAXPROCS adjustment to beatcmd
Browse files Browse the repository at this point in the history
This is something that should run just once for
the lifetime of the process. I'm intending to
move the reload logic out of beater and into
beatcmd (or a subpackage).
  • Loading branch information
axw committed Oct 1, 2022
1 parent ef92548 commit e44b61f
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 167 deletions.
8 changes: 8 additions & 0 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ func (b *Beat) Run(ctx context.Context) error {
}
}

logger := logp.NewLogger("")
done := make(chan struct{})
go func() {
defer close(done)
adjustMaxProcs(ctx, 30*time.Second, diffInfof(logger), logger.Errorf)
}()
defer func() { <-done }()

beater, err := b.createBeater(b.create)
if err != nil {
return err
Expand Down
112 changes: 112 additions & 0 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package beatcmd

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages.
func TestRunMaxProcs(t *testing.T) {
for _, n := range []int{1, 2, 4} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) {
t.Setenv("GOMAXPROCS", strconv.Itoa(n))
beat, _ := newNopBeat(t, "output.console.enabled: true")

// Capture logs for testing.
logp.DevelopmentSetup(logp.ToObserverOutput())
logs := logp.ObserverLogs()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
defer g.Wait()
g.Go(func() error { return beat.Run(ctx) })

timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()
for {
select {
case <-timeout.C:
t.Error("timed out waiting for log message, total logs observed:", logs.Len())
for _, log := range logs.All() {
t.Log(log.LoggerName, log.Message)
}
return
case <-time.After(10 * time.Millisecond):
}

logs := logs.FilterMessageSnippet(fmt.Sprintf(
`maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n,
))
if logs.Len() > 0 {
break
}
}

cancel()
assert.NoError(t, g.Wait())
})
}
}

func newNopBeat(t testing.TB, configYAML string) (*Beat, *nopBeater) {
resetGlobals()
initCfgfile(t, configYAML)
nopBeater := newNopBeater()
beat, err := NewBeat(BeatParams{
Create: func(b *beat.Beat, cfg *config.C) (beat.Beater, error) {
return nopBeater, nil
},
})
require.NoError(t, err)
return beat, nopBeater
}

func resetGlobals() {
// Clear monitoring registries to allow the new Beat to populate them.
monitoring.GetNamespace("info").SetRegistry(nil)
monitoring.GetNamespace("state").SetRegistry(nil)
for _, name := range []string{"system", "beat", "libbeat"} {
registry := monitoring.Default.GetRegistry(name)
if registry != nil {
registry.Clear()
}
}

// Create a new reload registry, as the Beat.Run method will register with it.
reload.Register = reload.NewRegistry()
}

type nopBeater struct {
running chan struct{}
done chan struct{}
}

func newNopBeater() *nopBeater {
return &nopBeater{
running: make(chan struct{}),
done: make(chan struct{}),
}
}

func (b *nopBeater) Run(*beat.Beat) error {
close(b.running)
<-b.done
return nil
}

func (b *nopBeater) Stop() {
close(b.done)
}
30 changes: 3 additions & 27 deletions internal/beatcmd/locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,7 @@ import (
)

func TestLocker(t *testing.T) {
initCfgfile(t, `output.console.enabled: true`)

nopBeater := &nopBeater{
running: make(chan struct{}),
done: make(chan struct{}),
}
beat1, err := NewBeat(BeatParams{
Create: func(*beat.Beat, *config.C) (beat.Beater, error) {
return nopBeater, nil
},
})
require.NoError(t, err)
beat1, nopBeater := newNopBeat(t, `output.console.enabled: true`)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -55,6 +44,8 @@ func TestLocker(t *testing.T) {
// the lock should be held.
<-nopBeater.running

// Create another Beat using the same configuration and data directory;
// its Run method should fail to acquire the lock while beat1 is running.
beat2, err := NewBeat(BeatParams{
Create: func(*beat.Beat, *config.C) (beat.Beater, error) {
panic("should not be called")
Expand All @@ -67,18 +58,3 @@ func TestLocker(t *testing.T) {
cancel()
assert.NoError(t, g.Wait())
}

type nopBeater struct {
running chan struct{}
done chan struct{}
}

func (b *nopBeater) Run(*beat.Beat) error {
close(b.running)
<-b.done
return nil
}

func (b *nopBeater) Stop() {
close(b.done)
}
56 changes: 56 additions & 0 deletions internal/beatcmd/maxprocs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package beatcmd

import (
"context"
"fmt"
"time"

"go.uber.org/automaxprocs/maxprocs"

"github.com/elastic/elastic-agent-libs/logp"
)

type logf func(string, ...interface{})

// adjustMaxProcs uses `maxprocs` to change the GOMAXPROCS respecting any
// CFS quotas, if set.
//
// This is necessary since the Go runtime will default to the number of CPUs
// available in the machine it's running in, however, when running in a
// container or in a cgroup with resource limits, the disparity can be extreme.
//
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
// Since the quotas may be updated without restarting the process, the
// GOMAXPROCS are adjusted every 30s.
func adjustMaxProcs(ctx context.Context, d time.Duration, infof, errorf logf) error {
setMaxProcs := func() {
if _, err := maxprocs.Set(maxprocs.Logger(infof)); err != nil {
errorf("failed to set GOMAXPROCS: %v", err)
}
}
// set the gomaxprocs immediately.
setMaxProcs()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
setMaxProcs()
}
}
}

func diffInfof(logger *logp.Logger) logf {
var last string
return func(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if msg != last {
logger.Info(msg)
last = msg
}
}
}
72 changes: 72 additions & 0 deletions internal/beatcmd/maxprocs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package beatcmd

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestAdjustMaxProcsTickerRefresh(t *testing.T) {
// This test asserts that the GOMAXPROCS is called multiple times
// respecting the time.Duration that is passed in the function.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, false)
assert.GreaterOrEqual(t, observedLogs.Len(), 10)
})
}
}

func TestAdjustMaxProcsTickerRefreshDiffLogger(t *testing.T) {
// This test asserts that the log messages aren't logged more than once.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, true)
// Assert that only 1 message has been logged.
assert.Equal(t, observedLogs.Len(), 1)
})
}
}

func testAdjustMaxProcs(t *testing.T, maxP int, diffCore bool) *observer.ObservedLogs {
t.Setenv("GOMAXPROCS", fmt.Sprint(maxP))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

core, observedLogs := observer.New(zapcore.DebugLevel)
logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
}))

// Adjust maxprocs every 1ms.
refreshDuration := time.Millisecond
logFunc := logger.Infof
if diffCore {
logFunc = diffInfof(logger)
}

go adjustMaxProcs(ctx, refreshDuration, logFunc, logger.Errorf)

filterMsg := fmt.Sprintf(`maxprocs: Honoring GOMAXPROCS="%d"`, maxP)
for {
select {
// Wait for 50ms so adjustmaxprocs has had time to run a few times.
case <-time.After(50 * refreshDuration):
logs := observedLogs.FilterMessageSnippet(filterMsg)
if logs.Len() >= 1 {
return logs
}
case <-ctx.Done():
t.Error(ctx.Err())
return nil
}
}
}
46 changes: 0 additions & 46 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"go.elastic.co/apm/module/apmgrpc/v2"
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

Expand Down Expand Up @@ -171,18 +170,6 @@ func (bt *beater) Run(b *beat.Beat) error {
}

func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error {
// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
// Since the quotas may be updated without restarting the process, the
// GOMAXPROCS are adjusted every 30s.
go adjustMaxProcs(ctx, 30*time.Second, diffInfof(bt.logger), bt.logger.Errorf)

tracer, tracerServer, err := initTracing(b, bt.config, bt.logger)
if err != nil {
return err
Expand Down Expand Up @@ -1130,36 +1117,3 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error
s.Set(response.ClusterUUID)
return nil
}

type logf func(string, ...interface{})

func adjustMaxProcs(ctx context.Context, d time.Duration, infof, errorf logf) error {
setMaxProcs := func() {
if _, err := maxprocs.Set(maxprocs.Logger(infof)); err != nil {
errorf("failed to set GOMAXPROCS: %v", err)
}
}
// set the gomaxprocs immediately.
setMaxProcs()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
setMaxProcs()
}
}
}

func diffInfof(logger *logp.Logger) logf {
var last string
return func(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if msg != last {
logger.Info(msg)
last = msg
}
}
}
Loading

0 comments on commit e44b61f

Please sign in to comment.