package pgconn

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
)

const (
	connStatusUninitialized = iota
	connStatusConnecting
	connStatusClosed
	connStatusIdle
	connStatusBusy
)

// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from
// LISTEN/NOTIFY notification.
type Notice PgError

// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system
type Notification struct {
	PID     uint32 // backend pid that sent the notification
	Channel string // channel from which notification was received
	Payload string
}

// DialFunc is a function that can be used to connect to a PostgreSQL server.
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)

// LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be
// returned in order to override the connection string's port.
type LookupFunc func(ctx context.Context, host string) (addrs []string, err error)

// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.
type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend

// PgErrorHandler is a function that handles errors returned from Postgres. This function must return true to keep
// the connection open. Returning false will cause the connection to be closed immediately. You should return
// false on any FATAL-severity errors. This will not receive network errors. The *PgConn is provided so the handler is
// aware of the origin of the error, but it must not invoke any query method.
type PgErrorHandler func(*PgConn, *PgError) bool

// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
// notification.
type NoticeHandler func(*PgConn, *Notice)

// NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications
// can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is
// aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a
// notice event.
type NotificationHandler func(*PgConn, *Notification)

// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
type PgConn struct {
	conn              net.Conn
	pid               uint32            // backend pid
	secretKey         []byte            // key to use to send a cancel query message to the server
	parameterStatuses map[string]string // parameters that have been reported by the server
	txStatus          byte
	frontend          *pgproto3.Frontend
	bgReader          *bgreader.BGReader
	slowWriteTimer    *time.Timer
	bgReaderStarted   chan struct{}

	customData map[string]any

	config *Config

	status byte // One of connStatus* constants

	bufferingReceive    bool
	bufferingReceiveMux sync.Mutex
	bufferingReceiveMsg pgproto3.BackendMessage
	bufferingReceiveErr error

	peekedMsg pgproto3.BackendMessage

	// Reusable / preallocated resources
	resultReader      ResultReader
	multiResultReader MultiResultReader
	pipeline          Pipeline
	contextWatcher    *ctxwatch.ContextWatcher
	fieldDescriptions [16]FieldDescription

	cleanupDone chan struct{}
}

// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value
// format) to provide configuration. See documentation for [ParseConfig] for details. ctx can be used to cancel a
// connect attempt.
func ( context.Context,  string) (*PgConn, error) {
	,  := ParseConfig()
	if  != nil {
		return nil, 
	}

	return ConnectConfig(, )
}

// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value
// format) and ParseConfigOptions to provide additional configuration. See documentation for [ParseConfig] for details.
// ctx can be used to cancel a connect attempt.
func ( context.Context,  string,  ParseConfigOptions) (*PgConn, error) {
	,  := ParseConfigWithOptions(, )
	if  != nil {
		return nil, 
	}

	return ConnectConfig(, )
}

// Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with
// [ParseConfig]. ctx can be used to cancel a connect attempt.
//
// If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An
// authentication error will terminate the chain of attempts (like libpq:
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error.
func ( context.Context,  *Config) (*PgConn, error) {
	// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
	// zero values.
	if !.createdByParseConfig {
		panic("config must be created by ParseConfig")
	}

	var  []error

	,  := buildConnectOneConfigs(, )
	if len() > 0 {
		 = append(, ...)
	}

	if len() == 0 {
		return nil, &ConnectError{Config: , err: fmt.Errorf("hostname resolving error: %w", errors.Join(...))}
	}

	,  := connectPreferred(, , )
	if len() > 0 {
		 = append(, ...)
		return nil, &ConnectError{Config: , err: errors.Join(...)}
	}

	if .AfterConnect != nil {
		 := .AfterConnect(, )
		if  != nil {
			.conn.Close()
			return nil, &ConnectError{Config: , err: fmt.Errorf("AfterConnect error: %w", )}
		}
	}

	return , nil
}

// buildConnectOneConfigs resolves hostnames and builds a list of connectOneConfigs to try connecting to. It returns a
// slice of successfully resolved connectOneConfigs and a slice of errors. It is possible for both slices to contain
// values if some hosts were successfully resolved and others were not.
func ( context.Context,  *Config) ([]*connectOneConfig, []error) {
	// Simplify usage by treating primary config and fallbacks the same.
	 := []*FallbackConfig{
		{
			Host:      .Host,
			Port:      .Port,
			TLSConfig: .TLSConfig,
		},
	}
	 = append(, .Fallbacks...)

	var  []*connectOneConfig

	var  []error

	for ,  := range  {
		// skip resolve for unix sockets
		if isAbsolutePath(.Host) {
			,  := NetworkAddress(.Host, .Port)
			 = append(, &connectOneConfig{
				network:          ,
				address:          ,
				originalHostname: .Host,
				tlsConfig:        .TLSConfig,
			})

			continue
		}

		,  := .LookupFunc(, .Host)
		if  != nil {
			 = append(, )
			continue
		}

		for ,  := range  {
			, ,  := net.SplitHostPort()
			if  == nil {
				,  := strconv.ParseUint(, 10, 16)
				if  != nil {
					return nil, []error{fmt.Errorf("error parsing port (%s) from lookup: %w", , )}
				}
				,  := NetworkAddress(, uint16())
				 = append(, &connectOneConfig{
					network:          ,
					address:          ,
					originalHostname: .Host,
					tlsConfig:        .TLSConfig,
				})
			} else {
				,  := NetworkAddress(, .Port)
				 = append(, &connectOneConfig{
					network:          ,
					address:          ,
					originalHostname: .Host,
					tlsConfig:        .TLSConfig,
				})
			}
		}
	}

	return , 
}

// connectPreferred attempts to connect to the preferred host from connectOneConfigs. The connections are attempted in
// order. If a connection is successful it is returned. If no connection is successful then all errors are returned. If
// a connection attempt returns a [NotPreferredError], then that host will be used if no other hosts are successful.
func ( context.Context,  *Config,  []*connectOneConfig) (*PgConn, []error) {
	 := 
	var  []error

	var  *connectOneConfig
	for ,  := range  {
		// ConnectTimeout restricts the whole connection process.
		if .ConnectTimeout != 0 {
			// create new context first time or when previous host was different
			if  == 0 || ([].address != [-1].address) {
				var  context.CancelFunc
				,  = context.WithTimeout(, .ConnectTimeout)
				defer ()
			}
		} else {
			 = 
		}

		,  := connectOne(, , , false)
		if  != nil {
			return , nil
		}

		 = append(, )

		var  *PgError
		if errors.As(, &) {
			// pgx will try next host even if libpq does not in certain cases (see #2246)
			// consider change for the next major version

			const  = "28P01"
			const  = "3D000"   // db does not exist
			const  = "42501" // missing connect privilege

			// auth failed due to invalid password, db does not exist or user has no permission
			if .Code ==  ||
				.Code ==  ||
				.Code ==  {
				return nil, 
			}
		}

		var  *NotPreferredError
		if errors.As(, &) {
			 = 
		}
	}

	if  != nil {
		,  := connectOne(, , , true)
		if  == nil {
			return , nil
		}
		 = append(, )
	}

	return nil, 
}

// connectOne makes one connection attempt to a single host.
func ( context.Context,  *Config,  *connectOneConfig,
	 bool,
) (*PgConn, error) {
	 := new(PgConn)
	.config = 
	.cleanupDone = make(chan struct{})
	.customData = make(map[string]any)

	var  error

	 := func( string,  error) *perDialConnectError {
		 = normalizeTimeoutError(, )
		 := &perDialConnectError{address: .address, originalHostname: .originalHostname, err: fmt.Errorf("%s: %w", , )}
		return 
	}

	,  := parseProtocolVersion(.MaxProtocolVersion)
	if  != nil {
		return nil, ("invalid max_protocol_version", )
	}
	,  := parseProtocolVersion(.MinProtocolVersion)
	if  != nil {
		return nil, ("invalid min_protocol_version", )
	}

	.conn,  = .DialFunc(, .network, .address)
	if  != nil {
		return nil, ("dial error", )
	}

	if .tlsConfig != nil {
		.contextWatcher = ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: .conn})
		.contextWatcher.Watch()
		var (
			 net.Conn
			     error
		)
		if .SSLNegotiation == "direct" {
			 = tls.Client(.conn, .tlsConfig)
		} else {
			,  = startTLS(.conn, .tlsConfig)
		}
		.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
		if  != nil {
			.conn.Close()
			return nil, ("tls error", )
		}

		.conn = 
	}

	if .AfterNetConnect != nil {
		.conn,  = .AfterNetConnect(, , .conn)
		if  != nil {
			.conn.Close()
			return nil, ("AfterNetConnect failed", )
		}
	}

	.contextWatcher = ctxwatch.NewContextWatcher(.BuildContextWatcherHandler())
	.contextWatcher.Watch()
	defer .contextWatcher.Unwatch()

	.parameterStatuses = make(map[string]string)
	.status = connStatusConnecting
	.bgReader = bgreader.New(.conn)
	.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
		func() {
			.bgReader.Start()
			.bgReaderStarted <- struct{}{}
		},
	)
	.slowWriteTimer.Stop()
	.bgReaderStarted = make(chan struct{})
	.frontend = .BuildFrontend(.bgReader, .conn)

	 := pgproto3.StartupMessage{
		ProtocolVersion: ,
		Parameters:      make(map[string]string),
	}

	// Copy default run-time params
	maps.Copy(.Parameters, .RuntimeParams)

	.Parameters["user"] = .User
	if .Database != "" {
		.Parameters["database"] = .Database
	}

	.frontend.Send(&)
	if  := .flushWithPotentialWriteReadDeadlock();  != nil {
		.conn.Close()
		return nil, ("failed to write startup message", )
	}

	for {
		,  := .receiveMessage()
		if  != nil {
			.conn.Close()
			if ,  := .(*PgError);  {
				return nil, ("server error", )
			}
			return nil, ("failed to receive message", )
		}

		switch msg := .(type) {
		case *pgproto3.BackendKeyData:
			.pid = .ProcessID
			.secretKey = .SecretKey

		case *pgproto3.AuthenticationOk:
		case *pgproto3.AuthenticationCleartextPassword:
			 = .txPasswordMessage(.config.Password)
			if  != nil {
				.conn.Close()
				return nil, ("failed to write password message", )
			}
		case *pgproto3.AuthenticationMD5Password:
			 := "md5" + hexMD5(hexMD5(.config.Password+.config.User)+string(.Salt[:]))
			 = .txPasswordMessage()
			if  != nil {
				.conn.Close()
				return nil, ("failed to write password message", )
			}
		case *pgproto3.AuthenticationSASL:
			// Check if OAUTHBEARER is supported
			 := false
			for ,  := range .AuthMechanisms {
				if  == "OAUTHBEARER" {
					 = true
					break
				}
			}

			if  && .config.OAuthTokenProvider != nil {
				 = .oauthAuth()
			} else {
				 = .scramAuth(.AuthMechanisms)
			}
			if  != nil {
				.conn.Close()
				return nil, ("failed SASL auth", )
			}
		case *pgproto3.AuthenticationGSS:
			 = .gssAuth()
			if  != nil {
				.conn.Close()
				return nil, ("failed GSS auth", )
			}
		case *pgproto3.ReadyForQuery:
			.status = connStatusIdle
			if .ValidateConnect != nil {
				// ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid
				// the watch already in progress panic. This is that last thing done by this method so there is no need to
				// restart the watch after ValidateConnect returns.
				//
				// See https://github.com/jackc/pgconn/issues/40.
				.contextWatcher.Unwatch()

				 := .ValidateConnect(, )
				if  != nil {
					if ,  := .(*NotPreferredError);  &&  {
						return , nil
					}
					.conn.Close()
					return nil, ("ValidateConnect failed", )
				}
			}
			return , nil
		case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse:
			// handled by ReceiveMessage
		case *pgproto3.NegotiateProtocolVersion:
			 := pgproto3.ProtocolVersion30&0xFFFF0000 | uint32(.NewestMinorProtocol)
			if  <  {
				.conn.Close()
				return nil, ("server protocol version too low", nil)
			}
		case *pgproto3.ErrorResponse:
			.conn.Close()
			return nil, ("server error", ErrorResponseToPgError())
		default:
			.conn.Close()
			return nil, ("received unexpected message", )
		}
	}
}

func ( net.Conn,  *tls.Config) (net.Conn, error) {
	 := binary.Write(, binary.BigEndian, []int32{8, 80877103})
	if  != nil {
		return nil, 
	}

	 := make([]byte, 1)
	if _,  = io.ReadFull(, );  != nil {
		return nil, 
	}

	if [0] != 'S' {
		return nil, errors.New("server refused TLS connection")
	}

	return tls.Client(, ), nil
}

func ( *PgConn) ( string) ( error) {
	.frontend.Send(&pgproto3.PasswordMessage{Password: })
	return .flushWithPotentialWriteReadDeadlock()
}

func ( string) string {
	 := md5.New()
	io.WriteString(, )
	return hex.EncodeToString(.Sum(nil))
}

func ( *PgConn) () chan struct{} {
	if .bufferingReceive {
		panic("BUG: signalMessage when already in progress")
	}

	.bufferingReceive = true
	.bufferingReceiveMux.Lock()

	 := make(chan struct{})
	go func() {
		.bufferingReceiveMsg, .bufferingReceiveErr = .frontend.Receive()
		.bufferingReceiveMux.Unlock()
		close()
	}()

	return 
}

// ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the
// connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages
// are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger
// the OnNotification callback.
//
// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly.
// See https://www.postgresql.org/docs/current/protocol.html.
func ( *PgConn) ( context.Context) (pgproto3.BackendMessage, error) {
	if  := .lock();  != nil {
		return nil, 
	}
	defer .unlock()

	if  != context.Background() {
		select {
		case <-.Done():
			return nil, newContextAlreadyDoneError()
		default:
		}
		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	,  := .receiveMessage()
	if  != nil {
		 = &pgconnError{
			msg:         "receive message failed",
			err:         normalizeTimeoutError(, ),
			safeToRetry: true,
		}
	}
	return , 
}

// peekMessage peeks at the next message without setting up context cancellation.
func ( *PgConn) () (pgproto3.BackendMessage, error) {
	if .peekedMsg != nil {
		return .peekedMsg, nil
	}

	var  pgproto3.BackendMessage
	var  error
	if .bufferingReceive {
		.bufferingReceiveMux.Lock()
		 = .bufferingReceiveMsg
		 = .bufferingReceiveErr
		.bufferingReceiveMux.Unlock()
		.bufferingReceive = false

		// If a timeout error happened in the background try the read again.
		var  net.Error
		if errors.As(, &) && .Timeout() {
			,  = .frontend.Receive()
		}
	} else {
		,  = .frontend.Receive()
	}

	if  != nil {
		// Close on anything other than timeout error - everything else is fatal
		var  net.Error
		 := errors.As(, &)
		if !( && .Timeout()) {
			.asyncClose()
		}

		return nil, 
	}

	.peekedMsg = 
	return , nil
}

// receiveMessage receives a message without setting up context cancellation
func ( *PgConn) () (pgproto3.BackendMessage, error) {
	if .status == connStatusClosed {
		return nil, &connLockError{status: "conn closed"}
	}

	,  := .peekMessage()
	if  != nil {
		return nil, 
	}
	.peekedMsg = nil

	switch msg := .(type) {
	case *pgproto3.ReadyForQuery:
		.txStatus = .TxStatus
	case *pgproto3.ParameterStatus:
		.parameterStatuses[.Name] = .Value
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		if .config.OnPgError != nil && !.config.OnPgError(, ) {
			.status = connStatusClosed
			.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
			close(.cleanupDone)
			return nil, 
		}
	case *pgproto3.NoticeResponse:
		if .config.OnNotice != nil {
			.config.OnNotice(, noticeResponseToNotice())
		}
	case *pgproto3.NotificationResponse:
		if .config.OnNotification != nil {
			.config.OnNotification(, &Notification{PID: .PID, Channel: .Channel, Payload: .Payload})
		}
	}

	return , nil
}

// Conn returns the underlying net.Conn. This rarely necessary. If the connection will be directly used for reading or
// writing then SyncConn should usually be called before Conn.
func ( *PgConn) () net.Conn {
	return .conn
}

// PID returns the backend PID.
func ( *PgConn) () uint32 {
	return .pid
}

// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message.
//
// Possible return values:
//
//	'I' - idle / not in transaction
//	'T' - in a transaction
//	'E' - in a failed transaction
//
// See https://www.postgresql.org/docs/current/protocol-message-formats.html.
func ( *PgConn) () byte {
	return .txStatus
}

// SecretKey returns the backend secret key used to send a cancel query message to the server.
func ( *PgConn) () []byte {
	return .secretKey
}

// Frontend returns the underlying *pgproto3.Frontend. This rarely necessary.
func ( *PgConn) () *pgproto3.Frontend {
	return .frontend
}

// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by
// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The
// underlying net.Conn.Close() will always be called regardless of any other errors.
func ( *PgConn) ( context.Context) error {
	if .status == connStatusClosed {
		return nil
	}
	.status = connStatusClosed

	defer close(.cleanupDone)
	defer .conn.Close()

	if  != context.Background() {
		// Close may be called while a cancellable query is in progress. This will most often be triggered by panic when
		// a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any
		// previous watch. It is safe to Unwatch regardless of whether a watch is already is progress.
		//
		// See https://github.com/jackc/pgconn/issues/29
		.contextWatcher.Unwatch()

		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	// Ignore any errors sending Terminate message and waiting for server to close connection.
	// This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully
	// ignores errors.
	//
	// See https://github.com/jackc/pgx/issues/637
	.frontend.Send(&pgproto3.Terminate{})
	.flushWithPotentialWriteReadDeadlock()

	return .conn.Close()
}

// asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying
// connection.
func ( *PgConn) () {
	if .status == connStatusClosed {
		return
	}
	.status = connStatusClosed

	go func() {
		defer close(.cleanupDone)
		defer .conn.Close()

		 := time.Now().Add(time.Second * 15)

		,  := context.WithDeadline(context.Background(), )
		defer ()

		.CancelRequest()

		.conn.SetDeadline()

		.frontend.Send(&pgproto3.Terminate{})
		.flushWithPotentialWriteReadDeadlock()
	}()
}

// CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed
// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing
// yet. This is because certain errors such as a context cancellation require that the interrupted function call return
// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are
// closed asynchronously.
//
// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while
// an old connection is still being cleaned up and thereby exceeding the maximum pool size.
func ( *PgConn) () chan (struct{}) {
	return .cleanupDone
}

// IsClosed reports if the connection has been closed.
//
// CleanupDone() can be used to determine if all cleanup has been completed.
func ( *PgConn) () bool {
	return .status < connStatusIdle
}

// IsBusy reports if the connection is busy.
func ( *PgConn) () bool {
	return .status == connStatusBusy
}

// lock locks the connection.
func ( *PgConn) () error {
	switch .status {
	case connStatusBusy:
		return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug.
	case connStatusClosed:
		return &connLockError{status: "conn closed"}
	case connStatusUninitialized:
		return &connLockError{status: "conn uninitialized"}
	}
	.status = connStatusBusy
	return nil
}

func ( *PgConn) () {
	switch .status {
	case connStatusBusy:
		.status = connStatusIdle
	case connStatusClosed:
	default:
		panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package.
	}
}

// ParameterStatus returns the value of a parameter reported by the server (e.g.
// server_version). Returns an empty string for unknown parameters.
func ( *PgConn) ( string) string {
	return .parameterStatuses[]
}

// CommandTag is the status text returned by PostgreSQL for a query.
type CommandTag struct {
	s string
}

// NewCommandTag makes a CommandTag from s.
func ( string) CommandTag {
	return CommandTag{s: }
}

// RowsAffected returns the number of rows affected. If the CommandTag was not
// for a row affecting command (e.g. "CREATE TABLE") then it returns 0.
func ( CommandTag) () int64 {
	// Parse the number from the end in a single pass.
	var  int64
	var  int64 = 1

	for  := len(.s) - 1;  >= 0; -- {
		 := .s[]
		if  >= '0' &&  <= '9' {
			 += int64(-'0') * 
			 *= 10
		} else {
			break
		}
	}

	return 
}

func ( CommandTag) () string {
	return .s
}

// Insert is true if the command tag starts with "INSERT".
func ( CommandTag) () bool {
	return strings.HasPrefix(.s, "INSERT")
}

// Update is true if the command tag starts with "UPDATE".
func ( CommandTag) () bool {
	return strings.HasPrefix(.s, "UPDATE")
}

// Delete is true if the command tag starts with "DELETE".
func ( CommandTag) () bool {
	return strings.HasPrefix(.s, "DELETE")
}

// Select is true if the command tag starts with "SELECT".
func ( CommandTag) () bool {
	return strings.HasPrefix(.s, "SELECT")
}

type FieldDescription struct {
	Name                 string
	TableOID             uint32
	TableAttributeNumber uint16
	DataTypeOID          uint32
	DataTypeSize         int16
	TypeModifier         int32
	Format               int16
}

func ( *PgConn) ( int) []FieldDescription {
	if cap(.fieldDescriptions) >=  {
		return .fieldDescriptions[::]
	} else {
		return make([]FieldDescription, )
	}
}

func ( []FieldDescription,  *pgproto3.RowDescription) {
	for  := range .Fields {
		[].Name = string(.Fields[].Name)
		[].TableOID = .Fields[].TableOID
		[].TableAttributeNumber = .Fields[].TableAttributeNumber
		[].DataTypeOID = .Fields[].DataTypeOID
		[].DataTypeSize = .Fields[].DataTypeSize
		[].TypeModifier = .Fields[].TypeModifier
		[].Format = .Fields[].Format
	}
}

type StatementDescription struct {
	Name      string
	SQL       string
	ParamOIDs []uint32
	Fields    []FieldDescription
}

// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This
// allows Prepare to also to describe statements without creating a server-side prepared statement.
//
// Prepare does not send a PREPARE statement to the server. It uses the PostgreSQL Parse and Describe protocol messages
// directly.
//
// In extremely rare cases, Prepare may fail after the Parse is successful, but before the Describe is complete. In this
// case, the returned error will be an error where errors.As with a *PrepareError succeeds and the *PrepareError has
// ParseComplete set to true.
func ( *PgConn) ( context.Context, ,  string,  []uint32) (*StatementDescription, error) {
	if  := .lock();  != nil {
		return nil, 
	}
	defer .unlock()

	if  != context.Background() {
		select {
		case <-.Done():
			return nil, newContextAlreadyDoneError()
		default:
		}
		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	.frontend.SendParse(&pgproto3.Parse{Name: , Query: , ParameterOIDs: })
	.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: })
	.frontend.SendSync(&pgproto3.Sync{})
	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		return nil, 
	}

	 := &StatementDescription{Name: , SQL: }

	var  bool
	var  *PgError

:
	for {
		,  := .receiveMessage()
		if  != nil {
			.asyncClose()
			return nil, normalizeTimeoutError(, )
		}

		switch msg := .(type) {
		case *pgproto3.ParseComplete:
			 = true
		case *pgproto3.ParameterDescription:
			.ParamOIDs = make([]uint32, len(.ParameterOIDs))
			copy(.ParamOIDs, .ParameterOIDs)
		case *pgproto3.RowDescription:
			.Fields = make([]FieldDescription, len(.Fields))
			convertRowDescription(.Fields, )
		case *pgproto3.ErrorResponse:
			 = ErrorResponseToPgError()
		case *pgproto3.ReadyForQuery:
			break 
		}
	}

	if  != nil {
		return nil, &PrepareError{err: , ParseComplete: }
	}
	return , nil
}

// Deallocate deallocates a prepared statement.
//
// Deallocate does not send a DEALLOCATE statement to the server. It uses the PostgreSQL Close protocol message
// directly. This has slightly different behavior than executing DEALLOCATE statement.
//   - Deallocate can succeed in an aborted transaction.
//   - Deallocating a non-existent prepared statement is not an error.
func ( *PgConn) ( context.Context,  string) error {
	if  := .lock();  != nil {
		return 
	}
	defer .unlock()

	if  != context.Background() {
		select {
		case <-.Done():
			return newContextAlreadyDoneError()
		default:
		}
		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: })
	.frontend.SendSync(&pgproto3.Sync{})
	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		return 
	}

	for {
		,  := .receiveMessage()
		if  != nil {
			.asyncClose()
			return normalizeTimeoutError(, )
		}

		switch msg := .(type) {
		case *pgproto3.ErrorResponse:
			return ErrorResponseToPgError()
		case *pgproto3.ReadyForQuery:
			return nil
		}
	}
}

// ErrorResponseToPgError converts a wire protocol error message to a *PgError.
func ( *pgproto3.ErrorResponse) *PgError {
	return &PgError{
		Severity:            .Severity,
		SeverityUnlocalized: .SeverityUnlocalized,
		Code:                string(.Code),
		Message:             string(.Message),
		Detail:              string(.Detail),
		Hint:                .Hint,
		Position:            .Position,
		InternalPosition:    .InternalPosition,
		InternalQuery:       string(.InternalQuery),
		Where:               string(.Where),
		SchemaName:          string(.SchemaName),
		TableName:           string(.TableName),
		ColumnName:          string(.ColumnName),
		DataTypeName:        string(.DataTypeName),
		ConstraintName:      .ConstraintName,
		File:                string(.File),
		Line:                .Line,
		Routine:             string(.Routine),
	}
}

func ( *pgproto3.NoticeResponse) *Notice {
	 := ErrorResponseToPgError((*pgproto3.ErrorResponse)())
	return (*Notice)()
}

// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel
// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there
// is no way to be sure a query was canceled.
// See https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS
func ( *PgConn) ( context.Context) error {
	// Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing
	// the connection config. This is important in high availability configurations where fallback connections may be
	// specified or DNS may be used to load balance.
	 := .conn.RemoteAddr()
	var  string
	var  string
	if .Network() == "unix" {
		// for unix sockets, RemoteAddr() calls getpeername() which returns the name the
		// server passed to bind(). For Postgres, this is always a relative path "./.s.PGSQL.5432"
		// so connecting to it will fail. Fall back to the config's value
		,  = NetworkAddress(.config.Host, .config.Port)
	} else {
		,  = .Network(), .String()
	}
	,  := .config.DialFunc(, , )
	if  != nil {
		// In case of unix sockets, RemoteAddr() returns only the file part of the path. If the
		// first connect failed, try the config.
		if .Network() != "unix" {
			return 
		}
		,  := NetworkAddress(.config.Host, .config.Port)
		,  = .config.DialFunc(, , )
		if  != nil {
			return 
		}
	}
	defer .Close()

	if  != context.Background() {
		 := ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: })
		.Watch()
		defer .Unwatch()
	}

	 := make([]byte, 12+len(.secretKey))
	binary.BigEndian.PutUint32([0:4], uint32(len()))
	binary.BigEndian.PutUint32([4:8], 80877102)
	binary.BigEndian.PutUint32([8:12], .pid)
	copy([12:], .secretKey)

	if ,  := .Write();  != nil {
		return fmt.Errorf("write to connection for cancellation: %w", )
	}

	// Wait for the cancel request to be acknowledged by the server.
	// It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960
	_, _ = .Read()

	return nil
}

// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not
// received.
func ( *PgConn) ( context.Context) error {
	if  := .lock();  != nil {
		return 
	}
	defer .unlock()

	if  != context.Background() {
		select {
		case <-.Done():
			return newContextAlreadyDoneError()
		default:
		}

		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	for {
		,  := .receiveMessage()
		if  != nil {
			return normalizeTimeoutError(, )
		}

		switch .(type) {
		case *pgproto3.NotificationResponse:
			return nil
		}
	}
}

// Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is
// implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control
// statements.
//
// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries.
func ( *PgConn) ( context.Context,  string) *MultiResultReader {
	if  := .lock();  != nil {
		return &MultiResultReader{
			closed: true,
			err:    ,
		}
	}

	.multiResultReader = MultiResultReader{
		pgConn: ,
		ctx:    ,
	}
	 := &.multiResultReader
	if  != context.Background() {
		select {
		case <-.Done():
			.closed = true
			.err = newContextAlreadyDoneError()
			.unlock()
			return 
		default:
		}
		.contextWatcher.Watch()
	}

	.frontend.SendQuery(&pgproto3.Query{String: })
	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		.contextWatcher.Unwatch()
		.closed = true
		.err = 
		.unlock()
		return 
	}

	return 
}

// ExecParams executes a command via the PostgreSQL extended query protocol.
//
// sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3,
// etc.
//
// paramValues are the parameter values. It must be encoded in the format given by paramFormats.
//
// paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for
// all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter.
// ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues).
//
// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or
// binary format. If paramFormats is nil all params are text format. ExecParams will panic if
// len(paramFormats) is not 0, 1, or len(paramValues).
//
// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
// binary format. If resultFormats is nil all results will be in text format.
//
// ResultReader must be closed before PgConn can be used again.
func ( *PgConn) ( context.Context,  string,  [][]byte,  []uint32, ,  []int16) *ResultReader {
	 := .execExtendedPrefix(, )
	if .closed {
		return 
	}

	.frontend.SendParse(&pgproto3.Parse{Query: , ParameterOIDs: })
	.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: , Parameters: , ResultFormatCodes: })

	.execExtendedSuffix(, nil, nil)

	return 
}

// ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol.
//
// paramValues are the parameter values. It must be encoded in the format given by paramFormats.
//
// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or
// binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if
// len(paramFormats) is not 0, 1, or len(paramValues).
//
// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
// binary format. If resultFormats is nil all results will be in text format.
//
// ResultReader must be closed before PgConn can be used again.
func ( *PgConn) ( context.Context,  string,  [][]byte, ,  []int16) *ResultReader {
	 := .execExtendedPrefix(, )
	if .closed {
		return 
	}

	.frontend.SendBind(&pgproto3.Bind{PreparedStatement: , ParameterFormatCodes: , Parameters: , ResultFormatCodes: })

	.execExtendedSuffix(, nil, nil)

	return 
}

// ExecStatement enqueues the execution of a prepared statement via the PostgreSQL extended query protocol.
//
// This differs from ExecPrepared in that it takes a *StatementDescription instead of the prepared statement name.
// Because it has the *StatementDescription it can avoid the Describe Portal message that ExecPrepared must send to get
// the result column descriptions.
//
// paramValues are the parameter values. It must be encoded in the format given by paramFormats.
//
// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or
// binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if len(paramFormats) is not
// 0, 1, or len(paramValues).
//
// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or binary
// format. If resultFormats is nil all results will be in text format.
//
// ResultReader must be closed before PgConn can be used again.
func ( *PgConn) ( context.Context,  *StatementDescription,  [][]byte, ,  []int16) *ResultReader {
	 := .execExtendedPrefix(, )
	if .closed {
		return 
	}

	.frontend.SendBind(&pgproto3.Bind{PreparedStatement: .Name, ParameterFormatCodes: , Parameters: , ResultFormatCodes: })

	.execExtendedSuffix(, , )

	return 
}

func ( *PgConn) ( context.Context,  [][]byte) *ResultReader {
	.resultReader = ResultReader{
		pgConn: ,
		ctx:    ,
	}
	 := &.resultReader

	if  := .lock();  != nil {
		.concludeCommand(CommandTag{}, )
		.closed = true
		return 
	}

	if len() > math.MaxUint16 {
		.concludeCommand(CommandTag{}, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16))
		.closed = true
		.unlock()
		return 
	}

	if  != context.Background() {
		select {
		case <-.Done():
			.concludeCommand(CommandTag{}, newContextAlreadyDoneError())
			.closed = true
			.unlock()
			return 
		default:
		}
		.contextWatcher.Watch()
	}

	return 
}

func ( *PgConn) ( *ResultReader,  *StatementDescription,  []int16) {
	if  == nil {
		.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
	}
	.frontend.SendExecute(&pgproto3.Execute{})
	.frontend.SendSync(&pgproto3.Sync{})

	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		.concludeCommand(CommandTag{}, )
		.contextWatcher.Unwatch()
		.closed = true
		.unlock()
		return
	}

	.readUntilRowDescription(, )
}

// CopyTo executes the copy command sql and copies the results to w.
func ( *PgConn) ( context.Context,  io.Writer,  string) (CommandTag, error) {
	if  := .lock();  != nil {
		return CommandTag{}, 
	}

	if  != context.Background() {
		select {
		case <-.Done():
			.unlock()
			return CommandTag{}, newContextAlreadyDoneError()
		default:
		}
		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	// Send copy to command
	.frontend.SendQuery(&pgproto3.Query{String: })

	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		.unlock()
		return CommandTag{}, 
	}

	// Read results
	var  CommandTag
	var  error
	for {
		,  := .receiveMessage()
		if  != nil {
			.asyncClose()
			return CommandTag{}, normalizeTimeoutError(, )
		}

		switch msg := .(type) {
		case *pgproto3.CopyDone:
		case *pgproto3.CopyData:
			,  := .Write(.Data)
			if  != nil {
				.asyncClose()
				return CommandTag{}, 
			}
		case *pgproto3.ReadyForQuery:
			.unlock()
			return , 
		case *pgproto3.CommandComplete:
			 = .makeCommandTag(.CommandTag)
		case *pgproto3.ErrorResponse:
			 = ErrorResponseToPgError()
		}
	}
}

// CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server.
//
// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r
// could still block.
func ( *PgConn) ( context.Context,  io.Reader,  string) (CommandTag, error) {
	if  := .lock();  != nil {
		return CommandTag{}, 
	}
	defer .unlock()

	if  != context.Background() {
		select {
		case <-.Done():
			return CommandTag{}, newContextAlreadyDoneError()
		default:
		}
		.contextWatcher.Watch()
		defer .contextWatcher.Unwatch()
	}

	// Send copy from query
	.frontend.SendQuery(&pgproto3.Query{String: })
	 := .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		return CommandTag{}, 
	}

	// Send copy data
	 := make(chan struct{})
	 := make(chan error, 1)
	 := .signalMessage()
	var  sync.WaitGroup
	.Go(func() {
		 := iobufpool.Get(65536)
		defer iobufpool.Put()
		(*)[0] = 'd'

		for {
			,  := .Read((*)[5:cap(*)])
			if  > 0 {
				* = (*)[0 : +5]
				pgio.SetInt32((*)[1:], int32(+4))

				 := .frontend.SendUnbufferedEncodedCopyData(*)
				if  != nil {
					// Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not
					// setting pgConn.status or closing pgConn.cleanupDone for the same reason.
					.conn.Close()

					 <- 
					return
				}
			}
			if  != nil {
				 <- 
				return
			}

			select {
			case <-:
				return
			default:
			}
		}
	})

	var  error
	var  error
	for  == nil &&  == nil {
		select {
		case  = <-:
		case <-:
			// If pgConn.receiveMessage encounters an error it will call pgConn.asyncClose. But that is a race condition with
			// the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an
			// error is found then forcibly close the connection without sending the Terminate message.
			if  := .bufferingReceiveErr;  != nil {
				.status = connStatusClosed
				.conn.Close()
				close(.cleanupDone)
				return CommandTag{}, normalizeTimeoutError(, )
			}
			// peekMessage never returns err in the bufferingReceive mode - it only forwards the bufferingReceive variables.
			// Therefore, the only case for receiveMessage to return err is during handling of the ErrorResponse message type
			// and using pgOnError handler to determine the connection is no longer valid (and thus closing the conn).
			,  := .receiveMessage()
			if  != nil {
				close()
				return CommandTag{}, 
			}

			switch msg := .(type) {
			case *pgproto3.ErrorResponse:
				 = ErrorResponseToPgError()
			default:
				 = .signalMessage()
			}
		}
	}
	close()
	// Make sure io goroutine finishes before writing.
	.Wait()

	if  == io.EOF ||  != nil {
		.frontend.Send(&pgproto3.CopyDone{})
	} else {
		.frontend.Send(&pgproto3.CopyFail{Message: .Error()})
	}
	 = .flushWithPotentialWriteReadDeadlock()
	if  != nil {
		.asyncClose()
		return CommandTag{}, 
	}

	// Read results
	var  CommandTag
	for {
		,  := .receiveMessage()
		if  != nil {
			.asyncClose()
			return CommandTag{}, normalizeTimeoutError(, )
		}

		switch msg := .(type) {
		case *pgproto3.ReadyForQuery:
			return , 
		case *pgproto3.CommandComplete:
			 = .makeCommandTag(.CommandTag)
		case *pgproto3.ErrorResponse:
			 = ErrorResponseToPgError()
		}
	}
}

// MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch.
type MultiResultReader struct {
	pgConn *PgConn
	ctx    context.Context

	rr *ResultReader

	// Data from when the batch was queued.
	statementDescriptions []*StatementDescription
	resultFormats         [][]int16

	closed bool
	err    error
}

// ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods.
func ( *MultiResultReader) () ([]*Result, error) {
	var  []*Result

	for .NextResult() {
		 = append(, .ResultReader().Read())
	}
	 := .Close()

	return , 
}

func ( *MultiResultReader) () (pgproto3.BackendMessage, error) {
	,  := .pgConn.receiveMessage()
	if  != nil {
		.pgConn.contextWatcher.Unwatch()
		.err = normalizeTimeoutError(.ctx, )
		.closed = true
		.pgConn.asyncClose()
		return nil, .err
	}

	switch msg := .(type) {
	case *pgproto3.ReadyForQuery:
		.closed = true
		.pgConn.contextWatcher.Unwatch()
		.pgConn.unlock()
	case *pgproto3.ErrorResponse:
		.err = ErrorResponseToPgError()
	}

	return , nil
}

// NextResult returns advances the MultiResultReader to the next result and returns true if a result is available.
func ( *MultiResultReader) () bool {
	for !.closed && .err == nil {
		,  := .pgConn.peekMessage()
		if ,  := .(*pgproto3.DataRow);  {
			if len(.statementDescriptions) > 0 {
				 := ResultReader{
					pgConn:            .pgConn,
					multiResultReader: ,
					ctx:               .ctx,
				}

				// This result corresponds to a prepared statement description that was provided when queuing the batch.
				 := .statementDescriptions[0]
				.statementDescriptions = .statementDescriptions[1:]

				 := .resultFormats[0]
				.resultFormats = .resultFormats[1:]

				 := .Fields
				.fieldDescriptions = .pgConn.getFieldDescriptionSlice(len())

				 := combineFieldDescriptionsAndResultFormats(.fieldDescriptions, , )
				if  != nil {
					.concludeCommand(CommandTag{}, )
				}

				.pgConn.resultReader = 
				.rr = &.pgConn.resultReader
				return true
			}

			.err = fmt.Errorf("unexpected DataRow message without preceding RowDescription")
			return false
		}

		,  := .receiveMessage()
		if  != nil {
			return false
		}

		switch msg := .(type) {
		case *pgproto3.RowDescription:
			.pgConn.resultReader = ResultReader{
				pgConn:            .pgConn,
				multiResultReader: ,
				ctx:               .ctx,
				fieldDescriptions: .pgConn.getFieldDescriptionSlice(len(.Fields)),
			}
			convertRowDescription(.pgConn.resultReader.fieldDescriptions, )

			.rr = &.pgConn.resultReader
			return true
		case *pgproto3.CommandComplete:
			.pgConn.resultReader = ResultReader{
				commandTag:       .pgConn.makeCommandTag(.CommandTag),
				commandConcluded: true,
				closed:           true,
			}
			.rr = &.pgConn.resultReader
			return true
		case *pgproto3.EmptyQueryResponse:
			.pgConn.resultReader = ResultReader{
				commandConcluded: true,
				closed:           true,
			}
			.rr = &.pgConn.resultReader
			return true
		}
	}

	return false
}

// ResultReader returns the current ResultReader.
func ( *MultiResultReader) () *ResultReader {
	return .rr
}

// Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use.
func ( *MultiResultReader) () error {
	for !.closed {
		,  := .receiveMessage()
		if  != nil {
			return .err
		}
	}

	return .err
}

// ResultReader is a reader for the result of a single query.
type ResultReader struct {
	pgConn            *PgConn
	multiResultReader *MultiResultReader
	pipeline          *Pipeline
	ctx               context.Context

	fieldDescriptions []FieldDescription
	rowValues         [][]byte
	commandTag        CommandTag
	preloaded         bool
	commandConcluded  bool
	closed            bool
	err               error
}

// Result is the saved query response that is returned by calling Read on a ResultReader.
type Result struct {
	FieldDescriptions []FieldDescription
	Rows              [][][]byte
	CommandTag        CommandTag
	Err               error
}

// Read saves the query response to a Result.
func ( *ResultReader) () *Result {
	 := &Result{}

	for .NextRow() {
		if .FieldDescriptions == nil {
			.FieldDescriptions = make([]FieldDescription, len(.FieldDescriptions()))
			copy(.FieldDescriptions, .FieldDescriptions())
		}

		 := .Values()
		 := make([][]byte, len())
		for  := range  {
			if [] != nil {
				[] = make([]byte, len([]))
				copy([], [])
			}
		}
		.Rows = append(.Rows, )
	}

	.CommandTag, .Err = .Close()

	return 
}

// NextRow advances the ResultReader to the next row and returns true if a row is available.
func ( *ResultReader) () bool {
	if .preloaded {
		.preloaded = false
		return true
	}

	for !.commandConcluded {
		,  := .receiveMessage()
		if  != nil {
			return false
		}

		switch msg := .(type) {
		case *pgproto3.DataRow:
			.rowValues = .Values
			return true
		}
	}

	return false
}

func ( *ResultReader) ( [][]byte) {
	.rowValues = 
	.preloaded = true
}

// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until
// the ResultReader is closed. It may return nil (for example, if the query did not return a result set or an error was
// encountered.)
func ( *ResultReader) () []FieldDescription {
	return .fieldDescriptions
}

// Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only
// valid until the next NextRow call or the ResultReader is closed.
func ( *ResultReader) () [][]byte {
	return .rowValues
}

// Close consumes any remaining result data and returns the command tag or
// error.
func ( *ResultReader) () (CommandTag, error) {
	if .closed {
		return .commandTag, .err
	}
	.closed = true

	for !.commandConcluded {
		,  := .receiveMessage()
		if  != nil {
			return CommandTag{}, .err
		}
	}

	if .multiResultReader == nil && .pipeline == nil {
		for {
			,  := .receiveMessage()
			if  != nil {
				return CommandTag{}, .err
			}

			switch msg := .(type) {
			// Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete.
			case *pgproto3.ErrorResponse:
				.err = ErrorResponseToPgError()
			case *pgproto3.ReadyForQuery:
				.pgConn.contextWatcher.Unwatch()
				.pgConn.unlock()
				return .commandTag, .err
			}
		}
	}

	return .commandTag, .err
}

// readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any
// error will be stored in the ResultReader.
func ( *ResultReader) ( *StatementDescription,  []int16) {
	for !.commandConcluded {
		,  := .receiveMessage()
		switch msg := .(type) {
		case *pgproto3.RowDescription:
			return
		case *pgproto3.DataRow:
			.preloadRowValues(.Values)
			if  != nil {
				 := .Fields
				.fieldDescriptions = .pgConn.getFieldDescriptionSlice(len())

				 := combineFieldDescriptionsAndResultFormats(.fieldDescriptions, , )
				if  != nil {
					.concludeCommand(CommandTag{}, )
				}
			}
			return
		case *pgproto3.CommandComplete:
			if  != nil {
				 := .Fields
				.fieldDescriptions = .pgConn.getFieldDescriptionSlice(len())

				 := combineFieldDescriptionsAndResultFormats(.fieldDescriptions, , )
				if  != nil {
					.concludeCommand(CommandTag{}, )
				}
			}
			return
		}
	}
}

func ( *ResultReader) () ( pgproto3.BackendMessage,  error) {
	if .multiResultReader == nil {
		,  = .pgConn.receiveMessage()
	} else {
		,  = .multiResultReader.receiveMessage()
	}

	if  != nil {
		 = normalizeTimeoutError(.ctx, )
		.concludeCommand(CommandTag{}, )
		.pgConn.contextWatcher.Unwatch()
		.closed = true
		if .multiResultReader == nil {
			.pgConn.asyncClose()
		}

		return nil, .err
	}

	switch msg := .(type) {
	case *pgproto3.RowDescription:
		.fieldDescriptions = .pgConn.getFieldDescriptionSlice(len(.Fields))
		convertRowDescription(.fieldDescriptions, )
	case *pgproto3.CommandComplete:
		.concludeCommand(.pgConn.makeCommandTag(.CommandTag), nil)
	case *pgproto3.EmptyQueryResponse:
		.concludeCommand(CommandTag{}, nil)
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		if .pipeline != nil {
			.pipeline.state.HandleError()
		}
		.concludeCommand(CommandTag{}, )
	}

	return , nil
}

func ( *ResultReader) ( CommandTag,  error) {
	// Keep the first error that is recorded. Store the error before checking if the command is already concluded to
	// allow for receiving an error after CommandComplete but before ReadyForQuery.
	if  != nil && .err == nil {
		.err = 
	}

	if .commandConcluded {
		return
	}

	.commandTag = 
	.rowValues = nil
	.commandConcluded = true
}

// Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip.
type Batch struct {
	buf                   []byte
	statementDescriptions []*StatementDescription
	resultFormats         [][]int16
	err                   error
}

// ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions.
func ( *Batch) ( string,  [][]byte,  []uint32, ,  []int16) {
	if .err != nil {
		return
	}

	.buf, .err = (&pgproto3.Parse{Query: , ParameterOIDs: }).Encode(.buf)
	if .err != nil {
		return
	}
	.ExecPrepared("", , , )
}

// ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions.
func ( *Batch) ( string,  [][]byte, ,  []int16) {
	if .err != nil {
		return
	}

	.buf, .err = (&pgproto3.Bind{PreparedStatement: , ParameterFormatCodes: , Parameters: , ResultFormatCodes: }).Encode(.buf)
	if .err != nil {
		return
	}

	.buf, .err = (&pgproto3.Describe{ObjectType: 'P'}).Encode(.buf)
	if .err != nil {
		return
	}

	.buf, .err = (&pgproto3.Execute{}).Encode(.buf)
	if .err != nil {
		return
	}
}

// ExecStatement appends an ExecStatement command to the batch. See PgConn.ExecPrepared for parameter descriptions.
//
// This differs from ExecPrepared in that it takes a *StatementDescription instead of just the prepared statement name.
// Because it has the *StatementDescription it can avoid the Describe Portal message that ExecPrepared must send to get
// the result column descriptions.
func ( *Batch) ( *StatementDescription,  [][]byte, ,  []int16) {
	if .err != nil {
		return
	}

	.buf, .err = (&pgproto3.Bind{PreparedStatement: .Name, ParameterFormatCodes: , Parameters: , ResultFormatCodes: }).Encode(.buf)
	if .err != nil {
		return
	}

	.statementDescriptions = append(.statementDescriptions, )
	.resultFormats = append(.resultFormats, )

	.buf, .err = (&pgproto3.Execute{}).Encode(.buf)
	if .err != nil {
		return
	}
}

// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a
// transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing
// multiple queries in a single round trip than using pipeline mode.
func ( *PgConn) ( context.Context,  *Batch) *MultiResultReader {
	if .err != nil {
		return &MultiResultReader{
			closed: true,
			err:    .err,
		}
	}

	if  := .lock();  != nil {
		return &MultiResultReader{
			closed: true,
			err:    ,
		}
	}

	.multiResultReader = MultiResultReader{
		pgConn:                ,
		ctx:                   ,
		statementDescriptions: .statementDescriptions,
		resultFormats:         .resultFormats,
	}
	 := &.multiResultReader

	if  != context.Background() {
		select {
		case <-.Done():
			.closed = true
			.err = newContextAlreadyDoneError()
			.unlock()
			return 
		default:
		}
		.contextWatcher.Watch()
	}

	.buf, .err = (&pgproto3.Sync{}).Encode(.buf)
	if .err != nil {
		.contextWatcher.Unwatch()
		.err = normalizeTimeoutError(.ctx, .err)
		.closed = true
		.asyncClose()
		return 
	}

	,  := func( []byte) (int, error) {
		.enterPotentialWriteReadDeadlock()
		defer .exitPotentialWriteReadDeadlock()
		return .conn.Write()
	}(.buf)
	if  != nil {
		.contextWatcher.Unwatch()
		.err = normalizeTimeoutError(.ctx, )
		.closed = true
		.asyncClose()
		return 
	}

	return 
}

// EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include
// the surrounding single quotes.
//
// The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these
// conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future.
func ( *PgConn) ( string) (string, error) {
	if .ParameterStatus("standard_conforming_strings") != "on" {
		return "", errors.New("EscapeString must be run with standard_conforming_strings=on")
	}

	if .ParameterStatus("client_encoding") != "UTF8" {
		return "", errors.New("EscapeString must be run with client_encoding=UTF8")
	}

	return strings.Replace(, "'", "''", -1), nil
}

// CheckConn checks the underlying connection without writing any bytes. This is currently implemented by doing a read
// with a very short deadline. This can be useful because a TCP connection can be broken such that a write will appear
// to succeed even though it will never actually reach the server. Reading immediately before a write will detect this
// condition. If this is done immediately before sending a query it reduces the chances a query will be sent that fails
// without the client knowing whether the server received it or not.
//
// Deprecated: CheckConn is deprecated in favor of Ping. CheckConn cannot detect all types of broken connections where
// the write would still appear to succeed. Prefer Ping unless on a high latency connection.
func ( *PgConn) () error {
	,  := context.WithTimeout(context.Background(), 1*time.Millisecond)
	defer ()

	,  := .ReceiveMessage()
	if  != nil {
		if !Timeout() {
			return 
		}
	}

	return nil
}

// Ping pings the server. This can be useful because a TCP connection can be broken such that a write will appear to
// succeed even though it will never actually reach the server. Pinging immediately before sending a query reduces the
// chances a query will be sent that fails without the client knowing whether the server received it or not.
func ( *PgConn) ( context.Context) error {
	return .Exec(, "-- ping").Close()
}

// makeCommandTag makes a CommandTag. It does not retain a reference to buf or buf's underlying memory.
func ( *PgConn) ( []byte) CommandTag {
	return CommandTag{s: string()}
}

// enterPotentialWriteReadDeadlock must be called before a write that could deadlock if the server is simultaneously
// blocked writing to us.
func ( *PgConn) () {
	// The time to wait is somewhat arbitrary. A Write should only take as long as the syscall and memcpy to the OS
	// outbound network buffer unless the buffer is full (which potentially is a block). It needs to be long enough for
	// the normal case, but short enough not to kill performance if a block occurs.
	//
	// In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is
	// ineffective.
	if .slowWriteTimer.Reset(15 * time.Millisecond) {
		panic("BUG: slow write timer already active")
	}
}

// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
func ( *PgConn) () {
	if !.slowWriteTimer.Stop() {
		// The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has
		// started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a
		// serious problem. But what is a serious problem is that the background reader may start at an inopportune time in
		// a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to
		// interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background
		// reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error.
		<-.bgReaderStarted
		.bgReader.Stop()
	}
}

func ( *PgConn) () error {
	.enterPotentialWriteReadDeadlock()
	defer .exitPotentialWriteReadDeadlock()
	 := .frontend.Flush()
	return 
}

// SyncConn prepares the underlying net.Conn for direct use. PgConn may internally buffer reads or use goroutines for
// background IO. This means that any direct use of the underlying net.Conn may be corrupted if a read is already
// buffered or a read is in progress. SyncConn drains read buffers and stops background IO. In some cases this may
// require sending a ping to the server. ctx can be used to cancel this operation. This should be called before any
// operation that will use the underlying net.Conn directly. e.g. Before Conn() or Hijack().
//
// This should not be confused with the PostgreSQL protocol Sync message.
func ( *PgConn) ( context.Context) error {
	for range 10 {
		if .bgReader.Status() == bgreader.StatusStopped && .frontend.ReadBufferLen() == 0 {
			return nil
		}

		 := .Ping()
		if  != nil {
			return fmt.Errorf("SyncConn: Ping failed while syncing conn: %w", )
		}
	}

	// This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as
	// LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
	return errors.New("SyncConn: conn never synchronized")
}

// CustomData returns a map that can be used to associate custom data with the connection.
func ( *PgConn) () map[string]any {
	return .customData
}

// HijackedConn is the result of hijacking a connection.
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
type HijackedConn struct {
	Conn              net.Conn
	PID               uint32            // backend pid
	SecretKey         []byte            // key to use to send a cancel query message to the server
	ParameterStatuses map[string]string // parameters that have been reported by the server
	TxStatus          byte
	Frontend          *pgproto3.Frontend
	Config            *Config
	CustomData        map[string]any
}

// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately
// before Hijack. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish
// a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy).
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
func ( *PgConn) () (*HijackedConn, error) {
	if  := .lock();  != nil {
		return nil, 
	}
	.status = connStatusClosed

	return &HijackedConn{
		Conn:              .conn,
		PID:               .pid,
		SecretKey:         .secretKey,
		ParameterStatuses: .parameterStatuses,
		TxStatus:          .txStatus,
		Frontend:          .frontend,
		Config:            .config,
		CustomData:        .customData,
	}, nil
}

// Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of
// PgConn.Hijack. The connection must be in an idle state.
//
// hc.Frontend is replaced by a new pgproto3.Frontend built by hc.Config.BuildFrontend.
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
func ( *HijackedConn) (*PgConn, error) {
	 := &PgConn{
		conn:              .Conn,
		pid:               .PID,
		secretKey:         .SecretKey,
		parameterStatuses: .ParameterStatuses,
		txStatus:          .TxStatus,
		frontend:          .Frontend,
		config:            .Config,
		customData:        .CustomData,

		status: connStatusIdle,

		cleanupDone: make(chan struct{}),
	}

	.contextWatcher = ctxwatch.NewContextWatcher(.Config.BuildContextWatcherHandler())
	.bgReader = bgreader.New(.conn)
	.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
		func() {
			.bgReader.Start()
			.bgReaderStarted <- struct{}{}
		},
	)
	.slowWriteTimer.Stop()
	.bgReaderStarted = make(chan struct{})
	.frontend = .Config.BuildFrontend(.bgReader, .conn)

	return , nil
}

// Pipeline represents a connection in pipeline mode.
//
// SendPrepare, SendQueryParams, SendQueryPrepared, and SendQueryStatement queue requests to the server. These requests
// are not written until pipeline is flushed by Flush or Sync. Sync must be called after the last request is queued.
// Requests between synchronization points are implicitly transactional unless explicit transaction control statements
// have been issued.
//
// The context the pipeline was started with is in effect for the entire life of the Pipeline.
//
// For a deeper understanding of pipeline mode see the PostgreSQL documentation for the extended query protocol
// (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) and the libpq pipeline mode
// (https://www.postgresql.org/docs/current/libpq-pipeline-mode.html).
type Pipeline struct {
	conn *PgConn
	ctx  context.Context

	state  pipelineState
	err    error
	closed bool
}

// PipelineSync is returned by GetResults when a ReadyForQuery message is received.
type PipelineSync struct{}

// CloseComplete is returned by GetResults when a CloseComplete message is received.
type CloseComplete struct{}

type pipelineRequestType int

const (
	pipelineNil pipelineRequestType = iota
	pipelinePrepare
	pipelineQueryParams
	pipelineQueryPrepared
	pipelineQueryStatement
	pipelineDeallocate
	pipelineSyncRequest
	pipelineFlushRequest
)

type pipelineRequestEvent struct {
	RequestType       pipelineRequestType
	WasSentToServer   bool
	BeforeFlushOrSync bool
}

type pipelineState struct {
	requestEventQueue          list.List
	statementDescriptionsQueue list.List
	resultFormatsQueue         list.List
	lastRequestType            pipelineRequestType
	pgErr                      *PgError
	expectedReadyForQueryCount int
}

func ( *pipelineState) () {
	.requestEventQueue.Init()
	.statementDescriptionsQueue.Init()
	.resultFormatsQueue.Init()
	.lastRequestType = pipelineNil
}

func ( *pipelineState) () {
	for  := .requestEventQueue.Back();  != nil;  = .Prev() {
		 := .Value.(pipelineRequestEvent)
		if .WasSentToServer {
			return
		}
		.WasSentToServer = true
		.Value = 
	}
}

func ( *pipelineState) () {
	for  := .requestEventQueue.Back();  != nil;  = .Prev() {
		 := .Value.(pipelineRequestEvent)
		if .BeforeFlushOrSync {
			return
		}
		.BeforeFlushOrSync = true
		.Value = 
	}
}

func ( *pipelineState) ( pipelineRequestType) {
	if  == pipelineNil {
		return
	}

	if  != pipelineFlushRequest {
		.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: })
	}
	if  == pipelineFlushRequest ||  == pipelineSyncRequest {
		.registerFlushingBufferOnServer()
	}
	.lastRequestType = 

	if  == pipelineSyncRequest {
		.expectedReadyForQueryCount++
	}
}

func ( *pipelineState) () pipelineRequestType {
	for {
		 := .requestEventQueue.Front()
		if  == nil {
			return pipelineNil
		}
		 := .Value.(pipelineRequestEvent)
		if !(.WasSentToServer && .BeforeFlushOrSync) {
			return pipelineNil
		}

		.requestEventQueue.Remove()
		if .RequestType == pipelineSyncRequest {
			.pgErr = nil
		}
		if .pgErr == nil {
			return .RequestType
		}
	}
}

func ( *pipelineState) ( *StatementDescription,  []int16) {
	.statementDescriptionsQueue.PushBack()
	.resultFormatsQueue.PushBack()
}

func ( *pipelineState) () (*StatementDescription, []int16) {
	 := .statementDescriptionsQueue.Front()
	var  *StatementDescription
	if  != nil {
		.statementDescriptionsQueue.Remove()
		 = .Value.(*StatementDescription)
	}

	 := .resultFormatsQueue.Front()
	var  []int16
	if  != nil {
		.resultFormatsQueue.Remove()
		 = .Value.([]int16)
	}

	return , 
}

func ( *pipelineState) ( *PgError) {
	.pgErr = 
}

func ( *pipelineState) () {
	.expectedReadyForQueryCount--
}

func ( *pipelineState) () bool {
	var  bool

	if  := .requestEventQueue.Back();  != nil {
		 := .Value.(pipelineRequestEvent)
		 = (.RequestType == pipelineSyncRequest) && .WasSentToServer
	} else {
		 = (.lastRequestType == pipelineSyncRequest) || (.lastRequestType == pipelineNil)
	}

	return !
}

func ( *pipelineState) () int {
	return .expectedReadyForQueryCount
}

// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent
// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection
// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except
// CancelRequest and Close. ctx is in effect for entire life of the *Pipeline.
//
// Prefer ExecBatch when only sending one group of queries at once.
func ( *PgConn) ( context.Context) *Pipeline {
	if  := .lock();  != nil {
		 := &Pipeline{
			closed: true,
			err:    ,
		}
		.state.Init()

		return 
	}

	.resultReader = ResultReader{closed: true}

	.pipeline = Pipeline{
		conn: ,
		ctx:  ,
	}
	.pipeline.state.Init()

	 := &.pipeline

	if  != context.Background() {
		select {
		case <-.Done():
			.closed = true
			.err = newContextAlreadyDoneError()
			.unlock()
			return 
		default:
		}
		.contextWatcher.Watch()
	}

	return 
}

// SendPrepare is the pipeline version of *PgConn.Prepare.
func ( *Pipeline) (,  string,  []uint32) {
	if .closed {
		return
	}

	.conn.frontend.SendParse(&pgproto3.Parse{Name: , Query: , ParameterOIDs: })
	.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: })
	.state.PushBackRequestType(pipelinePrepare)
}

// SendDeallocate deallocates a prepared statement.
func ( *Pipeline) ( string) {
	if .closed {
		return
	}

	.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: })
	.state.PushBackRequestType(pipelineDeallocate)
}

// SendQueryParams is the pipeline version of *PgConn.ExecParams.
func ( *Pipeline) ( string,  [][]byte,  []uint32, ,  []int16) {
	if .closed {
		return
	}

	.conn.frontend.SendParse(&pgproto3.Parse{Query: , ParameterOIDs: })
	.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: , Parameters: , ResultFormatCodes: })
	.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
	.conn.frontend.SendExecute(&pgproto3.Execute{})
	.state.PushBackRequestType(pipelineQueryParams)
}

// SendQueryPrepared is the pipeline version of *PgConn.ExecPrepared.
func ( *Pipeline) ( string,  [][]byte, ,  []int16) {
	if .closed {
		return
	}

	.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: , ParameterFormatCodes: , Parameters: , ResultFormatCodes: })
	.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
	.conn.frontend.SendExecute(&pgproto3.Execute{})
	.state.PushBackRequestType(pipelineQueryPrepared)
}

// SendQueryStatement is the pipeline version of *PgConn.ExecStatement.
func ( *Pipeline) ( *StatementDescription,  [][]byte, ,  []int16) {
	if .closed {
		return
	}

	.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: .Name, ParameterFormatCodes: , Parameters: , ResultFormatCodes: })
	.conn.frontend.SendExecute(&pgproto3.Execute{})
	.state.PushBackRequestType(pipelineQueryStatement)
	.state.PushBackStatementData(, )
}

// SendFlushRequest sends a request for the server to flush its output buffer.
//
// The server flushes its output buffer automatically as a result of Sync being called,
// or on any request when not in pipeline mode; this function is useful to cause the server
// to flush its output buffer in pipeline mode without establishing a synchronization point.
// Note that the request is not itself flushed to the server automatically; use Flush if
// necessary. This copies the behavior of libpq PQsendFlushRequest.
func ( *Pipeline) () {
	if .closed {
		return
	}

	.conn.frontend.Send(&pgproto3.Flush{})
	.state.PushBackRequestType(pipelineFlushRequest)
}

// SendPipelineSync marks a synchronization point in a pipeline by sending a sync message
// without flushing the send buffer. This serves as the delimiter of an implicit
// transaction and an error recovery point.
//
// Note that the request is not itself flushed to the server automatically; use Flush if
// necessary. This copies the behavior of libpq PQsendPipelineSync.
func ( *Pipeline) () {
	if .closed {
		return
	}

	.conn.frontend.SendSync(&pgproto3.Sync{})
	.state.PushBackRequestType(pipelineSyncRequest)
}

// Flush flushes the queued requests without establishing a synchronization point.
func ( *Pipeline) () error {
	if .closed {
		if .err != nil {
			return .err
		}
		return errors.New("pipeline closed")
	}

	 := .conn.flushWithPotentialWriteReadDeadlock()
	if  != nil {
		 = normalizeTimeoutError(.ctx, )

		.conn.asyncClose()

		.conn.contextWatcher.Unwatch()
		.conn.unlock()
		.closed = true
		.err = 
		return 
	}

	.state.RegisterSendingToServer()
	return nil
}

// Sync establishes a synchronization point and flushes the queued requests.
func ( *Pipeline) () error {
	.SendPipelineSync()
	return .Flush()
}

// GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or
// *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no
// results are available, results and err will both be nil.
func ( *Pipeline) () ( any,  error) {
	if .closed {
		if .err != nil {
			return nil, .err
		}
		return nil, errors.New("pipeline closed")
	}

	return .getResults()
}

func ( *Pipeline) () ( any,  error) {
	if !.conn.resultReader.closed {
		,  := .conn.resultReader.Close()
		if  != nil {
			return nil, 
		}
	}

	 := .state.ExtractFrontRequestType()
	switch  {
	case pipelineNil:
		return nil, nil
	case pipelinePrepare:
		return .getResultsPrepare()
	case pipelineQueryParams:
		return .getResultsQueryParams()
	case pipelineQueryPrepared:
		return .getResultsQueryPrepared()
	case pipelineQueryStatement:
		return .getResultsQueryStatement()
	case pipelineDeallocate:
		return .getResultsDeallocate()
	case pipelineSyncRequest:
		return .getResultsSync()
	case pipelineFlushRequest:
		return nil, errors.New("BUG: pipelineFlushRequest should not be in request queue")
	default:
		return nil, errors.New("BUG: unknown pipeline request type")
	}
}

func ( *Pipeline) () (*StatementDescription, error) {
	 := .receiveParseComplete("Prepare")
	if  != nil {
		return nil, 
	}

	 := &StatementDescription{}

	,  := .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.ParameterDescription:
		.ParamOIDs = make([]uint32, len(.ParameterOIDs))
		copy(.ParamOIDs, .ParameterOIDs)
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		return nil, 
	default:
		return nil, .handleUnexpectedMessage("Prepare ParameterDescription", )
	}

	,  = .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.RowDescription:
		.Fields = make([]FieldDescription, len(.Fields))
		convertRowDescription(.Fields, )
		return , nil

	// NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING
	// clause.
	case *pgproto3.NoData:
		return , nil

	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		return nil, 
	default:
		return nil, .handleUnexpectedMessage("Prepare RowDescription", )
	}
}

func ( *Pipeline) () (*ResultReader, error) {
	 := .receiveParseComplete("QueryParams")
	if  != nil {
		return nil, 
	}

	 = .receiveBindComplete("QueryParams")
	if  != nil {
		return nil, 
	}

	return .receiveDescribedResultReader("QueryParams")
}

func ( *Pipeline) () (*ResultReader, error) {
	 := .receiveBindComplete("QueryPrepared")
	if  != nil {
		return nil, 
	}

	return .receiveDescribedResultReader("QueryPrepared")
}

func ( *Pipeline) () (*ResultReader, error) {
	 := .receiveBindComplete("QueryStatement")
	if  != nil {
		return nil, 
	}

	,  := .receiveMessage()
	if  != nil {
		return nil, 
	}

	,  := .state.ExtractFrontStatementData()
	if  == nil {
		return nil, errors.New("BUG: missing statement description or result formats for QueryStatement")
	}
	 := .Fields
	 := .conn.getFieldDescriptionSlice(len())
	 = combineFieldDescriptionsAndResultFormats(, , )
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.DataRow:
		 := ResultReader{
			pgConn:            .conn,
			pipeline:          ,
			ctx:               .ctx,
			fieldDescriptions: ,
		}
		.preloadRowValues(.Values)
		.conn.resultReader = 
		return &.conn.resultReader, nil
	case *pgproto3.CommandComplete:
		.conn.resultReader = ResultReader{
			commandTag:        .conn.makeCommandTag(.CommandTag),
			commandConcluded:  true,
			closed:            true,
			fieldDescriptions: ,
		}
		return &.conn.resultReader, nil
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		.conn.resultReader.closed = true
		return nil, 
	default:
		return nil, .handleUnexpectedMessage("QueryStatement", )
	}
}

func ( *Pipeline) () (*CloseComplete, error) {
	,  := .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.CloseComplete:
		return &CloseComplete{}, nil
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		.conn.resultReader.closed = true
		return nil, 
	default:
		return nil, .handleUnexpectedMessage("Deallocate", )
	}
}

func ( *Pipeline) () (*PipelineSync, error) {
	,  := .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.ReadyForQuery:
		.state.HandleReadyForQuery()
		return &PipelineSync{}, nil
	case *pgproto3.ErrorResponse:
		// Error message that is received while expecting a Sync message still consumes the expected Sync. Put it back.
		.state.requestEventQueue.PushFront(pipelineRequestEvent{RequestType: pipelineSyncRequest, WasSentToServer: true, BeforeFlushOrSync: true})

		 := ErrorResponseToPgError()
		.state.HandleError()
		.conn.resultReader.closed = true
		return nil, 
	default:
		return nil, .handleUnexpectedMessage("Sync", )
	}
}

func ( *Pipeline) ( string) error {
	,  := .receiveMessage()
	if  != nil {
		return 
	}

	switch msg := .(type) {
	case *pgproto3.ParseComplete:
		return nil
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		return 
	default:
		return .handleUnexpectedMessage(fmt.Sprintf("%s Parse", ), )
	}
}

func ( *Pipeline) ( string) error {
	,  := .receiveMessage()
	if  != nil {
		return 
	}

	switch msg := .(type) {
	case *pgproto3.BindComplete:
		return nil
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		return 
	default:
		return .handleUnexpectedMessage(fmt.Sprintf("%s Bind", ), )
	}
}

func ( *Pipeline) ( string) (*ResultReader, error) {
	,  := .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.RowDescription:
		.conn.resultReader = ResultReader{
			pgConn:            .conn,
			pipeline:          ,
			ctx:               .ctx,
			fieldDescriptions: .conn.getFieldDescriptionSlice(len(.Fields)),
		}
		convertRowDescription(.conn.resultReader.fieldDescriptions, )
		return &.conn.resultReader, nil
	case *pgproto3.NoData:
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		.conn.resultReader.closed = true
		return nil, 
	default:
		return nil, .handleUnexpectedMessage(fmt.Sprintf("%s RowDescription or NoData", ), )
	}

	,  = .receiveMessage()
	if  != nil {
		return nil, 
	}

	switch msg := .(type) {
	case *pgproto3.CommandComplete:
		.conn.resultReader = ResultReader{
			commandTag:       .conn.makeCommandTag(.CommandTag),
			commandConcluded: true,
			closed:           true,
		}
		return &.conn.resultReader, nil
	case *pgproto3.ErrorResponse:
		 := ErrorResponseToPgError()
		.state.HandleError()
		.conn.resultReader.closed = true
		return nil, 
	default:
		return nil, .handleUnexpectedMessage(fmt.Sprintf("%s CommandComplete", ), )
	}
}

func ( *Pipeline) () (pgproto3.BackendMessage, error) {
	for {
		,  := .conn.receiveMessage()
		if  != nil {
			.err = 
			.conn.asyncClose()
			return nil, normalizeTimeoutError(.ctx, )
		}

		switch msg := .(type) {
		case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse, *pgproto3.NotificationResponse:
			// Filter these message types out in pipeline mode. The normal processing is handled by PgConn.receiveMessage.
		default:
			return , nil
		}
	}
}

func ( *Pipeline) ( string,  pgproto3.BackendMessage) error {
	.err = fmt.Errorf("pipeline: %s: received unexpected message type %T", , )
	.conn.asyncClose()
	return .err
}

// Close closes the pipeline and returns the connection to normal mode.
func ( *Pipeline) () error {
	if .closed {
		return .err
	}

	.closed = true

	if .state.PendingSync() {
		.conn.asyncClose()
		.err = errors.New("pipeline has unsynced requests")
		.conn.contextWatcher.Unwatch()
		.conn.unlock()

		return .err
	}

	for .state.ExpectedReadyForQuery() > 0 {
		,  := .getResults()
		if  != nil {
			.err = 
			var  *PgError
			if !errors.As(, &) {
				.conn.asyncClose()
				break
			}
		} else if  == nil {
			// getResults returns (nil, nil) when the request queue is exhausted but
			// ExpectedReadyForQuery is still > 0. This can happen when FATAL errors consume
			// queued request slots without the server ever sending ReadyForQuery.
			.conn.asyncClose()
			if .err == nil {
				.err = errors.New("pipeline: no more results but expected ReadyForQuery")
			}
			break
		}
	}

	.conn.contextWatcher.Unwatch()
	.conn.unlock()

	return .err
}

// DeadlineContextWatcherHandler handles canceled contexts by setting a deadline on a net.Conn.
type DeadlineContextWatcherHandler struct {
	Conn net.Conn

	// DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled.
	DeadlineDelay time.Duration
}

func ( *DeadlineContextWatcherHandler) ( context.Context) {
	.Conn.SetDeadline(time.Now().Add(.DeadlineDelay))
}

func ( *DeadlineContextWatcherHandler) () {
	.Conn.SetDeadline(time.Time{})
}

// CancelRequestContextWatcherHandler handles canceled contexts by sending a cancel request to the server. It also sets
// a deadline on a net.Conn as a fallback.
type CancelRequestContextWatcherHandler struct {
	Conn *PgConn

	// CancelRequestDelay is the delay before sending the cancel request to the server.
	CancelRequestDelay time.Duration

	// DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled.
	DeadlineDelay time.Duration

	cancelFinishedChan             chan struct{}
	handleUnwatchAfterCancelCalled func()
}

func ( *CancelRequestContextWatcherHandler) (context.Context) {
	.cancelFinishedChan = make(chan struct{})
	var  context.Context
	, .handleUnwatchAfterCancelCalled = context.WithCancel(context.Background())

	 := time.Now().Add(.DeadlineDelay)
	.Conn.conn.SetDeadline()

	go func() {
		defer close(.cancelFinishedChan)

		select {
		case <-.Done():
			return
		case <-time.After(.CancelRequestDelay):
		}

		,  := context.WithDeadline(, )
		defer ()
		.Conn.CancelRequest()

		// CancelRequest is inherently racy. Even though the cancel request has been received by the server at this point,
		// it hasn't necessarily been delivered to the other connection. If we immediately return and the connection is
		// immediately used then it is possible the CancelRequest will actually cancel our next query. The
		// TestCancelRequestContextWatcherHandler Stress test can produce this error without the sleep below. The sleep time
		// is arbitrary, but should be sufficient to prevent this error case.
		time.Sleep(100 * time.Millisecond)
	}()
}

func ( *CancelRequestContextWatcherHandler) () {
	.handleUnwatchAfterCancelCalled()
	<-.cancelFinishedChan

	.Conn.conn.SetDeadline(time.Time{})
}

func (,  []FieldDescription,  []int16) error {
	switch {
	case len() == 0:
		// No format codes provided means text format for all columns.
		for  := range  {
			[] = []
			[].Format = pgtype.TextFormatCode
		}
	case len() == 1:
		// Single format code applies to all columns.
		 := [0]
		for  := range  {
			[] = []
			[].Format = 
		}
	case len() == len():
		// One format code per column.
		for  := range  {
			[] = []
			[].Format = []
		}
	default:
		// This should not occur if Bind validation is correct, but handle gracefully
		return fmt.Errorf("result format codes length %d does not match field count %d", len(), len())
	}

	return nil
}