Skip to content

Commit f9f7916

Browse files
committed
webrtc: fix memory leak with udpmux.muxedConnection context (#3243)
This context wasn't being cancelled on all code paths. In particular, contexts for connections that didn't complete negotiation were not being cancelled. The change arranges for either `udpmux.muxedConnection.Close` or `RemoveConnByUfrag` to call the other. Fixes: #3223
1 parent 1b3b1ed commit f9f7916

File tree

3 files changed

+55
-21
lines changed

3 files changed

+55
-21
lines changed

p2p/transport/webrtc/udpmux/mux.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,13 @@ func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {
271271

272272
for _, isIPv6 := range [...]bool{true, false} {
273273
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
274-
if _, ok := mux.ufragMap[key]; ok {
274+
if conn, ok := mux.ufragMap[key]; ok {
275275
delete(mux.ufragMap, key)
276276
for _, addr := range mux.ufragAddrMap[key] {
277277
delete(mux.addrMap, addr.String())
278278
}
279279
delete(mux.ufragAddrMap, key)
280+
conn.close()
280281
}
281282
}
282283
}
@@ -293,7 +294,7 @@ func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr ne
293294
return false, conn
294295
}
295296

296-
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
297+
conn := newMuxedConnection(mux, ufrag)
297298
mux.ufragMap[key] = conn
298299
mux.addrMap[addr.String()] = conn
299300
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)

p2p/transport/webrtc/udpmux/mux_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/pion/stun"
11+
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
)
1314

@@ -246,3 +247,28 @@ func TestMuxedConnection(t *testing.T) {
246247
}
247248
require.Empty(t, addrUfragMap)
248249
}
250+
251+
func TestRemovingUfragClosesConn(t *testing.T) {
252+
c := newPacketConn(t)
253+
m := NewUDPMux(c)
254+
m.Start()
255+
defer m.Close()
256+
remoteAddr := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234}
257+
conn, err := m.GetConn("a", remoteAddr)
258+
require.NoError(t, err)
259+
defer conn.Close()
260+
261+
connClosed := make(chan bool)
262+
go func() {
263+
_, _, err := conn.ReadFrom(make([]byte, 100))
264+
assert.ErrorIs(t, err, context.Canceled)
265+
close(connClosed)
266+
}()
267+
require.NoError(t, err)
268+
m.RemoveConnByUfrag("a")
269+
select {
270+
case <-connClosed:
271+
case <-time.After(1 * time.Second):
272+
t.Fatalf("expected the connection to be closed")
273+
}
274+
}

p2p/transport/webrtc/udpmux/muxed_connection.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,29 @@ const queueLen = 128
2323
// from which this connection (indexed by ufrag) received
2424
// data.
2525
type muxedConnection struct {
26-
ctx context.Context
27-
cancel context.CancelFunc
28-
onClose func()
29-
queue chan packet
30-
mux *UDPMux
26+
ctx context.Context
27+
cancel context.CancelFunc
28+
queue chan packet
29+
mux *UDPMux
30+
ufrag string
3131
}
3232

3333
var _ net.PacketConn = &muxedConnection{}
3434

35-
func newMuxedConnection(mux *UDPMux, onClose func()) *muxedConnection {
35+
func newMuxedConnection(mux *UDPMux, ufrag string) *muxedConnection {
3636
ctx, cancel := context.WithCancel(mux.ctx)
3737
return &muxedConnection{
38-
ctx: ctx,
39-
cancel: cancel,
40-
queue: make(chan packet, queueLen),
41-
onClose: onClose,
42-
mux: mux,
38+
ctx: ctx,
39+
cancel: cancel,
40+
queue: make(chan packet, queueLen),
41+
mux: mux,
42+
ufrag: ufrag,
4343
}
4444
}
4545

4646
func (c *muxedConnection) Push(buf []byte, addr net.Addr) error {
47-
select {
48-
case <-c.ctx.Done():
47+
if c.ctx.Err() != nil {
4948
return errors.New("closed")
50-
default:
5149
}
5250
select {
5351
case c.queue <- packet{buf: buf, addr: addr}:
@@ -76,20 +74,29 @@ func (c *muxedConnection) WriteTo(p []byte, addr net.Addr) (n int, err error) {
7674
}
7775

7876
func (c *muxedConnection) Close() error {
79-
select {
80-
case <-c.ctx.Done():
77+
if c.ctx.Err() != nil {
8178
return nil
82-
default:
8379
}
84-
c.onClose()
80+
// mux calls close to actually close the connection
81+
//
82+
// Removing the connection from the mux or closing the connection
83+
// must trigger the other.
84+
// Doing this here ensures we don't need to call both RemoveConnByUfrag
85+
// and close on all code paths.
86+
c.mux.RemoveConnByUfrag(c.ufrag)
87+
return nil
88+
}
89+
90+
// closes the connection. Must only be called by the mux.
91+
func (c *muxedConnection) close() {
8592
c.cancel()
8693
// drain the packet queue
8794
for {
8895
select {
8996
case p := <-c.queue:
9097
pool.Put(p.buf)
9198
default:
92-
return nil
99+
return
93100
}
94101
}
95102
}

0 commit comments

Comments
 (0)