Skip to content

Optimize payload queue for write path #320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: chunk-tracker
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type Association struct {
myMaxNumInboundStreams uint16
myMaxNumOutboundStreams uint16
myCookie *paramStateCookie
payloadQueue *payloadQueue
payloadQueue *receivedChunkTracker
inflightQueue *payloadQueue
pendingQueue *pendingQueue
controlQueue *controlQueue
Expand Down Expand Up @@ -318,7 +318,7 @@ func createAssociation(config Config) *Association {
myMaxNumOutboundStreams: math.MaxUint16,
myMaxNumInboundStreams: math.MaxUint16,

payloadQueue: newPayloadQueue(),
payloadQueue: newReceivedPacketTracker(),
inflightQueue: newPayloadQueue(),
pendingQueue: newPendingQueue(),
controlQueue: newControlQueue(),
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {
a.name, d.tsn, d.immediateSack, len(d.userData))
a.stats.incDATAs()

canPush := a.payloadQueue.canPush(d, a.peerLastTSN)
canPush := a.payloadQueue.canPush(d.tsn, a.peerLastTSN)
if canPush {
s := a.getOrCreateStream(d.streamIdentifier, true, PayloadTypeUnknown)
if s == nil {
Expand All @@ -1390,14 +1390,14 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {

if a.getMyReceiverWindowCredit() > 0 {
// Pass the new chunk to stream level as soon as it arrives
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn, a.peerLastTSN)
s.handleData(d)
} else {
// Receive buffer is full
lastTSN, ok := a.payloadQueue.getLastTSNReceived()
if ok && sna32LT(d.tsn, lastTSN) {
a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn, a.peerLastTSN)
s.handleData(d)
} else {
a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
Expand All @@ -1421,7 +1421,7 @@ func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool)
// Meaning, if peerLastTSN+1 points to a chunk that is received,
// advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
for {
if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
if popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
break
}
a.peerLastTSN++
Expand Down Expand Up @@ -2184,7 +2184,7 @@ func (a *Association) movePendingDataChunkToInflightQueue(c *chunkPayloadData) {
a.log.Tracef("[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)",
a.name, c.payloadType, c.tsn, c.streamSequenceNumber, c.nSent, len(c.userData), c.beginningFragment, c.endingFragment)

a.inflightQueue.pushNoCheck(c)
a.inflightQueue.push(c)
}

// popPendingDataChunksToSend pops chunks from the pending queues as many as
Expand Down
26 changes: 6 additions & 20 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ func TestCreateForwardTSN(t *testing.T) {

a.cumulativeTSNAckPoint = 9
a.advancedPeerTSNAckPoint = 10
a.inflightQueue.pushNoCheck(&chunkPayloadData{
a.inflightQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: 10,
Expand All @@ -1218,7 +1218,7 @@ func TestCreateForwardTSN(t *testing.T) {

a.cumulativeTSNAckPoint = 9
a.advancedPeerTSNAckPoint = 12
a.inflightQueue.pushNoCheck(&chunkPayloadData{
a.inflightQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: 10,
Expand All @@ -1228,7 +1228,7 @@ func TestCreateForwardTSN(t *testing.T) {
nSent: 1,
_abandoned: true,
})
a.inflightQueue.pushNoCheck(&chunkPayloadData{
a.inflightQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: 11,
Expand All @@ -1238,7 +1238,7 @@ func TestCreateForwardTSN(t *testing.T) {
nSent: 1,
_abandoned: true,
})
a.inflightQueue.pushNoCheck(&chunkPayloadData{
a.inflightQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: 12,
Expand Down Expand Up @@ -1310,14 +1310,7 @@ func TestHandleForwardTSN(t *testing.T) {
prevTSN := a.peerLastTSN

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 2,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN+2, a.peerLastTSN)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
Expand Down Expand Up @@ -1347,14 +1340,7 @@ func TestHandleForwardTSN(t *testing.T) {
prevTSN := a.peerLastTSN

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 3,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN+3, a.peerLastTSN)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
Expand Down
140 changes: 20 additions & 120 deletions payload_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,45 @@

package sctp

import (
"fmt"
"sort"
)

type payloadQueue struct {
chunkMap map[uint32]*chunkPayloadData
sorted []uint32
dupTSN []uint32
tsns []uint32
nBytes int
}

func newPayloadQueue() *payloadQueue {
return &payloadQueue{chunkMap: map[uint32]*chunkPayloadData{}}
}

func (q *payloadQueue) updateSortedKeys() {
if q.sorted != nil {
func (q *payloadQueue) push(p *chunkPayloadData) {
if _, ok := q.chunkMap[p.tsn]; ok {
return
}

q.sorted = make([]uint32, len(q.chunkMap))
i := 0
for k := range q.chunkMap {
q.sorted[i] = k
i++
}

sort.Slice(q.sorted, func(i, j int) bool {
return sna32LT(q.sorted[i], q.sorted[j])
})
}

func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
return false
}
return true
}

func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
q.chunkMap[p.tsn] = p
q.nBytes += len(p.userData)
q.sorted = nil
}

// push pushes a payload data. If the payload data is already in our queue or
// older than our cumulativeTSN marker, it will be recored as duplications,
// which can later be retrieved using popDuplicates.
func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {
_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
// Found the packet, log in dups
q.dupTSN = append(q.dupTSN, p.tsn)
return false
var pos int
for pos = len(q.tsns) - 1; pos >= 0; pos-- {
if q.tsns[pos] < p.tsn {
break
}
}

q.chunkMap[p.tsn] = p
q.nBytes += len(p.userData)
q.sorted = nil
return true
pos++
q.tsns = append(q.tsns, 0)
copy(q.tsns[pos+1:], q.tsns[pos:])
q.tsns[pos] = p.tsn
}

// pop pops only if the oldest chunk's TSN matches the given TSN.
func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {
q.updateSortedKeys()

if len(q.chunkMap) > 0 && tsn == q.sorted[0] {
q.sorted = q.sorted[1:]
if c, ok := q.chunkMap[tsn]; ok {
delete(q.chunkMap, tsn)
q.nBytes -= len(c.userData)
return c, true
}
if len(q.tsns) == 0 || q.tsns[0] != tsn {
return nil, false
}
q.tsns = q.tsns[1:]
if c, ok := q.chunkMap[tsn]; ok {
delete(q.chunkMap, tsn)
q.nBytes -= len(c.userData)
return c, true
}

return nil, false
}

Expand All @@ -89,58 +51,6 @@ func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) {
return c, ok
}

// popDuplicates returns an array of TSN values that were found duplicate.
func (q *payloadQueue) popDuplicates() []uint32 {
dups := q.dupTSN
q.dupTSN = []uint32{}
return dups
}

func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) {
var b gapAckBlock

if len(q.chunkMap) == 0 {
return []gapAckBlock{}
}

q.updateSortedKeys()

for i, tsn := range q.sorted {
if i == 0 {
b.start = uint16(tsn - cumulativeTSN)
b.end = b.start
continue
}
diff := uint16(tsn - cumulativeTSN)
if b.end+1 == diff {
b.end++
} else {
gapAckBlocks = append(gapAckBlocks, gapAckBlock{
start: b.start,
end: b.end,
})
b.start = diff
b.end = diff
}
}

gapAckBlocks = append(gapAckBlocks, gapAckBlock{
start: b.start,
end: b.end,
})

return gapAckBlocks
}

func (q *payloadQueue) getGapAckBlocksString(cumulativeTSN uint32) string {
gapAckBlocks := q.getGapAckBlocks(cumulativeTSN)
str := fmt.Sprintf("cumTSN=%d", cumulativeTSN)
for _, b := range gapAckBlocks {
str += fmt.Sprintf(",%d-%d", b.start, b.end)
}
return str
}

func (q *payloadQueue) markAsAcked(tsn uint32) int {
var nBytesAcked int
if c, ok := q.chunkMap[tsn]; ok {
Expand All @@ -154,16 +64,6 @@ func (q *payloadQueue) markAsAcked(tsn uint32) int {
return nBytesAcked
}

func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
q.updateSortedKeys()

qlen := len(q.sorted)
if qlen == 0 {
return 0, false
}
return q.sorted[qlen-1], true
}

func (q *payloadQueue) markAllToRetrasmit() {
for _, c := range q.chunkMap {
if c.acked || c.abandoned() {
Expand Down
Loading