package pgxpool
import (
)
var (
defaultMaxConns = int32(4)
defaultMinConns = int32(0)
defaultMinIdleConns = int32(0)
defaultMaxConnLifetime = time.Hour
defaultMaxConnIdleTime = time.Minute * 30
defaultHealthCheckPeriod = time.Minute
)
type connResource struct {
conn *pgx.Conn
conns []Conn
poolRows []poolRow
poolRowss []poolRows
maxAgeTime time.Time
}
func ( *connResource) ( *Pool, *puddle.Resource[*connResource]) *Conn {
if len(.conns) == 0 {
.conns = make([]Conn, 128)
}
:= &.conns[len(.conns)-1]
.conns = .conns[0 : len(.conns)-1]
.res =
.p =
return
}
func ( *connResource) ( *Conn, pgx.Row) *poolRow {
if len(.poolRows) == 0 {
.poolRows = make([]poolRow, 128)
}
:= &.poolRows[len(.poolRows)-1]
.poolRows = .poolRows[0 : len(.poolRows)-1]
.c =
.r =
return
}
func ( *connResource) ( *Conn, pgx.Rows) *poolRows {
if len(.poolRowss) == 0 {
.poolRowss = make([]poolRows, 128)
}
:= &.poolRowss[len(.poolRowss)-1]
.poolRowss = .poolRowss[0 : len(.poolRowss)-1]
.c =
.r =
return
}
type Pool struct {
newConnsCount int64
lifetimeDestroyCount int64
idleDestroyCount int64
p *puddle.Pool[*connResource]
config *Config
beforeConnect func(context.Context, *pgx.ConnConfig) error
afterConnect func(context.Context, *pgx.Conn) error
prepareConn func(context.Context, *pgx.Conn) (bool, error)
afterRelease func(*pgx.Conn) bool
beforeClose func(*pgx.Conn)
shouldPing func(context.Context, ShouldPingParams) bool
minConns int32
minIdleConns int32
maxConns int32
maxConnLifetime time.Duration
maxConnLifetimeJitter time.Duration
maxConnIdleTime time.Duration
healthCheckPeriod time.Duration
pingTimeout time.Duration
healthCheckMu sync.Mutex
healthCheckTimer *time.Timer
healthCheckChan chan struct{}
acquireTracer AcquireTracer
releaseTracer ReleaseTracer
closeOnce sync.Once
closeChan chan struct{}
}
type ShouldPingParams struct {
Conn *pgx.Conn
IdleDuration time.Duration
}
type Config struct {
ConnConfig *pgx.ConnConfig
BeforeConnect func(context.Context, *pgx.ConnConfig) error
AfterConnect func(context.Context, *pgx.Conn) error
BeforeAcquire func(context.Context, *pgx.Conn) bool
PrepareConn func(context.Context, *pgx.Conn) (bool, error)
AfterRelease func(*pgx.Conn) bool
BeforeClose func(*pgx.Conn)
ShouldPing func(context.Context, ShouldPingParams) bool
MaxConnLifetime time.Duration
MaxConnLifetimeJitter time.Duration
MaxConnIdleTime time.Duration
PingTimeout time.Duration
MaxConns int32
MinConns int32
MinIdleConns int32
HealthCheckPeriod time.Duration
createdByParseConfig bool
}
func ( *Config) () *Config {
:= new(Config)
* = *
.ConnConfig = .ConnConfig.Copy()
return
}
func ( *Config) () string { return .ConnConfig.ConnString() }
func ( context.Context, string) (*Pool, error) {
, := ParseConfig()
if != nil {
return nil,
}
return NewWithConfig(, )
}
func ( context.Context, *Config) (*Pool, error) {
if !.createdByParseConfig {
panic("config must be created by ParseConfig")
}
:= .PrepareConn
if == nil && .BeforeAcquire != nil {
= func( context.Context, *pgx.Conn) (bool, error) {
return .BeforeAcquire(, ), nil
}
}
:= &Pool{
config: ,
beforeConnect: .BeforeConnect,
afterConnect: .AfterConnect,
prepareConn: ,
afterRelease: .AfterRelease,
beforeClose: .BeforeClose,
minConns: .MinConns,
minIdleConns: .MinIdleConns,
maxConns: .MaxConns,
maxConnLifetime: .MaxConnLifetime,
maxConnLifetimeJitter: .MaxConnLifetimeJitter,
maxConnIdleTime: .MaxConnIdleTime,
pingTimeout: .PingTimeout,
healthCheckPeriod: .HealthCheckPeriod,
healthCheckChan: make(chan struct{}, 1),
closeChan: make(chan struct{}),
}
if , := .ConnConfig.Tracer.(AcquireTracer); {
.acquireTracer =
}
if , := .ConnConfig.Tracer.(ReleaseTracer); {
.releaseTracer =
}
if .ShouldPing != nil {
.shouldPing = .ShouldPing
} else {
.shouldPing = func( context.Context, ShouldPingParams) bool {
return .IdleDuration > time.Second
}
}
var error
.p, = puddle.NewPool(
&puddle.Config[*connResource]{
Constructor: func( context.Context) (*connResource, error) {
atomic.AddInt64(&.newConnsCount, 1)
:= .config.ConnConfig.Copy()
if .ConnectTimeout <= 0 {
.ConnectTimeout = 2 * time.Minute
}
if .beforeConnect != nil {
if := .beforeConnect(, ); != nil {
return nil,
}
}
, := pgx.ConnectConfig(, )
if != nil {
return nil,
}
if .afterConnect != nil {
= .afterConnect(, )
if != nil {
.Close()
return nil,
}
}
:= rand.Float64() * .MaxConnLifetimeJitter.Seconds()
:= time.Now().Add(.MaxConnLifetime).Add(time.Duration() * time.Second)
:= &connResource{
conn: ,
conns: make([]Conn, 64),
poolRows: make([]poolRow, 64),
poolRowss: make([]poolRows, 64),
maxAgeTime: ,
}
return , nil
},
Destructor: func( *connResource) {
, := context.WithTimeout(context.Background(), 15*time.Second)
:= .conn
if .beforeClose != nil {
.beforeClose()
}
.Close()
select {
case <-.PgConn().CleanupDone():
case <-.Done():
}
()
},
MaxSize: .MaxConns,
},
)
if != nil {
return nil,
}
go func() {
:= max(int(.minConns), int(.minIdleConns))
.createIdleResources(, )
.backgroundHealthCheck()
}()
return , nil
}
func ( string) (*Config, error) {
, := pgx.ParseConfig()
if != nil {
return nil,
}
:= &Config{
ConnConfig: ,
createdByParseConfig: true,
}
if , := .ConnConfig.Config.RuntimeParams["pool_max_conns"]; {
delete(.Config.RuntimeParams, "pool_max_conns")
, := strconv.ParseInt(, 10, 32)
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_max_conns", )
}
if < 1 {
return nil, pgconn.NewParseConfigError(, "pool_max_conns too small", )
}
.MaxConns = int32()
} else {
.MaxConns = defaultMaxConns
if := int32(runtime.NumCPU()); > .MaxConns {
.MaxConns =
}
}
if , := .ConnConfig.Config.RuntimeParams["pool_min_conns"]; {
delete(.Config.RuntimeParams, "pool_min_conns")
, := strconv.ParseInt(, 10, 32)
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_min_conns", )
}
.MinConns = int32()
} else {
.MinConns = defaultMinConns
}
if , := .ConnConfig.Config.RuntimeParams["pool_min_idle_conns"]; {
delete(.Config.RuntimeParams, "pool_min_idle_conns")
, := strconv.ParseInt(, 10, 32)
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_min_idle_conns", )
}
.MinIdleConns = int32()
} else {
.MinIdleConns = defaultMinIdleConns
}
if , := .ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; {
delete(.Config.RuntimeParams, "pool_max_conn_lifetime")
, := time.ParseDuration()
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_max_conn_lifetime", )
}
.MaxConnLifetime =
} else {
.MaxConnLifetime = defaultMaxConnLifetime
}
if , := .ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; {
delete(.Config.RuntimeParams, "pool_max_conn_idle_time")
, := time.ParseDuration()
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_max_conn_idle_time", )
}
.MaxConnIdleTime =
} else {
.MaxConnIdleTime = defaultMaxConnIdleTime
}
if , := .ConnConfig.Config.RuntimeParams["pool_health_check_period"]; {
delete(.Config.RuntimeParams, "pool_health_check_period")
, := time.ParseDuration()
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_health_check_period", )
}
.HealthCheckPeriod =
} else {
.HealthCheckPeriod = defaultHealthCheckPeriod
}
if , := .ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; {
delete(.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
, := time.ParseDuration()
if != nil {
return nil, pgconn.NewParseConfigError(, "cannot parse pool_max_conn_lifetime_jitter", )
}
.MaxConnLifetimeJitter =
}
return , nil
}
func ( *Pool) () {
.closeOnce.Do(func() {
close(.closeChan)
.p.Close()
})
}
func ( *Pool) ( *puddle.Resource[*connResource]) bool {
return time.Now().After(.Value().maxAgeTime)
}
func ( *Pool) () {
const = 500 * time.Millisecond
.healthCheckMu.Lock()
defer .healthCheckMu.Unlock()
if .healthCheckTimer == nil {
.healthCheckTimer = time.AfterFunc(, func() {
select {
case <-.closeChan:
case .healthCheckChan <- struct{}{}:
default:
}
})
return
}
.healthCheckTimer.Reset()
}
func ( *Pool) () {
:= time.NewTicker(.healthCheckPeriod)
defer .Stop()
for {
select {
case <-.closeChan:
return
case <-.healthCheckChan:
.checkHealth()
case <-.C:
.checkHealth()
}
}
}
func ( *Pool) () {
for {
if := .checkMinConns(); != nil {
break
}
if !.checkConnsHealth() {
break
}
select {
case <-.closeChan:
return
case <-time.After(500 * time.Millisecond):
}
}
}
func ( *Pool) () bool {
var bool
:= .Stat().TotalConns()
:= .p.AcquireAllIdle()
for , := range {
if .isExpired() && >= .minConns {
atomic.AddInt64(&.lifetimeDestroyCount, 1)
.Destroy()
= true
--
} else if .IdleDuration() > .maxConnIdleTime && > .minConns {
atomic.AddInt64(&.idleDestroyCount, 1)
.Destroy()
= true
--
} else {
.ReleaseUnused()
}
}
return
}
func ( *Pool) () error {
:= .Stat()
:= max(.minConns-.TotalConns(), .minIdleConns-.IdleConns())
if > 0 {
return .createIdleResources(context.Background(), int())
}
return nil
}
func ( *Pool) ( context.Context, int) error {
, := context.WithCancel()
defer ()
:= make(chan error, )
for range {
go func() {
:= .p.CreateResource()
if == puddle.ErrNotAvailable {
= nil
}
<-
}()
}
var error
for range {
:= <-
if != nil && == nil {
()
=
}
}
return
}
func ( *Pool) ( context.Context) ( *Conn, error) {
if .acquireTracer != nil {
= .acquireTracer.TraceAcquireStart(, , TraceAcquireStartData{})
defer func() {
var *pgx.Conn
if != nil {
= .Conn()
}
.acquireTracer.TraceAcquireEnd(, , TraceAcquireEndData{Conn: , Err: })
}()
}
for range int(.maxConns) + 1 {
, := .p.Acquire()
if != nil {
return nil,
}
:= .Value()
:= ShouldPingParams{Conn: .conn, IdleDuration: .IdleDuration()}
if .shouldPing(, ) {
:= func() error {
:=
if .pingTimeout > 0 {
var context.CancelFunc
, = context.WithTimeout(, .pingTimeout)
defer ()
}
return .conn.Ping()
}()
if != nil {
.Destroy()
continue
}
}
if .prepareConn != nil {
, := .prepareConn(, .conn)
if ! {
.Destroy()
}
if != nil {
if {
.Release()
}
return nil,
}
if ! {
continue
}
}
return .getConn(, ), nil
}
return nil, errors.New("pgxpool: too many failed attempts acquiring connection; likely bug in PrepareConn, BeforeAcquire, or ShouldPing hook")
}
func ( *Pool) ( context.Context, func(*Conn) error) error {
, := .Acquire()
if != nil {
return
}
defer .Release()
return ()
}
func ( *Pool) ( context.Context) []*Conn {
:= .p.AcquireAllIdle()
:= make([]*Conn, 0, len())
for , := range {
:= .Value()
if .prepareConn != nil {
, := .prepareConn(, .conn)
if ! || != nil {
.Destroy()
continue
}
}
= append(, .getConn(, ))
}
return
}
func ( *Pool) () {
.p.Reset()
}
func ( *Pool) () *Config { return .config.Copy() }
func ( *Pool) () *Stat {
return &Stat{
s: .p.Stat(),
newConnsCount: atomic.LoadInt64(&.newConnsCount),
lifetimeDestroyCount: atomic.LoadInt64(&.lifetimeDestroyCount),
idleDestroyCount: atomic.LoadInt64(&.idleDestroyCount),
}
}
func ( *Pool) ( context.Context, string, ...any) (pgconn.CommandTag, error) {
, := .Acquire()
if != nil {
return pgconn.CommandTag{},
}
defer .Release()
return .Exec(, , ...)
}
func ( *Pool) ( context.Context, string, ...any) (pgx.Rows, error) {
, := .Acquire()
if != nil {
return errRows{err: },
}
, := .Query(, , ...)
if != nil {
.Release()
return errRows{err: },
}
return .getPoolRows(), nil
}
func ( *Pool) ( context.Context, string, ...any) pgx.Row {
, := .Acquire()
if != nil {
return errRow{err: }
}
:= .QueryRow(, , ...)
return .getPoolRow()
}
func ( *Pool) ( context.Context, *pgx.Batch) pgx.BatchResults {
, := .Acquire()
if != nil {
return errBatchResults{err: }
}
:= .SendBatch(, )
return &poolBatchResults{br: , c: }
}
func ( *Pool) ( context.Context) (pgx.Tx, error) {
return .BeginTx(, pgx.TxOptions{})
}
func ( *Pool) ( context.Context, pgx.TxOptions) (pgx.Tx, error) {
, := .Acquire()
if != nil {
return nil,
}
, := .BeginTx(, )
if != nil {
.Release()
return nil,
}
return &Tx{t: , c: }, nil
}
func ( *Pool) ( context.Context, pgx.Identifier, []string, pgx.CopyFromSource) (int64, error) {
, := .Acquire()
if != nil {
return 0,
}
defer .Release()
return .Conn().CopyFrom(, , , )
}
func ( *Pool) ( context.Context) error {
, := .Acquire()
if != nil {
return
}
defer .Release()
return .Ping()
}