Source File
pubsub.go
Belonging Package
google.golang.org/grpc/internal/grpcsync
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcsync
import (
)
// Subscriber represents an entity that is subscribed to messages published on
// a PubSub. It wraps the callback to be invoked by the PubSub when a new
// message is published.
type Subscriber interface {
// OnMessage is invoked when a new message is published. Implementations
// must not block in this method.
OnMessage(msg any)
}
// PubSub is a simple one-to-many publish-subscribe system that supports
// messages of arbitrary type. It guarantees that messages are delivered in
// the same order in which they were published.
//
// Publisher invokes the Publish() method to publish new messages, while
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method.
//
// Once a PubSub is stopped, no more messages can be published, but any pending
// published messages will be delivered to the subscribers. Done may be used
// to determine when all published messages have been delivered.
type PubSub struct {
cs *CallbackSerializer
// Access to the below fields are guarded by this mutex.
mu sync.Mutex
msg any
subscribers map[Subscriber]bool
}
// NewPubSub returns a new PubSub instance. Users should cancel the
// provided context to shutdown the PubSub.
func ( context.Context) *PubSub {
return &PubSub{
cs: NewCallbackSerializer(),
subscribers: map[Subscriber]bool{},
}
}
// Subscribe registers the provided Subscriber to the PubSub.
//
// If the PubSub contains a previously published message, the Subscriber's
// OnMessage() callback will be invoked asynchronously with the existing
// message to begin with, and subsequently for every newly published message.
//
// The caller is responsible for invoking the returned cancel function to
// unsubscribe itself from the PubSub.
func ( *PubSub) ( Subscriber) ( func()) {
.mu.Lock()
defer .mu.Unlock()
.subscribers[] = true
if .msg != nil {
:= .msg
.cs.TrySchedule(func(context.Context) {
.mu.Lock()
defer .mu.Unlock()
if !.subscribers[] {
return
}
.OnMessage()
})
}
return func() {
.mu.Lock()
defer .mu.Unlock()
delete(.subscribers, )
}
}
// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
func ( *PubSub) ( any) {
.mu.Lock()
defer .mu.Unlock()
.msg =
for := range .subscribers {
:=
.cs.TrySchedule(func(context.Context) {
.mu.Lock()
defer .mu.Unlock()
if !.subscribers[] {
return
}
.OnMessage()
})
}
}
// Done returns a channel that is closed after the context passed to NewPubSub
// is canceled and all updates have been sent to subscribers.
func ( *PubSub) () <-chan struct{} {
return .cs.Done()
}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)