package pool
import (
)
var (
ErrClosed = errors.New("pg: database is closed")
ErrPoolTimeout = errors.New("pg: connection pool timeout")
)
var timers = sync.Pool{
New: func() interface{} {
:= time.NewTimer(time.Hour)
.Stop()
return
},
}
type Stats struct {
Hits uint32
Misses uint32
Timeouts uint32
TotalConns uint32
IdleConns uint32
StaleConns uint32
}
type Pooler interface {
NewConn(context.Context) (*Conn, error)
CloseConn(*Conn) error
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
Len() int
IdleLen() int
Stats() *Stats
Close() error
}
type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
type ConnPool struct {
opt *Options
dialErrorsNum uint32
_closed uint32
lastDialErrorMu sync.RWMutex
lastDialError error
queue chan struct{}
stats Stats
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
}
var _ Pooler = (*ConnPool)(nil)
func ( *Options) *ConnPool {
:= &ConnPool{
opt: ,
queue: make(chan struct{}, .PoolSize),
conns: make([]*Conn, 0, .PoolSize),
idleConns: make([]*Conn, 0, .PoolSize),
}
.connsMu.Lock()
.checkMinIdleConns()
.connsMu.Unlock()
if .IdleTimeout > 0 && .IdleCheckFrequency > 0 {
go .reaper(.IdleCheckFrequency)
}
return
}
func ( *ConnPool) () {
if .opt.MinIdleConns == 0 {
return
}
for .poolSize < .opt.PoolSize && .idleConnsLen < .opt.MinIdleConns {
.poolSize++
.idleConnsLen++
go func() {
:= .addIdleConn()
if != nil {
.connsMu.Lock()
.poolSize--
.idleConnsLen--
.connsMu.Unlock()
}
}()
}
}
func ( *ConnPool) () error {
, := .dialConn(context.TODO(), true)
if != nil {
return
}
.connsMu.Lock()
.conns = append(.conns, )
.idleConns = append(.idleConns, )
.connsMu.Unlock()
return nil
}
func ( *ConnPool) ( context.Context) (*Conn, error) {
return .newConn(, false)
}
func ( *ConnPool) ( context.Context, bool) (*Conn, error) {
, := .dialConn(, )
if != nil {
return nil,
}
.connsMu.Lock()
.conns = append(.conns, )
if {
if .poolSize >= .opt.PoolSize {
.pooled = false
} else {
.poolSize++
}
}
.connsMu.Unlock()
return , nil
}
func ( *ConnPool) ( context.Context, bool) (*Conn, error) {
if .closed() {
return nil, ErrClosed
}
if atomic.LoadUint32(&.dialErrorsNum) >= uint32(.opt.PoolSize) {
return nil, .getLastDialError()
}
, := .opt.Dialer()
if != nil {
.setLastDialError()
if atomic.AddUint32(&.dialErrorsNum, 1) == uint32(.opt.PoolSize) {
go .tryDial()
}
return nil,
}
:= NewConn()
.pooled =
return , nil
}
func ( *ConnPool) () {
for {
if .closed() {
return
}
, := .opt.Dialer(context.TODO())
if != nil {
.setLastDialError()
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&.dialErrorsNum, 0)
_ = .Close()
return
}
}
func ( *ConnPool) ( error) {
.lastDialErrorMu.Lock()
.lastDialError =
.lastDialErrorMu.Unlock()
}
func ( *ConnPool) () error {
.lastDialErrorMu.RLock()
:= .lastDialError
.lastDialErrorMu.RUnlock()
return
}
func ( *ConnPool) ( context.Context) (*Conn, error) {
if .closed() {
return nil, ErrClosed
}
:= .waitTurn()
if != nil {
return nil,
}
for {
.connsMu.Lock()
:= .popIdle()
.connsMu.Unlock()
if == nil {
break
}
if .isStaleConn() {
_ = .CloseConn()
continue
}
atomic.AddUint32(&.stats.Hits, 1)
return , nil
}
atomic.AddUint32(&.stats.Misses, 1)
, := .newConn(, true)
if != nil {
.freeTurn()
return nil,
}
return , nil
}
func ( *ConnPool) () {
.queue <- struct{}{}
}
func ( *ConnPool) ( context.Context) error {
select {
case <-.Done():
return .Err()
default:
}
select {
case .queue <- struct{}{}:
return nil
default:
}
:= timers.Get().(*time.Timer)
.Reset(.opt.PoolTimeout)
select {
case <-.Done():
if !.Stop() {
<-.C
}
timers.Put()
return .Err()
case .queue <- struct{}{}:
if !.Stop() {
<-.C
}
timers.Put()
return nil
case <-.C:
timers.Put()
atomic.AddUint32(&.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
func ( *ConnPool) () {
<-.queue
}
func ( *ConnPool) () *Conn {
if len(.idleConns) == 0 {
return nil
}
:= len(.idleConns) - 1
:= .idleConns[]
.idleConns = .idleConns[:]
.idleConnsLen--
.checkMinIdleConns()
return
}
func ( *ConnPool) ( context.Context, *Conn) {
if !.pooled {
.Remove(, , nil)
return
}
.connsMu.Lock()
.idleConns = append(.idleConns, )
.idleConnsLen++
.connsMu.Unlock()
.freeTurn()
}
func ( *ConnPool) ( context.Context, *Conn, error) {
.removeConnWithLock()
.freeTurn()
_ = .closeConn()
}
func ( *ConnPool) ( *Conn) error {
.removeConnWithLock()
return .closeConn()
}
func ( *ConnPool) ( *Conn) {
.connsMu.Lock()
.removeConn()
.connsMu.Unlock()
}
func ( *ConnPool) ( *Conn) {
for , := range .conns {
if == {
.conns = append(.conns[:], .conns[+1:]...)
if .pooled {
.poolSize--
.checkMinIdleConns()
}
return
}
}
}
func ( *ConnPool) ( *Conn) error {
if .opt.OnClose != nil {
_ = .opt.OnClose()
}
return .Close()
}
func ( *ConnPool) () int {
.connsMu.Lock()
:= len(.conns)
.connsMu.Unlock()
return
}
func ( *ConnPool) () int {
.connsMu.Lock()
:= .idleConnsLen
.connsMu.Unlock()
return
}
func ( *ConnPool) () *Stats {
:= .IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&.stats.Hits),
Misses: atomic.LoadUint32(&.stats.Misses),
Timeouts: atomic.LoadUint32(&.stats.Timeouts),
TotalConns: uint32(.Len()),
IdleConns: uint32(),
StaleConns: atomic.LoadUint32(&.stats.StaleConns),
}
}
func ( *ConnPool) () bool {
return atomic.LoadUint32(&._closed) == 1
}
func ( *ConnPool) ( func(*Conn) bool) error {
var error
.connsMu.Lock()
for , := range .conns {
if () {
if := .closeConn(); != nil && == nil {
=
}
}
}
.connsMu.Unlock()
return
}
func ( *ConnPool) () error {
if !atomic.CompareAndSwapUint32(&._closed, 0, 1) {
return ErrClosed
}
var error
.connsMu.Lock()
for , := range .conns {
if := .closeConn(); != nil && == nil {
=
}
}
.conns = nil
.poolSize = 0
.idleConns = nil
.idleConnsLen = 0
.connsMu.Unlock()
return
}
func ( *ConnPool) ( time.Duration) {
:= time.NewTicker()
defer .Stop()
for range .C {
if .closed() {
break
}
, := .ReapStaleConns()
if != nil {
internal.Logger.Printf(context.TODO(), "ReapStaleConns failed: %s", )
continue
}
atomic.AddUint32(&.stats.StaleConns, uint32())
}
}
func ( *ConnPool) () (int, error) {
var int
for {
.getTurn()
.connsMu.Lock()
:= .reapStaleConn()
.connsMu.Unlock()
.freeTurn()
if != nil {
_ = .closeConn()
++
} else {
break
}
}
return , nil
}
func ( *ConnPool) () *Conn {
if len(.idleConns) == 0 {
return nil
}
:= .idleConns[0]
if !.isStaleConn() {
return nil
}
.idleConns = append(.idleConns[:0], .idleConns[1:]...)
.idleConnsLen--
.removeConn()
return
}
func ( *ConnPool) ( *Conn) bool {
if .opt.IdleTimeout == 0 && .opt.MaxConnAge == 0 {
return false
}
:= time.Now()
if .opt.IdleTimeout > 0 && .Sub(.UsedAt()) >= .opt.IdleTimeout {
return true
}
if .opt.MaxConnAge > 0 && .Sub(.createdAt) >= .opt.MaxConnAge {
return true
}
return false
}