Documentation
¶
Index ¶
- Variables
- func PrintSampleConfig()
- type Config
- type Hub
- func (m *Hub) DB() *sql.DB
- func (m *Hub) ForceGC(streamName string) error
- func (m *Hub) GetStreamNames() ([]string, error)
- func (m *Hub) MessagesSinceOffset(streamName string, offset Offset) ([]Message, error)
- func (m *Hub) MinMaxID(streamName string) (int64, int64, error)
- func (m *Hub) PollStat(streamName string) map[string]interface{}
- func (m *Hub) Publish(streamName string, msg *Message) error
- func (m *Hub) Subscribe(streamName string, subscriberID string) (<-chan Message, error)
- func (m *Hub) Unsubscribe(streamName string, subscriberID string)
- type Message
- type Offset
- type PollWorker
- type Stat
- type Store
- type Stream
- type TiDBStore
- func (s *TiDBStore) CreateStream(streamName string) error
- func (s *TiDBStore) DB() *sql.DB
- func (s *TiDBStore) FetchMessages(streamName string, idOffset Offset, limit int) ([]Message, Offset, error)
- func (s *TiDBStore) GetStreamNames() ([]string, error)
- func (s *TiDBStore) Init() error
- func (s *TiDBStore) MinMaxID(streamName string) (int64, int64, error)
- func (s *TiDBStore) PutMessages(streamName string, messages []*Message) error
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrStreamNotFound error = errors.New("stream not found")
)
Functions ¶
func PrintSampleConfig ¶
func PrintSampleConfig()
Types ¶
type Config ¶
type Config struct {
// DSN is the data source name.
DSN string `toml:"dsn" env:"DSN" env-default:"root:@tcp(localhost:4000)/test"`
// MaxBatchSize is the maximum number of messages to batch a transaction.
MaxBatchSize int `toml:"max_batch_size" env:"MAX_BATCH_SIZE" env-default:"1000"`
// PollIntervalInMs is the interval to poll the database.
PollIntervalInMs int `toml:"poll_interval_in_ms" env:"POLL_INTERVAL_IN_MS" env-default:"100"`
// GCIntervalInSec is the interval to run garbage collection.
GCIntervalInSec int `toml:"gc_interval_in_sec" env:"GC_INTERVAL_IN_SEC" env-default:"600"`
// GCKeepItems is the number of items to keep in the cache.
GCKeepItems int `toml:"gc_keep_items" env:"GC_KEEP_ITEMS" env-default:"10000"`
}
func DefaultConfig ¶
func DefaultConfig() *Config
func MustLoadConfig ¶
LoadConfigFromEnv loads config from environment variables.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) GetStreamNames ¶
func (*Hub) MessagesSinceOffset ¶
func (*Hub) Unsubscribe ¶
type Message ¶
type PollWorker ¶
type PollWorker struct {
// contains filtered or unexported fields
}
PollWorker is a worker that polls messages from a stream
func (*PollWorker) Stat ¶
func (pw *PollWorker) Stat() map[string]interface{}
func (*PollWorker) Stop ¶
func (pw *PollWorker) Stop()
type Stat ¶
type Stat struct {
// contains filtered or unexported fields
}
NOT yet implemented, just a placeholder
type Store ¶
type Store interface {
// Init initializes the store, call it after creating the store
Init() error
// CreateStream creates a stream
CreateStream(streamName string) error
// PutMessages puts messages into a stream
PutMessages(streamName string, messages []*Message) error
// FetchMessages fetches messages from a stream
FetchMessages(streamName string, offset Offset, limit int) ([]Message, Offset, error)
// MinMaxID returns the min, max offset of a stream
MinMaxID(streamName string) (int64, int64, error)
// GetStreamNames returns the names of all streams
GetStreamNames() ([]string, error)
// DB returns the underlying database
DB() *sql.DB
}
Store is the interface for the storage of the messages
type TiDBStore ¶
type TiDBStore struct {
// contains filtered or unexported fields
}
func NewTiDBStore ¶
func (*TiDBStore) CreateStream ¶
CreateStream creates a stream, every stream is a table in the database
func (*TiDBStore) FetchMessages ¶
func (*TiDBStore) GetStreamNames ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.