package pg
import (
)
const (
commandCompleteMsg = 'C'
errorResponseMsg = 'E'
noticeResponseMsg = 'N'
parameterStatusMsg = 'S'
authenticationOKMsg = 'R'
backendKeyDataMsg = 'K'
noDataMsg = 'n'
passwordMessageMsg = 'p'
terminateMsg = 'X'
saslInitialResponseMsg = 'p'
authenticationSASLContinueMsg = 'R'
saslResponseMsg = 'p'
authenticationSASLFinalMsg = 'R'
authenticationOK = 0
authenticationCleartextPassword = 3
authenticationMD5Password = 5
authenticationSASL = 10
notificationResponseMsg = 'A'
describeMsg = 'D'
parameterDescriptionMsg = 't'
queryMsg = 'Q'
readyForQueryMsg = 'Z'
emptyQueryResponseMsg = 'I'
rowDescriptionMsg = 'T'
dataRowMsg = 'D'
parseMsg = 'P'
parseCompleteMsg = '1'
bindMsg = 'B'
bindCompleteMsg = '2'
executeMsg = 'E'
syncMsg = 'S'
flushMsg = 'H'
closeMsg = 'C'
closeCompleteMsg = '3'
copyInResponseMsg = 'G'
copyOutResponseMsg = 'H'
copyDataMsg = 'd'
copyDoneMsg = 'c'
)
var errEmptyQuery = internal.Errorf("pg: query is empty")
func ( *baseDB) (
context.Context, *pool.Conn, , , , string,
) error {
:= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeStartupMsg(, , , )
return nil
})
if != nil {
return
}
return .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
for {
, , := readMessageType()
if != nil {
return
}
switch {
case backendKeyDataMsg:
, := readInt32()
if != nil {
return
}
, := readInt32()
if != nil {
return
}
.ProcessID =
.SecretKey =
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return
}
case authenticationOKMsg:
:= .auth(, , , , )
if != nil {
return
}
case readyForQueryMsg:
, := .ReadN()
return
case noticeResponseMsg:
if := .logStartupNotice(); != nil {
return
}
case errorResponseMsg:
, := readError()
if != nil {
return
}
return
default:
return fmt.Errorf("pg: unknown startup message response: %q", )
}
}
})
}
func ( *baseDB) ( context.Context, *pool.Conn, *tls.Config) error {
:= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writeSSLMsg()
return nil
})
if != nil {
return
}
= .WithReader(, .opt.ReadTimeout, func( *pool.ReaderContext) error {
, := .ReadByte()
if != nil {
return
}
if != 'S' {
return errors.New("pg: SSL is not enabled on the server")
}
return nil
})
if != nil {
return
}
.SetNetConn(tls.Client(.NetConn(), ))
return nil
}
func ( *baseDB) (
context.Context, *pool.Conn, *pool.ReaderContext, , string,
) error {
, := readInt32()
if != nil {
return
}
switch {
case authenticationOK:
return nil
case authenticationCleartextPassword:
return .authCleartext(, , , )
case authenticationMD5Password:
return .authMD5(, , , , )
case authenticationSASL:
return .authSASL(, , , , )
default:
return fmt.Errorf("pg: unknown authentication message response: %q", )
}
}
func ( *baseDB) (
*pool.ReaderContext,
) error {
:= make([]string, 0)
for {
, := .ReadByte()
if != nil {
return
}
if == 0 {
break
}
, := readString()
if != nil {
return
}
= append(, fmt.Sprintf("%s: %s", string(), ))
}
internal.Warn.Printf("notice during startup: %s", strings.Join(, ", "))
return nil
}
func ( *baseDB) (
context.Context, *pool.Conn, *pool.ReaderContext, string,
) error {
:= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writePasswordMsg(, )
return nil
})
if != nil {
return
}
return readAuthOK()
}
func ( *baseDB) (
context.Context, *pool.Conn, *pool.ReaderContext, , string,
) error {
, := .ReadN(4)
if != nil {
return
}
:= "md5" + md5s(md5s(+)+string())
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
writePasswordMsg(, )
return nil
})
if != nil {
return
}
return readAuthOK()
}
func ( *pool.ReaderContext) error {
, , := readMessageType()
if != nil {
return
}
switch {
case authenticationOKMsg:
, := readInt32()
if != nil {
return
}
if != 0 {
return fmt.Errorf("pg: unexpected authentication code: %q", )
}
return nil
case errorResponseMsg:
, := readError()
if != nil {
return
}
return
default:
return fmt.Errorf("pg: unknown password message response: %q", )
}
}
func ( *baseDB) (
context.Context, *pool.Conn, *pool.ReaderContext, , string,
) error {
var sasl.Mechanism
:
for {
, := readString()
if != nil {
return
}
switch {
case "":
break
case sasl.ScramSha256.Name:
= sasl.ScramSha256
case sasl.ScramSha256Plus.Name:
default:
return fmt.Errorf("got %q, wanted %q", , sasl.ScramSha256.Name)
}
}
:= sasl.Credentials(func() (, , []byte) {
return []byte(), []byte(), nil
})
:= sasl.NewClient(, )
, , := .Step(nil)
if != nil {
return
}
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
.StartMessage(saslInitialResponseMsg)
.WriteString(.Name)
.WriteInt32(int32(len()))
, := .Write()
if != nil {
return
}
.FinishMessage()
return nil
})
if != nil {
return
}
, , := readMessageType()
if != nil {
return
}
switch {
case authenticationSASLContinueMsg:
, := readInt32()
if != nil {
return
}
if != 11 {
return fmt.Errorf("pg: SASL: got %q, wanted %q", , 11)
}
, := .ReadN( - 4)
if != nil {
return
}
_, , = .Step()
if != nil {
return
}
= .WithWriter(, .opt.WriteTimeout, func( *pool.WriteBuffer) error {
.StartMessage(saslResponseMsg)
, := .Write()
if != nil {
return
}
.FinishMessage()
return nil
})
if != nil {
return
}
return readAuthSASLFinal(, )
case errorResponseMsg:
, := readError()
if != nil {
return
}
return
default:
return fmt.Errorf(
"pg: SASL: got %q, wanted %q", , authenticationSASLContinueMsg)
}
}
func ( *pool.ReaderContext, *sasl.Negotiator) error {
, , := readMessageType()
if != nil {
return
}
switch {
case authenticationSASLFinalMsg:
, := readInt32()
if != nil {
return
}
if != 12 {
return fmt.Errorf("pg: SASL: got %q, wanted %q", , 12)
}
, := .ReadN( - 4)
if != nil {
return
}
_, _, = .Step()
if != nil {
return
}
if .State() != sasl.ValidServerResponse {
return fmt.Errorf("pg: SASL: state=%q, wanted %q",
.State(), sasl.ValidServerResponse)
}
case errorResponseMsg:
, := readError()
if != nil {
return
}
return
default:
return fmt.Errorf(
"pg: SASL: got %q, wanted %q", , authenticationSASLFinalMsg)
}
return readAuthOK()
}
func ( string) string {
:= md5.Sum([]byte())
return hex.EncodeToString([:])
}
func ( *pool.WriteBuffer, , , string) {
.StartMessage(0)
.WriteInt32(196608)
.WriteString("user")
.WriteString()
.WriteString("database")
.WriteString()
if != "" {
.WriteString("application_name")
.WriteString()
}
.WriteString("")
.FinishMessage()
}
func ( *pool.WriteBuffer) {
.StartMessage(0)
.WriteInt32(80877103)
.FinishMessage()
}
func ( *pool.WriteBuffer, string) {
.StartMessage(passwordMessageMsg)
.WriteString()
.FinishMessage()
}
func ( *pool.WriteBuffer) {
.StartMessage(flushMsg)
.FinishMessage()
}
func ( *pool.WriteBuffer, , int32) {
.StartMessage(0)
.WriteInt32(80877102)
.WriteInt32()
.WriteInt32()
.FinishMessage()
}
func (
*pool.WriteBuffer,
orm.QueryFormatter,
interface{},
...interface{},
) error {
.StartMessage(queryMsg)
, := appendQuery(, .Bytes, , ...)
if != nil {
return
}
.Bytes =
= .WriteByte(0x0)
if != nil {
return
}
.FinishMessage()
return nil
}
func ( orm.QueryFormatter, []byte, interface{}, ...interface{}) ([]byte, error) {
switch query := .(type) {
case orm.QueryAppender:
if , := .(*orm.Formatter); {
= .WithModel()
}
return .AppendQuery(, )
case string:
if len() > 0 {
, := [len()-1].(orm.TableModel)
if {
if , := .(*orm.Formatter); {
= .WithTableModel()
= [:len()-1]
}
}
}
return .FormatQuery(, , ...), nil
default:
return nil, fmt.Errorf("pg: can't append %T", )
}
}
func ( *pool.WriteBuffer) {
.StartMessage(syncMsg)
.FinishMessage()
}
func ( *pool.WriteBuffer, , string) {
.StartMessage(parseMsg)
.WriteString()
.WriteString()
.WriteInt16(0)
.FinishMessage()
.StartMessage(describeMsg)
.WriteByte('S')
.WriteString()
.FinishMessage()
writeSyncMsg()
}
func ( *pool.ReaderContext) ([]types.ColumnInfo, error) {
var []types.ColumnInfo
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case parseCompleteMsg:
_, = .ReadN()
if != nil {
return nil,
}
case rowDescriptionMsg:
, = readRowDescription(, pool.NewColumnAlloc())
if != nil {
return nil,
}
case parameterDescriptionMsg:
, := .ReadN()
if != nil {
return nil,
}
case noDataMsg:
, := .ReadN()
if != nil {
return nil,
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return ,
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readParseDescribeSync: unexpected message %q", )
}
}
}
func ( *pool.WriteBuffer, string, ...interface{}) error {
.StartMessage(bindMsg)
.WriteString("")
.WriteString()
.WriteInt16(0)
.WriteInt16(int16(len()))
for , := range {
.StartParam()
:= types.Append(.Bytes, , 0)
if != nil {
.Bytes =
.FinishParam()
} else {
.FinishNullParam()
}
}
.WriteInt16(0)
.FinishMessage()
.StartMessage(executeMsg)
.WriteString("")
.WriteInt32(0)
.FinishMessage()
writeSyncMsg()
return nil
}
func ( *pool.WriteBuffer, string) {
.StartMessage(closeMsg)
.WriteByte('S')
.WriteString()
.FinishMessage()
}
func ( *pool.ReaderContext) error {
for {
, , := readMessageType()
if != nil {
return
}
switch {
case closeCompleteMsg:
, := .ReadN()
return
case errorResponseMsg:
, := readError()
if != nil {
return
}
return
case noticeResponseMsg:
if := logNotice(, ); != nil {
return
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return
}
default:
return fmt.Errorf("pg: readCloseCompleteMsg: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext) (*result, error) {
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case rowDescriptionMsg:
, := .ReadN()
if != nil {
return nil,
}
case dataRowMsg:
if , := .Discard(); != nil {
return nil,
}
.returned++
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case emptyQueryResponseMsg:
if == nil {
= errEmptyQuery
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readSimpleQuery: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext) (*result, error) {
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case bindCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
case dataRowMsg:
, := .ReadN()
if != nil {
return nil,
}
.returned++
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case emptyQueryResponseMsg:
if == nil {
= errEmptyQuery
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readExtQuery: unexpected message %q", )
}
}
}
func (
*pool.ReaderContext, *pool.ColumnAlloc,
) ([]types.ColumnInfo, error) {
, := readInt16()
if != nil {
return nil,
}
for := 0; < int(); ++ {
, := .ReadSlice(0)
if != nil {
return nil,
}
:= .New(int16(), [:len()-1])
if , := .ReadN(6); != nil {
return nil,
}
, := readInt32()
if != nil {
return nil,
}
.DataType =
if , := .ReadN(8); != nil {
return nil,
}
}
return .Columns(), nil
}
func (
context.Context,
*pool.ReaderContext,
[]types.ColumnInfo,
orm.ColumnScanner,
) error {
, := readInt16()
if != nil {
return
}
if , := .(orm.BeforeScanHook); {
if := .BeforeScan(); != nil {
return
}
}
var error
for := int16(0); < ; ++ {
, := readInt32()
if != nil {
return
}
var types.Reader
if int() <= .Buffered() {
= .BytesReader(int())
} else {
.SetAvailable(int())
=
}
:= []
if := .ScanColumn(, , int()); != nil && == nil {
= internal.Errorf(.Error())
}
if == {
if .Available() > 0 {
if , := .Discard(.Available()); != nil && == nil {
=
}
}
.SetAvailable(-1)
}
}
if , := .(orm.AfterScanHook); {
if := .AfterScan(); != nil {
return
}
}
return
}
func ( interface{}) (orm.Model, error) {
, := orm.NewModel()
if != nil {
return nil,
}
return , .Init()
}
func (
context.Context, *pool.ReaderContext, interface{},
) (*result, error) {
var []types.ColumnInfo
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case rowDescriptionMsg:
, = readRowDescription(, .ColumnAlloc)
if != nil {
return nil,
}
if .model == nil {
var error
.model, = newModel()
if != nil {
if == nil {
=
}
.model = Discard
}
}
case dataRowMsg:
:= .model.NextColumnScanner()
if := readDataRow(, , , ); != nil {
if == nil {
=
}
} else if := .model.AddColumnScanner(); != nil {
if == nil {
=
}
}
.returned++
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case emptyQueryResponseMsg:
if == nil {
= errEmptyQuery
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readSimpleQueryData: unexpected message %q", )
}
}
}
func (
context.Context, *pool.ReaderContext, interface{}, []types.ColumnInfo,
) (*result, error) {
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case bindCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
case dataRowMsg:
if .model == nil {
var error
.model, = newModel()
if != nil {
if == nil {
=
}
.model = Discard
}
}
:= .model.NextColumnScanner()
if := readDataRow(, , , ); != nil {
if == nil {
=
}
} else if := .model.AddColumnScanner(); != nil {
if == nil {
=
}
}
.returned++
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readExtQueryData: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext) error {
var error
for {
, , := readMessageType()
if != nil {
return
}
switch {
case copyInResponseMsg:
, := .ReadN()
return
case errorResponseMsg:
, := readError()
if != nil {
return
}
if == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return
}
return
case noticeResponseMsg:
if := logNotice(, ); != nil {
return
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return
}
default:
return fmt.Errorf("pg: readCopyInResponse: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext) error {
var error
for {
, , := readMessageType()
if != nil {
return
}
switch {
case copyOutResponseMsg:
, := .ReadN()
return
case errorResponseMsg:
, := readError()
if != nil {
return
}
if == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return
}
return
case noticeResponseMsg:
if := logNotice(, ); != nil {
return
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return
}
default:
return fmt.Errorf("pg: readCopyOutResponse: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext, io.Writer) (*result, error) {
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case copyDataMsg:
for > 0 {
, := .ReadN()
if != nil && != bufio.ErrBufferFull {
return nil,
}
_, = .Write()
if != nil {
return nil,
}
-= len()
}
case copyDoneMsg:
, := .ReadN()
if != nil {
return nil,
}
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
return nil,
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readCopyData: unexpected message %q", )
}
}
}
func ( *pool.WriteBuffer, io.Reader) error {
.StartMessage(copyDataMsg)
, := .ReadFrom()
.FinishMessage()
return
}
func ( *pool.WriteBuffer) {
.StartMessage(copyDoneMsg)
.FinishMessage()
}
func ( *pool.ReaderContext) (*result, error) {
var result
var error
for {
, , := readMessageType()
if != nil {
return nil,
}
switch {
case commandCompleteMsg:
, := .ReadN()
if != nil {
return nil,
}
if := .parse(); != nil && == nil {
=
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return nil,
}
if != nil {
return nil,
}
return &, nil
case errorResponseMsg:
, := readError()
if != nil {
return nil,
}
if == nil {
=
}
case noticeResponseMsg:
if := logNotice(, ); != nil {
return nil,
}
case parameterStatusMsg:
if := logParameterStatus(, ); != nil {
return nil,
}
default:
return nil, fmt.Errorf("pg: readReadyForQueryOrError: unexpected message %q", )
}
}
}
func ( *pool.ReaderContext) (, string, error) {
for {
, , := readMessageType()
if != nil {
return "", "",
}
switch {
case commandCompleteMsg:
, := .ReadN()
if != nil {
return "", "",
}
case readyForQueryMsg:
, := .ReadN()
if != nil {
return "", "",
}
case errorResponseMsg:
, := readError()
if != nil {
return "", "",
}
return "", "",
case noticeResponseMsg:
if := logNotice(, ); != nil {
return "", "",
}
case notificationResponseMsg:
, := readInt32()
if != nil {
return "", "",
}
, = readString()
if != nil {
return "", "",
}
, = readString()
if != nil {
return "", "",
}
return , , nil
default:
return "", "", fmt.Errorf("pg: readNotification: unexpected message %q", )
}
}
}
var terminateMessage = []byte{terminateMsg, 0, 0, 0, 4}
func ( *pool.Conn) error {
, := .NetConn().Write(terminateMessage)
return
}
func ( *pool.ReaderContext, int) error {
, := .ReadN()
return
}
func ( *pool.ReaderContext, int) error {
, := .ReadN()
return
}
func ( *pool.ReaderContext) (int16, error) {
, := .ReadN(2)
if != nil {
return 0,
}
return int16(binary.BigEndian.Uint16()), nil
}
func ( *pool.ReaderContext) (int32, error) {
, := .ReadN(4)
if != nil {
return 0,
}
return int32(binary.BigEndian.Uint32()), nil
}
func ( *pool.ReaderContext) (string, error) {
, := .ReadSlice(0)
if != nil {
return "",
}
return string([:len()-1]), nil
}
func ( *pool.ReaderContext) (error, error) {
:= make(map[byte]string)
for {
, := .ReadByte()
if != nil {
return nil,
}
if == 0 {
break
}
, := readString()
if != nil {
return nil,
}
[] =
}
return internal.NewPGError(), nil
}
func ( *pool.ReaderContext) (byte, int, error) {
, := .ReadByte()
if != nil {
return 0, 0,
}
, := readInt32()
if != nil {
return 0, 0,
}
return , int() - 4, nil
}