collection_state

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 9 Imported by: 8

Documentation

Index

Constants

View Source
const CollectionStateStructVersion = 20250618
View Source
const MinArtifactGranularity = time.Hour * 24

Variables

This section is empty.

Functions

This section is empty.

Types

type ArtifactCollectionState

type ArtifactCollectionState struct {
	// map of trunk paths to collection state for that trunk
	// a trunk is a path segment that does not contain any time metadata
	// for example if the path is s3://bucket/folder1/folder2/2021/01/01/file.txt then the trunk is s3://bucket/folder1/folder2
	TrunkStates map[string]*TimeRangeCollectionState `json:"trunk_states,omitempty"`
	// contains filtered or unexported fields
}

ArtifactCollectionState is a collection state implementation for artifact sources it tracks the collection state for each trunk (a path segment that does not contain any time metadata) NOTE: in use, this will be wrapped in a SaveableCollectionState to allow saving to disk. This also implements locking for OnCollected and ShouldCollect methods so we do not need to do that here.

func (*ArtifactCollectionState) Clear added in v0.9.0

func (s *ArtifactCollectionState) Clear(timeRange DirectionalTimeRange)

func (*ArtifactCollectionState) Compare added in v0.9.0

Compare compares the current state with another ArtifactCollectionState and returns whether they are equal and a message describing any differences

func (*ArtifactCollectionState) GetFromTime added in v0.9.0

func (s *ArtifactCollectionState) GetFromTime() time.Time

func (*ArtifactCollectionState) GetToTime added in v0.9.0

func (s *ArtifactCollectionState) GetToTime() time.Time

GetToTime returns the time we know have collected ALL data up until (we may have collected some data after this - within the granularity period) return the earliest end time of all the trunk states

func (*ArtifactCollectionState) Init added in v0.9.0

func (s *ArtifactCollectionState) Init(collectionTimeRange DirectionalTimeRange, granularity time.Duration)

Init sets the filepath of the collection state and loads the state from the file if it exists

func (*ArtifactCollectionState) IsEmpty added in v0.9.0

func (s *ArtifactCollectionState) IsEmpty() bool

IsEmpty returns whether the collection state is empty

func (*ArtifactCollectionState) MigrateFromLegacyState added in v0.9.0

func (s *ArtifactCollectionState) MigrateFromLegacyState(bytes []byte) error

MigrateFromLegacyState attempts to migrate from a legacy collection state

func (*ArtifactCollectionState) OnCollected added in v0.9.0

func (s *ArtifactCollectionState) OnCollected(id string, timestamp time.Time) error

OnCollected is called when an object has been collected - update our end time and end objects if needed

func (*ArtifactCollectionState) OnCollectionComplete added in v0.9.0

func (s *ArtifactCollectionState) OnCollectionComplete() error

OnCollectionComplete sets the end time for the collection state - update all trunk states This is called after a successful collection to set the collection state end time to the to time of the collection

func (*ArtifactCollectionState) RegisterPath

func (s *ArtifactCollectionState) RegisterPath(path string, metadata map[string]string)

RegisterPath registers a path with the collection state - we determine whether this is a potential trunk (i.e. a path segment with no time metadata for which we need to track collection state separately) and if so, add it to the map of trunk states

func (*ArtifactCollectionState) ShouldCollect added in v0.9.0

func (s *ArtifactCollectionState) ShouldCollect(id string, timestamp time.Time) bool

ShouldCollect returns whether the object should be collected, based on the time metadata in the object

func (*ArtifactCollectionState) String added in v0.9.0

func (s *ArtifactCollectionState) String() any

func (*ArtifactCollectionState) TrimNilTrunkStates added in v0.9.2

func (s *ArtifactCollectionState) TrimNilTrunkStates()

TrimNilTrunkStates removes all entries from TrunkStates where the value is nil.

func (*ArtifactCollectionState) Validate added in v0.9.0

func (s *ArtifactCollectionState) Validate() error

type ArtifactCollectionStateLegacy added in v0.9.0

type ArtifactCollectionStateLegacy struct {
	TrunkStates      map[string]*TimeRangeCollectionStateLegacy `json:"trunk_states,omitempty"`
	LastModifiedTime time.Time                                  `json:"last_modified_time,omitempty"`
}

type CollectionOrder

type CollectionOrder int
const (
	CollectionOrderChronological CollectionOrder = iota
	CollectionOrderReverse
)

type CollectionState

type CollectionState interface {
	IsEmpty() bool
	Init(DirectionalTimeRange, time.Duration)
	ShouldCollect(id string, timestamp time.Time) bool
	OnCollected(id string, timestamp time.Time) error
	GetFromTime() time.Time
	// GetToTime returns the time 1 granularity period AFTER the last time we are sure we have collected ALL data for
	// e.g. if end time is 2023-10-10T00:00:00Z and granularity is 1 hour,
	// then we have collected all data up to and including 2023-10-09:23:00:00Z
	GetToTime() time.Time
	OnCollectionComplete() error
	MigrateFromLegacyState(bytes []byte) error
	Validate() error
	Clear(DirectionalTimeRange)
}

func NewArtifactCollectionState added in v0.9.0

func NewArtifactCollectionState() CollectionState

func NewTimeRangeCollectionState

func NewTimeRangeCollectionState() CollectionState

type CollectionStateWithPaths added in v0.9.0

type CollectionStateWithPaths interface {
	CollectionState
	RegisterPath(path string, metadata map[string]string)
}

type DirectionalTimeRange added in v0.9.0

type DirectionalTimeRange struct {
	// LowerBoundary is the chronological start time of the range
	// For forward collection, this is the start time (inclusive)
	// For reverse collection, this is the end time (exclusive)
	LowerBoundary time.Time `json:"lower_boundary"`
	// UpperBoundary is the chronological end time of the range
	// For forward collection, this is the end time (exclusive)
	// For reverse collection, this is the start time (inclusive)
	UpperBoundary   time.Time       `json:"upper_boundary"`
	CollectionOrder CollectionOrder `json:"collection_order"`
}

func (*DirectionalTimeRange) AfterEnd added in v0.9.0

func (t *DirectionalTimeRange) AfterEnd(timestamp time.Time) bool

AfterEnd returns whether the timestamp is outside the upper boundary time for chronological collection, this returns whether the time is AFTER the upperBoundary time for reverse collection, this returns whether the time is BEFORE the lowerBoundary time

func (*DirectionalTimeRange) AfterStart added in v0.9.0

func (t *DirectionalTimeRange) AfterStart(timestamp time.Time) bool

AfterStart returns whether the timestamp is after the start _in the direction of collection_ for chronological collection, this returns whether the time is AFTER the lowerBoundary time for reverse collection, this returns whether the time is BEFORE the upperBoundary time

func (*DirectionalTimeRange) BeforeEnd added in v0.9.0

func (t *DirectionalTimeRange) BeforeEnd(timestamp time.Time) bool

BeforeEnd returns whether the timestamp is before the upper boundary time (i.e. in the opposite to collection direction_) for chronological collection, this returns whether the time is BEFORE the upperBoundary time for reverse collection, this returns whether the time is AFTER the lowerBoundary time

func (*DirectionalTimeRange) BeforeStart added in v0.9.0

func (t *DirectionalTimeRange) BeforeStart(timestamp time.Time) bool

BeforeStart returns whether the timestamp is before the lower boundary time (i.e. in the opposite to collection direction_) for chronological collection, this returns whether the time is BEFORE the lowerBoundary time for reverse collection, this returns whether the time is AFTER the upperBoundary time

func (*DirectionalTimeRange) EndTime added in v0.9.0

func (t *DirectionalTimeRange) EndTime() time.Time

EndTime returns the the furthest time in the direction of collection i.e. if we are collecting forwards, the EndTime is the upperBoundary time if we are collecting backwards, the EndTime is the lowerBoundary time

func (*DirectionalTimeRange) IsSubsumedBy added in v0.9.0

func (t *DirectionalTimeRange) IsSubsumedBy(other DirectionalTimeRange) bool

IsSubsumedBy checks if this time range is completely contained within another time range

func (*DirectionalTimeRange) OnOrAfterEnd added in v0.9.0

func (t *DirectionalTimeRange) OnOrAfterEnd(timestamp time.Time) bool

OnOrAfterEnd returns whether the timestamp is on or after the end time (in the direction of collection) for chronological collection, this returns whether the time is ON or AFTER the upperBoundary time for reverse collection, this returns whether the time is ON or BEFORE the lowerBoundary time

func (*DirectionalTimeRange) OnOrAfterStart added in v0.9.0

func (t *DirectionalTimeRange) OnOrAfterStart(timestamp time.Time) bool

OnOrAfterStart returns whether the timestamp is on or after the lower boundary time (in the direction of collection) for chronological collection, this returns whether the time is ON or AFTER the lowerBoundary time for reverse collection, this returns whether the time is ON or BEFORE the upperBoundary time

func (*DirectionalTimeRange) OnOrBeforeEnd added in v0.9.0

func (t *DirectionalTimeRange) OnOrBeforeEnd(timestamp time.Time) bool
OnOrBeforeEnd returns whether the timestamp is on or before the end time (in the opposite direction of collection)

for chronological collection, this returns whether the time is ON or BEFORE the upperBoundary time for reverse collection, this returns whether the time is ON or AFTER the lowerBoundary time

func (*DirectionalTimeRange) OverlapsEnd added in v0.9.0

func (t *DirectionalTimeRange) OverlapsEnd(other DirectionalTimeRange) bool

OverlapsEnd returns whether our START overlaps the END of the other time range NOTE: - returns true if our START is within the other range (but not if either range subsumes the other) - returns false if either range completely contains the other

func (*DirectionalTimeRange) OverlapsStart added in v0.9.0

func (t *DirectionalTimeRange) OverlapsStart(other DirectionalTimeRange) bool

OverlapsStart returns whether our END overlaps the START of the other time range NOTE: - returns true if our END is after the other's start and before the other's end, but not if either range subsumes the other

func (*DirectionalTimeRange) StartTime added in v0.9.0

func (t *DirectionalTimeRange) StartTime() time.Time

StartTime returns the the furthest time in the opposite direction of collection i.e. if we are collecting forwards, the StartTime is the lowerBoundary time if we are collecting backwards, the StartTime is the upperBoundary time

func (*DirectionalTimeRange) Validate added in v0.9.0

func (t *DirectionalTimeRange) Validate() error

type ReverseOrderCollectionStateLegacy added in v0.9.0

type ReverseOrderCollectionStateLegacy struct {
	// collection of time ranges ordered by time
	TimeRanges []*TimeRangeCollectionStateLegacy `json:"time_ranges"`
}

type SaveableCollectionState added in v0.9.0

type SaveableCollectionState struct {
	StructVersion int `json:"struct_version"`

	State CollectionState `json:"state"`
	// contains filtered or unexported fields
}

SaveableCollectionState is a decorator for CollectionState that allows it to be saved to disk It wraps a CollectionState and provides methods to save the state to a JSON file It also provides a mutex to ensure thread safety when accessing the collection state

func NewSaveableCollectionState added in v0.9.0

func NewSaveableCollectionState(state CollectionState, path string) (*SaveableCollectionState, error)

func (*SaveableCollectionState) GetFromTime added in v0.9.0

func (s *SaveableCollectionState) GetFromTime() time.Time

func (*SaveableCollectionState) GetGranularity added in v0.9.0

func (s *SaveableCollectionState) GetGranularity() time.Duration

func (*SaveableCollectionState) GetToTime added in v0.9.0

func (s *SaveableCollectionState) GetToTime() time.Time

func (*SaveableCollectionState) Init added in v0.9.0

func (s *SaveableCollectionState) Init(collectionTimeRange DirectionalTimeRange, recollect bool, granularity time.Duration) error

Init initializes the SaveableCollectionState with a DirectionalTimeRange and a file path if the file exists, it loads the state from the file

func (*SaveableCollectionState) IsEmpty added in v0.9.0

func (s *SaveableCollectionState) IsEmpty() bool

func (*SaveableCollectionState) LoadFromFile added in v0.9.0

func (s *SaveableCollectionState) LoadFromFile(path string) error

func (*SaveableCollectionState) OnCollected added in v0.9.0

func (s *SaveableCollectionState) OnCollected(id string, timestamp time.Time) error

func (*SaveableCollectionState) OnCollectionComplete added in v0.9.0

func (s *SaveableCollectionState) OnCollectionComplete() error

func (*SaveableCollectionState) RegisterPath added in v0.9.0

func (s *SaveableCollectionState) RegisterPath(path string, metadata map[string]string)

func (*SaveableCollectionState) Save added in v0.9.0

func (s *SaveableCollectionState) Save() error

Save serializes the underlying collection state to JSON and writes it to the file specified by jsonPath. NOTE: This method locks the mutex to ensure thread safety.

func (*SaveableCollectionState) ShouldCollect added in v0.9.0

func (s *SaveableCollectionState) ShouldCollect(id string, timestamp time.Time) bool

type TimeRangeCollectionState

type TimeRangeCollectionState struct {
	TimeRanges  []*TimeRangeObjectState `json:"time_ranges"`
	Granularity time.Duration           `json:"granularity"`
	Order       CollectionOrder         `json:"order"`
	// contains filtered or unexported fields
}

func NewTimeRangeCollectionStateFromLegacy added in v0.9.0

func NewTimeRangeCollectionStateFromLegacy(legacy *TimeRangeCollectionStateLegacy) *TimeRangeCollectionState

NewTimeRangeCollectionStateFromLegacy constructs a new TimeRangeCollectionState from a legacy state

func (*TimeRangeCollectionState) Clear

func (t *TimeRangeCollectionState) Clear(clearRange DirectionalTimeRange)

Clear updates the state clear any entries for the given time range.

func (*TimeRangeCollectionState) Compare added in v0.9.0

Compare compares the current state with another TimeRangeCollectionState and returns whether they are equal and a message describing any differences

func (*TimeRangeCollectionState) GetFromTime added in v0.9.0

func (t *TimeRangeCollectionState) GetFromTime() time.Time

GetFromTime returns the earliest time we have data for

func (*TimeRangeCollectionState) GetGranularity added in v0.9.0

func (t *TimeRangeCollectionState) GetGranularity() time.Duration

func (*TimeRangeCollectionState) GetToTime added in v0.9.0

func (t *TimeRangeCollectionState) GetToTime() time.Time

GetToTime returns the time we have data UNTIL (exclusive) i.e. the end time is the granularity period AFTER thew time we are confident we have all data for When collection if complete, this will be set to the collection end time

func (*TimeRangeCollectionState) Init

func (t *TimeRangeCollectionState) Init(collectionTimeRange DirectionalTimeRange, granularity time.Duration)

Init is called after loading a collection state compact the time ranges in case the previous collection was not completed successfully (normally the state is compacted before the final save but if the process was killed before that, we may have a collection state with multiple ranges that can be merged)

func (*TimeRangeCollectionState) IsEmpty added in v0.9.0

func (t *TimeRangeCollectionState) IsEmpty() bool

func (*TimeRangeCollectionState) MigrateFromLegacyState added in v0.9.0

func (t *TimeRangeCollectionState) MigrateFromLegacyState(bytes []byte) error

MigrateFromLegacyState attempts to migrate from a legacy collection state

func (*TimeRangeCollectionState) OnCollected

func (t *TimeRangeCollectionState) OnCollected(id string, timestamp time.Time) error

func (*TimeRangeCollectionState) OnCollectionComplete added in v0.9.0

func (t *TimeRangeCollectionState) OnCollectionComplete() error

OnCollectionComplete sets the end time of the collect - this is called after a successful collection we the end time as we know that we have collected all data up to the collection 'to' time it sets the upper boundary (end time) of the active range to the upper boundary time of the collection time range

func (*TimeRangeCollectionState) ShouldCollect

func (t *TimeRangeCollectionState) ShouldCollect(id string, timestamp time.Time) bool

func (*TimeRangeCollectionState) String added in v0.9.0

func (t *TimeRangeCollectionState) String() string

func (*TimeRangeCollectionState) Validate added in v0.9.0

func (t *TimeRangeCollectionState) Validate() error

type TimeRangeCollectionStateLegacy added in v0.9.0

type TimeRangeCollectionStateLegacy struct {
	FirstEntryTime  time.Time           `json:"first_entry_time,omitempty"`
	LastEntryTime   time.Time           `json:"last_entry_time,omitempty"`
	EndTime         time.Time           `json:"end_time,omitempty"`
	EndObjects      map[string]struct{} `json:"end_objects"`
	Granularity     time.Duration       `json:"granularity,omitempty"`
	CollectionOrder CollectionOrder     `json:"collection_order,omitempty"`
}

type TimeRangeObjectState added in v0.9.0

type TimeRangeObjectState struct {
	// the time range for this collection state
	TimeRange DirectionalTimeRange `json:"time_range"`

	// for the end time (i.e. the end granularity period) we store the metadata
	// whenever the upper boundary time changes, we must clear the map
	// NOTE: for forwards collection, the end objects are at the UpperBoundary time
	// for backwards collection, the end objects are at the LowerBoundary time
	EndObjects map[string]struct{} `json:"end_objects"`

	// the granularity of the file naming scheme - so we must keep track of object metadata
	// this will depend on the template used to name the files
	Granularity time.Duration `json:"granularity"`
}

TimeRangeObjectState is a struct that tracks time ranges and objects that have been collected it is used by TimeRangeCollectionState NOTE: we do not implement mutex locking here - it is assumed that the caller will lock the state before calling NOTE: this struct DOES NOT implement the CollectionState interface directly

func (*TimeRangeObjectState) Clone added in v0.9.0

func (*TimeRangeObjectState) Compare added in v0.9.0

func (s *TimeRangeObjectState) Compare(other *TimeRangeObjectState) (bool, string)

Compare compares the current state with another TimeRangeObjectState and returns whether they are equal and a message describing any differences

func (*TimeRangeObjectState) GetFromTime added in v0.9.0

func (s *TimeRangeObjectState) GetFromTime() time.Time

func (*TimeRangeObjectState) GetGranularity added in v0.9.0

func (s *TimeRangeObjectState) GetGranularity() time.Duration

GetGranularity returns the granularity of the collection state

func (*TimeRangeObjectState) GetToTime added in v0.9.0

func (s *TimeRangeObjectState) GetToTime() time.Time

GetToTime returns the time we know have collected ALL data up until (we may have collected some data after this - within the granularity period

func (*TimeRangeObjectState) IsEmpty added in v0.9.0

func (s *TimeRangeObjectState) IsEmpty() bool

func (*TimeRangeObjectState) OnCollected added in v0.9.0

func (s *TimeRangeObjectState) OnCollected(id string, timestamp time.Time) error

OnCollected is called when an object has been collected - update the end time and end objects if needed Note: the object name is the full path to the object

func (*TimeRangeObjectState) ShouldCollect added in v0.9.0

func (s *TimeRangeObjectState) ShouldCollect(id string, timestamp time.Time) bool

ShouldCollect returns whether the object should be collected

func (*TimeRangeObjectState) String added in v0.9.0

func (s *TimeRangeObjectState) String() string

func (*TimeRangeObjectState) Validate added in v0.9.0

func (s *TimeRangeObjectState) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL