Documentation
¶
Index ¶
- Constants
- type ArtifactCollectionState
- func (s *ArtifactCollectionState) Clear(timeRange CollectionTimeRange)
- func (s *ArtifactCollectionState) Compare(want *ArtifactCollectionState) (bool, string)
- func (s *ArtifactCollectionState) GetFromTime() time.Time
- func (s *ArtifactCollectionState) GetToTime() time.Time
- func (s *ArtifactCollectionState) Init(collectionTimeRange CollectionTimeRange, granularity time.Duration)
- func (s *ArtifactCollectionState) IsEmpty() bool
- func (s *ArtifactCollectionState) MigrateFromLegacyState(bytes []byte) error
- func (s *ArtifactCollectionState) OnCollected(id string, timestamp time.Time) error
- func (s *ArtifactCollectionState) OnCollectionComplete() error
- func (s *ArtifactCollectionState) RegisterPath(path string, metadata map[string]string)
- func (s *ArtifactCollectionState) ShouldCollect(id string, timestamp time.Time) bool
- func (s *ArtifactCollectionState) String() any
- func (s *ArtifactCollectionState) Validate() error
- type ArtifactCollectionStateLegacy
- type CollectionOrder
- type CollectionState
- type CollectionStateWithPaths
- type CollectionTimeRange
- type ReverseOrderCollectionStateLegacy
- type SaveableCollectionState
- func (s *SaveableCollectionState) GetFromTime() time.Time
- func (s *SaveableCollectionState) GetGranularity() time.Duration
- func (s *SaveableCollectionState) GetToTime() time.Time
- func (s *SaveableCollectionState) Init(collectionTimeRange CollectionTimeRange, recollect bool, ...) error
- func (s *SaveableCollectionState) IsEmpty() bool
- func (s *SaveableCollectionState) LoadFromFile(path string) error
- func (s *SaveableCollectionState) OnCollected(id string, timestamp time.Time) error
- func (s *SaveableCollectionState) OnCollectionComplete() error
- func (s *SaveableCollectionState) RegisterPath(path string, metadata map[string]string)
- func (s *SaveableCollectionState) Save() error
- func (s *SaveableCollectionState) ShouldCollect(id string, timestamp time.Time) bool
- type TimeRangeCollectionState
- func (t *TimeRangeCollectionState) Clear(clearRange CollectionTimeRange)
- func (t *TimeRangeCollectionState) Compare(want *TimeRangeCollectionState) (bool, string)
- func (t *TimeRangeCollectionState) GetFromTime() time.Time
- func (t *TimeRangeCollectionState) GetGranularity() time.Duration
- func (t *TimeRangeCollectionState) GetToTime() time.Time
- func (t *TimeRangeCollectionState) Init(collectionTimeRange CollectionTimeRange, granularity time.Duration)
- func (t *TimeRangeCollectionState) IsEmpty() bool
- func (t *TimeRangeCollectionState) MigrateFromLegacyState(bytes []byte) error
- func (t *TimeRangeCollectionState) OnCollected(id string, timestamp time.Time) error
- func (t *TimeRangeCollectionState) OnCollectionComplete() error
- func (t *TimeRangeCollectionState) ShouldCollect(id string, timestamp time.Time) bool
- func (t *TimeRangeCollectionState) String() string
- func (t *TimeRangeCollectionState) Validate() error
- type TimeRangeCollectionStateLegacy
- type TimeRangeObjectState
- func (s *TimeRangeObjectState) Clone() *TimeRangeObjectState
- func (s *TimeRangeObjectState) Compare(other *TimeRangeObjectState) (bool, string)
- func (s *TimeRangeObjectState) GetFromTime() time.Time
- func (s *TimeRangeObjectState) GetGranularity() time.Duration
- func (s *TimeRangeObjectState) GetToTime() time.Time
- func (s *TimeRangeObjectState) IsEmpty() bool
- func (s *TimeRangeObjectState) OnCollected(id string, timestamp time.Time) error
- func (s *TimeRangeObjectState) ShouldCollect(id string, timestamp time.Time) bool
- func (s *TimeRangeObjectState) String() string
- func (s *TimeRangeObjectState) Validate() error
Constants ¶
const CollectionStateStructVersion = 20250618
const MinArtifactGranularity = time.Hour * 24
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ArtifactCollectionState ¶
type ArtifactCollectionState struct {
// map of trunk paths to collection state for that trunk
// a trunk is a path segment that does not contain any time metadata
// for example if the path is s3://bucket/folder1/folder2/2021/01/01/file.txt then the trunk is s3://bucket/folder1/folder2
TrunkStates map[string]*TimeRangeCollectionState `json:"trunk_states,omitempty"`
// contains filtered or unexported fields
}
ArtifactCollectionState is a collection state implementation for artifact sources it tracks the collection state for each trunk (a path segment that does not contain any time metadata) NOTE: in use, this will be wrapped in a SaveableCollectionState to allow saving to disk. This also implements locking for OnCollected and ShouldCollect methods so we do not need to do that here.
func (*ArtifactCollectionState) Clear ¶ added in v0.9.0
func (s *ArtifactCollectionState) Clear(timeRange CollectionTimeRange)
func (*ArtifactCollectionState) Compare ¶ added in v0.9.0
func (s *ArtifactCollectionState) Compare(want *ArtifactCollectionState) (bool, string)
Compare compares the current state with another ArtifactCollectionState and returns whether they are equal and a message describing any differences
func (*ArtifactCollectionState) GetFromTime ¶ added in v0.9.0
func (s *ArtifactCollectionState) GetFromTime() time.Time
func (*ArtifactCollectionState) GetToTime ¶ added in v0.9.0
func (s *ArtifactCollectionState) GetToTime() time.Time
GetToTime returns the time we know have collected ALL data up until (we may have collected some data after this - within the granularity period) return the earliest end time of all the trunk states
func (*ArtifactCollectionState) Init ¶ added in v0.9.0
func (s *ArtifactCollectionState) Init(collectionTimeRange CollectionTimeRange, granularity time.Duration)
Init sets the filepath of the collection state and loads the state from the file if it exists
func (*ArtifactCollectionState) IsEmpty ¶ added in v0.9.0
func (s *ArtifactCollectionState) IsEmpty() bool
IsEmpty returns whether the collection state is empty
func (*ArtifactCollectionState) MigrateFromLegacyState ¶ added in v0.9.0
func (s *ArtifactCollectionState) MigrateFromLegacyState(bytes []byte) error
MigrateFromLegacyState attempts to migrate from a legacy collection state
func (*ArtifactCollectionState) OnCollected ¶ added in v0.9.0
func (s *ArtifactCollectionState) OnCollected(id string, timestamp time.Time) error
OnCollected is called when an object has been collected - update our end time and end objects if needed
func (*ArtifactCollectionState) OnCollectionComplete ¶ added in v0.9.0
func (s *ArtifactCollectionState) OnCollectionComplete() error
OnCollectionComplete sets the end time for the collection state - update all trunk states This is called after a successful collection to set the collection state end time to the to time of the collection
func (*ArtifactCollectionState) RegisterPath ¶
func (s *ArtifactCollectionState) RegisterPath(path string, metadata map[string]string)
RegisterPath registers a path with the collection state - we determine whether this is a potential trunk (i.e. a path segment with no time metadata for which we need to track collection state separately) and if so, add it to the map of trunk states
func (*ArtifactCollectionState) ShouldCollect ¶ added in v0.9.0
func (s *ArtifactCollectionState) ShouldCollect(id string, timestamp time.Time) bool
ShouldCollect returns whether the object should be collected, based on the time metadata in the object
func (*ArtifactCollectionState) String ¶ added in v0.9.0
func (s *ArtifactCollectionState) String() any
func (*ArtifactCollectionState) Validate ¶ added in v0.9.0
func (s *ArtifactCollectionState) Validate() error
type ArtifactCollectionStateLegacy ¶ added in v0.9.0
type ArtifactCollectionStateLegacy struct {
TrunkStates map[string]*TimeRangeCollectionStateLegacy `json:"trunk_states,omitempty"`
LastModifiedTime time.Time `json:"last_modified_time,omitempty"`
}
type CollectionOrder ¶
type CollectionOrder int
const ( CollectionOrderChronological CollectionOrder = iota CollectionOrderReverse )
type CollectionState ¶
type CollectionState interface {
IsEmpty() bool
Init(CollectionTimeRange, time.Duration)
ShouldCollect(id string, timestamp time.Time) bool
OnCollected(id string, timestamp time.Time) error
GetFromTime() time.Time
// GetToTime returns the time 1 granularity period AFTER the last time we are sure we have collected ALL data for
// e.g. if end time is 2023-10-10T00:00:00Z and granularity is 1 hour,
// then we have collected all data up to and including 2023-10-09:23:00:00Z
GetToTime() time.Time
OnCollectionComplete() error
MigrateFromLegacyState(bytes []byte) error
Validate() error
Clear(CollectionTimeRange)
}
func NewArtifactCollectionState ¶ added in v0.9.0
func NewArtifactCollectionState() CollectionState
func NewReverseOrderTimeRangeSliceCollectionState ¶
func NewReverseOrderTimeRangeSliceCollectionState() CollectionState
NewReverseOrderTimeRangeSliceCollectionState creates a new TimeRangeCollectionState with reverse order
func NewTimeRangeCollectionState ¶
func NewTimeRangeCollectionState() CollectionState
type CollectionStateWithPaths ¶ added in v0.9.0
type CollectionStateWithPaths interface {
CollectionState
RegisterPath(path string, metadata map[string]string)
}
type CollectionTimeRange ¶
type CollectionTimeRange struct {
From time.Time `json:"from"`
To time.Time `json:"to"`
CollectionOrder CollectionOrder `json:"collection_order"`
}
func (*CollectionTimeRange) IsRangeSubsumed ¶
func (t *CollectionTimeRange) IsRangeSubsumed(other CollectionTimeRange) bool
IsRangeSubsumed checks if this time range is completely contained within another time range
func (*CollectionTimeRange) OverlapsEnd ¶
func (t *CollectionTimeRange) OverlapsEnd(other CollectionTimeRange) bool
OverlapsEnd returns whether our START overlaps the END of the other time range NOTE: - returns true if our START is within the other range (but not if either range subsumes the other) - returns false if either range completely contains the other
func (*CollectionTimeRange) OverlapsStart ¶
func (t *CollectionTimeRange) OverlapsStart(other CollectionTimeRange) bool
OverlapsStart returns whether our END overlaps the START of the other time range NOTE: - returns true if our END is after the other's start and before the other's end, but not if either range subsumes the other
func (*CollectionTimeRange) Validate ¶
func (t *CollectionTimeRange) Validate() error
type ReverseOrderCollectionStateLegacy ¶ added in v0.9.0
type ReverseOrderCollectionStateLegacy struct {
// collection of time ranges ordered by time
TimeRanges []*TimeRangeCollectionStateLegacy `json:"time_ranges"`
}
type SaveableCollectionState ¶ added in v0.9.0
type SaveableCollectionState struct {
StructVersion int `json:"struct_version"`
State CollectionState `json:"state"`
// contains filtered or unexported fields
}
SaveableCollectionState is a decorator for CollectionState that allows it to be saved to disk It wraps a CollectionState and provides methods to save the state to a JSON file It also provides a mutex to ensure thread safety when accessing the collection state
func NewSaveableCollectionState ¶ added in v0.9.0
func NewSaveableCollectionState(state CollectionState, path string) (*SaveableCollectionState, error)
func (*SaveableCollectionState) GetFromTime ¶ added in v0.9.0
func (s *SaveableCollectionState) GetFromTime() time.Time
func (*SaveableCollectionState) GetGranularity ¶ added in v0.9.0
func (s *SaveableCollectionState) GetGranularity() time.Duration
func (*SaveableCollectionState) GetToTime ¶ added in v0.9.0
func (s *SaveableCollectionState) GetToTime() time.Time
func (*SaveableCollectionState) Init ¶ added in v0.9.0
func (s *SaveableCollectionState) Init(collectionTimeRange CollectionTimeRange, recollect bool, granularity time.Duration) error
Init initializes the SaveableCollectionState with a CollectionTimeRange and a file path if the file exists, it loads the state from the file
func (*SaveableCollectionState) IsEmpty ¶ added in v0.9.0
func (s *SaveableCollectionState) IsEmpty() bool
func (*SaveableCollectionState) LoadFromFile ¶ added in v0.9.0
func (s *SaveableCollectionState) LoadFromFile(path string) error
func (*SaveableCollectionState) OnCollected ¶ added in v0.9.0
func (s *SaveableCollectionState) OnCollected(id string, timestamp time.Time) error
func (*SaveableCollectionState) OnCollectionComplete ¶ added in v0.9.0
func (s *SaveableCollectionState) OnCollectionComplete() error
func (*SaveableCollectionState) RegisterPath ¶ added in v0.9.0
func (s *SaveableCollectionState) RegisterPath(path string, metadata map[string]string)
func (*SaveableCollectionState) Save ¶ added in v0.9.0
func (s *SaveableCollectionState) Save() error
Save serializes the underlying collection state to JSON and writes it to the file specified by jsonPath. NOTE: This method locks the mutex to ensure thread safety.
func (*SaveableCollectionState) ShouldCollect ¶ added in v0.9.0
func (s *SaveableCollectionState) ShouldCollect(id string, timestamp time.Time) bool
type TimeRangeCollectionState ¶
type TimeRangeCollectionState struct {
TimeRanges []*TimeRangeObjectState `json:"time_ranges"`
Granularity time.Duration `json:"granularity"`
Order CollectionOrder `json:"order"`
// contains filtered or unexported fields
}
func NewTimeRangeCollectionStateFromLegacy ¶ added in v0.9.0
func NewTimeRangeCollectionStateFromLegacy(legacy *TimeRangeCollectionStateLegacy) *TimeRangeCollectionState
NewTimeRangeCollectionStateFromLegacy constructs a new TimeRangeCollectionState from a legacy state
func (*TimeRangeCollectionState) Clear ¶
func (t *TimeRangeCollectionState) Clear(clearRange CollectionTimeRange)
Clear updates the state clear any entries for the given time range.
func (*TimeRangeCollectionState) Compare ¶ added in v0.9.0
func (t *TimeRangeCollectionState) Compare(want *TimeRangeCollectionState) (bool, string)
Compare compares the current state with another TimeRangeCollectionState and returns whether they are equal and a message describing any differences
func (*TimeRangeCollectionState) GetFromTime ¶ added in v0.9.0
func (t *TimeRangeCollectionState) GetFromTime() time.Time
GetFromTime returns the earliest time we have data for
func (*TimeRangeCollectionState) GetGranularity ¶ added in v0.9.0
func (t *TimeRangeCollectionState) GetGranularity() time.Duration
func (*TimeRangeCollectionState) GetToTime ¶ added in v0.9.0
func (t *TimeRangeCollectionState) GetToTime() time.Time
GetToTime returns the time we have data UNTIL (exclusive) i.e. the end time is the granularity period AFTER thew time we are confident we have all data for When collection if complete, this will be set to the collection end time
func (*TimeRangeCollectionState) Init ¶
func (t *TimeRangeCollectionState) Init(collectionTimeRange CollectionTimeRange, granularity time.Duration)
Init is called after loading a collection state compact the time ranges in case the previous collection was not completed successfully (normally the state is compacted before the final save but if the process was killed before that, we may have a collection state with multiple ranges that can be merged)
func (*TimeRangeCollectionState) IsEmpty ¶ added in v0.9.0
func (t *TimeRangeCollectionState) IsEmpty() bool
func (*TimeRangeCollectionState) MigrateFromLegacyState ¶ added in v0.9.0
func (t *TimeRangeCollectionState) MigrateFromLegacyState(bytes []byte) error
MigrateFromLegacyState attempts to migrate from a legacy collection state
func (*TimeRangeCollectionState) OnCollected ¶
func (t *TimeRangeCollectionState) OnCollected(id string, timestamp time.Time) error
func (*TimeRangeCollectionState) OnCollectionComplete ¶ added in v0.9.0
func (t *TimeRangeCollectionState) OnCollectionComplete() error
OnCollectionComplete sets the end time of the collect - this is called after a successful collection we the end time as we know that we have collected all data up to the collection 'to' time it sets the upper boundary (end time) of the active range to the upper boundary time of the collection time range
func (*TimeRangeCollectionState) ShouldCollect ¶
func (t *TimeRangeCollectionState) ShouldCollect(id string, timestamp time.Time) bool
func (*TimeRangeCollectionState) String ¶ added in v0.9.0
func (t *TimeRangeCollectionState) String() string
func (*TimeRangeCollectionState) Validate ¶ added in v0.9.0
func (t *TimeRangeCollectionState) Validate() error
type TimeRangeCollectionStateLegacy ¶ added in v0.9.0
type TimeRangeCollectionStateLegacy struct {
FirstEntryTime time.Time `json:"first_entry_time,omitempty"`
LastEntryTime time.Time `json:"last_entry_time,omitempty"`
EndTime time.Time `json:"end_time,omitempty"`
EndObjects map[string]struct{} `json:"end_objects"`
Granularity time.Duration `json:"granularity,omitempty"`
CollectionOrder CollectionOrder `json:"collection_order,omitempty"`
}
type TimeRangeObjectState ¶ added in v0.9.0
type TimeRangeObjectState struct {
// the time range for this collection state
TimeRange CollectionTimeRange `json:"time_range"`
// for upper boundary (i.e. the end granularity) we store the metadata
// whenever the upper boundary time changes, we must clear the map
// NOTE: for forwards collection, the end objects are at the To time
// for backwards collection, the end objects are at the From time
EndObjects map[string]struct{} `json:"end_objects"`
// the granularity of the file naming scheme - so we must keep track of object metadata
// this will depend on the template used to name the files
Granularity time.Duration `json:"granularity"`
}
TimeRangeObjectState is a struct that tracks time ranges and objects that have been collected it is used by TimeRangeCollectionState NOTE: we do not implement mutex locking here - it is assumed that the caller will lock the state before calling NOTE: this struct DOES NOT implement the CollectionState interface directly
func (*TimeRangeObjectState) Clone ¶ added in v0.9.0
func (s *TimeRangeObjectState) Clone() *TimeRangeObjectState
func (*TimeRangeObjectState) Compare ¶ added in v0.9.0
func (s *TimeRangeObjectState) Compare(other *TimeRangeObjectState) (bool, string)
Compare compares the current state with another TimeRangeObjectState and returns whether they are equal and a message describing any differences
func (*TimeRangeObjectState) GetFromTime ¶ added in v0.9.0
func (s *TimeRangeObjectState) GetFromTime() time.Time
func (*TimeRangeObjectState) GetGranularity ¶ added in v0.9.0
func (s *TimeRangeObjectState) GetGranularity() time.Duration
GetGranularity returns the granularity of the collection state
func (*TimeRangeObjectState) GetToTime ¶ added in v0.9.0
func (s *TimeRangeObjectState) GetToTime() time.Time
GetToTime returns the time we know have collected ALL data up until (we may have collected some data after this - within the granularity period
func (*TimeRangeObjectState) IsEmpty ¶ added in v0.9.0
func (s *TimeRangeObjectState) IsEmpty() bool
func (*TimeRangeObjectState) OnCollected ¶ added in v0.9.0
func (s *TimeRangeObjectState) OnCollected(id string, timestamp time.Time) error
OnCollected is called when an object has been collected - update the end time and end objects if needed Note: the object name is the full path to the object
func (*TimeRangeObjectState) ShouldCollect ¶ added in v0.9.0
func (s *TimeRangeObjectState) ShouldCollect(id string, timestamp time.Time) bool
ShouldCollect returns whether the object should be collected
func (*TimeRangeObjectState) String ¶ added in v0.9.0
func (s *TimeRangeObjectState) String() string
func (*TimeRangeObjectState) Validate ¶ added in v0.9.0
func (s *TimeRangeObjectState) Validate() error