broadcast

package
v1.34.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 4 Imported by: 9

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WatchBroadcast added in v1.33.0

func WatchBroadcast[T comparable](
	ctx context.Context,
	bcast *Broadcast,
	snapshot func() T,
	send func(T) error,
) error

WatchBroadcast watches a broadcast for changes and sends snapshots.

snapshot is called under the broadcast lock to get the current value. send is called outside the lock to transmit the value. Skips sending when the value is equal to the previous via ==. Returns when ctx is canceled or send returns an error.

func WatchBroadcastVT added in v1.33.0

func WatchBroadcastVT[T proto.EqualVT[T]](
	ctx context.Context,
	bcast *Broadcast,
	snapshot func() T,
	send func(T) error,
) error

WatchBroadcastVT watches a broadcast for changes and sends snapshots.

Uses EqualVT for deduplication. Same as WatchBroadcast but for VTProtobuf messages.

func WatchBroadcastWithEqual added in v1.33.0

func WatchBroadcastWithEqual[T comparable](
	ctx context.Context,
	bcast *Broadcast,
	snapshot func() T,
	send func(T) error,
	equal func(a, b T) bool,
) error

WatchBroadcastWithEqual watches a broadcast for changes and sends snapshots.

snapshot is called under the broadcast lock to get the current value. send is called outside the lock to transmit the value. equal is an optional comparator; if nil, uses == for dedup. Returns when ctx is canceled or send returns an error.

Types

type Broadcast

type Broadcast struct {
	// contains filtered or unexported fields
}

Broadcast implements notifying waiters via a channel.

The zero-value of this struct is valid.

Example
// b guards currValue
var b Broadcast
var currValue int

go func() {
	// 0 to 9 inclusive
	for i := range 10 {
		<-time.After(time.Millisecond * 20)
		b.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
			currValue = i
			broadcast()
		})
	}
}()

var waitCh <-chan struct{}
var gotValue int
for {
	b.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
		gotValue = currValue
		waitCh = getWaitCh()
	})

	// last value
	if gotValue == 9 {
		// success
		break
	}

	// otherwise keep waiting
	<-waitCh
}

fmt.Printf("waited for value to increment: %v\n", gotValue)
Output:
waited for value to increment: 9

func (*Broadcast) HoldLock added in v1.6.0

func (c *Broadcast) HoldLock(cb func(broadcast func(), getWaitCh func() <-chan struct{}))

HoldLock locks the mutex and calls the callback.

broadcast closes the wait channel, if any. getWaitCh returns a channel that will be closed when broadcast is called.

func (*Broadcast) HoldLockMaybeAsync added in v1.11.1

func (c *Broadcast) HoldLockMaybeAsync(cb func(broadcast func(), getWaitCh func() <-chan struct{}))

HoldLockMaybeAsync locks the mutex and calls the callback if possible. If the mutex cannot be locked right now, starts a new Goroutine to wait for it.

func (*Broadcast) TryHoldLock added in v1.25.7

func (c *Broadcast) TryHoldLock(cb func(broadcast func(), getWaitCh func() <-chan struct{})) bool

TryHoldLock attempts to lock the mutex and call the callback. It returns true if the lock was acquired and the callback was called, false otherwise.

func (*Broadcast) Wait added in v1.25.5

func (c *Broadcast) Wait(ctx context.Context, cb func(broadcast func(), getWaitCh func() <-chan struct{}) (bool, error)) error

Wait waits for the cb to return true or an error before returning. When the broadcast channel is broadcasted, re-calls cb again to re-check the value. cb is called while the mutex is locked. Returns context.Canceled if ctx is canceled.

Example
// b guards currValue
var b Broadcast
var currValue int

go func() {
	// 0 to 9 inclusive
	for i := range 10 {
		<-time.After(time.Millisecond * 20)
		b.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
			currValue = i
			broadcast()
		})
	}
}()

ctx := context.Background()
var gotValue int
err := b.Wait(ctx, func(broadcast func(), getWaitCh func() <-chan struct{}) (bool, error) {
	gotValue = currValue
	return gotValue == 9, nil
})
if err != nil {
	fmt.Printf("failed to wait for value: %v", err.Error())
	return
}

fmt.Printf("waited for value to increment: %v\n", gotValue)
Output:
waited for value to increment: 9

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL