package pg

import (
	
	
	
	
	
	

	
	
	
)

const gopgChannel = "gopg:ping"

var (
	errListenerClosed = errors.New("pg: listener is closed")
	errPingTimeout    = errors.New("pg: ping timeout")
)

// Notification which is received with LISTEN command.
type Notification struct {
	Channel string
	Payload string
}

// Listener listens for notifications sent with NOTIFY command.
// It's NOT safe for concurrent use by multiple goroutines
// except the Channel API.
type Listener struct {
	db *DB

	channels []string

	mu     sync.Mutex
	cn     *pool.Conn
	exit   chan struct{}
	closed bool

	chOnce sync.Once
	ch     chan Notification
	pingCh chan struct{}
}

func ( *Listener) () string {
	.mu.Lock()
	defer .mu.Unlock()

	return fmt.Sprintf("Listener(%s)", strings.Join(.channels, ", "))
}

func ( *Listener) () {
	.exit = make(chan struct{})
}

func ( *Listener) ( context.Context) (*pool.Conn, error) {
	.mu.Lock()
	,  := .conn()
	.mu.Unlock()

	switch  {
	case nil:
		return , nil
	case errListenerClosed:
		return nil, 
	case pool.ErrClosed:
		_ = .Close()
		return nil, errListenerClosed
	default:
		internal.Logger.Printf(, "pg: Listen failed: %s", )
		return nil, 
	}
}

func ( *Listener) ( context.Context) (*pool.Conn, error) {
	if .closed {
		return nil, errListenerClosed
	}

	if .cn != nil {
		return .cn, nil
	}

	,  := .db.pool.NewConn()
	if  != nil {
		return nil, 
	}

	if  := .db.initConn(, );  != nil {
		_ = .db.pool.CloseConn()
		return nil, 
	}

	.LockReader()

	if len(.channels) > 0 {
		 := .listen(, , .channels...)
		if  != nil {
			_ = .db.pool.CloseConn()
			return nil, 
		}
	}

	.cn = 
	return , nil
}

func ( *Listener) ( context.Context,  *pool.Conn,  error,  bool) {
	.mu.Lock()
	if .cn ==  {
		if ,  := isBadConn(, );  {
			.reconnect(, )
		}
	}
	.mu.Unlock()
}

func ( *Listener) ( context.Context,  error) {
	_ = .closeTheCn()
	_, _ = .conn()
}

func ( *Listener) ( error) error {
	if .cn == nil {
		return nil
	}
	if !.closed {
		internal.Logger.Printf(.db.ctx, "pg: discarding bad listener connection: %s", )
	}

	 := .db.pool.CloseConn(.cn)
	.cn = nil
	return 
}

// Close closes the listener, releasing any open resources.
func ( *Listener) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return errListenerClosed
	}
	.closed = true
	close(.exit)

	return .closeTheCn(errListenerClosed)
}

// Listen starts listening for notifications on channels.
func ( *Listener) ( context.Context,  ...string) error {
	// Always append channels so DB.Listen works correctly.
	.mu.Lock()
	.channels = appendIfNotExists(.channels, ...)
	.mu.Unlock()

	,  := .connWithLock()
	if  != nil {
		return 
	}

	if  := .listen(, , ...);  != nil {
		.releaseConn(, , , false)
		return 
	}

	return nil
}

func ( *Listener) ( context.Context,  *pool.Conn,  ...string) error {
	 := .WithWriter(, .db.opt.WriteTimeout, func( *pool.WriteBuffer) error {
		for ,  := range  {
			if  := writeQueryMsg(, .db.fmter, "LISTEN ?", pgChan());  != nil {
				return 
			}
		}
		return nil
	})
	return 
}

// Unlisten stops listening for notifications on channels.
func ( *Listener) ( context.Context,  ...string) error {
	.mu.Lock()
	.channels = removeIfExists(.channels, ...)

	,  := .conn()
	// I don't want to defer this unlock as the mutex is re-acquired in the `.releaseConn` function. But it is safe to
	// unlock here regardless of an error.
	.mu.Unlock()
	if  != nil {
		return 
	}

	if  := .unlisten(, , ...);  != nil {
		.releaseConn(, , , false)
		return 
	}

	return nil
}

func ( *Listener) ( context.Context,  *pool.Conn,  ...string) error {
	 := .WithWriter(, .db.opt.WriteTimeout, func( *pool.WriteBuffer) error {
		for ,  := range  {
			if  := writeQueryMsg(, .db.fmter, "UNLISTEN ?", pgChan());  != nil {
				return 
			}
		}
		return nil
	})
	return 
}

// Receive indefinitely waits for a notification. This is low-level API
// and in most cases Channel should be used instead.
func ( *Listener) ( context.Context) ( string,  string,  error) {
	return .ReceiveTimeout(, 0)
}

// ReceiveTimeout waits for a notification until timeout is reached.
// This is low-level API and in most cases Channel should be used instead.
func ( *Listener) (
	 context.Context,  time.Duration,
) (,  string,  error) {
	,  := .connWithLock()
	if  != nil {
		return "", "", 
	}

	 = .WithReader(, , func( *pool.ReaderContext) error {
		, ,  = readNotification()
		return 
	})
	if  != nil {
		.releaseConn(, , ,  > 0)
		return "", "", 
	}

	return , , nil
}

// Channel returns a channel for concurrently receiving notifications.
// It periodically sends Ping notification to test connection health.
//
// The channel is closed with Listener. Receive* APIs can not be used
// after channel is created.
func ( *Listener) () <-chan Notification {
	return .channel(100)
}

// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
func ( *Listener) ( int) <-chan Notification {
	return .channel()
}

func ( *Listener) ( int) <-chan Notification {
	.chOnce.Do(func() {
		.initChannel()
	})
	if cap(.ch) !=  {
		 := fmt.Errorf("pg: Listener.Channel is called with different buffer size")
		panic()
	}
	return .ch
}

func ( *Listener) ( int) {
	const  = time.Second
	const  = time.Minute

	 := .db.ctx
	_ = .Listen(, gopgChannel)

	.ch = make(chan Notification, )
	.pingCh = make(chan struct{}, 1)

	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		var  int
		for {
			, ,  := .Receive()
			if  != nil {
				if  == errListenerClosed {
					close(.ch)
					return
				}

				if  > 0 {
					time.Sleep(500 * time.Millisecond)
				}
				++

				continue
			}

			 = 0

			// Any notification is as good as a ping.
			select {
			case .pingCh <- struct{}{}:
			default:
			}

			switch  {
			case gopgChannel:
				// ignore
			default:
				.Reset()
				select {
				case .ch <- Notification{, }:
					if !.Stop() {
						<-.C
					}
				case <-.C:
					internal.Logger.Printf(
						,
						"pg: %s channel is full for %s (notification is dropped)",
						,
						,
					)
				}
			}
		}
	}()

	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		 := true
		for {
			.Reset()
			select {
			case <-.pingCh:
				 = true
				if !.Stop() {
					<-.C
				}
			case <-.C:
				 := .ping()
				if  {
					 = false
				} else {
					if  == nil {
						 = errPingTimeout
					}
					.mu.Lock()
					.reconnect(, )
					.mu.Unlock()
				}
			case <-.exit:
				return
			}
		}
	}()
}

func ( *Listener) () error {
	,  := .db.Exec("NOTIFY ?", pgChan(gopgChannel))
	return 
}

func ( []string,  ...string) []string {
:
	for ,  := range  {
		for ,  := range  {
			if  ==  {
				continue 
			}
		}
		 = append(, )
	}
	return 
}

func ( []string,  ...string) []string {
	for ,  := range  {
		for ,  := range  {
			if  ==  {
				 := len() - 1
				[] = []
				 = [:]
				break
			}
		}
	}
	return 
}

type pgChan string

var _ types.ValueAppender = pgChan("")

func ( pgChan) ( []byte,  int) ([]byte, error) {
	if  == 0 {
		return append(, ...), nil
	}

	 = append(, '"')
	for ,  := range []byte() {
		if  == '"' {
			 = append(, '"', '"')
		} else {
			 = append(, )
		}
	}
	 = append(, '"')

	return , nil
}