Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = fmt.Errorf("Redis PSUBSCRIBE stream closed due to storage shutdown: %w", context.Canceled)
ErrClosed represents dispatcher has been closed.
Functions ¶
This section is empty.
Types ¶
type AwaitCancelFunc ¶
type AwaitCancelFunc func(err error)
AwaitCancelFunc is function to cancel awaiter.
type DispatcherParams ¶
type DispatcherParams struct {
ReconcileInterval time.Duration
ReconcileRetryInterval time.Duration
ReconcileMinimumInterval time.Duration
}
DispatcherParams tunes Dispatcher
type RedisChannelID ¶
type RedisChannelID string
RedisChannelID represents channel ID of Redis Pub/Sub
type RedisPubSubAwaiter ¶
type RedisPubSubAwaiter interface {
// Chan returns channel that will be closed when new message received or error occurred (fulfilled).
Chan() chan interface{}
// After Chan() has been closed, can obtain error object if error occurred.
Err() error
}
RedisPubSubAwaiter is Promise-like object repsresents Pub/Sub message await.
type RedisPubSubDispatcher ¶
type RedisPubSubDispatcher interface {
Await(ctx context.Context, channel RedisChannelID) (RedisPubSubAwaiter, AwaitCancelFunc)
Shutdown(ctx context.Context)
}
RedisPubSubDispatcher subscribe Redis PubSub with PSUBSCRIBE (wildcard subscription), then broadcast message to redisPubsubAwaiter. Because go-redis open/close underlying TCP connection for each subscription, it cause massive TCP CLOSE_WAIT connections if Storage.FetchMessage make SUBSCRIBE for each call.
func NewDispatcher ¶
func NewDispatcher(ctx context.Context, deps deps.StorageDeps, params DispatcherParams, psubscribe RedisSubscribeRawFunc, pattern RedisChannelID) RedisPubSubDispatcher
NewDispatcher creates instance
type RedisPubSubPromise ¶
type RedisPubSubPromise interface {
Resolve()
Reject(err error)
Chan() chan interface{}
Err() error
}
RedisPubSubPromise supports resolve, reject operations in addition to RedisPubSubAwaiter
type RedisRawPubSub ¶
type RedisRawPubSub interface {
Receive(context.Context) (interface{}, error)
Ping(context.Context, ...string) error
ChannelSize(int) <-chan *redis.Message
Close() error
}
RedisRawPubSub is subset of *redis.PubSub
type RedisSubscribeRawFunc ¶
type RedisSubscribeRawFunc func(ctx context.Context, channel RedisChannelID) RedisRawPubSub
RedisSubscribeRawFunc represents (P)SUBSCRIBE command implementation.