cluster

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MPL-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelCoordinator

type ChannelCoordinator interface {
	Start(ctx context.Context) error
	Stop()
	GetPeers() []PeerInfo
	InstanceID() string
	IsLeader() bool
	AcquireChannel(ctx context.Context, channelID string) (bool, error)
	RenewChannelLease(ctx context.Context, channelID string) error
	ReleaseChannel(ctx context.Context, channelID string) error
	OwnedChannels() []string
	ShouldAcquireChannel(channelID string, tags []string) bool
}

type ChannelHealth

type ChannelHealth struct {
	Running int `json:"running"`
	Stopped int `json:"stopped"`
	Errored int `json:"errored"`
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(cfg *config.ClusterConfig, logger *slog.Logger) *Coordinator

func (*Coordinator) AcquireChannel

func (c *Coordinator) AcquireChannel(_ context.Context, channelID string) (bool, error)

func (*Coordinator) GetPeers

func (c *Coordinator) GetPeers() []PeerInfo

func (*Coordinator) InstanceID

func (c *Coordinator) InstanceID() string

func (*Coordinator) IsLeader

func (c *Coordinator) IsLeader() bool

func (*Coordinator) OwnedChannels

func (c *Coordinator) OwnedChannels() []string

func (*Coordinator) ReleaseChannel

func (c *Coordinator) ReleaseChannel(_ context.Context, channelID string) error

func (*Coordinator) RenewChannelLease

func (c *Coordinator) RenewChannelLease(_ context.Context, _ string) error

func (*Coordinator) ShouldAcquireChannel

func (c *Coordinator) ShouldAcquireChannel(_ string, _ []string) bool

func (*Coordinator) Start

func (c *Coordinator) Start(ctx context.Context) error

func (*Coordinator) Stop

func (c *Coordinator) Stop()

type Deduplicator

type Deduplicator struct {
	// contains filtered or unexported fields
}

func NewDeduplicator

func NewDeduplicator(window time.Duration) *Deduplicator

func (*Deduplicator) IsDuplicate

func (d *Deduplicator) IsDuplicate(key string) bool

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

func NewHealthChecker

func NewHealthChecker(cfg *config.HealthConfig, logger *slog.Logger) *HealthChecker

func (*HealthChecker) SetStatus

func (hc *HealthChecker) SetStatus(status HealthStatus)

func (*HealthChecker) Start

func (hc *HealthChecker) Start() error

func (*HealthChecker) UpdateChannels

func (hc *HealthChecker) UpdateChannels(running, stopped, errored int)

type HealthResponse

type HealthResponse struct {
	Status   HealthStatus  `json:"status"`
	Channels ChannelHealth `json:"channels"`
	Uptime   string        `json:"uptime"`
}

type HealthStatus

type HealthStatus string
const (
	StatusHealthy   HealthStatus = "healthy"
	StatusDegraded  HealthStatus = "degraded"
	StatusUnhealthy HealthStatus = "unhealthy"
)

type MessageDeduplicator

type MessageDeduplicator interface {
	IsDuplicate(key string) bool
}

type PeerInfo

type PeerInfo struct {
	InstanceID string    `json:"instance_id"`
	LastSeen   time.Time `json:"last_seen"`
	Channels   []string  `json:"channels"`
	Status     string    `json:"status"`
}

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

func NewRedisClient

func NewRedisClient(cfg *config.RedisConfig) (*RedisClient, error)

func (*RedisClient) Client

func (rc *RedisClient) Client() *redis.Client

func (*RedisClient) Close

func (rc *RedisClient) Close() error

func (*RedisClient) Key

func (rc *RedisClient) Key(parts ...string) string

type RedisCoordinator

type RedisCoordinator struct {
	// contains filtered or unexported fields
}

func NewRedisCoordinator

func NewRedisCoordinator(client *RedisClient, cfg *config.ClusterConfig, logger *slog.Logger) *RedisCoordinator

func (*RedisCoordinator) AcquireChannel

func (rc *RedisCoordinator) AcquireChannel(ctx context.Context, channelID string) (bool, error)

func (*RedisCoordinator) GetPeers

func (rc *RedisCoordinator) GetPeers() []PeerInfo

func (*RedisCoordinator) InstanceID

func (rc *RedisCoordinator) InstanceID() string

func (*RedisCoordinator) IsLeader

func (rc *RedisCoordinator) IsLeader() bool

func (*RedisCoordinator) OwnedChannels

func (rc *RedisCoordinator) OwnedChannels() []string

func (*RedisCoordinator) ReleaseChannel

func (rc *RedisCoordinator) ReleaseChannel(ctx context.Context, channelID string) error

func (*RedisCoordinator) RenewChannelLease

func (rc *RedisCoordinator) RenewChannelLease(ctx context.Context, channelID string) error

func (*RedisCoordinator) ShouldAcquireChannel

func (rc *RedisCoordinator) ShouldAcquireChannel(channelID string, tags []string) bool

func (*RedisCoordinator) Start

func (rc *RedisCoordinator) Start(ctx context.Context) error

func (*RedisCoordinator) Stop

func (rc *RedisCoordinator) Stop()

type RedisDeduplicator

type RedisDeduplicator struct {
	// contains filtered or unexported fields
}

func NewRedisDeduplicator

func NewRedisDeduplicator(client *RedisClient, window time.Duration) *RedisDeduplicator

func (*RedisDeduplicator) IsDuplicate

func (rd *RedisDeduplicator) IsDuplicate(key string) bool

func (*RedisDeduplicator) IsDuplicateCtx

func (rd *RedisDeduplicator) IsDuplicateCtx(ctx context.Context, key string) (bool, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL