Documentation
¶
Overview ¶
Package cloudwatch provides functionality to collect logs from AWS CloudWatch Log Groups
Package cloudwatch provides functionality to collect logs from AWS CloudWatch Log Groups ¶
Package cloudwatch provides functionality to collect logs from AWS CloudWatch
Index ¶
- Constants
- func NewCloudWatchCollectionState() collection_state.CollectionState[*AwsCloudWatchSourceConfig]
- type AwsCloudWatchSource
- type AwsCloudWatchSourceConfig
- type CloudWatchCollectionState
- func (s *CloudWatchCollectionState) Clear()
- func (s *CloudWatchCollectionState) GetConfig() *AwsCloudWatchSourceConfig
- func (s *CloudWatchCollectionState) GetEndTime() time.Time
- func (s *CloudWatchCollectionState) GetEndTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchCollectionState) GetFromTime() time.Time
- func (s *CloudWatchCollectionState) GetGranularity() time.Duration
- func (s *CloudWatchCollectionState) GetStartTime() time.Time
- func (s *CloudWatchCollectionState) GetStartTimeForStream(logStreamName string) time.Time
- func (s *CloudWatchCollectionState) GetToTime() time.Time
- func (s *CloudWatchCollectionState) Init(config *AwsCloudWatchSourceConfig, path string) error
- func (s *CloudWatchCollectionState) IsEmpty() bool
- func (s *CloudWatchCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
- func (s *CloudWatchCollectionState) Save() error
- func (s *CloudWatchCollectionState) SetConfig(config *AwsCloudWatchSourceConfig)
- func (s *CloudWatchCollectionState) SetEndTime(endTime time.Time)
- func (s *CloudWatchCollectionState) SetGranularity(granularity time.Duration)
- func (s *CloudWatchCollectionState) ShouldCollect(id string, timestamp time.Time) bool
Constants ¶
const (
// AwsCloudwatchSourceIdentifier is the unique identifier for the CloudWatch log source
AwsCloudwatchSourceIdentifier = "aws_cloudwatch_log_group"
)
Variables ¶
This section is empty.
Functions ¶
func NewCloudWatchCollectionState ¶
func NewCloudWatchCollectionState() collection_state.CollectionState[*AwsCloudWatchSourceConfig]
NewCloudWatchCollectionState creates a new CloudWatchCollectionState instance. It initializes an empty map for log streams and sets the initial modification time.
Types ¶
type AwsCloudWatchSource ¶
type AwsCloudWatchSource struct {
// Embed the base RowSourceImpl with CloudWatch specific config and AWS connection
row_source.RowSourceImpl[*AwsCloudWatchSourceConfig, *config.AwsConnection]
// contains filtered or unexported fields
}
AwsCloudWatchSource is responsible for collection of events from log streams within a log group in AWS CloudWatch It implements the RowSource interface and manages the collection state to support incremental collection
func (*AwsCloudWatchSource) Collect ¶
func (s *AwsCloudWatchSource) Collect(ctx context.Context) error
Collect retrieves log events from CloudWatch log streams within the specified time range It handles pagination, maintains collection state, and processes events incrementally
func (*AwsCloudWatchSource) Identifier ¶
func (s *AwsCloudWatchSource) Identifier() string
Identifier returns the unique identifier for this source
func (*AwsCloudWatchSource) Init ¶
func (s *AwsCloudWatchSource) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error
Init initializes the CloudWatch source with the provided parameters and options It sets up the collection state, AWS client, and validates the configuration
type AwsCloudWatchSourceConfig ¶
type AwsCloudWatchSourceConfig struct {
// LogGroupName is the name of the CloudWatch log group to collect logs from (required)
LogGroupName string `hcl:"log_group_name"`
// LogStreamPrefix optionally filters log streams by their name prefix
LogStreamPrefix *string `hcl:"log_stream_prefix"`
// Region specifies the AWS region where the log group exists
// If not provided, defaults to us-east-1
Region *string `hcl:"region"`
}
AwsCloudWatchSourceConfig defines the configuration parameters for collecting logs from AWS CloudWatch. It specifies which log group to collect from, optionally filters log streams by prefix, and allows specifying the AWS region to connect to.
func (*AwsCloudWatchSourceConfig) Identifier ¶
func (c *AwsCloudWatchSourceConfig) Identifier() string
Identifier returns the unique identifier for this source type. This is used to identify the source type in the plugin system.
func (*AwsCloudWatchSourceConfig) Validate ¶
func (c *AwsCloudWatchSourceConfig) Validate() error
Validate checks if the configuration is valid. It ensures that the required LogGroupName field is provided and not empty.
type CloudWatchCollectionState ¶
type CloudWatchCollectionState struct {
// Map of log stream name to its time range collection state
LogStreams map[string]*collection_state.TimeRangeCollectionStateImpl `json:"log_streams"`
// Time when the collection state was last modified
LastModifiedTime time.Time `json:"last_modified_time,omitempty"`
// contains filtered or unexported fields
}
CloudWatchCollectionState tracks collection state for multiple log streams within a CloudWatch log group. It maintains a map of log stream names to their individual time range collection states, allowing for incremental collection and resumption of collection from the last processed event.
func (*CloudWatchCollectionState) Clear ¶
func (s *CloudWatchCollectionState) Clear()
Clear resets the collection state by removing all log stream states
func (*CloudWatchCollectionState) GetConfig ¶
func (s *CloudWatchCollectionState) GetConfig() *AwsCloudWatchSourceConfig
GetConfig returns the current CloudWatch source configuration
func (*CloudWatchCollectionState) GetEndTime ¶
func (s *CloudWatchCollectionState) GetEndTime() time.Time
GetEndTime returns an empty time.Time since global end time is not tracked Individual log stream end times are tracked separately
func (*CloudWatchCollectionState) GetEndTimeForStream ¶
func (s *CloudWatchCollectionState) GetEndTimeForStream(logStreamName string) time.Time
GetEndTimeForStream returns the end time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchCollectionState) GetFromTime ¶
func (s *CloudWatchCollectionState) GetFromTime() time.Time
GetFromTime returns the earliest start time across all log streams. This represents the earliest point in time from which we have collected events.
func (*CloudWatchCollectionState) GetGranularity ¶
func (s *CloudWatchCollectionState) GetGranularity() time.Duration
GetGranularity returns the time granularity for collection state tracking Uses a fixed granularity of one minute
func (*CloudWatchCollectionState) GetStartTime ¶
func (s *CloudWatchCollectionState) GetStartTime() time.Time
GetStartTime returns an empty time.Time since global start time is not tracked Individual log stream start times are tracked separately
func (*CloudWatchCollectionState) GetStartTimeForStream ¶
func (s *CloudWatchCollectionState) GetStartTimeForStream(logStreamName string) time.Time
GetStartTimeForStream returns the start time for a specific log stream. If the stream doesn't exist in the state, returns zero time.
func (*CloudWatchCollectionState) GetToTime ¶
func (s *CloudWatchCollectionState) GetToTime() time.Time
GetToTime returns the latest end time across all log streams. This represents the most recent point in time up to which we have collected events.
func (*CloudWatchCollectionState) Init ¶
func (s *CloudWatchCollectionState) Init(config *AwsCloudWatchSourceConfig, path string) error
Init initializes the collection state with the provided configuration and state file path. If a state file exists at the given path, it loads and deserializes the state. If no file exists or the state is empty, it initializes a new empty state.
func (*CloudWatchCollectionState) IsEmpty ¶
func (s *CloudWatchCollectionState) IsEmpty() bool
IsEmpty returns true if no log streams have been collected yet
func (*CloudWatchCollectionState) OnCollected ¶
func (s *CloudWatchCollectionState) OnCollected(logStreamName string, timestamp time.Time) error
OnCollected updates the collection state for a specific log stream when an event is processed. It creates a new time range state for the stream if it doesn't exist, and updates the last modified time to trigger a state save.
func (*CloudWatchCollectionState) Save ¶
func (s *CloudWatchCollectionState) Save() error
Save persists the current collection state to disk if it has been modified since the last save. The state is serialized as JSON and written to the configured file path. It creates any necessary directories and updates the last save time on success.
func (*CloudWatchCollectionState) SetConfig ¶
func (s *CloudWatchCollectionState) SetConfig(config *AwsCloudWatchSourceConfig)
SetConfig updates the CloudWatch source configuration
func (*CloudWatchCollectionState) SetEndTime ¶
func (s *CloudWatchCollectionState) SetEndTime(endTime time.Time)
SetEndTime is a no-op since global end time is not tracked
func (*CloudWatchCollectionState) SetGranularity ¶
func (s *CloudWatchCollectionState) SetGranularity(granularity time.Duration)
SetGranularity is a no-op since we use a fixed granularity
func (*CloudWatchCollectionState) ShouldCollect ¶
func (s *CloudWatchCollectionState) ShouldCollect(id string, timestamp time.Time) bool
ShouldCollect determines whether an event with the given timestamp should be collected for the specified log stream based on its time range state.