package puddle

import (
	
	
	
	
	

	
	
)

const (
	resourceStatusConstructing = 0
	resourceStatusIdle         = iota
	resourceStatusAcquired     = iota
	resourceStatusHijacked     = iota
)

// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
// or a pool that is closed while the acquire is waiting.
var ErrClosedPool = errors.New("closed pool")

// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
// that is at maximum capacity and has no available resources.
var ErrNotAvailable = errors.New("resource not available")

// Constructor is a function called by the pool to construct a resource.
type Constructor[ any] func(ctx context.Context) (res , err error)

// Destructor is a function called by the pool to destroy a resource.
type Destructor[ any] func(res )

// Resource is the resource handle returned by acquiring from the pool.
type Resource[ any] struct {
	value          
	pool           *Pool[]
	creationTime   time.Time
	lastUsedNano   int64
	poolResetCount int
	status         byte
}

// Value returns the resource value.
func ( *Resource[]) ()  {
	if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
		panic("tried to access resource that is not acquired or hijacked")
	}
	return .value
}

// Release returns the resource to the pool. res must not be subsequently used.
func ( *Resource[]) () {
	if .status != resourceStatusAcquired {
		panic("tried to release resource that is not acquired")
	}
	.pool.releaseAcquiredResource(, nanotime())
}

// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
// will not change. res must not be subsequently used.
func ( *Resource[]) () {
	if .status != resourceStatusAcquired {
		panic("tried to release resource that is not acquired")
	}
	.pool.releaseAcquiredResource(, .lastUsedNano)
}

// Destroy returns the resource to the pool for destruction. res must not be
// subsequently used.
func ( *Resource[]) () {
	if .status != resourceStatusAcquired {
		panic("tried to destroy resource that is not acquired")
	}
	go .pool.destroyAcquiredResource()
}

// Hijack assumes ownership of the resource from the pool. Caller is responsible
// for cleanup of resource value.
func ( *Resource[]) () {
	if .status != resourceStatusAcquired {
		panic("tried to hijack resource that is not acquired")
	}
	.pool.hijackAcquiredResource()
}

// CreationTime returns when the resource was created by the pool.
func ( *Resource[]) () time.Time {
	if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
		panic("tried to access resource that is not acquired or hijacked")
	}
	return .creationTime
}

// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
func ( *Resource[]) () int64 {
	if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
		panic("tried to access resource that is not acquired or hijacked")
	}

	return .lastUsedNano
}

// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
// LastUsedNanotime to the current nanotime.
func ( *Resource[]) () time.Duration {
	if !(.status == resourceStatusAcquired || .status == resourceStatusHijacked) {
		panic("tried to access resource that is not acquired or hijacked")
	}

	return time.Duration(nanotime() - .lastUsedNano)
}

// Pool is a concurrency-safe resource pool.
type Pool[ any] struct {
	// mux is the pool internal lock. Any modification of shared state of
	// the pool (but Acquires of acquireSem) must be performed only by
	// holder of the lock. Long running operations are not allowed when mux
	// is held.
	mux sync.Mutex
	// acquireSem provides an allowance to acquire a resource.
	//
	// Releases are allowed only when caller holds mux. Acquires have to
	// happen before mux is locked (doesn't apply to semaphore.TryAcquire in
	// AcquireAllIdle).
	acquireSem *semaphore.Weighted
	destructWG sync.WaitGroup

	allResources  resList[]
	idleResources *genstack.GenStack[*Resource[]]

	constructor Constructor[]
	destructor  Destructor[]
	maxSize     int32

	acquireCount         int64
	acquireDuration      time.Duration
	emptyAcquireCount    int64
	emptyAcquireWaitTime time.Duration
	canceledAcquireCount atomic.Int64

	resetCount int

	baseAcquireCtx       context.Context
	cancelBaseAcquireCtx context.CancelFunc
	closed               bool
}

type Config[ any] struct {
	Constructor Constructor[]
	Destructor  Destructor[]
	MaxSize     int32
}

// NewPool creates a new pool. Returns an error iff MaxSize is less than 1.
func [ any]( *Config[]) (*Pool[], error) {
	if .MaxSize < 1 {
		return nil, errors.New("MaxSize must be >= 1")
	}

	,  := context.WithCancel(context.Background())

	return &Pool[]{
		acquireSem:           semaphore.NewWeighted(int64(.MaxSize)),
		idleResources:        genstack.NewGenStack[*Resource[]](),
		maxSize:              .MaxSize,
		constructor:          .Constructor,
		destructor:           .Destructor,
		baseAcquireCtx:       ,
		cancelBaseAcquireCtx: ,
	}, nil
}

// Close destroys all resources in the pool and rejects future Acquire calls.
// Blocks until all resources are returned to pool and destroyed.
func ( *Pool[]) () {
	defer .destructWG.Wait()

	.mux.Lock()
	defer .mux.Unlock()

	if .closed {
		return
	}
	.closed = true
	.cancelBaseAcquireCtx()

	for ,  := .idleResources.Pop(); ; ,  = .idleResources.Pop() {
		.allResources.remove()
		go .destructResourceValue(.value)
	}
}

// Stat is a snapshot of Pool statistics.
type Stat struct {
	constructingResources int32
	acquiredResources     int32
	idleResources         int32
	maxResources          int32
	acquireCount          int64
	acquireDuration       time.Duration
	emptyAcquireCount     int64
	emptyAcquireWaitTime  time.Duration
	canceledAcquireCount  int64
}

// TotalResources returns the total number of resources currently in the pool.
// The value is the sum of ConstructingResources, AcquiredResources, and
// IdleResources.
func ( *Stat) () int32 {
	return .constructingResources + .acquiredResources + .idleResources
}

// ConstructingResources returns the number of resources with construction in progress in
// the pool.
func ( *Stat) () int32 {
	return .constructingResources
}

// AcquiredResources returns the number of currently acquired resources in the pool.
func ( *Stat) () int32 {
	return .acquiredResources
}

// IdleResources returns the number of currently idle resources in the pool.
func ( *Stat) () int32 {
	return .idleResources
}

// MaxResources returns the maximum size of the pool.
func ( *Stat) () int32 {
	return .maxResources
}

// AcquireCount returns the cumulative count of successful acquires from the pool.
func ( *Stat) () int64 {
	return .acquireCount
}

// AcquireDuration returns the total duration of all successful acquires from
// the pool.
func ( *Stat) () time.Duration {
	return .acquireDuration
}

// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
// that waited for a resource to be released or constructed because the pool was
// empty.
func ( *Stat) () int64 {
	return .emptyAcquireCount
}

// EmptyAcquireWaitTime returns the cumulative time waited for successful acquires
// from the pool for a resource to be released or constructed because the pool was
// empty.
func ( *Stat) () time.Duration {
	return .emptyAcquireWaitTime
}

// CanceledAcquireCount returns the cumulative count of acquires from the pool
// that were canceled by a context.
func ( *Stat) () int64 {
	return .canceledAcquireCount
}

// Stat returns the current pool statistics.
func ( *Pool[]) () *Stat {
	.mux.Lock()
	defer .mux.Unlock()

	 := &Stat{
		maxResources:         .maxSize,
		acquireCount:         .acquireCount,
		emptyAcquireCount:    .emptyAcquireCount,
		emptyAcquireWaitTime: .emptyAcquireWaitTime,
		canceledAcquireCount: .canceledAcquireCount.Load(),
		acquireDuration:      .acquireDuration,
	}

	for ,  := range .allResources {
		switch .status {
		case resourceStatusConstructing:
			.constructingResources += 1
		case resourceStatusIdle:
			.idleResources += 1
		case resourceStatusAcquired:
			.acquiredResources += 1
		}
	}

	return 
}

// tryAcquireIdleResource checks if there is any idle resource. If there is
// some, this method removes it from idle list and returns it. If the idle pool
// is empty, this method returns nil and doesn't modify the idleResources slice.
//
// WARNING: Caller of this method must hold the pool mutex!
func ( *Pool[]) () *Resource[] {
	,  := .idleResources.Pop()
	if ! {
		return nil
	}

	.status = resourceStatusAcquired
	return 
}

// createNewResource creates a new resource and inserts it into list of pool
// resources.
//
// WARNING: Caller of this method must hold the pool mutex!
func ( *Pool[]) () *Resource[] {
	 := &Resource[]{
		pool:           ,
		creationTime:   time.Now(),
		lastUsedNano:   nanotime(),
		poolResetCount: .resetCount,
		status:         resourceStatusConstructing,
	}

	.allResources.append()
	.destructWG.Add(1)

	return 
}

// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
// used to cancel the Acquire.
//
// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
// the problem of it being impossible to create resources when the time to create a resource is greater than any one
// caller of Acquire is willing to wait.
func ( *Pool[]) ( context.Context) ( *Resource[],  error) {
	select {
	case <-.Done():
		.canceledAcquireCount.Add(1)
		return nil, .Err()
	default:
	}

	return .acquire()
}

// acquire is a continuation of Acquire function that doesn't check context
// validity.
//
// This function exists solely only for benchmarking purposes.
func ( *Pool[]) ( context.Context) (*Resource[], error) {
	 := nanotime()

	var  bool
	if !.acquireSem.TryAcquire(1) {
		 = true
		 := .acquireSem.Acquire(, 1)
		if  != nil {
			.canceledAcquireCount.Add(1)
			return nil, 
		}
	}

	.mux.Lock()
	if .closed {
		.acquireSem.Release(1)
		.mux.Unlock()
		return nil, ErrClosedPool
	}

	// If a resource is available in the pool.
	if  := .tryAcquireIdleResource();  != nil {
		 := time.Duration(nanotime() - )
		if  {
			.emptyAcquireCount += 1
			.emptyAcquireWaitTime += 
		}
		.acquireCount += 1
		.acquireDuration += 
		.mux.Unlock()
		return , nil
	}

	if len(.allResources) >= int(.maxSize) {
		// Unreachable code.
		panic("bug: semaphore allowed more acquires than pool allows")
	}

	// The resource is not idle, but there is enough space to create one.
	 := .createNewResource()
	.mux.Unlock()

	,  := .initResourceValue(, )
	if  != nil {
		return nil, 
	}

	.mux.Lock()
	defer .mux.Unlock()

	.emptyAcquireCount += 1
	.acquireCount += 1
	 := time.Duration(nanotime() - )
	.acquireDuration += 
	.emptyAcquireWaitTime += 

	return , nil
}

func ( *Pool[]) ( context.Context,  *Resource[]) (*Resource[], error) {
	// Create the resource in a goroutine to immediately return from Acquire
	// if ctx is canceled without also canceling the constructor.
	//
	// See:
	// - https://github.com/jackc/pgx/issues/1287
	// - https://github.com/jackc/pgx/issues/1259
	 := make(chan error)
	go func() {
		 := newValueCancelCtx(, .baseAcquireCtx)
		,  := .constructor()
		if  != nil {
			.mux.Lock()
			.allResources.remove()
			.destructWG.Done()

			// The resource won't be acquired because its
			// construction failed. We have to allow someone else to
			// take that resouce.
			.acquireSem.Release(1)
			.mux.Unlock()

			select {
			case  <- :
			case <-.Done():
				// The caller is cancelled, so no-one awaits the
				// error. This branch avoid goroutine leak.
			}
			return
		}

		// The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its
		// status.
		.mux.Lock()
		.value = 
		.status = resourceStatusAcquired
		.mux.Unlock()

		// This select works because the channel is unbuffered.
		select {
		case  <- nil:
		case <-.Done():
			.releaseAcquiredResource(, .lastUsedNano)
		}
	}()

	select {
	case <-.Done():
		.canceledAcquireCount.Add(1)
		return nil, .Err()
	case  := <-:
		if  != nil {
			return nil, 
		}
		return , nil
	}
}

// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
// used to cancel the background creation.
func ( *Pool[]) ( context.Context) (*Resource[], error) {
	if !.acquireSem.TryAcquire(1) {
		return nil, ErrNotAvailable
	}

	.mux.Lock()
	defer .mux.Unlock()

	if .closed {
		.acquireSem.Release(1)
		return nil, ErrClosedPool
	}

	// If a resource is available now
	if  := .tryAcquireIdleResource();  != nil {
		.acquireCount += 1
		return , nil
	}

	if len(.allResources) >= int(.maxSize) {
		// Unreachable code.
		panic("bug: semaphore allowed more acquires than pool allows")
	}

	 := .createNewResource()
	go func() {
		,  := .constructor()

		.mux.Lock()
		defer .mux.Unlock()
		// We have to create the resource and only then release the
		// semaphore - For the time being there is no resource that
		// someone could acquire.
		defer .acquireSem.Release(1)

		if  != nil {
			.allResources.remove()
			.destructWG.Done()
			return
		}

		.value = 
		.status = resourceStatusIdle
		.idleResources.Push()
	}()

	return nil, ErrNotAvailable
}

// acquireSemAll tries to acquire num free tokens from sem. This function is
// guaranteed to acquire at least the lowest number of tokens that has been
// available in the semaphore during runtime of this function.
//
// For the time being, semaphore doesn't allow to acquire all tokens atomically
// (see https://github.com/golang/sync/pull/19). We simulate this by trying all
// powers of 2 that are less or equal to num.
//
// For example, let's immagine we have 19 free tokens in the semaphore which in
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
// num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
// tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
//
// Naturally, Acquires and Releases of the semaphore might take place
// concurrently. For this reason, it's not guaranteed that absolutely all free
// tokens in the semaphore will be acquired. But it's guaranteed that at least
// the minimal number of tokens that has been present over the whole process
// will be acquired. This is sufficient for the use-case we have in this
// package.
//
// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
// upstream. https://github.com/golang/sync/pull/19
func ( *semaphore.Weighted,  int) int {
	if .TryAcquire(int64()) {
		return 
	}

	var  int
	for  := int(log2Int());  >= 0; -- {
		 := 1 << 
		if .TryAcquire(int64()) {
			 += 
		}
	}

	return 
}

// AcquireAllIdle acquires all currently idle resources. Its intended use is for
// health check and keep-alive functionality. It does not update pool
// statistics.
func ( *Pool[]) () []*Resource[] {
	.mux.Lock()
	defer .mux.Unlock()

	if .closed {
		return nil
	}

	 := .idleResources.Len()
	if  == 0 {
		return nil
	}

	// In acquireSemAll we use only TryAcquire and not Acquire. Because
	// TryAcquire cannot block, the fact that we hold mutex locked and try
	// to acquire semaphore cannot result in dead-lock.
	//
	// Because the mutex is locked, no parallel Release can run. This
	// implies that the number of tokens can only decrease because some
	// Acquire/TryAcquire call can consume the semaphore token. Consequently
	// acquired is always less or equal to numIdle. Moreover if acquired <
	// numIdle, then there are some parallel Acquire/TryAcquire calls that
	// will take the remaining idle connections.
	 := acquireSemAll(.acquireSem, )

	 := make([]*Resource[], )
	for  := range  {
		,  := .idleResources.Pop()
		.status = resourceStatusAcquired
		[] = 
	}

	// We have to bump the generation to ensure that Acquire/TryAcquire
	// calls running in parallel (those which caused acquired < numIdle)
	// will consume old connections and not freshly released connections
	// instead.
	.idleResources.NextGen()

	return 
}

// CreateResource constructs a new resource without acquiring it. It goes straight in the IdlePool. If the pool is full
// it returns an error. It can be useful to maintain warm resources under little load.
func ( *Pool[]) ( context.Context) error {
	if !.acquireSem.TryAcquire(1) {
		return ErrNotAvailable
	}

	.mux.Lock()
	if .closed {
		.acquireSem.Release(1)
		.mux.Unlock()
		return ErrClosedPool
	}

	if len(.allResources) >= int(.maxSize) {
		.acquireSem.Release(1)
		.mux.Unlock()
		return ErrNotAvailable
	}

	 := .createNewResource()
	.mux.Unlock()

	,  := .constructor()
	.mux.Lock()
	defer .mux.Unlock()
	defer .acquireSem.Release(1)
	if  != nil {
		.allResources.remove()
		.destructWG.Done()
		return 
	}

	.value = 
	.status = resourceStatusIdle

	// If closed while constructing resource then destroy it and return an error
	if .closed {
		go .destructResourceValue(.value)
		return ErrClosedPool
	}

	.idleResources.Push()

	return nil
}

// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
// disrupt all resources (such as a network interruption or a server state change).
//
// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
// to the pool.
func ( *Pool[]) () {
	.mux.Lock()
	defer .mux.Unlock()

	.resetCount++

	for ,  := .idleResources.Pop(); ; ,  = .idleResources.Pop() {
		.allResources.remove()
		go .destructResourceValue(.value)
	}
}

// releaseAcquiredResource returns res to the the pool.
func ( *Pool[]) ( *Resource[],  int64) {
	.mux.Lock()
	defer .mux.Unlock()
	defer .acquireSem.Release(1)

	if .closed || .poolResetCount != .resetCount {
		.allResources.remove()
		go .destructResourceValue(.value)
	} else {
		.lastUsedNano = 
		.status = resourceStatusIdle
		.idleResources.Push()
	}
}

// Remove removes res from the pool and closes it. If res is not part of the
// pool Remove will panic.
func ( *Pool[]) ( *Resource[]) {
	.destructResourceValue(.value)

	.mux.Lock()
	defer .mux.Unlock()
	defer .acquireSem.Release(1)

	.allResources.remove()
}

func ( *Pool[]) ( *Resource[]) {
	.mux.Lock()
	defer .mux.Unlock()
	defer .acquireSem.Release(1)

	.allResources.remove()
	.status = resourceStatusHijacked
	.destructWG.Done() // not responsible for destructing hijacked resources
}

func ( *Pool[]) ( ) {
	.destructor()
	.destructWG.Done()
}