Documentation
¶
Overview ¶
Package cloudwatch provides functionality to collect logs from AWS CloudWatch Log Groups
This package enables the collection of log events from AWS CloudWatch log groups, supporting incremental collection, filtering, and batching for efficient processing.
Package cloudwatch provides functionality to collect logs from AWS CloudWatch
Index ¶
- Constants
- func NewCloudWatchLogGroupCollectionState() collection_state.CollectionState
- type AwsCloudWatchLogGroupSource
- type AwsCloudWatchLogGroupSourceConfig
- type CloudWatchLogGroupCollectionState
- func (s *CloudWatchLogGroupCollectionState) Clear(timeRange collection_state.DirectionalTimeRange)
- func (s *CloudWatchLogGroupCollectionState) GetEndTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetEndTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchLogGroupCollectionState) GetFromTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetStartTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetStartTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchLogGroupCollectionState) GetToTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) Init(timeRange collection_state.DirectionalTimeRange, granularity time.Duration)
- func (s *CloudWatchLogGroupCollectionState) IsEmpty() bool
- func (s *CloudWatchLogGroupCollectionState) MigrateFromLegacyState(bytes []byte) error
- func (s *CloudWatchLogGroupCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
- func (s *CloudWatchLogGroupCollectionState) OnCollectionComplete() error
- func (s *CloudWatchLogGroupCollectionState) SetConfig(config *AwsCloudWatchLogGroupSourceConfig)
- func (s *CloudWatchLogGroupCollectionState) SetEndTime(endTime time.Time)
- func (s *CloudWatchLogGroupCollectionState) ShouldCollect(logStreamName string, timestamp time.Time) bool
- func (s *CloudWatchLogGroupCollectionState) Validate() error
- type CloudWatchLogGroupCollectionStateLegacy
Constants ¶
const (
// AwsCloudwatchLogGroupSourceIdentifier is the unique identifier for the CloudWatch log group source
AwsCloudwatchLogGroupSourceIdentifier = "aws_cloudwatch_log_group"
)
Variables ¶
This section is empty.
Functions ¶
func NewCloudWatchLogGroupCollectionState ¶
func NewCloudWatchLogGroupCollectionState() collection_state.CollectionState
NewCloudWatchLogGroupCollectionState creates a new CloudWatchCollectionState instance. It initializes an empty map for log streams and sets the initial modification time.
Types ¶
type AwsCloudWatchLogGroupSource ¶
type AwsCloudWatchLogGroupSource struct {
// Embeds the base RowSourceImpl with CloudWatch-specific config and AWS connection.
row_source.RowSourceImpl[*AwsCloudWatchLogGroupSourceConfig, *config.AwsConnection]
// contains filtered or unexported fields
}
AwsCloudWatchLogGroupSource is responsible for collecting events from log streams within a CloudWatch log group. It implements the RowSource interface and manages collection state to support incremental and efficient log collection.
func (*AwsCloudWatchLogGroupSource) Collect ¶
func (s *AwsCloudWatchLogGroupSource) Collect(ctx context.Context) error
Collect retrieves log events from CloudWatch log streams within the specified time range.
This function is responsible for collecting log events from all relevant log streams in the configured CloudWatch log group. The process includes:
- Retrieving all log streams that match the configuration (optionally filtered by name/pattern).
- Batching log streams to efficiently query events in groups (up to 100 at a time).
- For each batch, querying CloudWatch Logs for events within the desired time window.
- Sorting and processing each event, skipping already-collected events based on collection state.
- Enriching and forwarding each new event for downstream processing.
- Updating the collection state to support incremental collection and avoid duplicates.
- Aggregating and returning any errors encountered during the process.
Returns an error if any step fails, or if errors are encountered during log collection.
func (*AwsCloudWatchLogGroupSource) Identifier ¶
func (s *AwsCloudWatchLogGroupSource) Identifier() string
Identifier returns the unique identifier for this source type, used in the plugin system.
func (*AwsCloudWatchLogGroupSource) Init ¶
func (s *AwsCloudWatchLogGroupSource) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error
Init sets up the CloudWatch log group source with the provided parameters and options. It initializes the collection state, AWS client, and validates the configuration. If a specific start time is provided, it clears the previous collection state to force recollection.
type AwsCloudWatchLogGroupSourceConfig ¶
type AwsCloudWatchLogGroupSourceConfig struct {
// LogGroupName is the name of the CloudWatch log group to collect logs from (required)
LogGroupName string `hcl:"log_group_name"`
// LogStreamNames optionally filters log streams by their names. Supports wildcards (*).
// If not specified, logs from all available streams will be collected.
// Example: ["456789012345_CloudTrail_*", "123456789012_CloudTrail_us-east-1"]
LogStreamNames []string `hcl:"log_stream_names,optional"`
// Region specifies the AWS region where the log group exists
// If not provided, an error will be raised "region is required and cannot be empty".
Region *string `hcl:"region"`
}
AwsCloudWatchLogGroupSourceConfig defines the configuration parameters for collecting logs from AWS CloudWatch log groups. It specifies which log group to collect from, optionally filters log streams by prefix, and allows specifying the AWS region to connect to.
func (*AwsCloudWatchLogGroupSourceConfig) Identifier ¶
func (c *AwsCloudWatchLogGroupSourceConfig) Identifier() string
Identifier returns the unique identifier for this source type. This is used to identify the source type in the plugin system.
func (*AwsCloudWatchLogGroupSourceConfig) Validate ¶
func (c *AwsCloudWatchLogGroupSourceConfig) Validate() error
Validate checks if the configuration is valid. It ensures that the required LogGroupName field is provided and not empty.
type CloudWatchLogGroupCollectionState ¶
type CloudWatchLogGroupCollectionState struct {
// Map of log stream name to its time range collection state
LogStreams map[string]*collection_state.TimeRangeCollectionState `json:"log_streams"`
// Granularity defines the time resolution for collection state updates
Granularity time.Duration `json:"granularity,omitempty"`
// contains filtered or unexported fields
}
CloudWatchLogGroupCollectionState tracks collection state for multiple log streams within a CloudWatch log group. It maintains a map of log stream names to their individual time range collection states, allowing for incremental collection and resumption of collection from the last processed event.
func (*CloudWatchLogGroupCollectionState) Clear ¶
func (s *CloudWatchLogGroupCollectionState) Clear(timeRange collection_state.DirectionalTimeRange)
func (*CloudWatchLogGroupCollectionState) GetEndTime ¶
func (s *CloudWatchLogGroupCollectionState) GetEndTime() time.Time
GetEndTime returns an empty time.Time since global end time is not tracked Individual log stream end times are tracked separately
func (*CloudWatchLogGroupCollectionState) GetEndTimeForStream ¶
func (s *CloudWatchLogGroupCollectionState) GetEndTimeForStream(logStreamName string) time.Time
GetEndTimeForStream returns the end time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchLogGroupCollectionState) GetFromTime ¶
func (s *CloudWatchLogGroupCollectionState) GetFromTime() time.Time
GetFromTime returns the earliest start time across all log streams. This represents the earliest point in time from which we have collected events.
func (*CloudWatchLogGroupCollectionState) GetStartTime ¶
func (s *CloudWatchLogGroupCollectionState) GetStartTime() time.Time
GetStartTime returns an empty time.Time since global start time is not tracked Individual log stream start times are tracked separately
func (*CloudWatchLogGroupCollectionState) GetStartTimeForStream ¶
func (s *CloudWatchLogGroupCollectionState) GetStartTimeForStream(logStreamName string) time.Time
GetStartTimeForStream returns the start time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchLogGroupCollectionState) GetToTime ¶
func (s *CloudWatchLogGroupCollectionState) GetToTime() time.Time
GetToTime returns the latest end time across all log streams. This represents the most recent point in time up to which we have collected events.
func (*CloudWatchLogGroupCollectionState) Init ¶
func (s *CloudWatchLogGroupCollectionState) Init(timeRange collection_state.DirectionalTimeRange, granularity time.Duration)
Init initializes the collection state with the provided configuration and state file path. If a state file exists at the given path, it loads and deserializes the state. If no file exists or the state is empty, it initializes a new empty state.
func (*CloudWatchLogGroupCollectionState) IsEmpty ¶
func (s *CloudWatchLogGroupCollectionState) IsEmpty() bool
IsEmpty returns true if no log streams have been collected yet
func (*CloudWatchLogGroupCollectionState) MigrateFromLegacyState ¶ added in v0.14.0
func (s *CloudWatchLogGroupCollectionState) MigrateFromLegacyState(bytes []byte) error
func (*CloudWatchLogGroupCollectionState) OnCollected ¶
func (s *CloudWatchLogGroupCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
OnCollected updates the collection state for a specific log stream when an event is processed. It creates a new time range state for the stream if it doesn't exist, and updates the last modified time to trigger a state save.
func (*CloudWatchLogGroupCollectionState) OnCollectionComplete ¶ added in v0.14.0
func (s *CloudWatchLogGroupCollectionState) OnCollectionComplete() error
func (*CloudWatchLogGroupCollectionState) SetConfig ¶
func (s *CloudWatchLogGroupCollectionState) SetConfig(config *AwsCloudWatchLogGroupSourceConfig)
SetConfig updates the CloudWatch source configuration
func (*CloudWatchLogGroupCollectionState) SetEndTime ¶
func (s *CloudWatchLogGroupCollectionState) SetEndTime(endTime time.Time)
SetEndTime is a no-op since global end time is not tracked
func (*CloudWatchLogGroupCollectionState) ShouldCollect ¶
func (s *CloudWatchLogGroupCollectionState) ShouldCollect(logStreamName string, timestamp time.Time) bool
ShouldCollect determines whether an event with the given timestamp should be collected for the specified log stream based on its time range state.
func (*CloudWatchLogGroupCollectionState) Validate ¶ added in v0.14.0
func (s *CloudWatchLogGroupCollectionState) Validate() error
type CloudWatchLogGroupCollectionStateLegacy ¶ added in v0.14.0
type CloudWatchLogGroupCollectionStateLegacy struct {
LogStreams map[string]*collection_state.TimeRangeCollectionStateLegacy `json:"log_streams,omitempty"`
LastModifiedTime time.Time `json:"last_modified_time,omitempty"`
}