Documentation
¶
Index ¶
- Variables
- type ChannelLock
- type ChannelLocker
- type FifoQueue
- type HTTPWebhookHandler
- type Manager
- type NoopChannelLock
- type NoopChannelLocker
- type RedisChannelLock
- type RedisChannelLocker
- type RedisFifoQueue
- type RedisFifoQueueOption
- func WithClaimMinIdleTime(duration time.Duration) RedisFifoQueueOption
- func WithConsumerGroup(group string) RedisFifoQueueOption
- func WithKeyPrefix(prefix string) RedisFifoQueueOption
- func WithLockTTL(ttl time.Duration) RedisFifoQueueOption
- func WithMaxStreamLength(length int64) RedisFifoQueueOption
- func WithPollInterval(interval time.Duration) RedisFifoQueueOption
- func WithStreamInactivityTimeout(timeout time.Duration) RedisFifoQueueOption
- func WithStreamRefreshInterval(interval time.Duration) RedisFifoQueueOption
- type RedisFifoQueueOptions
- type RedisFifoQueueProducer
- type SlackClient
- type WebhookHandler
Constants ¶
This section is empty.
Variables ¶
ErrChannelLockUnavailable is returned when a channel lock cannot be obtained.
Functions ¶
This section is empty.
Types ¶
type ChannelLock ¶
type ChannelLock interface {
// Key returns the key associated with this lock.
Key() string
// Release releases the lock.
// It does not accept a context parameter because releasing a lock is a commitment
// that must complete regardless of the caller's context state. Each implementation
// is responsible for managing its own timeouts internally.
Release() error
}
ChannelLock represents a single lock on a channel.
type ChannelLocker ¶
type ChannelLocker interface {
// Obtain tries to obtain a lock for the given key (channel ID) with the specified TTL (time-to-live).
// It will retry obtaining the lock until the maxWait duration is reached.
// If the lock is successfully obtained, it returns a ChannelLock instance.
// If the lock cannot be obtained within the maxWait duration, it returns ErrChannelLockUnavailable.
Obtain(ctx context.Context, key string, ttl time.Duration, maxWait time.Duration) (ChannelLock, error)
}
ChannelLocker is an interface for obtaining and releasing distributed locks on channels. It is used to ensure that only one process can perform operations on a channel at a time. This is particularly useful in a distributed environment where multiple instances of the manager may be running. Use the RedisChannelLocker implementation for a production-ready solution, using Redis as the backing store. Use the NoopChannelLocker for testing or when locking is not required (i.e in a single instance setup).
type FifoQueue ¶
type FifoQueue interface {
// Name returns the name of the queue.
Name() string
// Send sends a single message to the queue.
//
// slackChannelID is the Slack channel to which the message belongs.
// A queue implementation should use this value to partition the queue (i.e. group ID in an AWS SQS Fifo queue),
// but it is not required.
//
// dedupID is a unique identifier for the message.
// A queue implementation should use this value to deduplicate messages, but it is not required.
//
// body is the json formatted message body.
Send(ctx context.Context, slackChannelID, dedupID, body string) error
// Receive receives messages from the queue, until the context is cancelled.
// Messages are sent to the provided channel.
// The channel must be closed by the implementation before returning, typically when the context is cancelled or a fatal error occurs.
Receive(ctx context.Context, sinkCh chan<- *types.FifoQueueItem) error
}
FifoQueue is an interface for interacting with a fifo queue.
type HTTPWebhookHandler ¶
type HTTPWebhookHandler struct {
// contains filtered or unexported fields
}
HTTPWebhookHandler is an implementation of the WebhookHandler interface that sends webhooks via HTTP. This is the default webhook handler when no other webhook handlers are configured.
func NewHTTPWebhookHandler ¶
func NewHTTPWebhookHandler(logger types.Logger) *HTTPWebhookHandler
NewHTTPWebhookHandler creates a new HTTPWebhookHandler.
func (*HTTPWebhookHandler) HandleWebhook ¶
func (h *HTTPWebhookHandler) HandleWebhook(ctx context.Context, target string, data *types.WebhookCallback, logger types.Logger) error
HandleWebhook sends the webhook data to the target URL via an HTTP POST request. It expects a successful HTTP status code (2xx) in response.
func (*HTTPWebhookHandler) ShouldHandleWebhook ¶
func (h *HTTPWebhookHandler) ShouldHandleWebhook(_ context.Context, target string) bool
ShouldHandleWebhook returns true if the target is an HTTP or HTTPS URL. If other webhooks should handle certain URLs (e.g. SQS queue URLs), they must be registred *before* this handler.
func (*HTTPWebhookHandler) WithRequestTimeout ¶
func (h *HTTPWebhookHandler) WithRequestTimeout(timeout time.Duration) *HTTPWebhookHandler
WithRequestTimeout sets the timeout for HTTP requests made by the webhook handler. The default timeout is 3 seconds.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func New ¶
func New(db types.DB, alertQueue FifoQueue, commandQueue FifoQueue, cacheStore store.StoreInterface, locker ChannelLocker, logger types.Logger, metrics types.Metrics, cfg *config.ManagerConfig, managerSettings *config.ManagerSettings) *Manager
func (*Manager) RegisterWebhookHandler ¶
func (m *Manager) RegisterWebhookHandler(handler WebhookHandler) *Manager
func (*Manager) UpdateSettings ¶
func (m *Manager) UpdateSettings(settings *config.ManagerSettings) error
type NoopChannelLock ¶
type NoopChannelLock struct {
// contains filtered or unexported fields
}
NoopChannelLock is a no-op implementation of the ChannelLock interface.
func (*NoopChannelLock) Key ¶
func (n *NoopChannelLock) Key() string
Key returns the key associated with the no-op channel lock.
func (*NoopChannelLock) Release ¶
func (n *NoopChannelLock) Release() error
Release is a no-op method that does nothing.
type NoopChannelLocker ¶
type NoopChannelLocker struct{}
NoopChannelLocker is a no-op implementation of the ChannelLocker interface. It does not perform any locking and is used for testing or when locking is not required. Locking is typically always required when running in a distributed environment, such as in Kubernetes.
type RedisChannelLock ¶
type RedisChannelLock struct {
// contains filtered or unexported fields
}
RedisChannelLock is an implementation of the ChannelLock interface that uses Redis locks. It represents a lock on a specific channel. It is returned by the RedisChannelLocker when a lock is successfully obtained. The lock can be released using the Release method, which will remove the lock from Redis.
func (*RedisChannelLock) Key ¶
func (l *RedisChannelLock) Key() string
Key returns the key associated with the RedisChannelLock instance.
func (*RedisChannelLock) Release ¶
func (l *RedisChannelLock) Release() error
Release releases the lock held by the RedisChannelLock instance. It removes the lock from Redis, allowing other instances to obtain the lock. If the lock is already released, or not held, it returns nil. It uses a new timeout context to ensure the release operation completes within a reasonable time frame, even if the caller's context is cancelled.
type RedisChannelLocker ¶
type RedisChannelLocker struct {
// contains filtered or unexported fields
}
RedisChannelLocker is an implementation of the ChannelLocker interface that uses Redis for distributed locking. It allows multiple instances of the manager to coordinate access to channels, ensuring that only one instance can perform operations on a channel at a time. The locker uses a key prefix to avoid conflicts with other locks in Redis. It also supports retry backoff for obtaining locks, allowing for configurable retry strategies. The default retry backoff is 2 seconds, but it can be configured using the WithRetryBackoff method.
func NewRedisChannelLocker ¶
func NewRedisChannelLocker(client *redis.Client) *RedisChannelLocker
NewRedisChannelLocker creates a new RedisChannelLocker instance. It takes a Redis client as an argument, which is used to communicate with the Redis server. The client should be configured with the appropriate Redis server address and authentication details. The RedisChannelLocker can be configured with a custom retry backoff and key prefix using the WithRetryBackoff and WithKeyPrefix methods. If no custom values are provided, it defaults to a retry backoff of 2 seconds and a key prefix of "slack-manager:channel-lock:".
func (*RedisChannelLocker) Obtain ¶
func (r *RedisChannelLocker) Obtain(ctx context.Context, key string, ttl time.Duration, maxWait time.Duration) (ChannelLock, error)
Obtain tries to obtain a lock for the given key (channel ID) with a specified TTL (time to live). It uses a retry strategy based on the configured retry backoff and max wait duration. If the lock is successfully obtained, it returns a RedisChannelLock instance. If the lock cannot be obtained within the max wait duration, it returns ErrChannelLockUnavailable. The key is prefixed with the configured key prefix to avoid conflicts with other locks in Redis.
func (*RedisChannelLocker) WithKeyPrefix ¶
func (r *RedisChannelLocker) WithKeyPrefix(prefix string) *RedisChannelLocker
WithKeyPrefix sets the Redis key prefix for the locks. The default prefix is "slack-manager:channel-lock:". This prefix is used to avoid conflicts with other keys in Redis. An empty prefix means that the locks will have keys equal to the Slack channel ID.
func (*RedisChannelLocker) WithRetryBackoff ¶
func (r *RedisChannelLocker) WithRetryBackoff(backoff time.Duration) *RedisChannelLocker
WithRetryBackoff sets the retry backoff duration for obtaining locks. If the backoff is less than or equal to zero, it defaults to 2 seconds.
type RedisFifoQueue ¶
type RedisFifoQueue struct {
*RedisFifoQueueProducer // Embedded for Send() and Name()
// contains filtered or unexported fields
}
RedisFifoQueue implements the FifoQueue interface using Redis Streams. It uses one stream per Slack channel to ensure message ordering within a channel, mimicking the behavior of SQS FIFO message groups.
Multiple instances can consume from the same queue using Redis consumer groups, providing at-least-once delivery semantics. Messages for different channels can be processed in parallel across instances.
Strict per-channel ordering is enforced using distributed locks. When a consumer reads from a stream, it acquires a lock for that channel. Other consumers cannot read from that stream until the lock is released (after ack/nack). This ensures that messages for a single channel are processed strictly in order, even across multiple instances.
For write-only usage (e.g., API server), consider using RedisFifoQueueProducer instead.
func NewRedisFifoQueue ¶
func NewRedisFifoQueue(client redis.UniversalClient, locker ChannelLocker, name string, logger types.Logger, opts ...RedisFifoQueueOption) *RedisFifoQueue
NewRedisFifoQueue creates a new RedisFifoQueue instance with full send and receive capabilities. The client should be a configured Redis client (can be a single node, sentinel, or cluster client). The locker is used to ensure strict per-channel ordering across multiple instances. The name is used as part of the Redis key prefix and for identification.
For write-only usage (e.g., API server), consider using NewRedisFifoQueueProducer instead.
func (*RedisFifoQueue) Init ¶
func (q *RedisFifoQueue) Init() (*RedisFifoQueue, error)
Init initializes the RedisFifoQueue. It validates options and generates a unique consumer name for this instance.
func (*RedisFifoQueue) Receive ¶
func (q *RedisFifoQueue) Receive(ctx context.Context, sinkCh chan<- *types.FifoQueueItem) error
Receive receives messages from the queue until the context is cancelled. Messages are sent to the provided channel, which is closed when Receive returns. The receiver uses the shared knownStreams map which is updated by Send() for immediate discovery of new streams created by in-process senders.
func (*RedisFifoQueue) Send ¶
func (q *RedisFifoQueue) Send(ctx context.Context, slackChannelID, dedupID, body string) error
Send sends a message to the queue with additional support for in-process receivers. This overrides the embedded producer's Send to add: - Consumer group creation for new streams (enables immediate consumption) - Local stream tracking for fast discovery by in-process receiver - Notification to wake up in-process receiver immediately
type RedisFifoQueueOption ¶
type RedisFifoQueueOption func(*RedisFifoQueueOptions)
RedisFifoQueueOption is a function that configures RedisFifoQueueOptions.
func WithClaimMinIdleTime ¶
func WithClaimMinIdleTime(duration time.Duration) RedisFifoQueueOption
WithClaimMinIdleTime sets the minimum idle time before a message can be claimed.
func WithConsumerGroup ¶
func WithConsumerGroup(group string) RedisFifoQueueOption
WithConsumerGroup sets the consumer group name.
func WithKeyPrefix ¶
func WithKeyPrefix(prefix string) RedisFifoQueueOption
WithKeyPrefix sets the Redis key prefix for the queue.
func WithLockTTL ¶
func WithLockTTL(ttl time.Duration) RedisFifoQueueOption
WithLockTTL sets the time-to-live for the per-stream lock.
func WithMaxStreamLength ¶
func WithMaxStreamLength(length int64) RedisFifoQueueOption
WithMaxStreamLength sets the approximate maximum length of each stream.
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) RedisFifoQueueOption
WithPollInterval sets how long to wait between polling cycles when no messages are available.
func WithStreamInactivityTimeout ¶
func WithStreamInactivityTimeout(timeout time.Duration) RedisFifoQueueOption
WithStreamInactivityTimeout sets how long a stream can be inactive before cleanup. Set to 0 to disable automatic cleanup.
func WithStreamRefreshInterval ¶
func WithStreamRefreshInterval(interval time.Duration) RedisFifoQueueOption
WithStreamRefreshInterval sets how often to check for new streams.
type RedisFifoQueueOptions ¶
type RedisFifoQueueOptions struct {
// contains filtered or unexported fields
}
RedisFifoQueueOptions holds configuration for RedisFifoQueue.
type RedisFifoQueueProducer ¶
type RedisFifoQueueProducer struct {
// contains filtered or unexported fields
}
RedisFifoQueueProducer is a lightweight write-only queue producer. Use this when the service only needs to send messages (e.g., API server in a separate service). For full queue functionality including Receive(), use RedisFifoQueue instead.
This producer has minimal overhead: no consumer groups, no distributed locks, no in-process notification mechanism. It simply writes messages to Redis streams.
func NewRedisFifoQueueProducer ¶
func NewRedisFifoQueueProducer(client redis.UniversalClient, name string, logger types.Logger, opts ...RedisFifoQueueOption) *RedisFifoQueueProducer
NewRedisFifoQueueProducer creates a new lightweight write-only queue producer. Use this when the service only needs to send messages and doesn't need to receive. The client should be a configured Redis client (can be a single node, sentinel, or cluster client). The name is used as part of the Redis key prefix and for identification.
func (*RedisFifoQueueProducer) Init ¶
func (p *RedisFifoQueueProducer) Init() (*RedisFifoQueueProducer, error)
Init initializes the RedisFifoQueueProducer. It validates options and marks the producer as ready for use.
func (*RedisFifoQueueProducer) Name ¶
func (p *RedisFifoQueueProducer) Name() string
Name returns the name of the queue.
func (*RedisFifoQueueProducer) Send ¶
func (p *RedisFifoQueueProducer) Send(ctx context.Context, slackChannelID, _, body string) error
Send sends a message to the queue. The slackChannelID is used to route the message to a specific stream, ensuring ordering per channel. Note: The dedupID parameter is accepted for interface compatibility but is not used for deduplication. Redis Streams does not natively support message deduplication like SQS FIFO queues. Deduplication should be handled at the application level if needed.
type SlackClient ¶
type SlackClient interface {
GetChannelName(ctx context.Context, channelID string) string
IsAlertChannel(ctx context.Context, channelID string) (bool, string, error)
Update(ctx context.Context, channelID string, allChannelIssues []*models.Issue) error
UpdateSingleIssueWithThrottling(ctx context.Context, issue *models.Issue, reason string, issuesInChannel int) error
UpdateSingleIssue(ctx context.Context, issue *models.Issue, reason string) error
Delete(ctx context.Context, issue *models.Issue, reason string, updateIfMessageHasReplies bool, sem *semaphore.Weighted) error
DeletePost(ctx context.Context, channelID, ts string) error
}
type WebhookHandler ¶
type WebhookHandler interface {
// ShouldHandleWebhook returns true if the handler should handle the specified webhook target.
ShouldHandleWebhook(ctx context.Context, target string) bool
// HandleWebhook handles the webhook target, with the specified callback data.
// If successful, it should DEBUG log the send action and return nil.
// If unsuccessful, it should return the error (without logging it).
HandleWebhook(ctx context.Context, target string, data *types.WebhookCallback, logger types.Logger) error
}
WebhookHandler is an interface for handling alert webhooks (i.e. issue callbacks). The webhook may or may not be http based. More than one handler can be registered with a Manager. The first handler that returns true from ShouldHandleWebhook will be used to handle a given webhook.