Documentation
¶
Index ¶
- Constants
- Variables
- func IsPositionStoreEvent(schemaName string, tableName string) bool
- func StringDecoder(s string) (interface{}, error)
- func StringEncoder(v interface{}) (string, error)
- type MongoPosition
- type MongoPositionRet
- type Position
- type PositionCacheInterface
- type PositionEntity
- type PositionMeta
- type PositionRepo
- type PositionValueDecoder
- type PositionValueEncoder
- type PositionWrapper
Constants ¶
View Source
const (
Version = "1.0"
)
Variables ¶
View Source
var DefaultFlushPeriod = 5 * time.Second
Functions ¶
func IsPositionStoreEvent ¶
func StringDecoder ¶ added in v0.9.19
func StringEncoder ¶ added in v0.9.19
Types ¶
type MongoPosition ¶
type MongoPosition struct {
StartPosition config.MongoPosition `json:"start_position" bson:"start_position"`
CurrentPosition config.MongoPosition `json:"current_position" bson:"current_position"`
}
MongoPosition and PositionEntity is here to keep backward compatible with previous position format
type MongoPositionRet ¶ added in v0.9.19
type MongoPositionRet struct {
Version string `json:"version" bson:"version"`
Name string `json:"name" bson:"name"`
Stage string `json:"stage" bson:"stage"`
Value string `json:"value" bson:"value"`
LastUpdate string `json:"last_update" bson:"last_update"`
}
MongoPositionRet is the new format
type Position ¶
type Position struct {
PositionMeta
Value interface{} `bson:"-" json:"-"`
}
type PositionCacheInterface ¶ added in v0.9.17
type PositionCacheInterface interface {
Start() error
Close()
Put(position Position) error
// Get will get a value from cache, if there is no value inside the cache
// it will try to get it from position repo
Get() (position Position, exist bool, err error)
GetEncodedPersistentPosition() (position PositionMeta, v string, exist bool, err error)
Flush() error
Clear() error
}
func NewPositionCache ¶ added in v0.9.17
func NewPositionCache(pipelineName string, repo PositionRepo, encoder PositionValueEncoder, decoder PositionValueDecoder, flushDuration time.Duration) (PositionCacheInterface, error)
type PositionEntity ¶ added in v0.9.17
type PositionEntity struct {
Name string `json:"name" bson:"name"`
Stage string `json:"stage" bson:"stage"`
MongoPosition `json:",inline" bson:",inline"`
LastUpdate string `json:"last_update" bson:"last_update"`
}
PositionEntity is the old format, will be deprecated
type PositionMeta ¶ added in v0.9.19
type PositionMeta struct {
// Version is the schema version of position
Version string `bson:"version" json:"version"`
// Name is the unique name of a pipeline
Name string
Stage config.InputMode
UpdateTime time.Time
}
func (PositionMeta) Validate ¶ added in v0.9.19
func (meta PositionMeta) Validate() error
type PositionRepo ¶ added in v0.9.17
type PositionRepo interface {
Get(pipelineName string) (PositionMeta, string, bool, error)
Put(pipelineName string, positionMeta PositionMeta, v string) error
Delete(pipelineName string) error
Close() error
}
func NewMemoRepo ¶ added in v0.9.17
func NewMemoRepo() PositionRepo
func NewMongoPositionRepo ¶ added in v0.9.17
func NewMongoPositionRepo(session *mgo.Session) (PositionRepo, error)
func NewMySQLRepo ¶ added in v0.9.17
func NewMySQLRepo(dbConfig *utils.DBConfig, annotation string) (PositionRepo, error)
type PositionValueDecoder ¶ added in v0.9.19
type PositionValueEncoder ¶ added in v0.9.19
type PositionWrapper ¶ added in v0.9.19
type PositionWrapper struct {
PositionMeta
MongoValue string `bson:"value" json:"value"`
}
Click to show internal directories.
Click to hide internal directories.