Documentation
¶
Index ¶
- Constants
- Variables
- func ValidateTimeZone(tz string) error
- type Cond
- type CronPolicy
- type EventExtra
- type GetTimerOption
- type Hook
- type HookFactory
- type ManualRequest
- type Operator
- type OperatorTp
- type OptionalVal
- type PreSchedEventResult
- type SchedEventPolicy
- type SchedEventStatus
- type SchedIntervalPolicy
- type SchedPolicyType
- type TimerClient
- type TimerCond
- type TimerRecord
- type TimerShedEvent
- type TimerSpec
- type TimerStore
- type TimerStoreCore
- type TimerUpdate
- type TimerWatchEventNotifier
- type UpdateTimerOption
- func WithSetEnable(enable bool) UpdateTimerOption
- func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption
- func WithSetSummaryData(summary []byte) UpdateTimerOption
- func WithSetTags(tags []string) UpdateTimerOption
- func WithSetTimeZone(name string) UpdateTimerOption
- func WithSetWatermark(watermark time.Time) UpdateTimerOption
- type WatchTimerChan
- type WatchTimerEvent
- type WatchTimerEventType
- type WatchTimerResponse
Constants ¶
const DefaultStoreNamespace = "default"
DefaultStoreNamespace is the default namespace.
Variables ¶
var ErrEventIDNotMatch = errors.New("timer event id not match")
ErrEventIDNotMatch indicates that the timer's event id not match.
var ErrTimerExists = errors.New("timer already exists")
ErrTimerExists indicates that the specified timer already exits.
var ErrTimerNotExist = errors.New("timer not exist")
ErrTimerNotExist indicates that the specified timer not exist.
var ErrVersionNotMatch = errors.New("timer version not match")
ErrVersionNotMatch indicates that the timer's version not match.
Functions ¶
func ValidateTimeZone ¶
ValidateTimeZone validates the TimeZone field.
Types ¶
type Cond ¶
type Cond interface {
// Match returns whether a record match the condition.
Match(timer *TimerRecord) bool
}
Cond is an interface to match a timer record.
type CronPolicy ¶
type CronPolicy struct {
// contains filtered or unexported fields
}
CronPolicy implements SchedEventPolicy, it is the policy of type `SchedEventCron`.
func NewCronPolicy ¶
func NewCronPolicy(expr string) (*CronPolicy, error)
NewCronPolicy creates a new CronPolicy.
func (*CronPolicy) NextEventTime ¶
NextEventTime returns the next time of the timer event.
type EventExtra ¶
type EventExtra struct {
// EventManualRequestID is the related request id of the manual trigger.
// If current event is not triggered manually, it is empty.
EventManualRequestID string
// EventWatermark is the watermark when event triggers.
EventWatermark time.Time
}
EventExtra stores some extra attributes for event.
type GetTimerOption ¶
type GetTimerOption func(*TimerCond)
GetTimerOption is the option to get timers.
func WithID ¶
func WithID(id string) GetTimerOption
WithID indicates to get a timer with the specified id.
func WithKey ¶
func WithKey(key string) GetTimerOption
WithKey indicates to get a timer with the specified key.
func WithKeyPrefix ¶
func WithKeyPrefix(keyPrefix string) GetTimerOption
WithKeyPrefix to get timers with the indicated key prefix.
func WithTag ¶
func WithTag(tags ...string) GetTimerOption
WithTag indicates to get a timer with the specified tags.
type Hook ¶
type Hook interface {
// Start starts the hook.
Start()
// Stop stops the hook. When it is called, this means the framework is shutting down.
Stop()
// OnPreSchedEvent will be called before triggering a new event. It's return value tells the next action of the triggering.
// For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed.
// Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered,
// use `event.EventID()` to get the event id instead.
OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (PreSchedEventResult, error)
// OnSchedEvent will be called when a new event is triggered.
OnSchedEvent(ctx context.Context, event TimerShedEvent) error
}
Hook is an interface which should be implemented by user to tell framework how to trigger an event. Several timers with a same hook class can share one hook in a runtime.
type HookFactory ¶
type HookFactory func(hookClass string, cli TimerClient) Hook
HookFactory is the factory function to construct a new Hook object with `hookClass`.
type ManualRequest ¶
type ManualRequest struct {
// ManualRequestID is the id of manual request.
ManualRequestID string
// ManualRequestTime is the request time.
ManualRequestTime time.Time
// ManualTimeout is the timeout for the request, if the timer is not triggered after timeout, processed will be set to true
// with empty event id.
ManualTimeout time.Duration
// ManualProcessed indicates the request is processed (triggered or timeout).
ManualProcessed bool
// ManualEventID means the triggered event id for the current request.
ManualEventID string
}
ManualRequest is the request info to trigger timer manually.
func (*ManualRequest) IsManualRequesting ¶
func (r *ManualRequest) IsManualRequesting() bool
IsManualRequesting indicates that whether this timer is requesting to trigger event manually.
func (*ManualRequest) SetProcessed ¶
func (r *ManualRequest) SetProcessed(eventID string) ManualRequest
SetProcessed sets the timer's manual request to processed.
type Operator ¶
type Operator struct {
// Op indicates the operator type.
Op OperatorTp
// Not indicates whether to apply 'NOT' to the condition.
Not bool
// Children is the children of the operator.
Children []Cond
}
Operator implements Cond.
func (*Operator) Match ¶
func (c *Operator) Match(t *TimerRecord) bool
Match will return whether the condition match the timer record.
type OperatorTp ¶
type OperatorTp int8
OperatorTp is the operator type of the condition.
const ( // OperatorAnd means 'AND' operator. OperatorAnd OperatorTp = iota // OperatorOr means 'OR' operator. OperatorOr )
type OptionalVal ¶
type OptionalVal[T any] struct { // contains filtered or unexported fields }
OptionalVal is used by `TimerCond` and `TimerUpdate` to indicate. whether some field's condition has been set or whether is field should be updated.
func NewOptionalVal ¶
func NewOptionalVal[T any](val T) (o OptionalVal[T])
NewOptionalVal creates a new OptionalVal.
func (*OptionalVal[T]) Clear ¶
func (o *OptionalVal[T]) Clear()
Clear clears the value. The `Present()` will return false after `Clear` is called.
func (*OptionalVal[T]) Get ¶
func (o *OptionalVal[T]) Get() (v T, present bool)
Get returns the current value, if the second return value is false, this means it is not set.
func (*OptionalVal[T]) Present ¶
func (o *OptionalVal[T]) Present() bool
Present indicates whether the field value is set.
type PreSchedEventResult ¶
type PreSchedEventResult struct {
// Delay indicates to delay the event after a while.
// If `Delay` is 0, it means no delay, and then `OnSchedEvent` will be called.
// Otherwise, after a while according to `Delay`, `OnPreSchedEvent` will be called again to
// check whether to trigger the event.
Delay time.Duration
// EventData indicates the data should be passed to the event that should be triggered.
// EventData can be used to store some pre-computed configurations of the next event.
EventData []byte
}
PreSchedEventResult is the result of `OnPreSchedEvent`.
type SchedEventPolicy ¶
type SchedEventPolicy interface {
// NextEventTime returns the time to schedule the next timer event. If the second return value is true,
// it means we have a next event schedule after `watermark`. Otherwise, it means there is no more event after `watermark`.
NextEventTime(watermark time.Time) (time.Time, bool)
}
SchedEventPolicy is an interface to tell the runtime how to schedule a timer's events.
func CreateSchedEventPolicy ¶
func CreateSchedEventPolicy(tp SchedPolicyType, expr string) (SchedEventPolicy, error)
CreateSchedEventPolicy creates a SchedEventPolicy according to `SchedPolicyType` and `SchedPolicyExpr`.
type SchedEventStatus ¶
type SchedEventStatus string
SchedEventStatus is the current schedule status of timer's event.
const ( // SchedEventIdle means the timer is not in trigger state currently. SchedEventIdle SchedEventStatus = "IDLE" // SchedEventTrigger means the timer is in trigger state. SchedEventTrigger SchedEventStatus = "TRIGGER" )
type SchedIntervalPolicy ¶
type SchedIntervalPolicy struct {
// contains filtered or unexported fields
}
SchedIntervalPolicy implements SchedEventPolicy, it is the policy of type `SchedEventInterval`.
func NewSchedIntervalPolicy ¶
func NewSchedIntervalPolicy(expr string) (*SchedIntervalPolicy, error)
NewSchedIntervalPolicy creates a new SchedIntervalPolicy.
func (*SchedIntervalPolicy) NextEventTime ¶
NextEventTime returns the next time of the timer event. A next event should be triggered after a time indicated by `interval` after watermark.
type SchedPolicyType ¶
type SchedPolicyType string
SchedPolicyType is the type of the event schedule policy.
const ( // SchedEventInterval indicates to schedule events every fixed interval. SchedEventInterval SchedPolicyType = "INTERVAL" // SchedEventCron indicates to schedule events by cron expression. SchedEventCron SchedPolicyType = "CRON" )
type TimerClient ¶
type TimerClient interface {
// GetDefaultNamespace returns the default namespace of this client.
GetDefaultNamespace() string
// CreateTimer creates a new timer.
CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error)
// GetTimerByID queries the timer by ID.
GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error)
// GetTimerByKey queries the timer by key.
GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error)
// GetTimers queries timers by options.
GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error)
// UpdateTimer updates a timer.
UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error
// ManualTriggerEvent triggers event manually.
ManualTriggerEvent(ctx context.Context, timerID string) (string, error)
// CloseTimerEvent closes the triggering event of a timer.
CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error
// DeleteTimer deletes a timer.
DeleteTimer(ctx context.Context, timerID string) (bool, error)
}
TimerClient is an interface exposed to user to manage timers.
func NewDefaultTimerClient ¶
func NewDefaultTimerClient(store *TimerStore) TimerClient
NewDefaultTimerClient creates a new defaultTimerClient.
type TimerCond ¶
type TimerCond struct {
// ID indicates to filter the timer record with ID.
ID OptionalVal[string]
// Namespace indicates to filter the timer by Namespace.
Namespace OptionalVal[string]
// Key indicates to filter the timer record with ID.
// The filter behavior is defined by `KeyPrefix`.
Key OptionalVal[string]
// KeyPrefix indicates how to filter with timer's key if `Key` is set.
// If `KeyPrefix is` true, it will check whether the timer's key is prefixed with `TimerCond.Key`.
// Otherwise, it will check whether the timer's key equals `TimerCond.Key`.
KeyPrefix bool
// Tags indicates to filter the timer record with specified tags.
Tags OptionalVal[[]string]
}
TimerCond is the condition to filter a timer record.
func (*TimerCond) Match ¶
func (c *TimerCond) Match(t *TimerRecord) bool
Match will return whether the condition match the timer record.
type TimerRecord ¶
type TimerRecord struct {
TimerSpec
// ID is the id of timer, it is unique and auto assigned by the store when created.
ID string
// ManualRequest is the request to trigger timer event manually.
ManualRequest
// EventStatus indicates the current schedule status of the timer's event.
EventStatus SchedEventStatus
// EventID indicates the id of current triggered event.
// If the `EventStatus` is `IDLE`, this value should be empty.
EventID string
// EventData indicates the data of current triggered event.
// If the `EventStatus` is `IDLE`, this value should be empty.
EventData []byte
// EventStart indicates the start time of current triggered event.
// If the `EventStatus` is `IDLE`, `EventStart.IsZero()` should returns true.
EventStart time.Time
// EventExtra stores some extra attributes for event
EventExtra
// SummaryData is a binary which is used to store some summary information of the timer.
// User can update it when closing a timer's event to update the summary.
SummaryData []byte
// CreateTime is the creation time of the timer.
CreateTime time.Time
// Version is the version of the record, when the record updated, version will be increased.
Version uint64
// Location is used to get the alias of TiDB timezone.
Location *time.Location
}
TimerRecord is the timer record saved in the timer store.
func (*TimerRecord) Clone ¶
func (r *TimerRecord) Clone() *TimerRecord
Clone returns a cloned TimerRecord.
func (*TimerRecord) NextEventTime ¶
func (r *TimerRecord) NextEventTime() (tm time.Time, _ bool, _ error)
NextEventTime returns the next time for timer to schedule
type TimerShedEvent ¶
type TimerShedEvent interface {
// EventID returns the event ID the current event.
EventID() string
// Timer returns the timer record object of the current event.
Timer() *TimerRecord
}
TimerShedEvent is an interface which gives the timer's schedule event's information.
type TimerSpec ¶
type TimerSpec struct {
// Namespace is the namespace of the timer.
Namespace string
// Key is the key of the timer. Key is unique in each namespace.
Key string
// Tags is used to tag a timer.
Tags []string
// Data is a binary which is defined by user.
Data []byte
// TimeZone is the time zone name of the timer to evaluate the schedule policy.
// If TimeZone is empty, it means to use the tidb cluster's time zone.
TimeZone string
// SchedPolicyType is the type of the event schedule policy.
SchedPolicyType SchedPolicyType
// SchedPolicyExpr is the expression of event schedule policy with the type specified by SchedPolicyType.
SchedPolicyExpr string
// HookClass is the class of the hook.
HookClass string
// Watermark indicates the progress the timer's event schedule.
Watermark time.Time
// Enable indicated whether the timer is enabled.
// If it is false, the new timer event will not be scheduled even it is up to time.
Enable bool
}
TimerSpec is the specification of a timer without any runtime status.
func (*TimerSpec) CreateSchedEventPolicy ¶
func (t *TimerSpec) CreateSchedEventPolicy() (SchedEventPolicy, error)
CreateSchedEventPolicy creates a SchedEventPolicy according to `SchedPolicyType` and `SchedPolicyExpr`.
type TimerStore ¶
type TimerStore struct {
TimerStoreCore
}
TimerStore extends TimerStoreCore to provide some extra methods for timer operations.
func NewMemoryTimerStore ¶
func NewMemoryTimerStore() *TimerStore
NewMemoryTimerStore creates a memory store for timers
func (*TimerStore) GetByID ¶
func (s *TimerStore) GetByID(ctx context.Context, timerID string) (record *TimerRecord, err error)
GetByID gets a timer by ID. If the timer with the specified ID not exists, an error `ErrTimerNotExist` will be returned.
func (*TimerStore) GetByKey ¶
func (s *TimerStore) GetByKey(ctx context.Context, namespace, key string) (record *TimerRecord, err error)
GetByKey gets a timer by key. If the timer with the specified key not exists in the namespace, an error `ErrTimerNotExist` will be returned.
type TimerStoreCore ¶
type TimerStoreCore interface {
// Create creates a new record. If `record.ID` is empty, an id will be assigned automatically.
// The first return value is the final id of the timer.
Create(ctx context.Context, record *TimerRecord) (string, error)
// List lists the timers that match the condition.
List(ctx context.Context, cond Cond) ([]*TimerRecord, error)
// Update updates a timer.
Update(ctx context.Context, timerID string, update *TimerUpdate) error
// Delete deletes a timer.
Delete(ctx context.Context, timerID string) (bool, error)
// WatchSupported indicates whether watch supported for this store.
WatchSupported() bool
// Watch watches all changes of the store. A chan will be returned to receive `WatchTimerResponse`
// The returned chan be closed when the context in the argument is done.
Watch(ctx context.Context) WatchTimerChan
// Close closes the store
Close()
}
TimerStoreCore is an interface, it contains several core methods of store.
type TimerUpdate ¶
type TimerUpdate struct {
// Tags indicates to set all tags for a timer.
Tags OptionalVal[[]string]
// Enable indicates to set the timer's `Enable` field.
Enable OptionalVal[bool]
// TimeZone indicates to set the timer's `TimeZone` field.
TimeZone OptionalVal[string]
// SchedPolicyType indicates to set the timer's `SchedPolicyType` field.
SchedPolicyType OptionalVal[SchedPolicyType]
// SchedPolicyExpr indicates to set the timer's `SchedPolicyExpr` field.
SchedPolicyExpr OptionalVal[string]
// ManualRequest indicates to set the timer's manual request.
ManualRequest OptionalVal[ManualRequest]
// EventStatus indicates the event status.
EventStatus OptionalVal[SchedEventStatus]
// EventID indicates to set the timer event id.
EventID OptionalVal[string]
// EventData indicates to set the timer event data.
EventData OptionalVal[[]byte]
// EventStart indicates the start time of event.
EventStart OptionalVal[time.Time]
// EventExtra indicates to set the `EventExtra` field.
EventExtra OptionalVal[EventExtra]
// Watermark indicates to set the timer's `Watermark` field.
Watermark OptionalVal[time.Time]
// SummaryData indicates to set the timer's `Summary` field.
SummaryData OptionalVal[[]byte]
// CheckVersion indicates to check the timer's version when updated.
CheckVersion OptionalVal[uint64]
// CheckEventID indicates to check the timer's eventID.
CheckEventID OptionalVal[string]
}
TimerUpdate indicates how to update a timer.
type TimerWatchEventNotifier ¶
type TimerWatchEventNotifier interface {
// Watch watches all changes of the store. A chan will be returned to receive `WatchTimerResponse`
// The returned chan be closed when the context in the argument is done.
Watch(ctx context.Context) WatchTimerChan
// Notify sends the event to watchers.
Notify(tp WatchTimerEventType, timerID string)
// Close closes the notifier.
Close()
}
TimerWatchEventNotifier is used to notify timer watch events.
func NewMemTimerWatchEventNotifier ¶
func NewMemTimerWatchEventNotifier() TimerWatchEventNotifier
NewMemTimerWatchEventNotifier creates a notifier with memory implement
type UpdateTimerOption ¶
type UpdateTimerOption func(*TimerUpdate)
UpdateTimerOption is the option to update the timer.
func WithSetEnable ¶
func WithSetEnable(enable bool) UpdateTimerOption
WithSetEnable indicates to set the timer's `Enable` field.
func WithSetSchedExpr ¶
func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption
WithSetSchedExpr indicates to set the timer's schedule policy.
func WithSetSummaryData ¶
func WithSetSummaryData(summary []byte) UpdateTimerOption
WithSetSummaryData indicates to set the timer's summary.
func WithSetTags ¶
func WithSetTags(tags []string) UpdateTimerOption
WithSetTags indicates to set the timer's tags.
func WithSetTimeZone ¶
func WithSetTimeZone(name string) UpdateTimerOption
WithSetTimeZone sets the timezone of the timer
func WithSetWatermark ¶
func WithSetWatermark(watermark time.Time) UpdateTimerOption
WithSetWatermark indicates to set the timer's watermark.
type WatchTimerChan ¶
type WatchTimerChan <-chan WatchTimerResponse
WatchTimerChan is the chan of the watch timer.
type WatchTimerEvent ¶
type WatchTimerEvent struct {
// Tp indicates the event type.
Tp WatchTimerEventType
// TimerID indicates the timer id for the event.
TimerID string
}
WatchTimerEvent is the watch event object.
type WatchTimerEventType ¶
type WatchTimerEventType int8
WatchTimerEventType is the type of the watch event.
const ( // WatchTimerEventCreate indicates that a new timer is created. WatchTimerEventCreate WatchTimerEventType = 1 << iota // WatchTimerEventUpdate indicates that a timer is updated. WatchTimerEventUpdate // WatchTimerEventDelete indicates that a timer is deleted. WatchTimerEventDelete )
type WatchTimerResponse ¶
type WatchTimerResponse struct {
// Events contains all events in the response.
Events []*WatchTimerEvent
}
WatchTimerResponse is the response of watch.