Documentation
¶
Index ¶
- Constants
- Variables
- type Configuration
- type ConnectionHandler
- type Consumer
- func (con *Consumer) Bury(id uint64, pri uint32) error
- func (con *Consumer) Close() error
- func (con *Consumer) ConnectionHandler() ConnectionHandler
- func (con *Consumer) Delete(id uint64) error
- func (con *Consumer) Init() error
- func (con *Consumer) ListTubes() ([]string, error)
- func (con *Consumer) OnEndConsume()
- func (con *Consumer) OnHeartbeat()
- func (con *Consumer) OnReserveTimeout()
- func (con *Consumer) OnStartConsume()
- func (con *Consumer) Release(id uint64, pri uint32, delay time.Duration) error
- func (con *Consumer) Reserve(timeout time.Duration) (id uint64, body []byte, err error)
- func (con *Consumer) SetEventHandler(handler EventHandler)
- func (con *Consumer) SetTaskPayloadHandler(handler common.TaskPayloadHandler)
- func (con *Consumer) StartConsumer() error
- func (con *Consumer) StopConsumer()
- func (con *Consumer) TaskEventChannel() chan<- *common.TaskProcessEvent
- func (con *Consumer) Touch(id uint64) error
- type EventHandler
- type Handler
Constants ¶
View Source
const ( //Channel size to allocate. It is important for task implementation to send event //asynchronously to avoid blocking the execution thread TaskEventChannelSize = 1 )
Variables ¶
View Source
var ErrTimeout = errors.New("timeout")
hard coded to avoid dependency on go-beanstalkd library only for one constant
View Source
var WireSet = wire.NewSet(NewConsumer, NewConfiguration, wire.Bind(new(Handler), new(*Consumer)))
Functions ¶
This section is empty.
Types ¶
type Configuration ¶
type Configuration struct {
//Waiting time for consumer reserve
WaitForConsumerReserve time.Duration
//Waiting time for quit signal timeout
Heartbeat time.Duration
ReleasePriority uint32
ReleaseDelay time.Duration
BuryPriority uint32
}
Configuration stores initialization data for worker server
func NewConfiguration ¶
func NewConfiguration() *Configuration
type ConnectionHandler ¶
type ConnectionHandler interface {
Reserve(timeout time.Duration) (id uint64, body []byte, err error)
Release(id uint64, pri uint32, delay time.Duration) error
Delete(id uint64) error
Bury(id uint64, pri uint32) error
Touch(id uint64) error
Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error)
ListTubes() ([]string, error)
Close() error
}
type Consumer ¶
type Consumer struct {
*Configuration
// contains filtered or unexported fields
}
Consumer stores configuration for consumer activation
func NewConsumer ¶
func NewConsumer(config *Configuration, connectorHandler connector.Handler, connectionHandler ConnectionHandler) *Consumer
NewConsumer creates consumer instance with given Handler
func (*Consumer) ConnectionHandler ¶
func (con *Consumer) ConnectionHandler() ConnectionHandler
func (*Consumer) OnEndConsume ¶
func (con *Consumer) OnEndConsume()
func (*Consumer) OnHeartbeat ¶
func (con *Consumer) OnHeartbeat()
func (*Consumer) OnReserveTimeout ¶
func (con *Consumer) OnReserveTimeout()
func (*Consumer) OnStartConsume ¶
func (con *Consumer) OnStartConsume()
func (*Consumer) SetEventHandler ¶
func (con *Consumer) SetEventHandler(handler EventHandler)
func (*Consumer) SetTaskPayloadHandler ¶
func (con *Consumer) SetTaskPayloadHandler(handler common.TaskPayloadHandler)
func (*Consumer) StartConsumer ¶
StartConsumer starts consumer thread
func (*Consumer) StopConsumer ¶
func (con *Consumer) StopConsumer()
StopConsumer stops consumer thread
func (*Consumer) TaskEventChannel ¶
func (con *Consumer) TaskEventChannel() chan<- *common.TaskProcessEvent
type EventHandler ¶
type EventHandler interface {
OnStartConsume()
OnEndConsume()
OnReserveTimeout()
OnHeartbeat()
}
type Handler ¶
type Handler interface {
Init() error
Close() error
ConnectionHandler() ConnectionHandler
TaskEventChannel() chan<- *common.TaskProcessEvent
SetEventHandler(handler EventHandler)
SetTaskPayloadHandler(handler common.TaskPayloadHandler)
StartConsumer() error
StopConsumer()
}
Click to show internal directories.
Click to hide internal directories.