Source File
semaphore.go
Belonging Package
golang.org/x/sync/semaphore
// Copyright 2017 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package semaphore provides a weighted semaphore implementation.package semaphore // import "golang.org/x/sync/semaphore"import ()type waiter struct {n int64ready chan<- struct{} // Closed when semaphore acquired.}// NewWeighted creates a new weighted semaphore with the given// maximum combined weight for concurrent access.func ( int64) *Weighted {:= &Weighted{size: }return}// Weighted provides a way to bound concurrent access to a resource.// The callers can request access with a given weight.type Weighted struct {size int64cur int64mu sync.Mutexwaiters list.List}// Acquire acquires the semaphore with a weight of n, blocking until resources// are available or ctx is done. On success, returns nil. On failure, returns// ctx.Err() and leaves the semaphore unchanged.func ( *Weighted) ( context.Context, int64) error {:= .Done().mu.Lock()select {case <-:// ctx becoming done has "happened before" acquiring the semaphore,// whether it became done before the call began or while we were// waiting for the mutex. We prefer to fail even if we could acquire// the mutex without blocking..mu.Unlock()return .Err()default:}if .size-.cur >= && .waiters.Len() == 0 {// Since we hold s.mu and haven't synchronized since checking done, if// ctx becomes done before we return here, it becoming done must have// "happened concurrently" with this call - it cannot "happen before"// we return in this branch. So, we're ok to always acquire here..cur +=.mu.Unlock()return nil}if > .size {// Don't make other Acquire calls block on one that's doomed to fail..mu.Unlock()<-return .Err()}:= make(chan struct{}):= waiter{n: , ready: }:= .waiters.PushBack().mu.Unlock()select {case <-:.mu.Lock()select {case <-:// Acquired the semaphore after we were canceled.// Pretend we didn't and put the tokens back..cur -=.notifyWaiters()default::= .waiters.Front() ==.waiters.Remove()// If we're at the front and there're extra tokens left, notify other waiters.if && .size > .cur {.notifyWaiters()}}.mu.Unlock()return .Err()case <-:// Acquired the semaphore. Check that ctx isn't already done.// We check the done channel instead of calling ctx.Err because we// already have the channel, and ctx.Err is O(n) with the nesting// depth of ctx.select {case <-:.Release()return .Err()default:}return nil}}// TryAcquire acquires the semaphore with a weight of n without blocking.// On success, returns true. On failure, returns false and leaves the semaphore unchanged.func ( *Weighted) ( int64) bool {.mu.Lock():= .size-.cur >= && .waiters.Len() == 0if {.cur +=}.mu.Unlock()return}// Release releases the semaphore with a weight of n.func ( *Weighted) ( int64) {.mu.Lock().cur -=if .cur < 0 {.mu.Unlock()panic("semaphore: released more than held")}.notifyWaiters().mu.Unlock()}func ( *Weighted) () {for {:= .waiters.Front()if == nil {break // No more waiters blocked.}:= .Value.(waiter)if .size-.cur < .n {// Not enough tokens for the next waiter. We could keep going (to try to// find a waiter with a smaller request), but under load that could cause// starvation for large requests; instead, we leave all remaining waiters// blocked.//// Consider a semaphore used as a read-write lock, with N tokens, N// readers, and one writer. Each reader can Acquire(1) to obtain a read// lock. The writer can Acquire(N) to obtain a write lock, excluding all// of the readers. If we allow the readers to jump ahead in the queue,// the writer will starve — there is always one token available for every// reader.break}.cur += .n.waiters.Remove()close(.ready)}}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)