Documentation
¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Acknowledge(ctx context.Context, ...) (bool, error)
- func (b *Broker) AcknowledgeMessages(ctx context.Context, ...) error
- func (b *Broker) Bootstrap() error
- func (b *Broker) Commit(ctx context.Context, ...) (bool, error)
- func (b *Broker) Conf() *Config
- func (b *Broker) Consume(ctx context.Context, topicName, partition, consumerGroup, consumerName string, ...) error
- func (b *Broker) CreateTopic(ctx context.Context, params *sgproto.CreateTopicParams) error
- func (b *Broker) FetchFromSync(topicName, partition string, from []byte, fn func(msg *sgproto.Message) error) error
- func (b *Broker) FetchRange(ctx context.Context, topicName, partition string, from, to sandflake.ID, ...) error
- func (b *Broker) Get(ctx context.Context, topicName string, partition string, key []byte) (*sgproto.Message, error)
- func (b *Broker) GetController() *sandglass.Node
- func (b *Broker) GetTopic(name string) *topic.Topic
- func (b *Broker) HasKey(ctx context.Context, topicName string, partition string, key []byte) (bool, error)
- func (b *Broker) IsController() bool
- func (b *Broker) Join(clusterAddrs ...string) error
- func (b *Broker) LastOffset(ctx context.Context, ...) (sandflake.ID, error)
- func (b *Broker) LaunchWatchers() error
- func (b *Broker) Members() []*sandglass.Node
- func (b *Broker) Name() string
- func (b *Broker) PublishMessage(ctx context.Context, msg *sgproto.Message) (*sandflake.ID, error)
- func (b *Broker) PublishMessages(ctx context.Context, msgs []*sgproto.Message) error
- func (b *Broker) Stop(ctx context.Context) error
- func (b *Broker) StoreMessageLocally(msg *sgproto.Message) error
- func (b *Broker) StoreMessages(msgs []*sgproto.Message) error
- func (b *Broker) Topics() []*topic.Topic
- func (b *Broker) TriggerSyncRequest() error
- func (b *Broker) WaitForIt() error
- type Config
- type ConsumerGroup
Constants ¶
View Source
const (
ConsumerOffsetTopicName = "consumer_offsets"
)
Variables ¶
View Source
var ( ErrTopicAlreadyExist = errors.New("ErrTopicAlreadyExist") ErrInvalidTopicName = errors.New("ErrInvalidTopicName") ErrUnableToSelectReplicas = errors.New("ErrUnableToSelectReplicas") ErrTopicNotFound = errors.New("ErrTopicNotFound") ErrNoPartitionSet = errors.New("ErrNoPartitionSet") ErrNoControllerSet = errors.New("ErrNoControllerSet") ErrNoLeaderFound = errors.New("ErrNoLeaderFound") ErrNoConsumerFound = errors.New("ErrNoConsumerFound") )
View Source
var DefaultStateCheckInterval = 1 * time.Second
View Source
var (
ErrNoKeySet = errors.New("ErrNoKeySet")
)
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
logy.Logger
ShutdownCh chan struct{}
// contains filtered or unexported fields
}
func (*Broker) Acknowledge ¶
func (*Broker) AcknowledgeMessages ¶
func (*Broker) CreateTopic ¶
func (*Broker) FetchFromSync ¶
func (*Broker) FetchRange ¶
func (*Broker) GetController ¶
func (*Broker) IsController ¶
func (*Broker) LastOffset ¶
func (*Broker) LaunchWatchers ¶
func (*Broker) PublishMessage ¶
func (*Broker) PublishMessages ¶
func (*Broker) StoreMessageLocally ¶
func (*Broker) TriggerSyncRequest ¶
type Config ¶
type Config struct {
Name string `yaml:"name,omitempty"`
DCName string `yaml:"dc_name,omitempty"`
BindAddr string `yaml:"bind_addr,omitempty"`
AdvertiseAddr string `yaml:"advertise_addr,omitempty"`
DBPath string `yaml:"db_path,omitempty"`
GossipPort string `yaml:"gossip_port,omitempty"`
HTTPPort string `yaml:"http_port,omitempty"`
GRPCPort string `yaml:"grpc_port,omitempty"`
RaftPort string `yaml:"raft_port,omitempty"`
InitialPeers []string `yaml:"initial_peers,omitempty"`
BootstrapRaft bool `yaml:"bootstrap_raft,omitempty"`
LoggingLevel *logy.Level `yaml:"-"`
}
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
Click to show internal directories.
Click to hide internal directories.