Documentation
¶
Index ¶
- type ChannelCoordinator
- type ChannelHealth
- type Coordinator
- func (c *Coordinator) AcquireChannel(_ context.Context, channelID string) (bool, error)
- func (c *Coordinator) GetPeers() []PeerInfo
- func (c *Coordinator) InstanceID() string
- func (c *Coordinator) IsLeader() bool
- func (c *Coordinator) OwnedChannels() []string
- func (c *Coordinator) ReleaseChannel(_ context.Context, channelID string) error
- func (c *Coordinator) RenewChannelLease(_ context.Context, _ string) error
- func (c *Coordinator) ShouldAcquireChannel(_ string, _ []string) bool
- func (c *Coordinator) Start(ctx context.Context) error
- func (c *Coordinator) Stop()
- type Deduplicator
- type HealthChecker
- type HealthResponse
- type HealthStatus
- type MessageDeduplicator
- type PeerInfo
- type RedisClient
- type RedisCoordinator
- func (rc *RedisCoordinator) AcquireChannel(ctx context.Context, channelID string) (bool, error)
- func (rc *RedisCoordinator) GetPeers() []PeerInfo
- func (rc *RedisCoordinator) InstanceID() string
- func (rc *RedisCoordinator) IsLeader() bool
- func (rc *RedisCoordinator) OwnedChannels() []string
- func (rc *RedisCoordinator) ReleaseChannel(ctx context.Context, channelID string) error
- func (rc *RedisCoordinator) RenewChannelLease(ctx context.Context, channelID string) error
- func (rc *RedisCoordinator) ShouldAcquireChannel(channelID string, tags []string) bool
- func (rc *RedisCoordinator) Start(ctx context.Context) error
- func (rc *RedisCoordinator) Stop()
- type RedisDeduplicator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelCoordinator ¶
type ChannelCoordinator interface {
Start(ctx context.Context) error
Stop()
GetPeers() []PeerInfo
InstanceID() string
IsLeader() bool
AcquireChannel(ctx context.Context, channelID string) (bool, error)
RenewChannelLease(ctx context.Context, channelID string) error
ReleaseChannel(ctx context.Context, channelID string) error
OwnedChannels() []string
ShouldAcquireChannel(channelID string, tags []string) bool
}
type ChannelHealth ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(cfg *config.ClusterConfig, logger *slog.Logger) *Coordinator
func (*Coordinator) AcquireChannel ¶
func (*Coordinator) GetPeers ¶
func (c *Coordinator) GetPeers() []PeerInfo
func (*Coordinator) InstanceID ¶
func (c *Coordinator) InstanceID() string
func (*Coordinator) IsLeader ¶
func (c *Coordinator) IsLeader() bool
func (*Coordinator) OwnedChannels ¶
func (c *Coordinator) OwnedChannels() []string
func (*Coordinator) ReleaseChannel ¶
func (c *Coordinator) ReleaseChannel(_ context.Context, channelID string) error
func (*Coordinator) RenewChannelLease ¶
func (c *Coordinator) RenewChannelLease(_ context.Context, _ string) error
func (*Coordinator) ShouldAcquireChannel ¶
func (c *Coordinator) ShouldAcquireChannel(_ string, _ []string) bool
func (*Coordinator) Stop ¶
func (c *Coordinator) Stop()
type Deduplicator ¶
type Deduplicator struct {
// contains filtered or unexported fields
}
func NewDeduplicator ¶
func NewDeduplicator(window time.Duration) *Deduplicator
func (*Deduplicator) IsDuplicate ¶
func (d *Deduplicator) IsDuplicate(key string) bool
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
func NewHealthChecker ¶
func NewHealthChecker(cfg *config.HealthConfig, logger *slog.Logger) *HealthChecker
func (*HealthChecker) SetStatus ¶
func (hc *HealthChecker) SetStatus(status HealthStatus)
func (*HealthChecker) Start ¶
func (hc *HealthChecker) Start() error
func (*HealthChecker) UpdateChannels ¶
func (hc *HealthChecker) UpdateChannels(running, stopped, errored int)
type HealthResponse ¶
type HealthResponse struct {
Status HealthStatus `json:"status"`
Channels ChannelHealth `json:"channels"`
Uptime string `json:"uptime"`
}
type HealthStatus ¶
type HealthStatus string
const ( StatusHealthy HealthStatus = "healthy" StatusDegraded HealthStatus = "degraded" StatusUnhealthy HealthStatus = "unhealthy" )
type MessageDeduplicator ¶
type RedisClient ¶
type RedisClient struct {
// contains filtered or unexported fields
}
func NewRedisClient ¶
func NewRedisClient(cfg *config.RedisConfig) (*RedisClient, error)
func (*RedisClient) Client ¶
func (rc *RedisClient) Client() *redis.Client
func (*RedisClient) Close ¶
func (rc *RedisClient) Close() error
func (*RedisClient) Key ¶
func (rc *RedisClient) Key(parts ...string) string
type RedisCoordinator ¶
type RedisCoordinator struct {
// contains filtered or unexported fields
}
func NewRedisCoordinator ¶
func NewRedisCoordinator(client *RedisClient, cfg *config.ClusterConfig, logger *slog.Logger) *RedisCoordinator
func (*RedisCoordinator) AcquireChannel ¶
func (*RedisCoordinator) GetPeers ¶
func (rc *RedisCoordinator) GetPeers() []PeerInfo
func (*RedisCoordinator) InstanceID ¶
func (rc *RedisCoordinator) InstanceID() string
func (*RedisCoordinator) IsLeader ¶
func (rc *RedisCoordinator) IsLeader() bool
func (*RedisCoordinator) OwnedChannels ¶
func (rc *RedisCoordinator) OwnedChannels() []string
func (*RedisCoordinator) ReleaseChannel ¶
func (rc *RedisCoordinator) ReleaseChannel(ctx context.Context, channelID string) error
func (*RedisCoordinator) RenewChannelLease ¶
func (rc *RedisCoordinator) RenewChannelLease(ctx context.Context, channelID string) error
func (*RedisCoordinator) ShouldAcquireChannel ¶
func (rc *RedisCoordinator) ShouldAcquireChannel(channelID string, tags []string) bool
func (*RedisCoordinator) Stop ¶
func (rc *RedisCoordinator) Stop()
type RedisDeduplicator ¶
type RedisDeduplicator struct {
// contains filtered or unexported fields
}
func NewRedisDeduplicator ¶
func NewRedisDeduplicator(client *RedisClient, window time.Duration) *RedisDeduplicator
func (*RedisDeduplicator) IsDuplicate ¶
func (rd *RedisDeduplicator) IsDuplicate(key string) bool
func (*RedisDeduplicator) IsDuplicateCtx ¶
Click to show internal directories.
Click to hide internal directories.