Documentation
¶
Index ¶
- Constants
- func Contain(obj interface{}, target interface{}) bool
- func GetLogCount(logGroupList *sls.LogGroupList) int
- func GetLogGroupCount(logGroupList *sls.LogGroupList) int
- func IntSliceReflectEqual(a, b []int) bool
- func Min(a, b int64) int64
- func Set(slc []int) []int
- func Subtract(a []int, b []int) (diffSlice []int)
- func TimeToSleepInMillsecond(intervalTime, lastCheckTime int64, condition bool)
- func TimeToSleepInSecond(intervalTime, lastCheckTime int64, condition bool)
- type CheckPointTracker
- type ConsumerClient
- type ConsumerHeartBeat
- type ConsumerWorker
- type DefaultCheckPointTracker
- func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
- func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
- func (tracker *DefaultCheckPointTracker) GetNextCursor() string
- func (tracker *DefaultCheckPointTracker) GetShardId() int
- func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
- type LogHubConfig
- type ProcessFunc
- type Processor
- type ShardConsumerWorker
Constants ¶
View Source
const ( BEGIN_CURSOR = "BEGIN_CURSOR" END_CURSOR = "END_CURSOR" SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" )
View Source
const ( INITIALIZING = "INITIALIZING" PULLING = "PULLING" PROCESSING = "PROCESSING" SHUTTING_DOWN = "SHUTTING_DOWN" SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" )
Variables ¶
This section is empty.
Functions ¶
func Contain ¶
func Contain(obj interface{}, target interface{}) bool
Determine whether obj is in target object
func GetLogCount ¶
func GetLogCount(logGroupList *sls.LogGroupList) int
Get the total number of logs
func GetLogGroupCount ¶
func GetLogGroupCount(logGroupList *sls.LogGroupList) int
func IntSliceReflectEqual ¶
Determine whether two lists are equal
func TimeToSleepInMillsecond ¶
func TimeToSleepInSecond ¶
Types ¶
type CheckPointTracker ¶ added in v0.1.46
type CheckPointTracker interface {
// GetCheckPoint get lastest saved check point
GetCheckPoint() string
// SaveCheckPoint, save next cursor to checkpoint
SaveCheckPoint(force bool) error
// GetCurrentCursor get current fetched data cursor
GetCurrentCursor() string
// GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved)
GetNextCursor() string
// GetShardId, return the id of shard tracked
GetShardId() int
}
CheckPointTracker Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these
type ConsumerClient ¶
type ConsumerClient struct {
// contains filtered or unexported fields
}
type ConsumerHeartBeat ¶ added in v0.1.46
type ConsumerHeartBeat struct {
// contains filtered or unexported fields
}
type ConsumerWorker ¶
func InitConsumerWorker ¶
func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker
depreciated: this old logic is to automatically save to memory, and then commit at a fixed time we highly recommend you to use InitConsumerWorkerWithCheckpointTracker
func InitConsumerWorkerWithCheckpointTracker ¶ added in v0.1.46
func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker
InitConsumerWorkerWithCheckpointTracker please note that you need to save after the process is successful,
func InitConsumerWorkerWithProcessor ¶ added in v0.1.46
func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker
InitConsumerWorkerWithProcessor you need save checkpoint by yourself and can do something after consumer shutdown
func (*ConsumerWorker) Start ¶
func (consumerWorker *ConsumerWorker) Start()
func (*ConsumerWorker) StopAndWait ¶
func (consumerWorker *ConsumerWorker) StopAndWait()
type DefaultCheckPointTracker ¶ added in v0.1.46
type DefaultCheckPointTracker struct {
// contains filtered or unexported fields
}
func (*DefaultCheckPointTracker) GetCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
func (*DefaultCheckPointTracker) GetCurrentCursor ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
func (*DefaultCheckPointTracker) GetNextCursor ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetNextCursor() string
func (*DefaultCheckPointTracker) GetShardId ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetShardId() int
func (*DefaultCheckPointTracker) SaveCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
type LogHubConfig ¶
type LogHubConfig struct {
Endpoint string
AccessKeyID string
AccessKeySecret string
Project string
Logstore string
ConsumerGroupName string
ConsumerName string
CursorPosition string
HeartbeatIntervalInSecond int
DataFetchIntervalInMs int64
MaxFetchLogGroupCount int
CursorStartTime int64 // Unix time stamp; Units are seconds.
InOrder bool
AllowLogLevel string
LogFileName string
IsJsonType bool
LogMaxSize int
LogMaxBackups int
LogCompass bool
HTTPClient *http.Client
SecurityToken string
AutoCommitDisabled bool
AutoCommitIntervalInMS int64
}
type ProcessFunc ¶ added in v0.1.46
type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error)
func (ProcessFunc) Process ¶ added in v0.1.46
func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error)
func (ProcessFunc) Shutdown ¶ added in v0.1.46
func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error
type Processor ¶ added in v0.1.46
type Processor interface {
Process(int, *sls.LogGroupList, CheckPointTracker) (string, error)
Shutdown(CheckPointTracker) error
}
type ShardConsumerWorker ¶
type ShardConsumerWorker struct {
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.