Documentation
¶
Overview ¶
Package eventrepo contains service code for getting and managing cloudevent objects.
Index ¶
- Constants
- func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod
- type CloudEventTypeSummary
- type ObjectGetter
- type ObjectInfo
- type Presigner
- type S3ReaderAt
- type Service
- func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], ...) (cloudevent.RawEvent, error)
- func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error)
- func (s *Service) GetCloudEventTypeSummariesAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) ([]CloudEventTypeSummary, error)
- func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)
- func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, ...) (cloudevent.RawEvent, error)
- func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) GetObjectFromKey(ctx context.Context, key, bucketName string) ([]byte, error)
- func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)
- func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, ...) ([]cloudevent.RawEvent, error)
- func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], ...) ([]cloudevent.RawEvent, error)
- func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
- func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)
- func (s *Service) PresignBlobURL(ctx context.Context, key string) (string, error)
- func (s *Service) StoreObject(ctx context.Context, bucketName string, ...) error
Constants ¶
const BlobKeyPrefix = "cloudevent/blobs/"
BlobKeyPrefix is the S3 key prefix used for large binary blob objects. Keys with this prefix are served via presigned URL instead of inline in the response.
Variables ¶
This section is empty.
Functions ¶
func AdvancedSearchOptionsToQueryMod ¶
func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod
Types ¶
type CloudEventTypeSummary ¶ added in v0.1.3
type CloudEventTypeSummary struct {
Type string
Count uint64
FirstSeen time.Time
LastSeen time.Time
}
CloudEventTypeSummary holds per-type aggregate metadata for a subject.
type ObjectGetter ¶
type ObjectGetter interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}
ObjectGetter is an interface for getting an object from S3.
type ObjectInfo ¶
type ObjectInfo struct {
Key string
}
ObjectInfo is the information about the object in S3.
type Presigner ¶ added in v0.1.4
type Presigner interface {
PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
}
Presigner generates presigned S3 GET URLs.
type S3ReaderAt ¶ added in v0.0.23
type S3ReaderAt struct {
// contains filtered or unexported fields
}
S3ReaderAt implements io.ReaderAt by downloading the entire S3 object into memory on creation. This avoids multiple round-trips (HeadObject + range GETs) which is optimal for the expected file sizes (< 100 KB).
func NewS3ReaderAt ¶ added in v0.0.23
func NewS3ReaderAt(ctx context.Context, client ObjectGetter, bucket, key string) (*S3ReaderAt, error)
NewS3ReaderAt downloads the S3 object into memory and returns a ReaderAt. Rejects objects larger than maxObjectSize.
func (*S3ReaderAt) ReadAt ¶ added in v0.0.23
func (r *S3ReaderAt) ReadAt(p []byte, off int64) (int, error)
ReadAt implements io.ReaderAt by reading from the in-memory buffer.
func (*S3ReaderAt) Size ¶ added in v0.0.23
func (r *S3ReaderAt) Size() int64
Size returns the total size of the S3 object in bytes.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages and retrieves data messages from indexed objects in S3.
func New ¶
func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket string) *Service
New creates a new instance of Service.
func (*Service) GetCloudEventFromIndex ¶
func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], bucketName string) (cloudevent.RawEvent, error)
GetCloudEventFromIndex fetches and returns the cloud event for the given index.
func (*Service) GetCloudEventTypeSummaries ¶ added in v0.1.3
func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error)
GetCloudEventTypeSummaries returns per-type counts and time ranges for the given search options.
func (*Service) GetCloudEventTypeSummariesAdvanced ¶ added in v0.1.9
func (s *Service) GetCloudEventTypeSummariesAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) ([]CloudEventTypeSummary, error)
GetCloudEventTypeSummariesAdvanced returns event type summaries filtered by advanced search options.
func (*Service) GetLatestCloudEvent ¶
func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)
GetLatestCloudEvent fetches and returns the latest cloud event that matches the given options.
func (*Service) GetLatestCloudEventAdvanced ¶
func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.RawEvent, error)
GetLatestCloudEventAdvanced fetches and returns the latest cloud event that matches the given advanced options.
func (*Service) GetLatestIndex ¶
func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
GetLatestIndex returns the latest cloud event index that matches the given options.
func (*Service) GetLatestIndexAdvanced ¶
func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)
GetLatestIndexAdvanced returns the latest cloud event index that matches the given advanced options.
func (*Service) GetObjectFromKey ¶
GetObjectFromKey fetches and returns the raw object for the given key. Routes based on index_key format:
- If key contains "#": Parquet reference -- reads the event via SeekToRow and returns data.
- Otherwise: legacy S3 path -- fetches the entire object as before.
func (*Service) ListCloudEvents ¶
func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)
ListCloudEvents fetches and returns the cloud events that match the given options.
func (*Service) ListCloudEventsAdvanced ¶
func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.RawEvent, error)
ListCloudEventsAdvanced fetches and returns the cloud events that match the given advanced options.
func (*Service) ListCloudEventsFromIndexes ¶
func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], bucketName string) ([]cloudevent.RawEvent, error)
ListCloudEventsFromIndexes fetches and returns the cloud events for the given index. Parquet refs sharing the same object key are grouped so they share one S3ReaderAt (avoiding duplicate downloads). Groups and JSON fetches run concurrently.
func (*Service) ListIndexes ¶
func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
ListIndexes fetches and returns a list of index for cloud events that match the given options.
func (*Service) ListIndexesAdvanced ¶
func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)
ListIndexesAdvanced fetches and returns a list of index for cloud events that match the given advanced options.
func (*Service) ListObjectsFromKeys ¶
func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)
ListObjectsFromKeys fetches and returns the objects for the given keys concurrently.
func (*Service) PresignBlobURL ¶ added in v0.1.4
PresignBlobURL returns a short-lived presigned GET URL for the given S3 key in the parquet bucket.
func (*Service) StoreObject ¶
func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeader *cloudevent.CloudEventHeader, data []byte) error
StoreObject stores the given data in S3 with the given cloudevent header.