package pool
import (
)
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
type BadConnError struct {
wrapped error
}
var _ error = (*BadConnError)(nil)
func ( BadConnError) () string {
:= "pg: Conn is in a bad state"
if .wrapped != nil {
+= ": " + .wrapped.Error()
}
return
}
func ( BadConnError) () error {
return .wrapped
}
type StickyConnPool struct {
pool Pooler
shared int32
state uint32
ch chan *Conn
_badConnError atomic.Value
}
var _ Pooler = (*StickyConnPool)(nil)
func ( Pooler) *StickyConnPool {
, := .(*StickyConnPool)
if ! {
= &StickyConnPool{
pool: ,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&.shared, 1)
return
}
func ( *StickyConnPool) ( context.Context) (*Conn, error) {
return .pool.NewConn()
}
func ( *StickyConnPool) ( *Conn) error {
return .pool.CloseConn()
}
func ( *StickyConnPool) ( context.Context) (*Conn, error) {
for := 0; < 1000; ++ {
switch atomic.LoadUint32(&.state) {
case stateDefault:
, := .pool.Get()
if != nil {
return nil,
}
if atomic.CompareAndSwapUint32(&.state, stateDefault, stateInited) {
return , nil
}
.pool.Remove(, , ErrClosed)
case stateInited:
if := .badConnError(); != nil {
return nil,
}
, := <-.ch
if ! {
return nil, ErrClosed
}
return , nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return nil, fmt.Errorf("pg: StickyConnPool.Get: infinite loop")
}
func ( *StickyConnPool) ( context.Context, *Conn) {
defer func() {
if recover() != nil {
.freeConn(, )
}
}()
.ch <-
}
func ( *StickyConnPool) ( context.Context, *Conn) {
if := .badConnError(); != nil {
.pool.Remove(, , )
} else {
.pool.Put(, )
}
}
func ( *StickyConnPool) ( context.Context, *Conn, error) {
defer func() {
if recover() != nil {
.pool.Remove(, , ErrClosed)
}
}()
._badConnError.Store(BadConnError{wrapped: })
.ch <-
}
func ( *StickyConnPool) () error {
if := atomic.AddInt32(&.shared, -1); > 0 {
return nil
}
for := 0; < 1000; ++ {
:= atomic.LoadUint32(&.state)
if == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&.state, , stateClosed) {
close(.ch)
, := <-.ch
if {
.freeConn(context.TODO(), )
}
return nil
}
}
return errors.New("pg: StickyConnPool.Close: infinite loop")
}
func ( *StickyConnPool) ( context.Context) error {
if .badConnError() == nil {
return nil
}
select {
case , := <-.ch:
if ! {
return ErrClosed
}
.pool.Remove(, , ErrClosed)
._badConnError.Store(BadConnError{wrapped: nil})
default:
return errors.New("pg: StickyConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&.state, stateInited, stateDefault) {
:= atomic.LoadUint32(&.state)
return fmt.Errorf("pg: invalid StickyConnPool state: %d", )
}
return nil
}
func ( *StickyConnPool) () error {
if := ._badConnError.Load(); != nil {
:= .(BadConnError)
if .wrapped != nil {
return
}
}
return nil
}
func ( *StickyConnPool) () int {
switch atomic.LoadUint32(&.state) {
case stateDefault:
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
}
}
func ( *StickyConnPool) () int {
return len(.ch)
}
func ( *StickyConnPool) () *Stats {
return &Stats{}
}