package pg
import (
)
const gopgChannel = "gopg:ping"
var (
errListenerClosed = errors.New("pg: listener is closed")
errPingTimeout = errors.New("pg: ping timeout")
)
type Notification struct {
Channel string
Payload string
}
type Listener struct {
db *DB
channels []string
mu sync.Mutex
cn *pool.Conn
exit chan struct{}
closed bool
chOnce sync.Once
ch chan Notification
pingCh chan struct{}
}
func ( *Listener) () string {
.mu.Lock()
defer .mu.Unlock()
return fmt.Sprintf("Listener(%s)", strings.Join(.channels, ", "))
}
func ( *Listener) () {
.exit = make(chan struct{})
}
func ( *Listener) ( context.Context) (*pool.Conn, error) {
.mu.Lock()
, := .conn()
.mu.Unlock()
switch {
case nil:
return , nil
case errListenerClosed:
return nil,
case pool.ErrClosed:
_ = .Close()
return nil, errListenerClosed
default:
internal.Logger.Printf(, "pg: Listen failed: %s", )
return nil,
}
}
func ( *Listener) ( context.Context) (*pool.Conn, error) {
if .closed {
return nil, errListenerClosed
}
if .cn != nil {
return .cn, nil
}
, := .db.pool.NewConn()
if != nil {
return nil,
}
if := .db.initConn(, ); != nil {
_ = .db.pool.CloseConn()
return nil,
}
.LockReader()
if len(.channels) > 0 {
:= .listen(, , .channels...)
if != nil {
_ = .db.pool.CloseConn()
return nil,
}
}
.cn =
return , nil
}
func ( *Listener) ( context.Context, *pool.Conn, error, bool) {
.mu.Lock()
if .cn == {
if , := isBadConn(, ); {
.reconnect(, )
}
}
.mu.Unlock()
}
func ( *Listener) ( context.Context, error) {
_ = .closeTheCn()
_, _ = .conn()
}
func ( *Listener) ( error) error {
if .cn == nil {
return nil
}
if !.closed {
internal.Logger.Printf(.db.ctx, "pg: discarding bad listener connection: %s", )
}
:= .db.pool.CloseConn(.cn)
.cn = nil
return
}
func ( *Listener) () error {
.mu.Lock()
defer .mu.Unlock()
if .closed {
return errListenerClosed
}
.closed = true
close(.exit)
return .closeTheCn(errListenerClosed)
}
func ( *Listener) ( context.Context, ...string) error {
.mu.Lock()
.channels = appendIfNotExists(.channels, ...)
.mu.Unlock()
, := .connWithLock()
if != nil {
return
}
if := .listen(, , ...); != nil {
.releaseConn(, , , false)
return
}
return nil
}
func ( *Listener) ( context.Context, *pool.Conn, ...string) error {
:= .WithWriter(, .db.opt.WriteTimeout, func( *pool.WriteBuffer) error {
for , := range {
if := writeQueryMsg(, .db.fmter, "LISTEN ?", pgChan()); != nil {
return
}
}
return nil
})
return
}
func ( *Listener) ( context.Context, ...string) error {
.mu.Lock()
.channels = removeIfExists(.channels, ...)
, := .conn()
.mu.Unlock()
if != nil {
return
}
if := .unlisten(, , ...); != nil {
.releaseConn(, , , false)
return
}
return nil
}
func ( *Listener) ( context.Context, *pool.Conn, ...string) error {
:= .WithWriter(, .db.opt.WriteTimeout, func( *pool.WriteBuffer) error {
for , := range {
if := writeQueryMsg(, .db.fmter, "UNLISTEN ?", pgChan()); != nil {
return
}
}
return nil
})
return
}
func ( *Listener) ( context.Context) ( string, string, error) {
return .ReceiveTimeout(, 0)
}
func ( *Listener) (
context.Context, time.Duration,
) (, string, error) {
, := .connWithLock()
if != nil {
return "", "",
}
= .WithReader(, , func( *pool.ReaderContext) error {
, , = readNotification()
return
})
if != nil {
.releaseConn(, , , > 0)
return "", "",
}
return , , nil
}
func ( *Listener) () <-chan Notification {
return .channel(100)
}
func ( *Listener) ( int) <-chan Notification {
return .channel()
}
func ( *Listener) ( int) <-chan Notification {
.chOnce.Do(func() {
.initChannel()
})
if cap(.ch) != {
:= fmt.Errorf("pg: Listener.Channel is called with different buffer size")
panic()
}
return .ch
}
func ( *Listener) ( int) {
const = time.Second
const = time.Minute
:= .db.ctx
_ = .Listen(, gopgChannel)
.ch = make(chan Notification, )
.pingCh = make(chan struct{}, 1)
go func() {
:= time.NewTimer(time.Minute)
.Stop()
var int
for {
, , := .Receive()
if != nil {
if == errListenerClosed {
close(.ch)
return
}
if > 0 {
time.Sleep(500 * time.Millisecond)
}
++
continue
}
= 0
select {
case .pingCh <- struct{}{}:
default:
}
switch {
case gopgChannel:
default:
.Reset()
select {
case .ch <- Notification{, }:
if !.Stop() {
<-.C
}
case <-.C:
internal.Logger.Printf(
,
"pg: %s channel is full for %s (notification is dropped)",
,
,
)
}
}
}
}()
go func() {
:= time.NewTimer(time.Minute)
.Stop()
:= true
for {
.Reset()
select {
case <-.pingCh:
= true
if !.Stop() {
<-.C
}
case <-.C:
:= .ping()
if {
= false
} else {
if == nil {
= errPingTimeout
}
.mu.Lock()
.reconnect(, )
.mu.Unlock()
}
case <-.exit:
return
}
}
}()
}
func ( *Listener) () error {
, := .db.Exec("NOTIFY ?", pgChan(gopgChannel))
return
}
func ( []string, ...string) []string {
:
for , := range {
for , := range {
if == {
continue
}
}
= append(, )
}
return
}
func ( []string, ...string) []string {
for , := range {
for , := range {
if == {
:= len() - 1
[] = []
= [:]
break
}
}
}
return
}
type pgChan string
var _ types.ValueAppender = pgChan("")
func ( pgChan) ( []byte, int) ([]byte, error) {
if == 0 {
return append(, ...), nil
}
= append(, '"')
for , := range []byte() {
if == '"' {
= append(, '"', '"')
} else {
= append(, )
}
}
= append(, '"')
return , nil
}