redis

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateStores

func CreateStores(client *redis.Client, prefix string) (
	roomStore any,
	channelStore any,
	messageStore any,
	presenceStore any,
	typingStore any,
	distributed any,
)

CreateStores creates all Redis-backed stores from a single client.

func NewChannelStore

func NewChannelStore(client *redis.Client, prefix string) streaming.ChannelStore

NewChannelStore creates a new Redis channel store.

func NewClient

func NewClient(config ClientConfig) (*redis.Client, error)

NewClient creates a new Redis client from configuration.

func NewDistributedBackend

func NewDistributedBackend(client *redis.Client, nodeID, prefix string) streaming.DistributedBackend

NewDistributedBackend creates a new Redis distributed backend.

func NewMessageStore

func NewMessageStore(client *redis.Client, prefix string) streaming.MessageStore

NewMessageStore creates a new Redis message store.

func NewPresenceStore

func NewPresenceStore(client *redis.Client, prefix string) streaming.PresenceStore

NewPresenceStore creates a new Redis presence store.

func NewRoomStore

func NewRoomStore(client *redis.Client, prefix string) streaming.RoomStore

NewRoomStore creates a new Redis room store.

func NewTypingStore

func NewTypingStore(client *redis.Client, prefix string) streaming.TypingStore

NewTypingStore creates a new Redis typing store.

Types

type ChannelStore

type ChannelStore struct {
	// contains filtered or unexported fields
}

ChannelStore implements streaming.ChannelStore with Redis backend.

func (*ChannelStore) AddSubscription

func (s *ChannelStore) AddSubscription(ctx context.Context, channelID string, sub streaming.Subscription) error

func (*ChannelStore) Connect

func (s *ChannelStore) Connect(ctx context.Context) error

func (*ChannelStore) Create

func (s *ChannelStore) Create(ctx context.Context, channel streaming.Channel) error

func (*ChannelStore) Delete

func (s *ChannelStore) Delete(ctx context.Context, channelID string) error

func (*ChannelStore) Disconnect

func (s *ChannelStore) Disconnect(ctx context.Context) error

func (*ChannelStore) Exists

func (s *ChannelStore) Exists(ctx context.Context, channelID string) (bool, error)

func (*ChannelStore) Get

func (s *ChannelStore) Get(ctx context.Context, channelID string) (streaming.Channel, error)

func (*ChannelStore) GetSubscriberCount

func (s *ChannelStore) GetSubscriberCount(ctx context.Context, channelID string) (int, error)

func (*ChannelStore) GetSubscriptions

func (s *ChannelStore) GetSubscriptions(ctx context.Context, channelID string) ([]streaming.Subscription, error)

func (*ChannelStore) GetUserChannels

func (s *ChannelStore) GetUserChannels(ctx context.Context, userID string) ([]streaming.Channel, error)

func (*ChannelStore) IsSubscribed

func (s *ChannelStore) IsSubscribed(ctx context.Context, channelID, connID string) (bool, error)

func (*ChannelStore) List

func (s *ChannelStore) List(ctx context.Context) ([]streaming.Channel, error)

func (*ChannelStore) Ping

func (s *ChannelStore) Ping(ctx context.Context) error

func (*ChannelStore) Publish

func (s *ChannelStore) Publish(ctx context.Context, channelID string, message *streaming.Message) error

func (*ChannelStore) RemoveSubscription

func (s *ChannelStore) RemoveSubscription(ctx context.Context, channelID, connID string) error

type ClientConfig

type ClientConfig struct {
	URLs     []string
	Username string
	Password string
	DB       int

	// TLS
	TLSEnabled  bool
	TLSCertFile string
	TLSKeyFile  string
	TLSCAFile   string

	// Prefix for all keys
	Prefix string
}

ClientConfig holds Redis client configuration.

type DistributedBackend

type DistributedBackend struct {
	// contains filtered or unexported fields
}

DistributedBackend implements streaming.DistributedBackend with Redis pub/sub.

func (*DistributedBackend) AcquireLock

func (d *DistributedBackend) AcquireLock(ctx context.Context, key string, ttl time.Duration) (streaming.Lock, error)

func (*DistributedBackend) Connect

func (d *DistributedBackend) Connect(ctx context.Context) error

func (*DistributedBackend) Decrement

func (d *DistributedBackend) Decrement(ctx context.Context, key string) (int64, error)

func (*DistributedBackend) Disconnect

func (d *DistributedBackend) Disconnect(ctx context.Context) error

func (*DistributedBackend) DiscoverNodes

func (d *DistributedBackend) DiscoverNodes(ctx context.Context) ([]streaming.NodeInfo, error)

func (*DistributedBackend) GetCounter

func (d *DistributedBackend) GetCounter(ctx context.Context, key string) (int64, error)

func (*DistributedBackend) GetNode

func (d *DistributedBackend) GetNode(ctx context.Context, nodeID string) (*streaming.NodeInfo, error)

func (*DistributedBackend) GetNodes

func (d *DistributedBackend) GetNodes(ctx context.Context) ([]streaming.NodeInfo, error)

func (*DistributedBackend) GetOnlineUsers

func (d *DistributedBackend) GetOnlineUsers(ctx context.Context) ([]string, error)

func (*DistributedBackend) GetPresence

func (d *DistributedBackend) GetPresence(ctx context.Context, userID string) (*streaming.UserPresence, error)

func (*DistributedBackend) Heartbeat

func (d *DistributedBackend) Heartbeat(ctx context.Context, nodeID string) error

func (*DistributedBackend) Increment

func (d *DistributedBackend) Increment(ctx context.Context, key string) (int64, error)

func (*DistributedBackend) Ping

func (d *DistributedBackend) Ping(ctx context.Context) error

func (*DistributedBackend) Publish

func (d *DistributedBackend) Publish(ctx context.Context, channel string, message *streaming.Message) error

func (*DistributedBackend) RegisterNode

func (d *DistributedBackend) RegisterNode(ctx context.Context, nodeID string, metadata map[string]any) error

func (*DistributedBackend) ReleaseLock

func (d *DistributedBackend) ReleaseLock(ctx context.Context, lock streaming.Lock) error

func (*DistributedBackend) SetPresence

func (d *DistributedBackend) SetPresence(ctx context.Context, userID, status string, ttl time.Duration) error

func (*DistributedBackend) Subscribe

func (d *DistributedBackend) Subscribe(ctx context.Context, channel string, handler streaming.MessageHandler) error

func (*DistributedBackend) UnregisterNode

func (d *DistributedBackend) UnregisterNode(ctx context.Context, nodeID string) error

func (*DistributedBackend) Unsubscribe

func (d *DistributedBackend) Unsubscribe(ctx context.Context, channel string) error

func (*DistributedBackend) WatchNodes

func (d *DistributedBackend) WatchNodes(ctx context.Context, handler streaming.NodeChangeHandler) error

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

MessageStore implements streaming.MessageStore with Redis backend using Redis Streams.

func (*MessageStore) Connect

func (s *MessageStore) Connect(ctx context.Context) error

func (*MessageStore) Delete

func (s *MessageStore) Delete(ctx context.Context, messageID string) error

func (*MessageStore) DeleteByRoom

func (s *MessageStore) DeleteByRoom(ctx context.Context, roomID string) error

func (*MessageStore) DeleteByUser

func (s *MessageStore) DeleteByUser(ctx context.Context, userID string) error

func (*MessageStore) DeleteOld

func (s *MessageStore) DeleteOld(ctx context.Context, olderThan time.Duration) error

func (*MessageStore) Disconnect

func (s *MessageStore) Disconnect(ctx context.Context) error

func (*MessageStore) Get

func (s *MessageStore) Get(ctx context.Context, messageID string) (*streaming.Message, error)

func (*MessageStore) GetHistory

func (s *MessageStore) GetHistory(ctx context.Context, roomID string, query streaming.HistoryQuery) ([]*streaming.Message, error)

func (*MessageStore) GetMessageCount

func (s *MessageStore) GetMessageCount(ctx context.Context, roomID string) (int64, error)

func (*MessageStore) GetMessageCountByUser

func (s *MessageStore) GetMessageCountByUser(ctx context.Context, roomID, userID string) (int64, error)

func (*MessageStore) GetThreadHistory

func (s *MessageStore) GetThreadHistory(ctx context.Context, roomID, threadID string, query streaming.HistoryQuery) ([]*streaming.Message, error)

func (*MessageStore) GetUserMessages

func (s *MessageStore) GetUserMessages(ctx context.Context, userID string, query streaming.HistoryQuery) ([]*streaming.Message, error)

func (*MessageStore) Ping

func (s *MessageStore) Ping(ctx context.Context) error

func (*MessageStore) Save

func (s *MessageStore) Save(ctx context.Context, message *streaming.Message) error

func (*MessageStore) SaveBatch

func (s *MessageStore) SaveBatch(ctx context.Context, messages []*streaming.Message) error

func (*MessageStore) Search

func (s *MessageStore) Search(ctx context.Context, roomID, searchTerm string, query streaming.HistoryQuery) ([]*streaming.Message, error)

type PresenceStore

type PresenceStore struct {
	// contains filtered or unexported fields
}

PresenceStore implements streaming.PresenceStore with Redis backend.

func (*PresenceStore) CleanupExpired

func (s *PresenceStore) CleanupExpired(ctx context.Context, olderThan time.Duration) error

func (*PresenceStore) Connect

func (s *PresenceStore) Connect(ctx context.Context) error

func (*PresenceStore) CountByStatus

func (s *PresenceStore) CountByStatus(ctx context.Context) (map[string]int, error)

CountByStatus returns count of users by status.

func (*PresenceStore) Delete

func (s *PresenceStore) Delete(ctx context.Context, userID string) error

func (*PresenceStore) DeleteMultiple

func (s *PresenceStore) DeleteMultiple(ctx context.Context, userIDs []string) error

DeleteMultiple deletes multiple presence records in a single operation.

func (*PresenceStore) Disconnect

func (s *PresenceStore) Disconnect(ctx context.Context) error

func (*PresenceStore) Get

func (*PresenceStore) GetActiveCount

func (s *PresenceStore) GetActiveCount(ctx context.Context, since time.Duration) (int, error)

GetActiveCount returns count of users active since the specified duration.

func (*PresenceStore) GetByStatus

func (s *PresenceStore) GetByStatus(ctx context.Context, status string) ([]*streaming.UserPresence, error)

GetByStatus returns all presence records with the specified status.

func (*PresenceStore) GetDevices

func (s *PresenceStore) GetDevices(ctx context.Context, userID string) ([]streaming.DeviceInfo, error)

GetDevices returns all devices for a user.

func (*PresenceStore) GetHistory

func (s *PresenceStore) GetHistory(ctx context.Context, userID string, limit int) ([]*streaming.PresenceEvent, error)

GetHistory returns presence history for a user.

func (*PresenceStore) GetHistorySince

func (s *PresenceStore) GetHistorySince(ctx context.Context, userID string, since time.Time) ([]*streaming.PresenceEvent, error)

GetHistorySince returns presence history for a user since the specified time.

func (*PresenceStore) GetLastActivity

func (s *PresenceStore) GetLastActivity(ctx context.Context, userID string) (time.Time, error)

func (*PresenceStore) GetMultiple

func (s *PresenceStore) GetMultiple(ctx context.Context, userIDs []string) ([]*streaming.UserPresence, error)

func (*PresenceStore) GetOnline

func (s *PresenceStore) GetOnline(ctx context.Context) ([]string, error)

func (*PresenceStore) GetRecent

func (s *PresenceStore) GetRecent(ctx context.Context, status string, since time.Duration) ([]*streaming.UserPresence, error)

GetRecent returns presence records with the specified status updated since the given duration.

func (*PresenceStore) GetWithFilters

func (s *PresenceStore) GetWithFilters(ctx context.Context, filters streaming.PresenceFilters) ([]*streaming.UserPresence, error)

GetWithFilters returns presence records matching the specified filters.

func (*PresenceStore) IsOnline

func (s *PresenceStore) IsOnline(ctx context.Context, userID string) (bool, error)

func (*PresenceStore) Ping

func (s *PresenceStore) Ping(ctx context.Context) error

func (*PresenceStore) RemoveDevice

func (s *PresenceStore) RemoveDevice(ctx context.Context, userID, deviceID string) error

RemoveDevice removes a device for a user.

func (*PresenceStore) SaveHistory

func (s *PresenceStore) SaveHistory(ctx context.Context, userID string, event *streaming.PresenceEvent) error

SaveHistory saves a presence event to history.

func (*PresenceStore) Set

func (s *PresenceStore) Set(ctx context.Context, userID string, presence *streaming.UserPresence) error

func (*PresenceStore) SetDevice

func (s *PresenceStore) SetDevice(ctx context.Context, userID, deviceID string, device streaming.DeviceInfo) error

SetDevice sets device information for a user.

func (*PresenceStore) SetMultiple

func (s *PresenceStore) SetMultiple(ctx context.Context, presences map[string]*streaming.UserPresence) error

SetMultiple sets multiple presence records in a single operation.

func (*PresenceStore) SetOffline

func (s *PresenceStore) SetOffline(ctx context.Context, userID string) error

func (*PresenceStore) SetOnline

func (s *PresenceStore) SetOnline(ctx context.Context, userID string, ttl time.Duration) error

func (*PresenceStore) UpdateActivity

func (s *PresenceStore) UpdateActivity(ctx context.Context, userID string, timestamp time.Time) error

type RoomStore

type RoomStore struct {
	// contains filtered or unexported fields
}

RoomStore implements streaming.RoomStore with Redis backend.

func (*RoomStore) AddMember

func (s *RoomStore) AddMember(ctx context.Context, roomID string, member streaming.Member) error

func (*RoomStore) BanMember

func (s *RoomStore) BanMember(ctx context.Context, roomID, userID string, ban streaming.RoomBan) error

BanMember bans a member from a room.

func (*RoomStore) Connect

func (s *RoomStore) Connect(ctx context.Context) error

func (*RoomStore) Create

func (s *RoomStore) Create(ctx context.Context, room streaming.Room) error

func (*RoomStore) CreateMany

func (s *RoomStore) CreateMany(ctx context.Context, rooms []streaming.Room) error

CreateMany creates multiple rooms in a batch.

func (*RoomStore) Delete

func (s *RoomStore) Delete(ctx context.Context, roomID string) error

func (*RoomStore) DeleteInvite

func (s *RoomStore) DeleteInvite(ctx context.Context, inviteCode string) error

DeleteInvite deletes an invite by its code.

func (*RoomStore) DeleteMany

func (s *RoomStore) DeleteMany(ctx context.Context, roomIDs []string) error

DeleteMany deletes multiple rooms by their IDs.

func (*RoomStore) Disconnect

func (s *RoomStore) Disconnect(ctx context.Context) error

func (*RoomStore) Exists

func (s *RoomStore) Exists(ctx context.Context, roomID string) (bool, error)

func (*RoomStore) FindByCategory

func (s *RoomStore) FindByCategory(ctx context.Context, category string) ([]streaming.Room, error)

FindByCategory finds rooms in a specific category.

func (*RoomStore) FindByTag

func (s *RoomStore) FindByTag(ctx context.Context, tag string) ([]streaming.Room, error)

FindByTag finds rooms with a specific tag.

func (*RoomStore) Get

func (s *RoomStore) Get(ctx context.Context, roomID string) (streaming.Room, error)

func (*RoomStore) GetArchivedRooms

func (s *RoomStore) GetArchivedRooms(ctx context.Context, userID string) ([]streaming.Room, error)

GetArchivedRooms gets archived rooms for a user.

func (*RoomStore) GetBans

func (s *RoomStore) GetBans(ctx context.Context, roomID string) ([]streaming.RoomBan, error)

GetBans gets all bans for a room.

func (*RoomStore) GetCommonRooms

func (s *RoomStore) GetCommonRooms(ctx context.Context, userID1, userID2 string) ([]streaming.Room, error)

GetCommonRooms gets rooms that both users are members of.

func (*RoomStore) GetInvite

func (s *RoomStore) GetInvite(ctx context.Context, inviteCode string) (*streaming.Invite, error)

GetInvite gets an invite by its code.

func (*RoomStore) GetMember

func (s *RoomStore) GetMember(ctx context.Context, roomID, userID string) (streaming.Member, error)

func (*RoomStore) GetMembers

func (s *RoomStore) GetMembers(ctx context.Context, roomID string) ([]streaming.Member, error)

func (*RoomStore) GetPublicRooms

func (s *RoomStore) GetPublicRooms(ctx context.Context, limit int) ([]streaming.Room, error)

GetPublicRooms gets public rooms up to the specified limit.

func (*RoomStore) GetRoomCount

func (s *RoomStore) GetRoomCount(ctx context.Context) (int, error)

GetRoomCount gets the total number of rooms.

func (*RoomStore) GetTotalMembers

func (s *RoomStore) GetTotalMembers(ctx context.Context) (int, error)

GetTotalMembers gets the total number of members across all rooms.

func (*RoomStore) GetUserRooms

func (s *RoomStore) GetUserRooms(ctx context.Context, userID string) ([]streaming.Room, error)

func (*RoomStore) GetUserRoomsByRole

func (s *RoomStore) GetUserRoomsByRole(ctx context.Context, userID, role string) ([]streaming.Room, error)

GetUserRoomsByRole gets all rooms where a user has a specific role.

func (*RoomStore) IsBanned

func (s *RoomStore) IsBanned(ctx context.Context, roomID, userID string) (bool, error)

IsBanned checks if a user is banned from a room.

func (*RoomStore) IsMember

func (s *RoomStore) IsMember(ctx context.Context, roomID, userID string) (bool, error)

func (*RoomStore) List

func (s *RoomStore) List(ctx context.Context, filters map[string]any) ([]streaming.Room, error)

func (*RoomStore) ListInvites

func (s *RoomStore) ListInvites(ctx context.Context, roomID string) ([]*streaming.Invite, error)

ListInvites lists all invites for a room.

func (*RoomStore) MemberCount

func (s *RoomStore) MemberCount(ctx context.Context, roomID string) (int, error)

func (*RoomStore) Ping

func (s *RoomStore) Ping(ctx context.Context) error

func (*RoomStore) RemoveMember

func (s *RoomStore) RemoveMember(ctx context.Context, roomID, userID string) error

func (*RoomStore) SaveInvite

func (s *RoomStore) SaveInvite(ctx context.Context, roomID string, invite *streaming.Invite) error

SaveInvite saves an invite for a room.

func (*RoomStore) Search

func (s *RoomStore) Search(ctx context.Context, query string, filters map[string]any) ([]streaming.Room, error)

Search searches for rooms by query and filters.

func (*RoomStore) UnbanMember

func (s *RoomStore) UnbanMember(ctx context.Context, roomID, userID string) error

UnbanMember removes a ban from a member.

func (*RoomStore) Update

func (s *RoomStore) Update(ctx context.Context, roomID string, updates map[string]any) error

type TypingStore

type TypingStore struct {
	// contains filtered or unexported fields
}

TypingStore implements streaming.TypingStore with Redis backend.

func (*TypingStore) CleanupExpired

func (s *TypingStore) CleanupExpired(ctx context.Context) error

func (*TypingStore) Connect

func (s *TypingStore) Connect(ctx context.Context) error

func (*TypingStore) Disconnect

func (s *TypingStore) Disconnect(ctx context.Context) error

func (*TypingStore) GetTypingUsers

func (s *TypingStore) GetTypingUsers(ctx context.Context, roomID string) ([]string, error)

func (*TypingStore) IsTyping

func (s *TypingStore) IsTyping(ctx context.Context, userID, roomID string) (bool, error)

func (*TypingStore) Ping

func (s *TypingStore) Ping(ctx context.Context) error

func (*TypingStore) RemoveTyping

func (s *TypingStore) RemoveTyping(ctx context.Context, userID, roomID string) error

func (*TypingStore) SetTyping

func (s *TypingStore) SetTyping(ctx context.Context, userID, roomID string, expiresAt time.Time) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL