manager

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrChannelLockUnavailable = errors.New("channel lock unavailable")

ErrChannelLockUnavailable is returned when a channel lock cannot be obtained.

Functions

This section is empty.

Types

type ChannelLock

type ChannelLock interface {
	// Key returns the key associated with this lock.
	Key() string

	// Release releases the lock.
	// It does not accept a context parameter because releasing a lock is a commitment
	// that must complete regardless of the caller's context state. Each implementation
	// is responsible for managing its own timeouts internally.
	Release() error
}

ChannelLock represents a single lock on a channel.

type ChannelLocker

type ChannelLocker interface {
	// Obtain tries to obtain a lock for the given key (channel ID) with the specified TTL (time-to-live).
	// It will retry obtaining the lock until the maxWait duration is reached.
	// If the lock is successfully obtained, it returns a ChannelLock instance.
	// If the lock cannot be obtained within the maxWait duration, it returns ErrChannelLockUnavailable.
	Obtain(ctx context.Context, key string, ttl time.Duration, maxWait time.Duration) (ChannelLock, error)
}

ChannelLocker is an interface for obtaining and releasing distributed locks on channels. It is used to ensure that only one process can perform operations on a channel at a time. This is particularly useful in a distributed environment where multiple instances of the manager may be running. Use the RedisChannelLocker implementation for a production-ready solution, using Redis as the backing store. Use the NoopChannelLocker for testing or when locking is not required (i.e in a single instance setup).

type FifoQueue

type FifoQueue interface {
	// Name returns the name of the queue.
	Name() string

	// Send sends a single message to the queue.
	//
	// slackChannelID is the Slack channel to which the message belongs.
	// A queue implementation should use this value to partition the queue (i.e. group ID in an AWS SQS Fifo queue),
	// but it is not required.
	//
	// dedupID is a unique identifier for the message.
	// A queue implementation should use this value to deduplicate messages, but it is not required.
	//
	// body is the json formatted message body.
	Send(ctx context.Context, slackChannelID, dedupID, body string) error

	// Receive receives messages from the queue, until the context is cancelled.
	// Messages are sent to the provided channel.
	// The channel must be closed by the implementation before returning, typically when the context is cancelled or a fatal error occurs.
	Receive(ctx context.Context, sinkCh chan<- *types.FifoQueueItem) error
}

FifoQueue is an interface for interacting with a fifo queue.

type HTTPWebhookHandler

type HTTPWebhookHandler struct {
	// contains filtered or unexported fields
}

HTTPWebhookHandler is an implementation of the WebhookHandler interface that sends webhooks via HTTP. This is the default webhook handler when no other webhook handlers are configured.

func NewHTTPWebhookHandler

func NewHTTPWebhookHandler(logger types.Logger) *HTTPWebhookHandler

NewHTTPWebhookHandler creates a new HTTPWebhookHandler.

func (*HTTPWebhookHandler) HandleWebhook

func (h *HTTPWebhookHandler) HandleWebhook(ctx context.Context, target string, data *types.WebhookCallback, logger types.Logger) error

HandleWebhook sends the webhook data to the target URL via an HTTP POST request. It expects a successful HTTP status code (2xx) in response.

func (*HTTPWebhookHandler) ShouldHandleWebhook

func (h *HTTPWebhookHandler) ShouldHandleWebhook(_ context.Context, target string) bool

ShouldHandleWebhook returns true if the target is an HTTP or HTTPS URL. If other webhooks should handle certain URLs (e.g. SQS queue URLs), they must be registred *before* this handler.

func (*HTTPWebhookHandler) WithRequestTimeout

func (h *HTTPWebhookHandler) WithRequestTimeout(timeout time.Duration) *HTTPWebhookHandler

WithRequestTimeout sets the timeout for HTTP requests made by the webhook handler. The default timeout is 3 seconds.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(db types.DB, alertQueue FifoQueue, commandQueue FifoQueue, cacheStore store.StoreInterface, locker ChannelLocker, logger types.Logger, metrics types.Metrics, cfg *config.ManagerConfig, managerSettings *config.ManagerSettings) *Manager

func (*Manager) RegisterWebhookHandler

func (m *Manager) RegisterWebhookHandler(handler WebhookHandler) *Manager

func (*Manager) Run

func (m *Manager) Run(ctx context.Context) error

func (*Manager) UpdateSettings

func (m *Manager) UpdateSettings(settings *config.ManagerSettings) error

type NoopChannelLock

type NoopChannelLock struct {
	// contains filtered or unexported fields
}

NoopChannelLock is a no-op implementation of the ChannelLock interface.

func (*NoopChannelLock) Key

func (n *NoopChannelLock) Key() string

Key returns the key associated with the no-op channel lock.

func (*NoopChannelLock) Release

func (n *NoopChannelLock) Release() error

Release is a no-op method that does nothing.

type NoopChannelLocker

type NoopChannelLocker struct{}

NoopChannelLocker is a no-op implementation of the ChannelLocker interface. It does not perform any locking and is used for testing or when locking is not required. Locking is typically always required when running in a distributed environment, such as in Kubernetes.

func (*NoopChannelLocker) Obtain

Obtain returns a no-op channel lock that does nothing.

type RedisChannelLock

type RedisChannelLock struct {
	// contains filtered or unexported fields
}

RedisChannelLock is an implementation of the ChannelLock interface that uses Redis locks. It represents a lock on a specific channel. It is returned by the RedisChannelLocker when a lock is successfully obtained. The lock can be released using the Release method, which will remove the lock from Redis.

func (*RedisChannelLock) Key

func (l *RedisChannelLock) Key() string

Key returns the key associated with the RedisChannelLock instance.

func (*RedisChannelLock) Release

func (l *RedisChannelLock) Release() error

Release releases the lock held by the RedisChannelLock instance. It removes the lock from Redis, allowing other instances to obtain the lock. If the lock is already released, or not held, it returns nil. It uses a new timeout context to ensure the release operation completes within a reasonable time frame, even if the caller's context is cancelled.

type RedisChannelLocker

type RedisChannelLocker struct {
	// contains filtered or unexported fields
}

RedisChannelLocker is an implementation of the ChannelLocker interface that uses Redis for distributed locking. It allows multiple instances of the manager to coordinate access to channels, ensuring that only one instance can perform operations on a channel at a time. The locker uses a key prefix to avoid conflicts with other locks in Redis. It also supports retry backoff for obtaining locks, allowing for configurable retry strategies. The default retry backoff is 2 seconds, but it can be configured using the WithRetryBackoff method.

func NewRedisChannelLocker

func NewRedisChannelLocker(client *redis.Client) *RedisChannelLocker

NewRedisChannelLocker creates a new RedisChannelLocker instance. It takes a Redis client as an argument, which is used to communicate with the Redis server. The client should be configured with the appropriate Redis server address and authentication details. The RedisChannelLocker can be configured with a custom retry backoff and key prefix using the WithRetryBackoff and WithKeyPrefix methods. If no custom values are provided, it defaults to a retry backoff of 2 seconds and a key prefix of "slack-manager:channel-lock:".

func (*RedisChannelLocker) Obtain

func (r *RedisChannelLocker) Obtain(ctx context.Context, key string, ttl time.Duration, maxWait time.Duration) (ChannelLock, error)

Obtain tries to obtain a lock for the given key (channel ID) with a specified TTL (time to live). It uses a retry strategy based on the configured retry backoff and max wait duration. If the lock is successfully obtained, it returns a RedisChannelLock instance. If the lock cannot be obtained within the max wait duration, it returns ErrChannelLockUnavailable. The key is prefixed with the configured key prefix to avoid conflicts with other locks in Redis.

func (*RedisChannelLocker) WithKeyPrefix

func (r *RedisChannelLocker) WithKeyPrefix(prefix string) *RedisChannelLocker

WithKeyPrefix sets the Redis key prefix for the locks. The default prefix is "slack-manager:channel-lock:". This prefix is used to avoid conflicts with other keys in Redis. An empty prefix means that the locks will have keys equal to the Slack channel ID.

func (*RedisChannelLocker) WithRetryBackoff

func (r *RedisChannelLocker) WithRetryBackoff(backoff time.Duration) *RedisChannelLocker

WithRetryBackoff sets the retry backoff duration for obtaining locks. If the backoff is less than or equal to zero, it defaults to 2 seconds.

type RedisFifoQueue

type RedisFifoQueue struct {
	*RedisFifoQueueProducer // Embedded for Send() and Name()
	// contains filtered or unexported fields
}

RedisFifoQueue implements the FifoQueue interface using Redis Streams. It uses one stream per Slack channel to ensure message ordering within a channel, mimicking the behavior of SQS FIFO message groups.

Multiple instances can consume from the same queue using Redis consumer groups, providing at-least-once delivery semantics. Messages for different channels can be processed in parallel across instances.

Strict per-channel ordering is enforced using distributed locks. When a consumer reads from a stream, it acquires a lock for that channel. Other consumers cannot read from that stream until the lock is released (after ack/nack). This ensures that messages for a single channel are processed strictly in order, even across multiple instances.

For write-only usage (e.g., API server), consider using RedisFifoQueueProducer instead.

func NewRedisFifoQueue

func NewRedisFifoQueue(client redis.UniversalClient, locker ChannelLocker, name string, logger types.Logger, opts ...RedisFifoQueueOption) *RedisFifoQueue

NewRedisFifoQueue creates a new RedisFifoQueue instance with full send and receive capabilities. The client should be a configured Redis client (can be a single node, sentinel, or cluster client). The locker is used to ensure strict per-channel ordering across multiple instances. The name is used as part of the Redis key prefix and for identification.

For write-only usage (e.g., API server), consider using NewRedisFifoQueueProducer instead.

func (*RedisFifoQueue) Init

func (q *RedisFifoQueue) Init() (*RedisFifoQueue, error)

Init initializes the RedisFifoQueue. It validates options and generates a unique consumer name for this instance.

func (*RedisFifoQueue) Receive

func (q *RedisFifoQueue) Receive(ctx context.Context, sinkCh chan<- *types.FifoQueueItem) error

Receive receives messages from the queue until the context is cancelled. Messages are sent to the provided channel, which is closed when Receive returns. The receiver uses the shared knownStreams map which is updated by Send() for immediate discovery of new streams created by in-process senders.

func (*RedisFifoQueue) Send

func (q *RedisFifoQueue) Send(ctx context.Context, slackChannelID, dedupID, body string) error

Send sends a message to the queue with additional support for in-process receivers. This overrides the embedded producer's Send to add: - Consumer group creation for new streams (enables immediate consumption) - Local stream tracking for fast discovery by in-process receiver - Notification to wake up in-process receiver immediately

type RedisFifoQueueOption

type RedisFifoQueueOption func(*RedisFifoQueueOptions)

RedisFifoQueueOption is a function that configures RedisFifoQueueOptions.

func WithClaimMinIdleTime

func WithClaimMinIdleTime(duration time.Duration) RedisFifoQueueOption

WithClaimMinIdleTime sets the minimum idle time before a message can be claimed.

func WithConsumerGroup

func WithConsumerGroup(group string) RedisFifoQueueOption

WithConsumerGroup sets the consumer group name.

func WithKeyPrefix

func WithKeyPrefix(prefix string) RedisFifoQueueOption

WithKeyPrefix sets the Redis key prefix for the queue.

func WithLockTTL

func WithLockTTL(ttl time.Duration) RedisFifoQueueOption

WithLockTTL sets the time-to-live for the per-stream lock.

func WithMaxStreamLength

func WithMaxStreamLength(length int64) RedisFifoQueueOption

WithMaxStreamLength sets the approximate maximum length of each stream.

func WithPollInterval

func WithPollInterval(interval time.Duration) RedisFifoQueueOption

WithPollInterval sets how long to wait between polling cycles when no messages are available.

func WithStreamInactivityTimeout

func WithStreamInactivityTimeout(timeout time.Duration) RedisFifoQueueOption

WithStreamInactivityTimeout sets how long a stream can be inactive before cleanup. Set to 0 to disable automatic cleanup.

func WithStreamRefreshInterval

func WithStreamRefreshInterval(interval time.Duration) RedisFifoQueueOption

WithStreamRefreshInterval sets how often to check for new streams.

type RedisFifoQueueOptions

type RedisFifoQueueOptions struct {
	// contains filtered or unexported fields
}

RedisFifoQueueOptions holds configuration for RedisFifoQueue.

type RedisFifoQueueProducer

type RedisFifoQueueProducer struct {
	// contains filtered or unexported fields
}

RedisFifoQueueProducer is a lightweight write-only queue producer. Use this when the service only needs to send messages (e.g., API server in a separate service). For full queue functionality including Receive(), use RedisFifoQueue instead.

This producer has minimal overhead: no consumer groups, no distributed locks, no in-process notification mechanism. It simply writes messages to Redis streams.

func NewRedisFifoQueueProducer

func NewRedisFifoQueueProducer(client redis.UniversalClient, name string, logger types.Logger, opts ...RedisFifoQueueOption) *RedisFifoQueueProducer

NewRedisFifoQueueProducer creates a new lightweight write-only queue producer. Use this when the service only needs to send messages and doesn't need to receive. The client should be a configured Redis client (can be a single node, sentinel, or cluster client). The name is used as part of the Redis key prefix and for identification.

func (*RedisFifoQueueProducer) Init

Init initializes the RedisFifoQueueProducer. It validates options and marks the producer as ready for use.

func (*RedisFifoQueueProducer) Name

func (p *RedisFifoQueueProducer) Name() string

Name returns the name of the queue.

func (*RedisFifoQueueProducer) Send

func (p *RedisFifoQueueProducer) Send(ctx context.Context, slackChannelID, _, body string) error

Send sends a message to the queue. The slackChannelID is used to route the message to a specific stream, ensuring ordering per channel. Note: The dedupID parameter is accepted for interface compatibility but is not used for deduplication. Redis Streams does not natively support message deduplication like SQS FIFO queues. Deduplication should be handled at the application level if needed.

type SlackClient

type SlackClient interface {
	GetChannelName(ctx context.Context, channelID string) string
	IsAlertChannel(ctx context.Context, channelID string) (bool, string, error)
	Update(ctx context.Context, channelID string, allChannelIssues []*models.Issue) error
	UpdateSingleIssueWithThrottling(ctx context.Context, issue *models.Issue, reason string, issuesInChannel int) error
	UpdateSingleIssue(ctx context.Context, issue *models.Issue, reason string) error
	Delete(ctx context.Context, issue *models.Issue, reason string, updateIfMessageHasReplies bool, sem *semaphore.Weighted) error
	DeletePost(ctx context.Context, channelID, ts string) error
}

type WebhookHandler

type WebhookHandler interface {
	// ShouldHandleWebhook returns true if the handler should handle the specified webhook target.
	ShouldHandleWebhook(ctx context.Context, target string) bool

	// HandleWebhook handles the webhook target, with the specified callback data.
	// If successful, it should DEBUG log the send action and return nil.
	// If unsuccessful, it should return the error (without logging it).
	HandleWebhook(ctx context.Context, target string, data *types.WebhookCallback, logger types.Logger) error
}

WebhookHandler is an interface for handling alert webhooks (i.e. issue callbacks). The webhook may or may not be http based. More than one handler can be registered with a Manager. The first handler that returns true from ShouldHandleWebhook will be used to handle a given webhook.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL