Documentation
¶
Index ¶
- Constants
- Variables
- type Dummy
- func (d *Dummy) AddEvent(namespace, id, status string, entity *Event) error
- func (d *Dummy) AppendTaskLog(taskID string, now time.Time, system, message, level string) error
- func (d *Dummy) Close() error
- func (d *Dummy) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
- func (d *Dummy) DeleteSignature(sourceID, collection string) error
- func (d *Dummy) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
- func (d *Dummy) GetAllTasks(sourceID, collection string, from, to time.Time, limit int) ([]Task, error)
- func (d *Dummy) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
- func (d *Dummy) GetAllTasksHeartBeat() (map[string]string, error)
- func (d *Dummy) GetEvents(namespace, id, status string, limit int) ([]Event, error)
- func (d *Dummy) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, ...) ([]EventsPerTime, error)
- func (d *Dummy) GetLastTask(sourceID, collection string, offset int) (*Task, error)
- func (d *Dummy) GetOrCreateClusterID() string
- func (d *Dummy) GetProjectDestinationIDs(projectID string) ([]string, error)
- func (d *Dummy) GetProjectPushSourceIDs(projectID string) ([]string, error)
- func (d *Dummy) GetProjectSourceIDs(projectID string) ([]string, error)
- func (d *Dummy) GetSignature(sourceID, collection, interval string) (string, error)
- func (d *Dummy) GetTask(taskID string) (*Task, error)
- func (d *Dummy) GetTaskLogs(taskID string, from, to time.Time) ([]TaskLogRecord, error)
- func (d *Dummy) GetTotalEvents(namespace, id, status string) (int, error)
- func (d *Dummy) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
- func (d *Dummy) PollTask() (*Task, error)
- func (d *Dummy) PushTask(task *Task) error
- func (d *Dummy) RemoveTaskFromHeartBeat(taskID string) error
- func (d *Dummy) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
- func (d *Dummy) SaveSignature(sourceID, collection, interval, signature string) error
- func (d *Dummy) TaskHeartBeat(taskID string) error
- func (d *Dummy) TrimEvents(namespace, id, status string, capacity int) error
- func (d *Dummy) Type() string
- func (d *Dummy) UpdateFinishedTask(taskID, status string) error
- func (d *Dummy) UpdateStartedTask(taskID, status string) error
- type ErrorMetrics
- type Event
- type EventsPerTime
- type Granularity
- type Options
- type Redis
- func (r *Redis) AddEvent(namespace, id, status string, entity *Event) error
- func (r *Redis) AppendTaskLog(taskID string, now time.Time, system, message, level string) error
- func (r *Redis) Close() error
- func (r *Redis) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
- func (r *Redis) DeleteSignature(sourceID, collection string) error
- func (r *Redis) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
- func (r *Redis) GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
- func (r *Redis) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
- func (r *Redis) GetAllTasksHeartBeat() (map[string]string, error)
- func (r *Redis) GetEvents(namespace, id, status string, limit int) ([]Event, error)
- func (r *Redis) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, ...) ([]EventsPerTime, error)
- func (r *Redis) GetLastTask(sourceID, collection string, offset int) (*Task, error)
- func (r *Redis) GetOrCreateClusterID() string
- func (r *Redis) GetProjectDestinationIDs(projectID string) ([]string, error)
- func (r *Redis) GetProjectPushSourceIDs(projectID string) ([]string, error)
- func (r *Redis) GetProjectSourceIDs(projectID string) ([]string, error)
- func (r *Redis) GetSignature(sourceID, collection, interval string) (string, error)
- func (r *Redis) GetTask(taskID string) (*Task, error)
- func (r *Redis) GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error)
- func (r *Redis) GetTotalEvents(namespace, id, status string) (int, error)
- func (r *Redis) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
- func (r *Redis) PollTask() (*Task, error)
- func (r *Redis) PushTask(task *Task) error
- func (r *Redis) RemoveTaskFromHeartBeat(taskID string) error
- func (r *Redis) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
- func (r *Redis) SaveSignature(sourceID, collection, interval, signature string) error
- func (r *Redis) TaskHeartBeat(taskID string) error
- func (r *Redis) TrimEvents(namespace, id, status string, capacity int) error
- func (r *Redis) Type() string
- func (r *Redis) UpdateFinishedTask(taskID, status string) error
- func (r *Redis) UpdateStartedTask(taskID, status string) error
- type RedisPool
- type RedisPoolFactory
- type Storage
- type Task
- type TaskLogRecord
Constants ¶
const ( DestinationNamespace = "destination" SourceNamespace = "source" //536-issue DEPRECATED //instead of this name - all sources will be in SourceNamespace and for push/pull events different keys will be selected PushSourceNamespace = "push_source" PushEventType = "push" PullEventType = "pull" SuccessStatus = "success" ErrorStatus = "errors" SkipStatus = "skip" ConfigPrefix = "config#" SystemKey = "system" EventsTokenNamespace = "token" EventsDestinationNamespace = "destination" EventsErrorStatus = "error" EventsPureStatus = "" )
const ( DummyType = "Dummy" RedisType = "Redis" )
Variables ¶
var DefaultOptions = Options{ DefaultDialConnectTimeout: 10 * time.Second, DefaultDialReadTimeout: 10 * time.Second, DefaultDialWriteTimeout: 10 * time.Second, MaxIdle: 10, MaxActive: 600, IdleTimeout: 240 * time.Second, PingTimeout: 30 * time.Second, }
DefaultOptions for Redis Pool
var (
ErrTaskNotFound = errors.New("Sync task wasn't found")
)
Functions ¶
This section is empty.
Types ¶
type Dummy ¶
type Dummy struct {
}
func (*Dummy) AppendTaskLog ¶
func (*Dummy) CreateTask ¶
func (*Dummy) DeleteSignature ¶
func (*Dummy) GetAllTaskIDs ¶
func (*Dummy) GetAllTasks ¶
func (*Dummy) GetAllTasksForInitialHeartbeat ¶
func (*Dummy) GetAllTasksHeartBeat ¶
func (*Dummy) GetEventsWithGranularity ¶
func (d *Dummy) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)
func (*Dummy) GetLastTask ¶
func (*Dummy) GetOrCreateClusterID ¶
func (*Dummy) GetProjectDestinationIDs ¶
func (*Dummy) GetProjectPushSourceIDs ¶
func (*Dummy) GetProjectSourceIDs ¶
func (*Dummy) GetSignature ¶
func (*Dummy) GetTaskLogs ¶
func (*Dummy) GetTotalEvents ¶
func (*Dummy) IncrementEventsCount ¶
func (*Dummy) RemoveTaskFromHeartBeat ¶
func (*Dummy) RemoveTasks ¶
func (*Dummy) SaveSignature ¶
func (*Dummy) TaskHeartBeat ¶
func (*Dummy) TrimEvents ¶
func (*Dummy) UpdateFinishedTask ¶
func (*Dummy) UpdateStartedTask ¶
type ErrorMetrics ¶
type ErrorMetrics struct {
// contains filtered or unexported fields
}
func NewErrorMetrics ¶
func NewErrorMetrics(metricFunc func(string)) *ErrorMetrics
func (*ErrorMetrics) NoticeError ¶
func (em *ErrorMetrics) NoticeError(err error)
type Event ¶
type Event struct {
Malformed string `json:"malformed,omitempty" redis:"malformed"`
Original string `json:"original,omitempty" redis:"original"`
Success string `json:"success,omitempty" redis:"success"`
Error string `json:"error,omitempty" redis:"error"`
Skip string `json:"skip,omitempty" redis:"skip"`
Timestamp string `json:"timestamp,omitempty" redis:"timestamp"`
UID string `json:"uid,omitempty" redis:"uid"`
DestinationID string `json:"destination_id,omitempty" redis:"destination_id"`
TokenID string `json:"token_id,omitempty" redis:"token_id"`
}
type EventsPerTime ¶
type Granularity ¶
type Granularity string
Granularity is used for gathering statistics
const ( UNKNOWN Granularity = "" DAY Granularity = "day" HOUR Granularity = "hour" )
func GranularityFromString ¶
func GranularityFromString(value string) (Granularity, error)
func (Granularity) String ¶
func (g Granularity) String() string
type Options ¶
type Options struct {
DefaultDialConnectTimeout time.Duration
DefaultDialReadTimeout time.Duration
DefaultDialWriteTimeout time.Duration
MaxIdle int
MaxActive int
IdleTimeout time.Duration
PingTimeout time.Duration
}
Options for Redis Pool
type Redis ¶
type Redis struct {
// contains filtered or unexported fields
}
func (*Redis) AppendTaskLog ¶
AppendTaskLog appends log record into task logs sorted set
func (*Redis) CreateTask ¶
CreateTask saves task into Redis and add Task ID in index
func (*Redis) DeleteSignature ¶
DeleteSignature deletes source collection signature from Redis
func (*Redis) GetAllTaskIDs ¶
GetAllTaskIDs returns all source's tasks ids by collection
func (*Redis) GetAllTasks ¶
func (r *Redis) GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
GetAllTasks returns all source's tasks by collection and time criteria
func (*Redis) GetAllTasksForInitialHeartbeat ¶
func (r *Redis) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
GetAllTasksForInitialHeartbeat returns all task IDs where: 1. task is RUNNING and last log time more than last activity threshold 2. task is SCHEDULED and task creation time more than last activity threshold
func (*Redis) GetAllTasksHeartBeat ¶
GetAllTasksHeartBeat returns map with taskID-last heartbeat timestamp pairs
func (*Redis) GetEventsWithGranularity ¶
func (r *Redis) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)
GetEventsWithGranularity returns events amount with time criteria by granularity, status and sources/destination ids
func (*Redis) GetLastTask ¶
GetLastTask returns last sync task
func (*Redis) GetOrCreateClusterID ¶
GetOrCreateClusterID returns clusterID from Redis or save input one
func (*Redis) GetProjectDestinationIDs ¶
GetProjectDestinationIDs returns project's destination ids
func (*Redis) GetProjectPushSourceIDs ¶
GetProjectPushSourceIDs returns project's pushed sources ids (api keys)
func (*Redis) GetProjectSourceIDs ¶
GetProjectSourceIDs returns project's sources ids
func (*Redis) GetSignature ¶
GetSignature returns sync interval signature from Redis
func (*Redis) GetTaskLogs ¶
GetTaskLogs returns task logs with time criteria
func (*Redis) GetTotalEvents ¶
GetTotalEvents returns total of cached events
func (*Redis) IncrementEventsCount ¶
func (r *Redis) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
IncrementEventsCount increment events counter namespaces: [destination, source] eventType: [push, pull] status: [success, error, skip]
func (*Redis) RemoveTaskFromHeartBeat ¶
RemoveTaskFromHeartBeat removes taskID current timestamp from heartbeat key
func (*Redis) RemoveTasks ¶
RemoveTasks tasks with provided taskIds from specified source's collections. All task logs removed as well
func (*Redis) SaveSignature ¶
SaveSignature saves sync interval signature in Redis
func (*Redis) TaskHeartBeat ¶
TaskHeartBeat sets current timestamp into heartbeat key
func (*Redis) TrimEvents ¶
TrimEvents keeps only last capacity events in Redis list key with trim function
func (*Redis) UpdateFinishedTask ¶
UpdateFinishedTask updates only status and finished_at field in the task
func (*Redis) UpdateStartedTask ¶
UpdateStartedTask updates only status and started_at field in the task
type RedisPool ¶
type RedisPool struct {
// contains filtered or unexported fields
}
RedisPool is a wrapper for keeping redis.Pool and sentinel.Sentinel and close them
func (*RedisPool) GetContext ¶
type RedisPoolFactory ¶
type RedisPoolFactory struct {
// contains filtered or unexported fields
}
RedisPoolFactory is a factory for creating RedisPool supports creating RedisPool from URLs: redis://, rediss://, sentinel:// and from config parameters like host,port, etc
func NewRedisPoolFactory ¶
func NewRedisPoolFactory(host string, port int, password string, database int, tlsSkipVerify bool, sentinelMasterMame string) *RedisPoolFactory
NewRedisPoolFactory returns filled RedisPoolFactory and removes quotes in host
func (*RedisPoolFactory) CheckAndSetDefaultPort ¶
func (rpf *RedisPoolFactory) CheckAndSetDefaultPort() (int, bool)
CheckAndSetDefaultPort checks if port isn't set - put defaultRedisPort, if sentinel mode put defaultSentinelPort
func (*RedisPoolFactory) Create ¶
func (rpf *RedisPoolFactory) Create() (*RedisPool, error)
Create returns configured RedisPool or err if ping failed host might be URLS: 1. redis://:password@host:port 2. rediss://:password@host:port 3. sentinel://master_name:password@node1:port,node2:port 4. plain host
func (*RedisPoolFactory) Details ¶
func (rpf *RedisPoolFactory) Details() string
Details returns host:port or host if host is a URL with sentinel information
func (*RedisPoolFactory) GetOptions ¶
func (rpf *RedisPoolFactory) GetOptions() Options
func (*RedisPoolFactory) WithOptions ¶
func (rpf *RedisPoolFactory) WithOptions(options Options) *RedisPoolFactory
WithOptions overrides options
type Storage ¶
type Storage interface {
io.Closer
//** Sources **
//signatures
GetSignature(sourceID, collection, interval string) (string, error)
SaveSignature(sourceID, collection, interval, signature string) error
DeleteSignature(sourceID, collection string) error
//** Counters **
//events counters
IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
GetProjectSourceIDs(projectID string) ([]string, error)
GetProjectDestinationIDs(projectID string) ([]string, error)
//536-issue DEPRECATED instead of it all project sources will be selected
GetProjectPushSourceIDs(projectID string) ([]string, error)
GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)
//** Events Cache **
//events caching
AddEvent(namespace, id, status string, entity *Event) error
TrimEvents(namespace, id, status string, capacity int) error
GetEvents(namespace, id, status string, limit int) ([]Event, error)
GetTotalEvents(namespace, id, status string) (int, error)
// ** Sync Tasks **
CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
GetLastTask(sourceID, collection string, offset int) (*Task, error)
GetTask(taskID string) (*Task, error)
GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
UpdateStartedTask(taskID, status string) error
UpdateFinishedTask(taskID, status string) error
//heartbeat
TaskHeartBeat(taskID string) error
RemoveTaskFromHeartBeat(taskID string) error
GetAllTasksHeartBeat() (map[string]string, error)
GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
//task logs
AppendTaskLog(taskID string, now time.Time, system, message, level string) error
GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error)
//task queue
PushTask(task *Task) error
PollTask() (*Task, error)
//system
GetOrCreateClusterID() string
Type() string
}
type Task ¶
type Task struct {
ID string `json:"id,omitempty" redis:"id"`
SourceType string `json:"source_type" redis:"source_type"`
Source string `json:"source,omitempty" redis:"source"`
Collection string `json:"collection,omitempty" redis:"collection"`
Priority int64 `json:"priority,omitempty" redis:"priority"`
CreatedAt string `json:"created_at,omitempty" redis:"created_at"`
StartedAt string `json:"started_at,omitempty" redis:"started_at"`
FinishedAt string `json:"finished_at,omitempty" redis:"finished_at"`
Status string `json:"status,omitempty" redis:"status"`
}
Task is a Redis entity some fields are updated using names in Storage (like status updating)
type TaskLogRecord ¶
type TaskLogRecord struct {
Time string `json:"time,omitempty" redis:"time"`
System string `json:"system,omitempty" redis:"system"`
Message string `json:"message,omitempty" redis:"message"`
Level string `json:"level,omitempty" redis:"level"`
}
TaskLogRecord is a Redis entity
func (*TaskLogRecord) Marshal ¶
func (tlr *TaskLogRecord) Marshal() string
Marshal returns serialized JSON object string