Documentation
¶
Index ¶
- func CreateStores(client *redis.Client, prefix string) (roomStore any, channelStore any, messageStore any, presenceStore any, ...)
- func NewChannelStore(client *redis.Client, prefix string) streaming.ChannelStore
- func NewClient(config ClientConfig) (*redis.Client, error)
- func NewDistributedBackend(client *redis.Client, nodeID, prefix string) streaming.DistributedBackend
- func NewMessageStore(client *redis.Client, prefix string) streaming.MessageStore
- func NewPresenceStore(client *redis.Client, prefix string) streaming.PresenceStore
- func NewRoomStore(client *redis.Client, prefix string) streaming.RoomStore
- func NewTypingStore(client *redis.Client, prefix string) streaming.TypingStore
- type ChannelStore
- func (s *ChannelStore) AddSubscription(ctx context.Context, channelID string, sub streaming.Subscription) error
- func (s *ChannelStore) Connect(ctx context.Context) error
- func (s *ChannelStore) Create(ctx context.Context, channel streaming.Channel) error
- func (s *ChannelStore) Delete(ctx context.Context, channelID string) error
- func (s *ChannelStore) Disconnect(ctx context.Context) error
- func (s *ChannelStore) Exists(ctx context.Context, channelID string) (bool, error)
- func (s *ChannelStore) Get(ctx context.Context, channelID string) (streaming.Channel, error)
- func (s *ChannelStore) GetSubscriberCount(ctx context.Context, channelID string) (int, error)
- func (s *ChannelStore) GetSubscriptions(ctx context.Context, channelID string) ([]streaming.Subscription, error)
- func (s *ChannelStore) GetUserChannels(ctx context.Context, userID string) ([]streaming.Channel, error)
- func (s *ChannelStore) IsSubscribed(ctx context.Context, channelID, connID string) (bool, error)
- func (s *ChannelStore) List(ctx context.Context) ([]streaming.Channel, error)
- func (s *ChannelStore) Ping(ctx context.Context) error
- func (s *ChannelStore) Publish(ctx context.Context, channelID string, message *streaming.Message) error
- func (s *ChannelStore) RemoveSubscription(ctx context.Context, channelID, connID string) error
- type ClientConfig
- type DistributedBackend
- func (d *DistributedBackend) AcquireLock(ctx context.Context, key string, ttl time.Duration) (streaming.Lock, error)
- func (d *DistributedBackend) Connect(ctx context.Context) error
- func (d *DistributedBackend) Decrement(ctx context.Context, key string) (int64, error)
- func (d *DistributedBackend) Disconnect(ctx context.Context) error
- func (d *DistributedBackend) DiscoverNodes(ctx context.Context) ([]streaming.NodeInfo, error)
- func (d *DistributedBackend) GetCounter(ctx context.Context, key string) (int64, error)
- func (d *DistributedBackend) GetNode(ctx context.Context, nodeID string) (*streaming.NodeInfo, error)
- func (d *DistributedBackend) GetNodes(ctx context.Context) ([]streaming.NodeInfo, error)
- func (d *DistributedBackend) GetOnlineUsers(ctx context.Context) ([]string, error)
- func (d *DistributedBackend) GetPresence(ctx context.Context, userID string) (*streaming.UserPresence, error)
- func (d *DistributedBackend) Heartbeat(ctx context.Context, nodeID string) error
- func (d *DistributedBackend) Increment(ctx context.Context, key string) (int64, error)
- func (d *DistributedBackend) Ping(ctx context.Context) error
- func (d *DistributedBackend) Publish(ctx context.Context, channel string, message *streaming.Message) error
- func (d *DistributedBackend) RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error
- func (d *DistributedBackend) ReleaseLock(ctx context.Context, lock streaming.Lock) error
- func (d *DistributedBackend) SetPresence(ctx context.Context, userID, status string, ttl time.Duration) error
- func (d *DistributedBackend) Subscribe(ctx context.Context, channel string, handler streaming.MessageHandler) error
- func (d *DistributedBackend) UnregisterNode(ctx context.Context, nodeID string) error
- func (d *DistributedBackend) Unsubscribe(ctx context.Context, channel string) error
- func (d *DistributedBackend) WatchNodes(ctx context.Context, handler streaming.NodeChangeHandler) error
- type MessageStore
- func (s *MessageStore) Connect(ctx context.Context) error
- func (s *MessageStore) Delete(ctx context.Context, messageID string) error
- func (s *MessageStore) DeleteByRoom(ctx context.Context, roomID string) error
- func (s *MessageStore) DeleteByUser(ctx context.Context, userID string) error
- func (s *MessageStore) DeleteOld(ctx context.Context, olderThan time.Duration) error
- func (s *MessageStore) Disconnect(ctx context.Context) error
- func (s *MessageStore) Get(ctx context.Context, messageID string) (*streaming.Message, error)
- func (s *MessageStore) GetHistory(ctx context.Context, roomID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
- func (s *MessageStore) GetMessageCount(ctx context.Context, roomID string) (int64, error)
- func (s *MessageStore) GetMessageCountByUser(ctx context.Context, roomID, userID string) (int64, error)
- func (s *MessageStore) GetThreadHistory(ctx context.Context, roomID, threadID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
- func (s *MessageStore) GetUserMessages(ctx context.Context, userID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
- func (s *MessageStore) Ping(ctx context.Context) error
- func (s *MessageStore) Save(ctx context.Context, message *streaming.Message) error
- func (s *MessageStore) SaveBatch(ctx context.Context, messages []*streaming.Message) error
- func (s *MessageStore) Search(ctx context.Context, roomID, searchTerm string, query streaming.HistoryQuery) ([]*streaming.Message, error)
- type PresenceStore
- func (s *PresenceStore) CleanupExpired(ctx context.Context, olderThan time.Duration) error
- func (s *PresenceStore) Connect(ctx context.Context) error
- func (s *PresenceStore) CountByStatus(ctx context.Context) (map[string]int, error)
- func (s *PresenceStore) Delete(ctx context.Context, userID string) error
- func (s *PresenceStore) DeleteMultiple(ctx context.Context, userIDs []string) error
- func (s *PresenceStore) Disconnect(ctx context.Context) error
- func (s *PresenceStore) Get(ctx context.Context, userID string) (*streaming.UserPresence, error)
- func (s *PresenceStore) GetActiveCount(ctx context.Context, since time.Duration) (int, error)
- func (s *PresenceStore) GetByStatus(ctx context.Context, status string) ([]*streaming.UserPresence, error)
- func (s *PresenceStore) GetDevices(ctx context.Context, userID string) ([]streaming.DeviceInfo, error)
- func (s *PresenceStore) GetHistory(ctx context.Context, userID string, limit int) ([]*streaming.PresenceEvent, error)
- func (s *PresenceStore) GetHistorySince(ctx context.Context, userID string, since time.Time) ([]*streaming.PresenceEvent, error)
- func (s *PresenceStore) GetLastActivity(ctx context.Context, userID string) (time.Time, error)
- func (s *PresenceStore) GetMultiple(ctx context.Context, userIDs []string) ([]*streaming.UserPresence, error)
- func (s *PresenceStore) GetOnline(ctx context.Context) ([]string, error)
- func (s *PresenceStore) GetRecent(ctx context.Context, status string, since time.Duration) ([]*streaming.UserPresence, error)
- func (s *PresenceStore) GetWithFilters(ctx context.Context, filters streaming.PresenceFilters) ([]*streaming.UserPresence, error)
- func (s *PresenceStore) IsOnline(ctx context.Context, userID string) (bool, error)
- func (s *PresenceStore) Ping(ctx context.Context) error
- func (s *PresenceStore) RemoveDevice(ctx context.Context, userID, deviceID string) error
- func (s *PresenceStore) SaveHistory(ctx context.Context, userID string, event *streaming.PresenceEvent) error
- func (s *PresenceStore) Set(ctx context.Context, userID string, presence *streaming.UserPresence) error
- func (s *PresenceStore) SetDevice(ctx context.Context, userID, deviceID string, device streaming.DeviceInfo) error
- func (s *PresenceStore) SetMultiple(ctx context.Context, presences map[string]*streaming.UserPresence) error
- func (s *PresenceStore) SetOffline(ctx context.Context, userID string) error
- func (s *PresenceStore) SetOnline(ctx context.Context, userID string, ttl time.Duration) error
- func (s *PresenceStore) UpdateActivity(ctx context.Context, userID string, timestamp time.Time) error
- type RoomStore
- func (s *RoomStore) AddMember(ctx context.Context, roomID string, member streaming.Member) error
- func (s *RoomStore) BanMember(ctx context.Context, roomID, userID string, ban streaming.RoomBan) error
- func (s *RoomStore) Connect(ctx context.Context) error
- func (s *RoomStore) Create(ctx context.Context, room streaming.Room) error
- func (s *RoomStore) CreateMany(ctx context.Context, rooms []streaming.Room) error
- func (s *RoomStore) Delete(ctx context.Context, roomID string) error
- func (s *RoomStore) DeleteInvite(ctx context.Context, inviteCode string) error
- func (s *RoomStore) DeleteMany(ctx context.Context, roomIDs []string) error
- func (s *RoomStore) Disconnect(ctx context.Context) error
- func (s *RoomStore) Exists(ctx context.Context, roomID string) (bool, error)
- func (s *RoomStore) FindByCategory(ctx context.Context, category string) ([]streaming.Room, error)
- func (s *RoomStore) FindByTag(ctx context.Context, tag string) ([]streaming.Room, error)
- func (s *RoomStore) Get(ctx context.Context, roomID string) (streaming.Room, error)
- func (s *RoomStore) GetArchivedRooms(ctx context.Context, userID string) ([]streaming.Room, error)
- func (s *RoomStore) GetBans(ctx context.Context, roomID string) ([]streaming.RoomBan, error)
- func (s *RoomStore) GetCommonRooms(ctx context.Context, userID1, userID2 string) ([]streaming.Room, error)
- func (s *RoomStore) GetInvite(ctx context.Context, inviteCode string) (*streaming.Invite, error)
- func (s *RoomStore) GetMember(ctx context.Context, roomID, userID string) (streaming.Member, error)
- func (s *RoomStore) GetMembers(ctx context.Context, roomID string) ([]streaming.Member, error)
- func (s *RoomStore) GetPublicRooms(ctx context.Context, limit int) ([]streaming.Room, error)
- func (s *RoomStore) GetRoomCount(ctx context.Context) (int, error)
- func (s *RoomStore) GetTotalMembers(ctx context.Context) (int, error)
- func (s *RoomStore) GetUserRooms(ctx context.Context, userID string) ([]streaming.Room, error)
- func (s *RoomStore) GetUserRoomsByRole(ctx context.Context, userID, role string) ([]streaming.Room, error)
- func (s *RoomStore) IsBanned(ctx context.Context, roomID, userID string) (bool, error)
- func (s *RoomStore) IsMember(ctx context.Context, roomID, userID string) (bool, error)
- func (s *RoomStore) List(ctx context.Context, filters map[string]any) ([]streaming.Room, error)
- func (s *RoomStore) ListInvites(ctx context.Context, roomID string) ([]*streaming.Invite, error)
- func (s *RoomStore) MemberCount(ctx context.Context, roomID string) (int, error)
- func (s *RoomStore) Ping(ctx context.Context) error
- func (s *RoomStore) RemoveMember(ctx context.Context, roomID, userID string) error
- func (s *RoomStore) SaveInvite(ctx context.Context, roomID string, invite *streaming.Invite) error
- func (s *RoomStore) Search(ctx context.Context, query string, filters map[string]any) ([]streaming.Room, error)
- func (s *RoomStore) UnbanMember(ctx context.Context, roomID, userID string) error
- func (s *RoomStore) Update(ctx context.Context, roomID string, updates map[string]any) error
- type TypingStore
- func (s *TypingStore) CleanupExpired(ctx context.Context) error
- func (s *TypingStore) Connect(ctx context.Context) error
- func (s *TypingStore) Disconnect(ctx context.Context) error
- func (s *TypingStore) GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
- func (s *TypingStore) IsTyping(ctx context.Context, userID, roomID string) (bool, error)
- func (s *TypingStore) Ping(ctx context.Context) error
- func (s *TypingStore) RemoveTyping(ctx context.Context, userID, roomID string) error
- func (s *TypingStore) SetTyping(ctx context.Context, userID, roomID string, expiresAt time.Time) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateStores ¶
func CreateStores(client *redis.Client, prefix string) ( roomStore any, channelStore any, messageStore any, presenceStore any, typingStore any, distributed any, )
CreateStores creates all Redis-backed stores from a single client.
func NewChannelStore ¶
func NewChannelStore(client *redis.Client, prefix string) streaming.ChannelStore
NewChannelStore creates a new Redis channel store.
func NewClient ¶
func NewClient(config ClientConfig) (*redis.Client, error)
NewClient creates a new Redis client from configuration.
func NewDistributedBackend ¶
func NewDistributedBackend(client *redis.Client, nodeID, prefix string) streaming.DistributedBackend
NewDistributedBackend creates a new Redis distributed backend.
func NewMessageStore ¶
func NewMessageStore(client *redis.Client, prefix string) streaming.MessageStore
NewMessageStore creates a new Redis message store.
func NewPresenceStore ¶
func NewPresenceStore(client *redis.Client, prefix string) streaming.PresenceStore
NewPresenceStore creates a new Redis presence store.
func NewRoomStore ¶
NewRoomStore creates a new Redis room store.
func NewTypingStore ¶
func NewTypingStore(client *redis.Client, prefix string) streaming.TypingStore
NewTypingStore creates a new Redis typing store.
Types ¶
type ChannelStore ¶
type ChannelStore struct {
// contains filtered or unexported fields
}
ChannelStore implements streaming.ChannelStore with Redis backend.
func (*ChannelStore) AddSubscription ¶
func (s *ChannelStore) AddSubscription(ctx context.Context, channelID string, sub streaming.Subscription) error
func (*ChannelStore) Delete ¶
func (s *ChannelStore) Delete(ctx context.Context, channelID string) error
func (*ChannelStore) Disconnect ¶
func (s *ChannelStore) Disconnect(ctx context.Context) error
func (*ChannelStore) GetSubscriberCount ¶
func (*ChannelStore) GetSubscriptions ¶
func (s *ChannelStore) GetSubscriptions(ctx context.Context, channelID string) ([]streaming.Subscription, error)
func (*ChannelStore) GetUserChannels ¶
func (*ChannelStore) IsSubscribed ¶
func (*ChannelStore) RemoveSubscription ¶
func (s *ChannelStore) RemoveSubscription(ctx context.Context, channelID, connID string) error
type ClientConfig ¶
type ClientConfig struct {
URLs []string
Username string
Password string
DB int
// TLS
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
// Prefix for all keys
Prefix string
}
ClientConfig holds Redis client configuration.
type DistributedBackend ¶
type DistributedBackend struct {
// contains filtered or unexported fields
}
DistributedBackend implements streaming.DistributedBackend with Redis pub/sub.
func (*DistributedBackend) AcquireLock ¶
func (*DistributedBackend) Connect ¶
func (d *DistributedBackend) Connect(ctx context.Context) error
func (*DistributedBackend) Disconnect ¶
func (d *DistributedBackend) Disconnect(ctx context.Context) error
func (*DistributedBackend) DiscoverNodes ¶
func (*DistributedBackend) GetCounter ¶
func (*DistributedBackend) GetOnlineUsers ¶
func (d *DistributedBackend) GetOnlineUsers(ctx context.Context) ([]string, error)
func (*DistributedBackend) GetPresence ¶
func (d *DistributedBackend) GetPresence(ctx context.Context, userID string) (*streaming.UserPresence, error)
func (*DistributedBackend) Heartbeat ¶
func (d *DistributedBackend) Heartbeat(ctx context.Context, nodeID string) error
func (*DistributedBackend) RegisterNode ¶
func (*DistributedBackend) ReleaseLock ¶
func (*DistributedBackend) SetPresence ¶
func (*DistributedBackend) Subscribe ¶
func (d *DistributedBackend) Subscribe(ctx context.Context, channel string, handler streaming.MessageHandler) error
func (*DistributedBackend) UnregisterNode ¶
func (d *DistributedBackend) UnregisterNode(ctx context.Context, nodeID string) error
func (*DistributedBackend) Unsubscribe ¶
func (d *DistributedBackend) Unsubscribe(ctx context.Context, channel string) error
func (*DistributedBackend) WatchNodes ¶
func (d *DistributedBackend) WatchNodes(ctx context.Context, handler streaming.NodeChangeHandler) error
type MessageStore ¶
type MessageStore struct {
// contains filtered or unexported fields
}
MessageStore implements streaming.MessageStore with Redis backend using Redis Streams.
func (*MessageStore) Delete ¶
func (s *MessageStore) Delete(ctx context.Context, messageID string) error
func (*MessageStore) DeleteByRoom ¶
func (s *MessageStore) DeleteByRoom(ctx context.Context, roomID string) error
func (*MessageStore) DeleteByUser ¶
func (s *MessageStore) DeleteByUser(ctx context.Context, userID string) error
func (*MessageStore) Disconnect ¶
func (s *MessageStore) Disconnect(ctx context.Context) error
func (*MessageStore) GetHistory ¶
func (s *MessageStore) GetHistory(ctx context.Context, roomID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
func (*MessageStore) GetMessageCount ¶
func (*MessageStore) GetMessageCountByUser ¶
func (*MessageStore) GetThreadHistory ¶
func (s *MessageStore) GetThreadHistory(ctx context.Context, roomID, threadID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
func (*MessageStore) GetUserMessages ¶
func (s *MessageStore) GetUserMessages(ctx context.Context, userID string, query streaming.HistoryQuery) ([]*streaming.Message, error)
type PresenceStore ¶
type PresenceStore struct {
// contains filtered or unexported fields
}
PresenceStore implements streaming.PresenceStore with Redis backend.
func (*PresenceStore) CleanupExpired ¶
func (*PresenceStore) CountByStatus ¶
CountByStatus returns count of users by status.
func (*PresenceStore) Delete ¶
func (s *PresenceStore) Delete(ctx context.Context, userID string) error
func (*PresenceStore) DeleteMultiple ¶
func (s *PresenceStore) DeleteMultiple(ctx context.Context, userIDs []string) error
DeleteMultiple deletes multiple presence records in a single operation.
func (*PresenceStore) Disconnect ¶
func (s *PresenceStore) Disconnect(ctx context.Context) error
func (*PresenceStore) Get ¶
func (s *PresenceStore) Get(ctx context.Context, userID string) (*streaming.UserPresence, error)
func (*PresenceStore) GetActiveCount ¶
GetActiveCount returns count of users active since the specified duration.
func (*PresenceStore) GetByStatus ¶
func (s *PresenceStore) GetByStatus(ctx context.Context, status string) ([]*streaming.UserPresence, error)
GetByStatus returns all presence records with the specified status.
func (*PresenceStore) GetDevices ¶
func (s *PresenceStore) GetDevices(ctx context.Context, userID string) ([]streaming.DeviceInfo, error)
GetDevices returns all devices for a user.
func (*PresenceStore) GetHistory ¶
func (s *PresenceStore) GetHistory(ctx context.Context, userID string, limit int) ([]*streaming.PresenceEvent, error)
GetHistory returns presence history for a user.
func (*PresenceStore) GetHistorySince ¶
func (s *PresenceStore) GetHistorySince(ctx context.Context, userID string, since time.Time) ([]*streaming.PresenceEvent, error)
GetHistorySince returns presence history for a user since the specified time.
func (*PresenceStore) GetLastActivity ¶
func (*PresenceStore) GetMultiple ¶
func (s *PresenceStore) GetMultiple(ctx context.Context, userIDs []string) ([]*streaming.UserPresence, error)
func (*PresenceStore) GetOnline ¶
func (s *PresenceStore) GetOnline(ctx context.Context) ([]string, error)
func (*PresenceStore) GetRecent ¶
func (s *PresenceStore) GetRecent(ctx context.Context, status string, since time.Duration) ([]*streaming.UserPresence, error)
GetRecent returns presence records with the specified status updated since the given duration.
func (*PresenceStore) GetWithFilters ¶
func (s *PresenceStore) GetWithFilters(ctx context.Context, filters streaming.PresenceFilters) ([]*streaming.UserPresence, error)
GetWithFilters returns presence records matching the specified filters.
func (*PresenceStore) RemoveDevice ¶
func (s *PresenceStore) RemoveDevice(ctx context.Context, userID, deviceID string) error
RemoveDevice removes a device for a user.
func (*PresenceStore) SaveHistory ¶
func (s *PresenceStore) SaveHistory(ctx context.Context, userID string, event *streaming.PresenceEvent) error
SaveHistory saves a presence event to history.
func (*PresenceStore) Set ¶
func (s *PresenceStore) Set(ctx context.Context, userID string, presence *streaming.UserPresence) error
func (*PresenceStore) SetDevice ¶
func (s *PresenceStore) SetDevice(ctx context.Context, userID, deviceID string, device streaming.DeviceInfo) error
SetDevice sets device information for a user.
func (*PresenceStore) SetMultiple ¶
func (s *PresenceStore) SetMultiple(ctx context.Context, presences map[string]*streaming.UserPresence) error
SetMultiple sets multiple presence records in a single operation.
func (*PresenceStore) SetOffline ¶
func (s *PresenceStore) SetOffline(ctx context.Context, userID string) error
func (*PresenceStore) UpdateActivity ¶
type RoomStore ¶
type RoomStore struct {
// contains filtered or unexported fields
}
RoomStore implements streaming.RoomStore with Redis backend.
func (*RoomStore) BanMember ¶
func (s *RoomStore) BanMember(ctx context.Context, roomID, userID string, ban streaming.RoomBan) error
BanMember bans a member from a room.
func (*RoomStore) CreateMany ¶
CreateMany creates multiple rooms in a batch.
func (*RoomStore) DeleteInvite ¶
DeleteInvite deletes an invite by its code.
func (*RoomStore) DeleteMany ¶
DeleteMany deletes multiple rooms by their IDs.
func (*RoomStore) FindByCategory ¶
FindByCategory finds rooms in a specific category.
func (*RoomStore) GetArchivedRooms ¶
GetArchivedRooms gets archived rooms for a user.
func (*RoomStore) GetCommonRooms ¶
func (s *RoomStore) GetCommonRooms(ctx context.Context, userID1, userID2 string) ([]streaming.Room, error)
GetCommonRooms gets rooms that both users are members of.
func (*RoomStore) GetMembers ¶
func (*RoomStore) GetPublicRooms ¶
GetPublicRooms gets public rooms up to the specified limit.
func (*RoomStore) GetRoomCount ¶
GetRoomCount gets the total number of rooms.
func (*RoomStore) GetTotalMembers ¶
GetTotalMembers gets the total number of members across all rooms.
func (*RoomStore) GetUserRooms ¶
func (*RoomStore) GetUserRoomsByRole ¶
func (s *RoomStore) GetUserRoomsByRole(ctx context.Context, userID, role string) ([]streaming.Room, error)
GetUserRoomsByRole gets all rooms where a user has a specific role.
func (*RoomStore) ListInvites ¶
ListInvites lists all invites for a room.
func (*RoomStore) MemberCount ¶
func (*RoomStore) RemoveMember ¶
func (*RoomStore) SaveInvite ¶
SaveInvite saves an invite for a room.
func (*RoomStore) Search ¶
func (s *RoomStore) Search(ctx context.Context, query string, filters map[string]any) ([]streaming.Room, error)
Search searches for rooms by query and filters.
func (*RoomStore) UnbanMember ¶
UnbanMember removes a ban from a member.
type TypingStore ¶
type TypingStore struct {
// contains filtered or unexported fields
}
TypingStore implements streaming.TypingStore with Redis backend.
func (*TypingStore) CleanupExpired ¶
func (s *TypingStore) CleanupExpired(ctx context.Context) error
func (*TypingStore) Disconnect ¶
func (s *TypingStore) Disconnect(ctx context.Context) error
func (*TypingStore) GetTypingUsers ¶
func (*TypingStore) RemoveTyping ¶
func (s *TypingStore) RemoveTyping(ctx context.Context, userID, roomID string) error