Documentation
¶
Overview ¶
Package cloudwatch provides functionality to collect logs from AWS CloudWatch Log Groups
Package cloudwatch provides functionality to collect logs from AWS CloudWatch Log Groups ¶
Package cloudwatch provides functionality to collect logs from AWS CloudWatch
Index ¶
- Constants
- func NewCloudWatchLogGroupCollectionState() collection_state.CollectionState[*AwsCloudWatchLogGroupSourceConfig]
- type AwsCloudWatchLogGroupSource
- type AwsCloudWatchLogGroupSourceConfig
- type CloudWatchLogGroupCollectionState
- func (s *CloudWatchLogGroupCollectionState) Clear()
- func (s *CloudWatchLogGroupCollectionState) GetConfig() *AwsCloudWatchLogGroupSourceConfig
- func (s *CloudWatchLogGroupCollectionState) GetEndTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetEndTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchLogGroupCollectionState) GetFromTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetGranularity() time.Duration
- func (s *CloudWatchLogGroupCollectionState) GetStartTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) GetStartTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchLogGroupCollectionState) GetToTime() time.Time
- func (s *CloudWatchLogGroupCollectionState) Init(config *AwsCloudWatchLogGroupSourceConfig, path string) error
- func (s *CloudWatchLogGroupCollectionState) IsEmpty() bool
- func (s *CloudWatchLogGroupCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
- func (s *CloudWatchLogGroupCollectionState) Save() error
- func (s *CloudWatchLogGroupCollectionState) SetConfig(config *AwsCloudWatchLogGroupSourceConfig)
- func (s *CloudWatchLogGroupCollectionState) SetEndTime(endTime time.Time)
- func (s *CloudWatchLogGroupCollectionState) SetGranularity(granularity time.Duration)
- func (s *CloudWatchLogGroupCollectionState) ShouldCollect(id string, timestamp time.Time) bool
Constants ¶
const (
// AwsCloudwatchLogGroupSourceIdentifier is the unique identifier for the CloudWatch log group source
AwsCloudwatchLogGroupSourceIdentifier = "aws_cloudwatch_log_group"
)
Variables ¶
This section is empty.
Functions ¶
func NewCloudWatchLogGroupCollectionState ¶
func NewCloudWatchLogGroupCollectionState() collection_state.CollectionState[*AwsCloudWatchLogGroupSourceConfig]
NewCloudWatchLogGroupCollectionState creates a new CloudWatchCollectionState instance. It initializes an empty map for log streams and sets the initial modification time.
Types ¶
type AwsCloudWatchLogGroupSource ¶
type AwsCloudWatchLogGroupSource struct {
// Embed the base RowSourceImpl with CloudWatch specific config and AWS connection
row_source.RowSourceImpl[*AwsCloudWatchLogGroupSourceConfig, *config.AwsConnection]
// contains filtered or unexported fields
}
AwsCloudWatchLogGroupSource is responsible for collection of events from log streams within a log group in AWS CloudWatch It implements the RowSource interface and manages the collection state to support incremental collection
func (*AwsCloudWatchLogGroupSource) Collect ¶
func (s *AwsCloudWatchLogGroupSource) Collect(ctx context.Context) error
Collect retrieves log events from CloudWatch log streams within the specified time range It handles pagination, maintains collection state, and processes events incrementally
func (*AwsCloudWatchLogGroupSource) Identifier ¶
func (s *AwsCloudWatchLogGroupSource) Identifier() string
Identifier returns the unique identifier for this source
func (*AwsCloudWatchLogGroupSource) Init ¶
func (s *AwsCloudWatchLogGroupSource) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error
Init initializes the CloudWatch source with the provided parameters and options It sets up the collection state, AWS client, and validates the configuration
type AwsCloudWatchLogGroupSourceConfig ¶
type AwsCloudWatchLogGroupSourceConfig struct {
// LogGroupName is the name of the CloudWatch log group to collect logs from (required)
LogGroupName string `hcl:"log_group_name"`
// LogStreamNames optionally filters log streams by their names. Supports wildcards (*).
// If not specified, logs from all available streams will be collected.
// Example: ["456789012345_CloudTrail_*", "123456789012_CloudTrail_us-east-1"]
LogStreamNames []string `hcl:"log_stream_names,optional"`
// Region specifies the AWS region where the log group exists
// If not provided, an error will be raised "region is required and cannot be empty".
Region *string `hcl:"region"`
}
AwsCloudWatchLogGroupSourceConfig defines the configuration parameters for collecting logs from AWS CloudWatch log groups. It specifies which log group to collect from, optionally filters log streams by prefix, and allows specifying the AWS region to connect to.
func (*AwsCloudWatchLogGroupSourceConfig) Identifier ¶
func (c *AwsCloudWatchLogGroupSourceConfig) Identifier() string
Identifier returns the unique identifier for this source type. This is used to identify the source type in the plugin system.
func (*AwsCloudWatchLogGroupSourceConfig) Validate ¶
func (c *AwsCloudWatchLogGroupSourceConfig) Validate() error
Validate checks if the configuration is valid. It ensures that the required LogGroupName field is provided and not empty.
type CloudWatchLogGroupCollectionState ¶
type CloudWatchLogGroupCollectionState struct {
// Map of log stream name to its time range collection state
LogStreams map[string]*collection_state.TimeRangeCollectionStateImpl `json:"log_streams"`
// Time when the collection state was last modified
LastModifiedTime time.Time `json:"last_modified_time,omitempty"`
// contains filtered or unexported fields
}
CloudWatchLogGroupCollectionState tracks collection state for multiple log streams within a CloudWatch log group. It maintains a map of log stream names to their individual time range collection states, allowing for incremental collection and resumption of collection from the last processed event.
func (*CloudWatchLogGroupCollectionState) Clear ¶
func (s *CloudWatchLogGroupCollectionState) Clear()
Clear resets the collection state by removing all log stream states
func (*CloudWatchLogGroupCollectionState) GetConfig ¶
func (s *CloudWatchLogGroupCollectionState) GetConfig() *AwsCloudWatchLogGroupSourceConfig
GetConfig returns the current CloudWatch source configuration
func (*CloudWatchLogGroupCollectionState) GetEndTime ¶
func (s *CloudWatchLogGroupCollectionState) GetEndTime() time.Time
GetEndTime returns an empty time.Time since global end time is not tracked Individual log stream end times are tracked separately
func (*CloudWatchLogGroupCollectionState) GetEndTimeForStream ¶
func (s *CloudWatchLogGroupCollectionState) GetEndTimeForStream(logStreamName string) time.Time
GetEndTimeForStream returns the end time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchLogGroupCollectionState) GetFromTime ¶
func (s *CloudWatchLogGroupCollectionState) GetFromTime() time.Time
GetFromTime returns the earliest start time across all log streams. This represents the earliest point in time from which we have collected events.
func (*CloudWatchLogGroupCollectionState) GetGranularity ¶
func (s *CloudWatchLogGroupCollectionState) GetGranularity() time.Duration
GetGranularity returns the time granularity for collection state tracking Uses a fixed granularity of one minute
func (*CloudWatchLogGroupCollectionState) GetStartTime ¶
func (s *CloudWatchLogGroupCollectionState) GetStartTime() time.Time
GetStartTime returns an empty time.Time since global start time is not tracked Individual log stream start times are tracked separately
func (*CloudWatchLogGroupCollectionState) GetStartTimeForStream ¶
func (s *CloudWatchLogGroupCollectionState) GetStartTimeForStream(logStreamName string) time.Time
GetStartTimeForStream returns the start time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchLogGroupCollectionState) GetToTime ¶
func (s *CloudWatchLogGroupCollectionState) GetToTime() time.Time
GetToTime returns the latest end time across all log streams. This represents the most recent point in time up to which we have collected events.
func (*CloudWatchLogGroupCollectionState) Init ¶
func (s *CloudWatchLogGroupCollectionState) Init(config *AwsCloudWatchLogGroupSourceConfig, path string) error
Init initializes the collection state with the provided configuration and state file path. If a state file exists at the given path, it loads and deserializes the state. If no file exists or the state is empty, it initializes a new empty state.
func (*CloudWatchLogGroupCollectionState) IsEmpty ¶
func (s *CloudWatchLogGroupCollectionState) IsEmpty() bool
IsEmpty returns true if no log streams have been collected yet
func (*CloudWatchLogGroupCollectionState) OnCollected ¶
func (s *CloudWatchLogGroupCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
OnCollected updates the collection state for a specific log stream when an event is processed. It creates a new time range state for the stream if it doesn't exist, and updates the last modified time to trigger a state save.
func (*CloudWatchLogGroupCollectionState) Save ¶
func (s *CloudWatchLogGroupCollectionState) Save() error
Save persists the current collection state to disk if it has been modified since the last save. The state is serialized as JSON and written to the configured file path. It creates any necessary directories and updates the last save time on success.
func (*CloudWatchLogGroupCollectionState) SetConfig ¶
func (s *CloudWatchLogGroupCollectionState) SetConfig(config *AwsCloudWatchLogGroupSourceConfig)
SetConfig updates the CloudWatch source configuration
func (*CloudWatchLogGroupCollectionState) SetEndTime ¶
func (s *CloudWatchLogGroupCollectionState) SetEndTime(endTime time.Time)
SetEndTime is a no-op since global end time is not tracked
func (*CloudWatchLogGroupCollectionState) SetGranularity ¶
func (s *CloudWatchLogGroupCollectionState) SetGranularity(granularity time.Duration)
SetGranularity is a no-op since we use a fixed granularity
func (*CloudWatchLogGroupCollectionState) ShouldCollect ¶
func (s *CloudWatchLogGroupCollectionState) ShouldCollect(id string, timestamp time.Time) bool
ShouldCollect determines whether an event with the given timestamp should be collected for the specified log stream based on its time range state.