package puddle
import (
)
const (
resourceStatusConstructing = 0
resourceStatusIdle = iota
resourceStatusAcquired = iota
resourceStatusHijacked = iota
)
var ErrClosedPool = errors.New("closed pool")
var ErrNotAvailable = errors.New("resource not available")
type Constructor[ any] func(ctx context.Context) (res , err error)
type Destructor[ any] func(res )
type Resource[ any] struct {
value
pool *Pool[]
creationTime time.Time
lastUsedNano int64
poolResetCount int
status byte
}
func ( *Resource[]) () {
if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return .value
}
func ( *Resource[]) () {
if .status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
.pool.releaseAcquiredResource(, nanotime())
}
func ( *Resource[]) () {
if .status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
.pool.releaseAcquiredResource(, .lastUsedNano)
}
func ( *Resource[]) () {
if .status != resourceStatusAcquired {
panic("tried to destroy resource that is not acquired")
}
go .pool.destroyAcquiredResource()
}
func ( *Resource[]) () {
if .status != resourceStatusAcquired {
panic("tried to hijack resource that is not acquired")
}
.pool.hijackAcquiredResource()
}
func ( *Resource[]) () time.Time {
if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return .creationTime
}
func ( *Resource[]) () int64 {
if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return .lastUsedNano
}
func ( *Resource[]) () time.Duration {
if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return time.Duration(nanotime() - .lastUsedNano)
}
type Pool[ any] struct {
mux sync.Mutex
acquireSem *semaphore.Weighted
destructWG sync.WaitGroup
allResources resList[]
idleResources *genstack.GenStack[*Resource[]]
constructor Constructor[]
destructor Destructor[]
maxSize int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
emptyAcquireWaitTime time.Duration
canceledAcquireCount atomic.Int64
resetCount int
baseAcquireCtx context.Context
cancelBaseAcquireCtx context.CancelFunc
closed bool
}
type Config[ any] struct {
Constructor Constructor[]
Destructor Destructor[]
MaxSize int32
}
func [ any]( *Config[]) (*Pool[], error) {
if .MaxSize < 1 {
return nil, errors.New("MaxSize must be >= 1")
}
, := context.WithCancel(context.Background())
return &Pool[]{
acquireSem: semaphore.NewWeighted(int64(.MaxSize)),
idleResources: genstack.NewGenStack[*Resource[]](),
maxSize: .MaxSize,
constructor: .Constructor,
destructor: .Destructor,
baseAcquireCtx: ,
cancelBaseAcquireCtx: ,
}, nil
}
func ( *Pool[]) () {
defer .destructWG.Wait()
.mux.Lock()
defer .mux.Unlock()
if .closed {
return
}
.closed = true
.cancelBaseAcquireCtx()
for , := .idleResources.Pop(); ; , = .idleResources.Pop() {
.allResources.remove()
go .destructResourceValue(.value)
}
}
type Stat struct {
constructingResources int32
acquiredResources int32
idleResources int32
maxResources int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
emptyAcquireWaitTime time.Duration
canceledAcquireCount int64
}
func ( *Stat) () int32 {
return .constructingResources + .acquiredResources + .idleResources
}
func ( *Stat) () int32 {
return .constructingResources
}
func ( *Stat) () int32 {
return .acquiredResources
}
func ( *Stat) () int32 {
return .idleResources
}
func ( *Stat) () int32 {
return .maxResources
}
func ( *Stat) () int64 {
return .acquireCount
}
func ( *Stat) () time.Duration {
return .acquireDuration
}
func ( *Stat) () int64 {
return .emptyAcquireCount
}
func ( *Stat) () time.Duration {
return .emptyAcquireWaitTime
}
func ( *Stat) () int64 {
return .canceledAcquireCount
}
func ( *Pool[]) () *Stat {
.mux.Lock()
defer .mux.Unlock()
:= &Stat{
maxResources: .maxSize,
acquireCount: .acquireCount,
emptyAcquireCount: .emptyAcquireCount,
emptyAcquireWaitTime: .emptyAcquireWaitTime,
canceledAcquireCount: .canceledAcquireCount.Load(),
acquireDuration: .acquireDuration,
}
for , := range .allResources {
switch .status {
case resourceStatusConstructing:
.constructingResources += 1
case resourceStatusIdle:
.idleResources += 1
case resourceStatusAcquired:
.acquiredResources += 1
}
}
return
}
func ( *Pool[]) () *Resource[] {
, := .idleResources.Pop()
if ! {
return nil
}
.status = resourceStatusAcquired
return
}
func ( *Pool[]) () *Resource[] {
:= &Resource[]{
pool: ,
creationTime: time.Now(),
lastUsedNano: nanotime(),
poolResetCount: .resetCount,
status: resourceStatusConstructing,
}
.allResources.append()
.destructWG.Add(1)
return
}
func ( *Pool[]) ( context.Context) ( *Resource[], error) {
select {
case <-.Done():
.canceledAcquireCount.Add(1)
return nil, .Err()
default:
}
return .acquire()
}
func ( *Pool[]) ( context.Context) (*Resource[], error) {
:= nanotime()
var bool
if !.acquireSem.TryAcquire(1) {
= true
:= .acquireSem.Acquire(, 1)
if != nil {
.canceledAcquireCount.Add(1)
return nil,
}
}
.mux.Lock()
if .closed {
.acquireSem.Release(1)
.mux.Unlock()
return nil, ErrClosedPool
}
if := .tryAcquireIdleResource(); != nil {
:= time.Duration(nanotime() - )
if {
.emptyAcquireCount += 1
.emptyAcquireWaitTime +=
}
.acquireCount += 1
.acquireDuration +=
.mux.Unlock()
return , nil
}
if len(.allResources) >= int(.maxSize) {
panic("bug: semaphore allowed more acquires than pool allows")
}
:= .createNewResource()
.mux.Unlock()
, := .initResourceValue(, )
if != nil {
return nil,
}
.mux.Lock()
defer .mux.Unlock()
.emptyAcquireCount += 1
.acquireCount += 1
:= time.Duration(nanotime() - )
.acquireDuration +=
.emptyAcquireWaitTime +=
return , nil
}
func ( *Pool[]) ( context.Context, *Resource[]) (*Resource[], error) {
:= make(chan error)
go func() {
:= newValueCancelCtx(, .baseAcquireCtx)
, := .constructor()
if != nil {
.mux.Lock()
.allResources.remove()
.destructWG.Done()
.acquireSem.Release(1)
.mux.Unlock()
select {
case <- :
case <-.Done():
}
return
}
.mux.Lock()
.value =
.status = resourceStatusAcquired
.mux.Unlock()
select {
case <- nil:
case <-.Done():
.releaseAcquiredResource(, .lastUsedNano)
}
}()
select {
case <-.Done():
.canceledAcquireCount.Add(1)
return nil, .Err()
case := <-:
if != nil {
return nil,
}
return , nil
}
}
func ( *Pool[]) ( context.Context) (*Resource[], error) {
if !.acquireSem.TryAcquire(1) {
return nil, ErrNotAvailable
}
.mux.Lock()
defer .mux.Unlock()
if .closed {
.acquireSem.Release(1)
return nil, ErrClosedPool
}
if := .tryAcquireIdleResource(); != nil {
.acquireCount += 1
return , nil
}
if len(.allResources) >= int(.maxSize) {
panic("bug: semaphore allowed more acquires than pool allows")
}
:= .createNewResource()
go func() {
, := .constructor()
.mux.Lock()
defer .mux.Unlock()
defer .acquireSem.Release(1)
if != nil {
.allResources.remove()
.destructWG.Done()
return
}
.value =
.status = resourceStatusIdle
.idleResources.Push()
}()
return nil, ErrNotAvailable
}
func ( *semaphore.Weighted, int) int {
if .TryAcquire(int64()) {
return
}
var int
for := int(log2Int()); >= 0; -- {
:= 1 <<
if .TryAcquire(int64()) {
+=
}
}
return
}
func ( *Pool[]) () []*Resource[] {
.mux.Lock()
defer .mux.Unlock()
if .closed {
return nil
}
:= .idleResources.Len()
if == 0 {
return nil
}
:= acquireSemAll(.acquireSem, )
:= make([]*Resource[], )
for := range {
, := .idleResources.Pop()
.status = resourceStatusAcquired
[] =
}
.idleResources.NextGen()
return
}
func ( *Pool[]) ( context.Context) error {
if !.acquireSem.TryAcquire(1) {
return ErrNotAvailable
}
.mux.Lock()
if .closed {
.acquireSem.Release(1)
.mux.Unlock()
return ErrClosedPool
}
if len(.allResources) >= int(.maxSize) {
.acquireSem.Release(1)
.mux.Unlock()
return ErrNotAvailable
}
:= .createNewResource()
.mux.Unlock()
, := .constructor()
.mux.Lock()
defer .mux.Unlock()
defer .acquireSem.Release(1)
if != nil {
.allResources.remove()
.destructWG.Done()
return
}
.value =
.status = resourceStatusIdle
if .closed {
go .destructResourceValue(.value)
return ErrClosedPool
}
.idleResources.Push()
return nil
}
func ( *Pool[]) () {
.mux.Lock()
defer .mux.Unlock()
.resetCount++
for , := .idleResources.Pop(); ; , = .idleResources.Pop() {
.allResources.remove()
go .destructResourceValue(.value)
}
}
func ( *Pool[]) ( *Resource[], int64) {
.mux.Lock()
defer .mux.Unlock()
defer .acquireSem.Release(1)
if .closed || .poolResetCount != .resetCount {
.allResources.remove()
go .destructResourceValue(.value)
} else {
.lastUsedNano =
.status = resourceStatusIdle
.idleResources.Push()
}
}
func ( *Pool[]) ( *Resource[]) {
.destructResourceValue(.value)
.mux.Lock()
defer .mux.Unlock()
defer .acquireSem.Release(1)
.allResources.remove()
}
func ( *Pool[]) ( *Resource[]) {
.mux.Lock()
defer .mux.Unlock()
defer .acquireSem.Release(1)
.allResources.remove()
.status = resourceStatusHijacked
.destructWG.Done()
}
func ( *Pool[]) ( ) {
.destructor()
.destructWG.Done()
}