From 7a66a86d0d7de501a16e0c1562691bbd612cc14f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 11 Dec 2019 10:38:57 +0100 Subject: [PATCH 1/3] feat: block on trim 1. Perform all trims in the background goroutine. 2. Make `TrimOpenConns` return early when canceled. 3. Make multiple concurrent calls to `TrimOpenConns` block until the trim finishes. Note: It already _may_ block one caller, this just ensures that it always behaves the same way. Returning a signal channel may be a nicer solution but this is less breaking. --- connmgr.go | 90 +++++++++++++++++++++++++++++++++++++------------ connmgr_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++--- go.mod | 2 ++ go.sum | 19 ++--------- 4 files changed, 153 insertions(+), 43 deletions(-) diff --git a/connmgr.go b/connmgr.go index 804730d..fb94fd8 100644 --- a/connmgr.go +++ b/connmgr.go @@ -36,8 +36,9 @@ type BasicConnMgr struct { plk sync.RWMutex protected map[peer.ID]map[string]struct{} - // channel-based semaphore that enforces only a single trim is in progress - trimRunningCh chan struct{} + trimMu sync.RWMutex + trimTrigger chan struct{} + trimSignal chan struct{} lastTrim time.Time silencePeriod time.Duration @@ -96,7 +97,8 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { highWater: hi, lowWater: low, gracePeriod: grace, - trimRunningCh: make(chan struct{}, 1), + trimTrigger: make(chan struct{}), + trimSignal: make(chan struct{}), protected: make(map[peer.ID]map[string]struct{}, 16), silencePeriod: SilencePeriod, ctx: ctx, @@ -167,25 +169,28 @@ type peerInfo struct { // TODO: error return value so we can cleanly signal we are aborting because: // (a) there's another trim in progress, or (b) the silence period is in effect. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { + cm.trimMu.RLock() + trimSignal := cm.trimSignal + cm.trimMu.RUnlock() + + // Trigger a trim. select { - case cm.trimRunningCh <- struct{}{}: - default: - return - } - defer func() { <-cm.trimRunningCh }() - if time.Since(cm.lastTrim) < cm.silencePeriod { - // skip this attempt to trim as the last one just took place. + case cm.trimTrigger <- struct{}{}: + case <-trimSignal: + // Someone else just trimmed. return + case <-cm.ctx.Done(): + case <-ctx.Done(): + // TODO: return an error? } - defer log.EventBegin(ctx, "connCleanup").Done() - for _, c := range cm.getConnsToClose(ctx) { - log.Info("closing conn: ", c.RemotePeer()) - log.Event(ctx, "closeConn", c.RemotePeer()) - c.Close() + // Wait for the trim. + select { + case <-trimSignal: + case <-cm.ctx.Done(): + case <-ctx.Done(): + // TODO: return an error? } - - cm.lastTrim = time.Now() } func (cm *BasicConnMgr) background() { @@ -195,19 +200,56 @@ func (cm *BasicConnMgr) background() { for { select { case <-ticker.C: - if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) { - cm.TrimOpenConns(cm.ctx) + if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) { + // Below high water, skip. + continue } - + case <-cm.trimTrigger: case <-cm.ctx.Done(): return } + cm.trim() + } +} + +func (cm *BasicConnMgr) trim() { + cm.trimMu.Lock() + + // read the last trim time under the lock + lastTrim := cm.lastTrim + + // swap out the trim signal + trimSignal := cm.trimSignal + cm.trimSignal = make(chan struct{}) + + cm.trimMu.Unlock() + + // always signal a trim, even if we _don't_ trim, to unblock anyone + // waiting. + defer close(trimSignal) + + // skip this attempt to trim if the last one just took place. + if time.Since(lastTrim) < cm.silencePeriod { + return } + + // do the actual trim. + defer log.EventBegin(cm.ctx, "connCleanup").Done() + for _, c := range cm.getConnsToClose() { + log.Info("closing conn: ", c.RemotePeer()) + log.Event(cm.ctx, "closeConn", c.RemotePeer()) + c.Close() + } + + // finally, update the last trim time. + cm.trimMu.Lock() + cm.lastTrim = time.Now() + cm.trimMu.Unlock() } // getConnsToClose runs the heuristics described in TrimOpenConns and returns the // connections to close. -func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn { +func (cm *BasicConnMgr) getConnsToClose() []network.Conn { if cm.lowWater == 0 || cm.highWater == 0 { // disabled return nil @@ -386,10 +428,14 @@ type CMInfo struct { // GetInfo returns the configuration and status data for this connection manager. func (cm *BasicConnMgr) GetInfo() CMInfo { + cm.trimMu.RLock() + lastTrim := cm.lastTrim + cm.trimMu.RUnlock() + return CMInfo{ HighWater: cm.highWater, LowWater: cm.lowWater, - LastTrim: cm.lastTrim, + LastTrim: lastTrim, GracePeriod: cm.gracePeriod, ConnCount: int(atomic.LoadInt32(&cm.connCount)), } diff --git a/connmgr_test.go b/connmgr_test.go index ad78786..705fa77 100644 --- a/connmgr_test.go +++ b/connmgr_test.go @@ -2,6 +2,7 @@ package connmgr import ( "context" + "sync" "testing" "time" @@ -47,6 +48,82 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw return &tconn{peer: pid, disconnectNotify: discNotify} } +// Make sure multiple trim calls block. +func TestTrimBlocks(t *testing.T) { + cm := NewConnManager(200, 300, 0) + + cm.trimMu.RLock() + + doneCh := make(chan struct{}, 2) + go func() { + cm.TrimOpenConns(context.Background()) + doneCh <- struct{}{} + }() + go func() { + cm.TrimOpenConns(context.Background()) + doneCh <- struct{}{} + }() + time.Sleep(time.Millisecond) + select { + case <-doneCh: + cm.trimMu.RUnlock() + t.Fatal("expected trim to block") + default: + cm.trimMu.RUnlock() + } + <-doneCh + <-doneCh +} + +// Make sure we return from trim when the context is canceled. +func TestTrimCancels(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cm := NewConnManager(200, 300, 0) + + cm.trimMu.RLock() + defer cm.trimMu.RUnlock() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + cm.TrimOpenConns(ctx) + }() + time.Sleep(time.Millisecond) + cancel() + <-doneCh +} + +// Make sure trim returns when closed. +func TestTrimClosed(t *testing.T) { + cm := NewConnManager(200, 300, 0) + cm.Close() + cm.TrimOpenConns(context.Background()) +} + +// Make sure joining an existing trim works. +func TestTrimJoin(t *testing.T) { + cm := NewConnManager(200, 300, 0) + cm.trimMu.RLock() + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + time.Sleep(time.Millisecond) + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + time.Sleep(time.Millisecond) + cm.trimMu.RUnlock() + wg.Wait() +} + func TestConnTrimming(t *testing.T) { cm := NewConnManager(200, 300, 0) not := cm.Notifee() @@ -86,19 +163,19 @@ func TestConnTrimming(t *testing.T) { func TestConnsToClose(t *testing.T) { cm := NewConnManager(0, 10, 0) - conns := cm.getConnsToClose(context.Background()) + conns := cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } cm = NewConnManager(10, 0, 0) - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } cm = NewConnManager(1, 1, 0) - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } @@ -109,7 +186,7 @@ func TestConnsToClose(t *testing.T) { conn := randConn(t, nil) not.Connected(nil, conn) } - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if len(conns) != 0 { t.Fatal("expected no connections") } diff --git a/go.mod b/go.mod index d7c3c51..2f31fc1 100644 --- a/go.mod +++ b/go.mod @@ -6,3 +6,5 @@ require ( github.com/libp2p/go-libp2p-core v0.2.5 github.com/multiformats/go-multiaddr v0.1.2 ) + +go 1.12 diff --git a/go.sum b/go.sum index 28694f0..6ac5056 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= -github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= -github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 h1:A/EVblehb75cUgXA5njHPn0kLAsykn6mJGz7rnmW5W0= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= -github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= @@ -18,6 +15,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= @@ -35,15 +33,12 @@ github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfm github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= -github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -58,10 +53,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= -github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.2.5 h1:iP1PIiIrlRrGbE1fYq2918yBc5NlCH3pFuIPSWU9hds= github.com/libp2p/go-libp2p-core v0.2.5/go.mod h1:6+5zJmKhsf7yHn1RbmYDu08qDUpIUxGdqHuEZckmZOA= github.com/libp2p/go-openssl v0.0.3 h1:wjlG7HvQkt4Fq4cfH33Ivpwp0omaElYEi9z26qaIkIk= @@ -84,13 +76,9 @@ github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKU github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= -github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= -github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= -github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s= -github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE= github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.1.2 h1:HWYHNSyyllbQopmVIF5K7JKJugiah+L9/kuZKHbmNdQ= @@ -112,8 +100,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= -github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= -github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -131,8 +117,6 @@ go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= -golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= @@ -149,6 +133,7 @@ golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTd golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= From b1719703a652fdb0da0b722f9cbc5824bb27eb24 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 11 Dec 2019 14:50:07 +0100 Subject: [PATCH 2/3] fix: simplify trim logic Pro: It's a lot simpler. Con: We lose the ordering guarantee. Before, TrimOpenConns would only return once a trim that started _after_ calling TrimOpenConns finished. Now, it returns as soon as it observes _any_ trim finishing (even if the trim started earlier). --- connmgr.go | 66 +++++++++++++++++++++++++------------------------ connmgr_test.go | 14 +++++------ 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/connmgr.go b/connmgr.go index fb94fd8..3de5789 100644 --- a/connmgr.go +++ b/connmgr.go @@ -36,10 +36,11 @@ type BasicConnMgr struct { plk sync.RWMutex protected map[peer.ID]map[string]struct{} - trimMu sync.RWMutex - trimTrigger chan struct{} - trimSignal chan struct{} - lastTrim time.Time + trimTrigger chan chan<- struct{} + + lastTrimMu sync.RWMutex + lastTrim time.Time + silencePeriod time.Duration ctx context.Context @@ -97,8 +98,7 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { highWater: hi, lowWater: low, gracePeriod: grace, - trimTrigger: make(chan struct{}), - trimSignal: make(chan struct{}), + trimTrigger: make(chan chan<- struct{}), protected: make(map[peer.ID]map[string]struct{}, 16), silencePeriod: SilencePeriod, ctx: ctx, @@ -169,16 +169,10 @@ type peerInfo struct { // TODO: error return value so we can cleanly signal we are aborting because: // (a) there's another trim in progress, or (b) the silence period is in effect. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { - cm.trimMu.RLock() - trimSignal := cm.trimSignal - cm.trimMu.RUnlock() - // Trigger a trim. + ch := make(chan struct{}) select { - case cm.trimTrigger <- struct{}{}: - case <-trimSignal: - // Someone else just trimmed. - return + case cm.trimTrigger <- ch: case <-cm.ctx.Done(): case <-ctx.Done(): // TODO: return an error? @@ -186,7 +180,7 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { // Wait for the trim. select { - case <-trimSignal: + case <-ch: case <-cm.ctx.Done(): case <-ctx.Done(): // TODO: return an error? @@ -198,35 +192,43 @@ func (cm *BasicConnMgr) background() { defer ticker.Stop() for { + var waiting chan<- struct{} select { case <-ticker.C: if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) { // Below high water, skip. continue } - case <-cm.trimTrigger: + case waiting = <-cm.trimTrigger: case <-cm.ctx.Done(): return } cm.trim() + + // Notify anyone waiting on this trim. + if waiting != nil { + close(waiting) + } + + for { + select { + case waiting = <-cm.trimTrigger: + if waiting != nil { + close(waiting) + } + continue + default: + } + break + } } } func (cm *BasicConnMgr) trim() { - cm.trimMu.Lock() - + cm.lastTrimMu.RLock() // read the last trim time under the lock lastTrim := cm.lastTrim - - // swap out the trim signal - trimSignal := cm.trimSignal - cm.trimSignal = make(chan struct{}) - - cm.trimMu.Unlock() - - // always signal a trim, even if we _don't_ trim, to unblock anyone - // waiting. - defer close(trimSignal) + cm.lastTrimMu.RUnlock() // skip this attempt to trim if the last one just took place. if time.Since(lastTrim) < cm.silencePeriod { @@ -242,9 +244,9 @@ func (cm *BasicConnMgr) trim() { } // finally, update the last trim time. - cm.trimMu.Lock() + cm.lastTrimMu.Lock() cm.lastTrim = time.Now() - cm.trimMu.Unlock() + cm.lastTrimMu.Unlock() } // getConnsToClose runs the heuristics described in TrimOpenConns and returns the @@ -428,9 +430,9 @@ type CMInfo struct { // GetInfo returns the configuration and status data for this connection manager. func (cm *BasicConnMgr) GetInfo() CMInfo { - cm.trimMu.RLock() + cm.lastTrimMu.RLock() lastTrim := cm.lastTrim - cm.trimMu.RUnlock() + cm.lastTrimMu.RUnlock() return CMInfo{ HighWater: cm.highWater, diff --git a/connmgr_test.go b/connmgr_test.go index 705fa77..8e1b4b1 100644 --- a/connmgr_test.go +++ b/connmgr_test.go @@ -52,7 +52,7 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw func TestTrimBlocks(t *testing.T) { cm := NewConnManager(200, 300, 0) - cm.trimMu.RLock() + cm.lastTrimMu.RLock() doneCh := make(chan struct{}, 2) go func() { @@ -66,10 +66,10 @@ func TestTrimBlocks(t *testing.T) { time.Sleep(time.Millisecond) select { case <-doneCh: - cm.trimMu.RUnlock() + cm.lastTrimMu.RUnlock() t.Fatal("expected trim to block") default: - cm.trimMu.RUnlock() + cm.lastTrimMu.RUnlock() } <-doneCh <-doneCh @@ -80,8 +80,8 @@ func TestTrimCancels(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cm := NewConnManager(200, 300, 0) - cm.trimMu.RLock() - defer cm.trimMu.RUnlock() + cm.lastTrimMu.RLock() + defer cm.lastTrimMu.RUnlock() doneCh := make(chan struct{}) go func() { @@ -103,7 +103,7 @@ func TestTrimClosed(t *testing.T) { // Make sure joining an existing trim works. func TestTrimJoin(t *testing.T) { cm := NewConnManager(200, 300, 0) - cm.trimMu.RLock() + cm.lastTrimMu.RLock() var wg sync.WaitGroup wg.Add(3) go func() { @@ -120,7 +120,7 @@ func TestTrimJoin(t *testing.T) { cm.TrimOpenConns(context.Background()) }() time.Sleep(time.Millisecond) - cm.trimMu.RUnlock() + cm.lastTrimMu.RUnlock() wg.Wait() } From 18a536ee7e315234137ab6d4efe9f41740adf0e8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 11 Dec 2019 15:26:38 +0100 Subject: [PATCH 3/3] docs: document TrimOpenConns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Raúl Kripalani --- connmgr.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connmgr.go b/connmgr.go index 3de5789..b385533 100644 --- a/connmgr.go +++ b/connmgr.go @@ -166,9 +166,13 @@ type peerInfo struct { // pruning those peers with the lowest scores first, as long as they are not within their // grace period. // -// TODO: error return value so we can cleanly signal we are aborting because: -// (a) there's another trim in progress, or (b) the silence period is in effect. +// This function blocks until a trim is completed. If a trim is underway, a new +// one won't be started, and instead it'll wait until that one is completed before +// returning. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { + // TODO: error return value so we can cleanly signal we are aborting because: + // (a) there's another trim in progress, or (b) the silence period is in effect. + // Trigger a trim. ch := make(chan struct{}) select {