 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
- func Contain(obj interface{}, target interface{}) bool
- func GetLogCount(logGroupList *sls.LogGroupList) int
- func GetLogGroupCount(logGroupList *sls.LogGroupList) int
- func IntSliceReflectEqual(a, b []int) bool
- func Min(a, b int64) int64
- func Set(slc []int) []int
- func Subtract(a []int, b []int) (diffSlice []int)
- func TimeToSleepInMillsecond(intervalTime, lastCheckTime int64, condition bool)
- func TimeToSleepInSecond(intervalTime, lastCheckTime int64, condition bool)
- type CheckPointTracker
- type ConsumerClient
- type ConsumerHeartBeat
- type ConsumerWorker
- type DefaultCheckPointTracker
- func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
- func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
- func (tracker *DefaultCheckPointTracker) GetNextCursor() string
- func (tracker *DefaultCheckPointTracker) GetShardId() int
- func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
 
- type LogHubConfig
- type ProcessFunc
- type Processor
- type ShardConsumerWorker
Constants ¶
      View Source
      
  
    const ( BEGIN_CURSOR = "BEGIN_CURSOR" END_CURSOR = "END_CURSOR" SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" )
      View Source
      
  
const ( INITIALIZING = "INITIALIZING" PULLING = "PULLING" PROCESSING = "PROCESSING" SHUTTING_DOWN = "SHUTTING_DOWN" SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" )
Variables ¶
This section is empty.
Functions ¶
func Contain ¶
func Contain(obj interface{}, target interface{}) bool
    Determine whether obj is in target object
func GetLogCount ¶
func GetLogCount(logGroupList *sls.LogGroupList) int
Get the total number of logs
func GetLogGroupCount ¶
func GetLogGroupCount(logGroupList *sls.LogGroupList) int
func IntSliceReflectEqual ¶
Determine whether two lists are equal
func TimeToSleepInMillsecond ¶
func TimeToSleepInSecond ¶
Types ¶
type CheckPointTracker ¶ added in v0.1.46
type CheckPointTracker interface {
	// GetCheckPoint get lastest saved check point
	GetCheckPoint() string
	// SaveCheckPoint, save next cursor to checkpoint
	SaveCheckPoint(force bool) error
	// GetCurrentCursor get current fetched data cursor
	GetCurrentCursor() string
	// GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved)
	GetNextCursor() string
	// GetShardId, return the id of shard tracked
	GetShardId() int
}
    CheckPointTracker Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these
type ConsumerClient ¶
type ConsumerClient struct {
	// contains filtered or unexported fields
}
    type ConsumerHeartBeat ¶ added in v0.1.46
type ConsumerHeartBeat struct {
	// contains filtered or unexported fields
}
    type ConsumerWorker ¶
func InitConsumerWorker ¶
func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker
depreciated: this old logic is to automatically save to memory, and then commit at a fixed time we highly recommend you to use InitConsumerWorkerWithCheckpointTracker
func InitConsumerWorkerWithCheckpointTracker ¶ added in v0.1.46
func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker
InitConsumerWorkerWithCheckpointTracker please note that you need to save after the process is successful,
func InitConsumerWorkerWithProcessor ¶ added in v0.1.46
func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker
InitConsumerWorkerWithProcessor you need save checkpoint by yourself and can do something after consumer shutdown
func (*ConsumerWorker) Start ¶
func (consumerWorker *ConsumerWorker) Start()
func (*ConsumerWorker) StopAndWait ¶
func (consumerWorker *ConsumerWorker) StopAndWait()
type DefaultCheckPointTracker ¶ added in v0.1.46
type DefaultCheckPointTracker struct {
	// contains filtered or unexported fields
}
    func (*DefaultCheckPointTracker) GetCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
func (*DefaultCheckPointTracker) GetCurrentCursor ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
func (*DefaultCheckPointTracker) GetNextCursor ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetNextCursor() string
func (*DefaultCheckPointTracker) GetShardId ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetShardId() int
func (*DefaultCheckPointTracker) SaveCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
type LogHubConfig ¶
type LogHubConfig struct {
	//:param Endpoint:
	//:param AccessKeyID:
	//:param AccessKeySecret:
	//:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired.
	//:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken)
	//:param Project:
	//:param Logstore:
	//:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx'
	//:param ConsumerGroupName:
	//:param ConsumerName:
	//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
	//  Provide three options :BEGIN_CURSOR,END_CURSOR,SPECIAL_TIMER_CURSOR,when you choose SPECIAL_TIMER_CURSOR, you have to set CursorStartTime parameter.
	//:param HeartbeatIntervalInSecond:
	// default 20, once a client doesn't report to server * HeartbeatTimeoutInSecond seconds,
	// server will consider it's offline and re-assign its task to another consumer.
	// don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good.
	//:param DataFetchIntervalInMs: default 200(Millisecond), don't configure it too small (<100Millisecond)
	//:param HeartbeatTimeoutInSecond:
	// default HeartbeatIntervalInSecond * 3, once a client doesn't report to server HeartbeatTimeoutInSecond seconds,
	// server will consider it's offline and re-assign its task to another consumer.
	//:param MaxFetchLogGroupCount: default 1000, fetch size in each request, normally use default. maximum is 1000, could be lower. the lower the size the memory efficiency might be better.
	//:param CursorStartTime: Will be used when cursor_position when could be "begin", "end", "specific time format in time stamp", it's log receiving time. The unit of parameter is seconds.
	//:param InOrder:
	// 	default False, during consuption, when shard is splitted,
	// 	if need to consume the newly splitted shard after its parent shard (read-only) is finished consumption or not.
	// 	suggest keep it as False (don't care) until you have good reasion for it.
	//:param AllowLogLevel: default info,optional: debug,info,warn,error
	//:param LogFileName: Setting Log File Path,for example "/root/log/log_file.log",default
	//:param IsJsonType: Set whether the log output type is JSON,default false.
	//:param LogMaxSize: MaxSize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100 megabytes.
	//:param LogMaxBackups:
	// 	MaxBackups is the maximum number of old log files to retain.  The default
	// 	is to retain all old log files (though MaxAge may still cause them to get
	// 	deleted.)
	//:param LogCompass: Compress determines if the rotated log files should be compressed using gzip.
	//:param CompressType: CompressType is the type of compression to use, default 0 standand for lz4
	//:param HTTPClient: custom http client for sending data to sls
	//:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint
	//	  Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker
	//:param AutoCommitIntervalInSec: default auto commit interval, default is 30
	//:param AuthVersion: signature algorithm version, default is sls.AuthV1
	//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
	Endpoint                  string
	AccessKeyID               string
	AccessKeySecret           string
	CredentialsProvider       sls.CredentialsProvider
	Project                   string
	Logstore                  string
	Query                     string
	ConsumerGroupName         string
	ConsumerName              string
	CursorPosition            string
	HeartbeatIntervalInSecond int
	HeartbeatTimeoutInSecond  int
	DataFetchIntervalInMs     int64
	MaxFetchLogGroupCount     int
	CursorStartTime           int64 // Unix time stamp; Units are seconds.
	InOrder                   bool
	AllowLogLevel             string
	LogFileName               string
	IsJsonType                bool
	LogMaxSize                int
	LogMaxBackups             int
	LogCompass                bool
	CompressType              int
	HTTPClient                *http.Client
	SecurityToken             string
	AutoCommitDisabled        bool
	AutoCommitIntervalInMS    int64
	AuthVersion               sls.AuthVersionType
	Region                    string
}
    type ProcessFunc ¶ added in v0.1.46
type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error)
func (ProcessFunc) Process ¶ added in v0.1.46
func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error)
func (ProcessFunc) Shutdown ¶ added in v0.1.46
func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error
type Processor ¶ added in v0.1.46
type Processor interface {
	Process(int, *sls.LogGroupList, CheckPointTracker) (string, error)
	Shutdown(CheckPointTracker) error
}
    type ShardConsumerWorker ¶
type ShardConsumerWorker struct {
	// contains filtered or unexported fields
}
     Click to show internal directories. 
   Click to hide internal directories.