Documentation
¶
Overview ¶
Package eventrepo contains service code for getting and managing cloudevent objects.
Index ¶
- func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod
- type ObjectGetter
- type ObjectInfo
- type S3ReaderAt
- type Service
- func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], ...) (cloudevent.RawEvent, error)
- func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)
- func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, ...) (cloudevent.RawEvent, error)
- func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) GetObjectFromKey(ctx context.Context, key, bucketName string) ([]byte, error)
- func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)
- func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, ...) ([]cloudevent.RawEvent, error)
- func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], ...) ([]cloudevent.RawEvent, error)
- func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)
- func (s *Service) StoreObject(ctx context.Context, bucketName string, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AdvancedSearchOptionsToQueryMod ¶
func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod
Types ¶
type ObjectGetter ¶
type ObjectGetter interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}
ObjectGetter is an interface for getting an object from S3.
type ObjectInfo ¶
type ObjectInfo struct {
Key string
}
ObjectInfo is the information about the object in S3.
type S3ReaderAt ¶ added in v0.0.23
type S3ReaderAt struct {
// contains filtered or unexported fields
}
S3ReaderAt implements io.ReaderAt by downloading the entire S3 object into memory on creation. This avoids multiple round-trips (HeadObject + range GETs) which is optimal for the expected file sizes (< 100 KB).
func NewS3ReaderAt ¶ added in v0.0.23
func NewS3ReaderAt(ctx context.Context, client ObjectGetter, bucket, key string) (*S3ReaderAt, error)
NewS3ReaderAt downloads the S3 object into memory and returns a ReaderAt. Rejects objects larger than maxObjectSize.
func (*S3ReaderAt) ReadAt ¶ added in v0.0.23
func (r *S3ReaderAt) ReadAt(p []byte, off int64) (int, error)
ReadAt implements io.ReaderAt by reading from the in-memory buffer.
func (*S3ReaderAt) Size ¶ added in v0.0.23
func (r *S3ReaderAt) Size() int64
Size returns the total size of the S3 object in bytes.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages and retrieves data messages from indexed objects in S3.
func New ¶
func New(chConn clickhouse.Conn, objGetter ObjectGetter, parquetBucket string) *Service
New creates a new instance of Service.
func (*Service) GetCloudEventFromIndex ¶
func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], bucketName string) (cloudevent.RawEvent, error)
GetCloudEventFromIndex fetches and returns the cloud event for the given index.
func (*Service) GetLatestCloudEvent ¶
func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)
GetLatestCloudEvent fetches and returns the latest cloud event that matches the given options.
func (*Service) GetLatestCloudEventAdvanced ¶
func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.RawEvent, error)
GetLatestCloudEventAdvanced fetches and returns the latest cloud event that matches the given advanced options.
func (*Service) GetLatestIndex ¶
func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
GetLatestIndex returns the latest cloud event index that matches the given options.
func (*Service) GetLatestIndexAdvanced ¶
func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
GetLatestIndexAdvanced returns the latest cloud event index that matches the given advanced options.
func (*Service) GetObjectFromKey ¶
GetObjectFromKey fetches and returns the raw object for the given key. Routes based on index_key format:
- If key contains "#": Parquet reference -- reads the event via SeekToRow and returns data.
- Otherwise: legacy S3 path -- fetches the entire object as before.
func (*Service) ListCloudEvents ¶
func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)
ListCloudEvents fetches and returns the cloud events that match the given options.
func (*Service) ListCloudEventsAdvanced ¶
func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.RawEvent, error)
ListCloudEventsAdvanced fetches and returns the cloud events that match the given advanced options.
func (*Service) ListCloudEventsFromIndexes ¶
func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], bucketName string) ([]cloudevent.RawEvent, error)
ListCloudEventsFromIndexes fetches and returns the cloud events for the given index. Parquet refs sharing the same object key are grouped so they share one S3ReaderAt (avoiding duplicate downloads). Groups and JSON fetches run concurrently.
func (*Service) ListIndexes ¶
func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
ListIndexes fetches and returns a list of index for cloud events that match the given options.
func (*Service) ListIndexesAdvanced ¶
func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
ListIndexesAdvanced fetches and returns a list of index for cloud events that match the given advanced options.
func (*Service) ListObjectsFromKeys ¶
func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)
ListObjectsFromKeys fetches and returns the objects for the given keys concurrently.
func (*Service) StoreObject ¶
func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeader *cloudevent.CloudEventHeader, data []byte) error
StoreObject stores the given data in S3 with the given cloudevent header.