package http2
import (
)
const priorityDefaultWeight = 15
type PriorityWriteSchedulerConfig struct {
MaxClosedNodesInTree int
MaxIdleNodesInTree int
ThrottleOutOfOrderWrites bool
}
func ( *PriorityWriteSchedulerConfig) WriteScheduler {
if == nil {
= &PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 10,
MaxIdleNodesInTree: 10,
ThrottleOutOfOrderWrites: false,
}
}
:= &priorityWriteScheduler{
nodes: make(map[uint32]*priorityNode),
maxClosedNodesInTree: .MaxClosedNodesInTree,
maxIdleNodesInTree: .MaxIdleNodesInTree,
enableWriteThrottle: .ThrottleOutOfOrderWrites,
}
.nodes[0] = &.root
if .ThrottleOutOfOrderWrites {
.writeThrottleLimit = 1024
} else {
.writeThrottleLimit = math.MaxInt32
}
return
}
type priorityNodeState int
const (
priorityNodeOpen priorityNodeState = iota
priorityNodeClosed
priorityNodeIdle
)
type priorityNode struct {
q writeQueue
id uint32
weight uint8
state priorityNodeState
bytes int64
subtreeBytes int64
parent *priorityNode
kids *priorityNode
prev, next *priorityNode
}
func ( *priorityNode) ( *priorityNode) {
if == {
panic("setParent to self")
}
if .parent == {
return
}
if := .parent; != nil {
if .prev == nil {
.kids = .next
} else {
.prev.next = .next
}
if .next != nil {
.next.prev = .prev
}
}
.parent =
if == nil {
.next = nil
.prev = nil
} else {
.next = .kids
.prev = nil
if .next != nil {
.next.prev =
}
.kids =
}
}
func ( *priorityNode) ( int64) {
.bytes +=
for ; != nil; = .parent {
.subtreeBytes +=
}
}
func ( *priorityNode) ( bool, *[]*priorityNode, func(*priorityNode, bool) bool) bool {
if !.q.empty() && (, ) {
return true
}
if .kids == nil {
return false
}
if .id != 0 {
= || (.state == priorityNodeOpen)
}
:= .kids.weight
:= false
for := .kids.next; != nil; = .next {
if .weight != {
= true
break
}
}
if ! {
for := .kids; != nil; = .next {
if .(, , ) {
return true
}
}
return false
}
* = (*)[:0]
for .kids != nil {
* = append(*, .kids)
.kids.setParent(nil)
}
sort.Sort(sortPriorityNodeSiblings(*))
for := len(*) - 1; >= 0; -- {
(*)[].setParent()
}
for := .kids; != nil; = .next {
if .(, , ) {
return true
}
}
return false
}
type sortPriorityNodeSiblings []*priorityNode
func ( sortPriorityNodeSiblings) () int { return len() }
func ( sortPriorityNodeSiblings) (, int) { [], [] = [], [] }
func ( sortPriorityNodeSiblings) (, int) bool {
, := float64([].weight+1), float64([].subtreeBytes)
, := float64([].weight+1), float64([].subtreeBytes)
if == 0 && == 0 {
return >=
}
if == 0 {
return false
}
return / <= /
}
type priorityWriteScheduler struct {
root priorityNode
nodes map[uint32]*priorityNode
maxID uint32
closedNodes, idleNodes []*priorityNode
maxClosedNodesInTree int
maxIdleNodesInTree int
writeThrottleLimit int32
enableWriteThrottle bool
tmp []*priorityNode
queuePool writeQueuePool
}
func ( *priorityWriteScheduler) ( uint32, OpenStreamOptions) {
if := .nodes[]; != nil {
if .state != priorityNodeIdle {
panic(fmt.Sprintf("stream %d already opened", ))
}
.state = priorityNodeOpen
return
}
:= .nodes[.PusherID]
if == nil {
= &.root
}
:= &priorityNode{
q: *.queuePool.get(),
id: ,
weight: priorityDefaultWeight,
state: priorityNodeOpen,
}
.setParent()
.nodes[] =
if > .maxID {
.maxID =
}
}
func ( *priorityWriteScheduler) ( uint32) {
if == 0 {
panic("violation of WriteScheduler interface: cannot close stream 0")
}
if .nodes[] == nil {
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", ))
}
if .nodes[].state != priorityNodeOpen {
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", ))
}
:= .nodes[]
.state = priorityNodeClosed
.addBytes(-.bytes)
:= .q
.queuePool.put(&)
.q.s = nil
if .maxClosedNodesInTree > 0 {
.addClosedOrIdleNode(&.closedNodes, .maxClosedNodesInTree, )
} else {
.removeNode()
}
}
func ( *priorityWriteScheduler) ( uint32, PriorityParam) {
if == 0 {
panic("adjustPriority on root")
}
:= .nodes[]
if == nil {
if <= .maxID || .maxIdleNodesInTree == 0 {
return
}
.maxID =
= &priorityNode{
q: *.queuePool.get(),
id: ,
weight: priorityDefaultWeight,
state: priorityNodeIdle,
}
.setParent(&.root)
.nodes[] =
.addClosedOrIdleNode(&.idleNodes, .maxIdleNodesInTree, )
}
:= .nodes[.StreamDep]
if == nil {
.setParent(&.root)
.weight = priorityDefaultWeight
return
}
if == {
return
}
for := .parent; != nil; = .parent {
if == {
.setParent(.parent)
break
}
}
if .Exclusive {
:= .kids
for != nil {
:= .next
if != {
.setParent()
}
=
}
}
.setParent()
.weight = .Weight
}
func ( *priorityWriteScheduler) ( FrameWriteRequest) {
var *priorityNode
if .isControl() {
= &.root
} else {
:= .StreamID()
= .nodes[]
if == nil {
if .DataSize() > 0 {
panic("add DATA on non-open stream")
}
= &.root
}
}
.q.push()
}
func ( *priorityWriteScheduler) () ( FrameWriteRequest, bool) {
.root.walkReadyInOrder(false, &.tmp, func( *priorityNode, bool) bool {
:= int32(math.MaxInt32)
if {
= .writeThrottleLimit
}
, = .q.consume()
if ! {
return false
}
.addBytes(int64(.DataSize()))
if {
.writeThrottleLimit += 1024
if .writeThrottleLimit < 0 {
.writeThrottleLimit = math.MaxInt32
}
} else if .enableWriteThrottle {
.writeThrottleLimit = 1024
}
return true
})
return ,
}
func ( *priorityWriteScheduler) ( *[]*priorityNode, int, *priorityNode) {
if == 0 {
return
}
if len(*) == {
.removeNode((*)[0])
:= (*)[1:]
copy(*, )
* = (*)[:len()]
}
* = append(*, )
}
func ( *priorityWriteScheduler) ( *priorityNode) {
for := .kids; != nil; = .next {
.setParent(.parent)
}
.setParent(nil)
delete(.nodes, .id)
}