package bgreader
import (
)
const (
StatusStopped = iota
StatusRunning
StatusStopping
)
type BGReader struct {
r io.Reader
cond *sync.Cond
status int32
readResults []readResult
}
type readResult struct {
buf *[]byte
err error
}
func ( *BGReader) () {
.cond.L.Lock()
defer .cond.L.Unlock()
switch .status {
case StatusStopped:
.status = StatusRunning
go .bgRead()
case StatusRunning:
case StatusStopping:
.status = StatusRunning
}
}
func ( *BGReader) () {
.cond.L.Lock()
defer .cond.L.Unlock()
switch .status {
case StatusStopped:
case StatusRunning:
.status = StatusStopping
case StatusStopping:
}
}
func ( *BGReader) () int32 {
.cond.L.Lock()
defer .cond.L.Unlock()
return .status
}
func ( *BGReader) () {
:= true
for {
:= iobufpool.Get(8192)
, := .r.Read(*)
* = (*)[:]
.cond.L.Lock()
.readResults = append(.readResults, readResult{buf: , err: })
if .status == StatusStopping || != nil {
.status = StatusStopped
= false
}
.cond.L.Unlock()
.cond.Broadcast()
}
}
func ( *BGReader) ( []byte) (int, error) {
.cond.L.Lock()
defer .cond.L.Unlock()
if len(.readResults) > 0 {
return .readFromReadResults()
}
if .status == StatusStopped {
return .r.Read()
}
for len(.readResults) == 0 {
.cond.Wait()
}
return .readFromReadResults()
}
func ( *BGReader) ( []byte) (int, error) {
:= .readResults[0].buf
var error
:= copy(, *)
if == len(*) {
= .readResults[0].err
iobufpool.Put()
if len(.readResults) == 1 {
.readResults = nil
} else {
.readResults = .readResults[1:]
}
} else {
* = (*)[:]
.readResults[0].buf =
}
return ,
}
func ( io.Reader) *BGReader {
return &BGReader{
r: ,
cond: &sync.Cond{
L: &sync.Mutex{},
},
}
}