package pg
import (
)
type baseDB struct {
db orm.DB
opt *Options
pool pool.Pooler
fmter *orm.Formatter
queryHooks []QueryHook
}
type PoolStats pool.Stats
func ( *baseDB) () *PoolStats {
:= .pool.Stats()
return (*PoolStats)()
}
func ( *baseDB) () *baseDB {
return &baseDB{
db: .db,
opt: .opt,
pool: .pool,
fmter: .fmter,
queryHooks: copyQueryHooks(.queryHooks),
}
}
func ( *baseDB) ( pool.Pooler) *baseDB {
:= .clone()
.pool =
return
}
func ( *baseDB) ( time.Duration) *baseDB {
:= *.opt
.ReadTimeout =
.WriteTimeout =
:= .clone()
.opt = &
return
}
func ( *baseDB) ( string, interface{}) *baseDB {
:= .clone()
.fmter = .fmter.WithParam(, )
return
}
func ( *baseDB) ( string) interface{} {
return .fmter.Param()
}
func ( *baseDB) ( int) time.Duration {
return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}
func ( *baseDB) ( context.Context) (*pool.Conn, error) {
, := .pool.Get()
if != nil {
return nil,
}
if .Inited {
return , nil
}
if := .initConn(, ); != nil {
.pool.Remove(, , )
if , := .pool.(*pool.StickyConnPool); {
_ = .Reset()
}
if := internal.Unwrap(); != nil {
return nil,
}
return nil,
}
return , nil
}
func ( *baseDB) ( context.Context, *pool.Conn) error {
if .Inited {
return nil
}
.Inited = true
if .opt.TLSConfig != nil {
:= .enableSSL(, , .opt.TLSConfig)
if != nil {
return
}
}
:= .startup(, , .opt.User, .opt.Password, .opt.Database, .opt.ApplicationName)
if != nil {
return
}
if .opt.OnConnect != nil {
:= pool.NewSingleConnPool(.pool, )
return .opt.OnConnect(, newConn(, .withPool()))
}
return nil
}
func ( *baseDB) ( context.Context, *pool.Conn, error) {
if , := isBadConn(, false); {
if != "25P02" {
:= .cancelRequest(.ProcessID, .SecretKey)
if != nil {
internal.Logger.Printf(, "cancelRequest failed: %s", )
}
}
.pool.Remove(, , )
} else {
.pool.Put(, )
}
}
func ( *baseDB) (
context.Context, func(context.Context, *pool.Conn) error,
) error {
, := .getConn()
if != nil {
return
}
var chan struct{}
if != nil && .Done() != nil {
= make(chan struct{})
go func() {
select {
case <-:
case <-.Done():
:= .cancelRequest(.ProcessID, .SecretKey)
if != nil {
internal.Logger.Printf(, "cancelRequest failed: %s", )
}
<- struct{}{}
}
}()
}
defer func() {
if == nil {
.releaseConn(, , )
return
}
select {
case <-:
.pool.Remove(, , )
case <- struct{}{}:
.releaseConn(, , )
}
}()
= (, )
return
}
func ( *baseDB) ( error) bool {
switch {
case io.EOF, io.ErrUnexpectedEOF:
return true
case nil, context.Canceled, context.DeadlineExceeded:
return false
}
if , := .(Error); {
switch .Field('C') {
case "40001",
"53300",
"55000":
return true
case "57014":
return .opt.RetryStatementTimeout
default:
return false
}
}
if , := .(timeoutError); {
return true
}
return false
}
func ( *baseDB) () error {
return .pool.Close()
}
func ( *baseDB) ( interface{}, ...interface{}) ( Result, error) {
return .exec(.db.Context(), , ...)
}
func ( *baseDB) ( context.Context, interface{}, ...interface{}) (Result, error) {
return .exec(, , ...)
}
func ( *baseDB) ( context.Context, interface{}, ...interface{}) (Result, error) {
:= pool.GetWriteBuffer()
defer pool.PutWriteBuffer()
if := writeQueryMsg(, .fmter, , ...); != nil {
return nil,
}
, , := .beforeQuery(, .db, nil, , , .Query())
if != nil {
return nil,
}
var Result
var error
for := 0; <= .opt.MaxRetries; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff(-1)); != nil {
return nil,
}
}
= .withConn(, func( context.Context, *pool.Conn) error {
, = .simpleQuery(, , )
return
})
if !.shouldRetry() {
break
}
}
if := .afterQuery(, , , ); != nil {
return nil,
}
return ,
}
func ( *baseDB) ( interface{}, ...interface{}) (Result, error) {
return .execOne(.db.Context(), , ...)
}
func ( *baseDB) ( context.Context, interface{}, ...interface{}) (Result, error) {
return .execOne(, , ...)
}
func ( *baseDB) ( context.Context, interface{}, ...interface{}) (Result, error) {
, := .ExecContext(, , ...)
if != nil {
return nil,
}
if := internal.AssertOneRow(.RowsAffected()); != nil {
return nil,
}
return , nil
}
func ( *baseDB) (, interface{}, ...interface{}) ( Result, error) {
return .query(.db.Context(), , , ...)
}
func ( *baseDB) ( context.Context, , interface{}, ...interface{}) (Result, error) {
return .query(, , , ...)
}
func ( *baseDB) ( context.Context, , interface{}, ...interface{}) (Result, error) {
:= pool.GetWriteBuffer()
defer pool.PutWriteBuffer()
if := writeQueryMsg(, .fmter, , ...); != nil {
return nil,
}
, , := .beforeQuery(, .db, , , , .Query())
if != nil {
return nil,
}
var Result
var error
for := 0; <= .opt.MaxRetries; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff(-1)); != nil {
return nil,
}
}
= .withConn(, func( context.Context, *pool.Conn) error {
, = .simpleQueryData(, , , )
return
})
if !.shouldRetry() {
break
}
}
if := .afterQuery(, , , ); != nil {
return nil,
}
return ,
}
func ( *baseDB) (, interface{}, ...interface{}) (Result, error) {
return .queryOne(.db.Context(), , , ...)
}
func ( *baseDB) (
context.Context, , interface{}, ...interface{},
) (Result, error) {
return .queryOne(, , , ...)
}
func ( *baseDB) ( context.Context, , interface{}, ...interface{}) (Result, error) {
, := .QueryContext(, , , ...)
if != nil {
return nil,
}
if := internal.AssertOneRow(.RowsAffected()); != nil {
return nil,
}
return , nil
}
func ( *baseDB) ( io.Reader, interface{}, ...interface{}) ( Result, error) {
:= .db.Context()
= .withConn(, func( context.Context, *pool.Conn) error {
, = .copyFrom(, , , , ...)
return
})
return ,
}
func ( *baseDB) (
context.Context, *pool.Conn, io.Reader, interface{}, ...interface{},
) ( Result, error) {
var *QueryEvent
:= pool.GetWriteBuffer()
defer pool.PutWriteBuffer()
if := writeQueryMsg(, .fmter, , ...); != nil {
return nil,
}
var interface{}
if len() > 0 {
, _ = [len()-1].(orm.TableModel)
}
, , = .beforeQuery(, .db, , , , .Query())
if != nil {
return nil,
}
defer func() {
if := .afterQuery(, , , ); != nil {
=
}
}()
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
return writeQueryMsg(, .fmter, , ...)
})
if != nil {
return nil,
}
= .WithReader(, .opt.ReadTimeout, readCopyInResponse)
if != nil {
return nil,
}
for {
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
return writeCopyData(, )
})
if != nil {
if == io.EOF {
break
}
return nil,
}
}
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeCopyDone()
return nil
})
if != nil {
return nil,
}
= .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
, = readReadyForQuery()
return
})
if != nil {
return nil,
}
return , nil
}
func ( *baseDB) ( io.Writer, interface{}, ...interface{}) ( Result, error) {
:= .db.Context()
= .withConn(, func( context.Context, *pool.Conn) error {
, = .copyTo(, , , , ...)
return
})
return ,
}
func ( *baseDB) (
context.Context, *pool.Conn, io.Writer, interface{}, ...interface{},
) ( Result, error) {
var *QueryEvent
:= pool.GetWriteBuffer()
defer pool.PutWriteBuffer()
if := writeQueryMsg(, .fmter, , ...); != nil {
return nil,
}
var interface{}
if len() > 0 {
, _ = [len()-1].(orm.TableModel)
}
, , = .beforeQuery(, .db, , , , .Query())
if != nil {
return nil,
}
defer func() {
if := .afterQuery(, , , ); != nil {
=
}
}()
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
return writeQueryMsg(, .fmter, , ...)
})
if != nil {
return nil,
}
= .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
:= readCopyOutResponse()
if != nil {
return
}
, = readCopyData(, )
return
})
if != nil {
return nil,
}
return , nil
}
func ( *baseDB) ( context.Context) error {
, := .ExecContext(, "SELECT 1")
return
}
func ( *baseDB) ( ...interface{}) *Query {
return orm.NewQuery(.db, ...)
}
func ( *baseDB) ( context.Context, ...interface{}) *Query {
return orm.NewQueryContext(, .db, ...)
}
func ( *baseDB) () orm.QueryFormatter {
return .fmter
}
func ( *baseDB) (, int32) error {
:= context.TODO()
, := .pool.NewConn()
if != nil {
return
}
defer func() {
_ = .pool.CloseConn()
}()
return .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeCancelRequestMsg(, , )
return nil
})
}
func ( *baseDB) (
context.Context, *pool.Conn, *pool.WriteBuffer,
) (*result, error) {
if := .WriteBuffer(, .opt.WriteTimeout, ); != nil {
return nil,
}
var *result
if := .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
var error
, = readSimpleQuery()
return
}); != nil {
return nil,
}
return , nil
}
func ( *baseDB) (
context.Context, *pool.Conn, interface{}, *pool.WriteBuffer,
) (*result, error) {
if := .WriteBuffer(, .opt.WriteTimeout, ); != nil {
return nil,
}
var *result
if := .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
var error
, = readSimpleQueryData(, , )
return
}); != nil {
return nil,
}
return , nil
}
func ( *baseDB) ( string) (*Stmt, error) {
return prepareStmt(.withPool(pool.NewStickyConnPool(.pool)), )
}
func ( *baseDB) (
context.Context, *pool.Conn, string,
) (string, []types.ColumnInfo, error) {
:= .NextID()
:= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeParseDescribeSyncMsg(, , )
return nil
})
if != nil {
return "", nil,
}
var []types.ColumnInfo
= .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
, = readParseDescribeSync()
return
})
if != nil {
return "", nil,
}
return , , nil
}
func ( *baseDB) ( context.Context, *pool.Conn, string) error {
:= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeCloseMsg(, )
writeFlushMsg()
return nil
})
if != nil {
return
}
= .WithReader(, .opt.ReadTimeout, readCloseCompleteMsg)
return
}