Documentation
¶
Index ¶
- type Broadcaster
- type Broker
- type Cacheable
- type Config
- type LegacyBroker
- func (LegacyBroker) Announce() string
- func (LegacyBroker) CommitSession(sid string, session Cacheable) error
- func (LegacyBroker) FinishSession(sid string) error
- func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
- func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
- func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
- func (LegacyBroker) RestoreSession(from string) ([]byte, error)
- func (LegacyBroker) Shutdown(ctx context.Context) error
- func (LegacyBroker) Start(done chan (error)) error
- func (b *LegacyBroker) Subscribe(stream string) string
- func (b *LegacyBroker) Unsubscribe(stream string) string
- type LocalBroker
- type Memory
- func (b *Memory) Announce() string
- func (b *Memory) CommitSession(sid string, session Cacheable) error
- func (b *Memory) FinishSession(sid string) error
- func (b *Memory) GetEpoch() string
- func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
- func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
- func (b *Memory) HistoryFrom(name string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (b *Memory) HistorySince(name string, ts int64) ([]common.StreamMessage, error)
- func (b *Memory) RestoreSession(from string) ([]byte, error)
- func (b *Memory) SetEpoch(v string)
- func (b *Memory) Shutdown(ctx context.Context) error
- func (b *Memory) Start(done chan (error)) error
- func (b *Memory) Store(name string, data []byte, offset uint64, ts time.Time) (uint64, error)
- func (b *Memory) Subscribe(stream string) string
- func (b *Memory) Unsubscribe(stream string) string
- type NATS
- func (n *NATS) Announce() string
- func (n *NATS) CommitSession(sid string, session Cacheable) error
- func (n *NATS) Epoch() string
- func (n *NATS) FinishSession(sid string) error
- func (n *NATS) HandleBroadcast(msg *common.StreamMessage)
- func (n *NATS) HandleCommand(msg *common.RemoteCommandMessage)
- func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error)
- func (n *NATS) Ready(timeout ...time.Duration) error
- func (n *NATS) Reset() error
- func (n *NATS) RestoreSession(sid string) ([]byte, error)
- func (n *NATS) SetEpoch(epoch string) error
- func (n *NATS) Shutdown(ctx context.Context) error
- func (n *NATS) Start(done chan (error)) error
- func (n *NATS) Subscribe(stream string) string
- func (n *NATS) Unsubscribe(stream string) string
- type NATSOption
- type StreamsTracker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface {
Broadcast(msg *common.StreamMessage)
BroadcastCommand(msg *common.RemoteCommandMessage)
Subscribe(stream string)
Unsubscribe(stream string)
}
Broadcaster is responsible for fanning-out messages to the stream clients and other nodes
type Broker ¶
type Broker interface {
Start(done chan (error)) error
Shutdown(ctx context.Context) error
Announce() string
HandleBroadcast(msg *common.StreamMessage)
HandleCommand(msg *common.RemoteCommandMessage)
// Registers the stream and returns its (short) unique identifier
Subscribe(stream string) string
// (Maybe) unregisters the stream and return its unique identifier
Unsubscribe(stream string) string
// Retrieves stream messages from history from the specified offset within the specified epoch
HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
// Retrieves stream messages from history from the specified timestamp
HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
// Saves session's state in cache
CommitSession(sid string, session Cacheable) error
// Fetches session's state from cache (by session id)
RestoreSession(from string) ([]byte, error)
// Marks session as finished (for cache expiration)
FinishSession(sid string) error
}
Broker is responsible for: - Managing streams history. - Keeping client states for recovery. - Distributing broadcasts across nodes.
type Cacheable ¶
Cacheable is an interface which a session object must implement to be stored in cache. We use interface and not require a string cache entry to be passed to avoid unnecessary dumping when broker doesn't support storing sessions.
type Config ¶
type Config struct {
// Adapter name
Adapter string `toml:"adapter"`
// For how long to keep history in seconds
HistoryTTL int64 `toml:"history_ttl"`
// Max size of messages to keep in the history per stream
HistoryLimit int `toml:"history_limit"`
// Sessions cache TTL in seconds (after disconnect)
SessionsTTL int64 `toml:"sessions_ttl"`
}
type LegacyBroker ¶
type LegacyBroker struct {
// contains filtered or unexported fields
}
LegacyBroker preserves the v1 behaviour while implementing the Broker APIs. Thus, we can use it without breaking the older behaviour
func NewLegacyBroker ¶
func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker
func (LegacyBroker) Announce ¶
func (LegacyBroker) Announce() string
func (LegacyBroker) CommitSession ¶
func (LegacyBroker) CommitSession(sid string, session Cacheable) error
func (LegacyBroker) FinishSession ¶
func (LegacyBroker) FinishSession(sid string) error
func (*LegacyBroker) HandleBroadcast ¶
func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
func (*LegacyBroker) HandleCommand ¶
func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
func (LegacyBroker) HistoryFrom ¶
func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
func (LegacyBroker) HistorySince ¶
func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
func (LegacyBroker) RestoreSession ¶
func (LegacyBroker) RestoreSession(from string) ([]byte, error)
func (LegacyBroker) Start ¶
func (LegacyBroker) Start(done chan (error)) error
func (*LegacyBroker) Subscribe ¶
func (b *LegacyBroker) Subscribe(stream string) string
Registring streams (for granular pub/sub)
func (*LegacyBroker) Unsubscribe ¶
func (b *LegacyBroker) Unsubscribe(stream string) string
type LocalBroker ¶ added in v1.4.7
type LocalBroker interface {
Start(done chan (error)) error
Shutdown(ctx context.Context) error
SetEpoch(epoch string)
GetEpoch() string
HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
Store(stream string, msg []byte, seq uint64, ts time.Time) (uint64, error)
}
LocalBroker is a single-node broker that can used to store streams data locally
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
func NewMemoryBroker ¶
func NewMemoryBroker(node Broadcaster, config *Config) *Memory
func (*Memory) FinishSession ¶
func (*Memory) HandleBroadcast ¶
func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
func (*Memory) HandleCommand ¶
func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
func (*Memory) HistoryFrom ¶
func (*Memory) HistorySince ¶
func (*Memory) Unsubscribe ¶
type NATS ¶ added in v1.4.7
type NATS struct {
// contains filtered or unexported fields
}
func NewNATSBroker ¶ added in v1.4.7
func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig, l *slog.Logger, opts ...NATSOption) *NATS
func (*NATS) CommitSession ¶ added in v1.4.7
func (*NATS) FinishSession ¶ added in v1.4.7
func (*NATS) HandleBroadcast ¶ added in v1.4.7
func (n *NATS) HandleBroadcast(msg *common.StreamMessage)
func (*NATS) HandleCommand ¶ added in v1.4.7
func (n *NATS) HandleCommand(msg *common.RemoteCommandMessage)
func (*NATS) HistoryFrom ¶ added in v1.4.7
func (*NATS) HistorySince ¶ added in v1.4.7
func (*NATS) RestoreSession ¶ added in v1.4.7
func (*NATS) Unsubscribe ¶ added in v1.4.7
type NATSOption ¶ added in v1.4.7
type NATSOption func(*NATS)
func WithNATSLocalBroker ¶ added in v1.4.7
func WithNATSLocalBroker(b LocalBroker) NATSOption
type StreamsTracker ¶
type StreamsTracker struct {
// contains filtered or unexported fields
}
func NewStreamsTracker ¶
func NewStreamsTracker() *StreamsTracker
func (*StreamsTracker) Add ¶
func (s *StreamsTracker) Add(name string) (isNew bool)
func (*StreamsTracker) Has ¶
func (s *StreamsTracker) Has(name string) bool
func (*StreamsTracker) Remove ¶
func (s *StreamsTracker) Remove(name string) (isLast bool)