eventrepo

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package eventrepo contains service code for getting and managing cloudevent objects.

Index

Constants

View Source
const BlobKeyPrefix = "cloudevent/blobs/"

BlobKeyPrefix is the S3 key prefix used for large binary blob objects. Keys with this prefix are served via presigned URL instead of inline in the response.

Variables

This section is empty.

Functions

func AdvancedSearchOptionsToQueryMod

func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod

Types

type CloudEventTypeSummary added in v0.1.3

type CloudEventTypeSummary struct {
	Type      string
	Count     uint64
	FirstSeen time.Time
	LastSeen  time.Time
}

CloudEventTypeSummary holds per-type aggregate metadata for a subject.

type ObjectGetter

type ObjectGetter interface {
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

ObjectGetter is an interface for getting an object from S3.

type ObjectInfo

type ObjectInfo struct {
	Key string
}

ObjectInfo is the information about the object in S3.

type Presigner added in v0.1.4

type Presigner interface {
	PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
}

Presigner generates presigned S3 GET URLs.

type S3ReaderAt added in v0.0.23

type S3ReaderAt struct {
	// contains filtered or unexported fields
}

S3ReaderAt implements io.ReaderAt by downloading the entire S3 object into memory on creation. This avoids multiple round-trips (HeadObject + range GETs) which is optimal for the expected file sizes (< 100 KB).

func NewS3ReaderAt added in v0.0.23

func NewS3ReaderAt(ctx context.Context, client ObjectGetter, bucket, key string) (*S3ReaderAt, error)

NewS3ReaderAt downloads the S3 object into memory and returns a ReaderAt. Rejects objects larger than maxObjectSize.

func (*S3ReaderAt) ReadAt added in v0.0.23

func (r *S3ReaderAt) ReadAt(p []byte, off int64) (int, error)

ReadAt implements io.ReaderAt by reading from the in-memory buffer.

func (*S3ReaderAt) Size added in v0.0.23

func (r *S3ReaderAt) Size() int64

Size returns the total size of the S3 object in bytes.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages and retrieves data messages from indexed objects in S3.

func New

func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket string) *Service

New creates a new instance of Service.

func (*Service) GetCloudEventFromIndex

func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], bucketName string) (cloudevent.RawEvent, error)

GetCloudEventFromIndex fetches and returns the cloud event for the given index.

func (*Service) GetCloudEventTypeSummaries added in v0.1.3

func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error)

GetCloudEventTypeSummaries returns per-type counts and time ranges for the given search options.

func (*Service) GetCloudEventTypeSummariesAdvanced added in v0.1.9

func (s *Service) GetCloudEventTypeSummariesAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) ([]CloudEventTypeSummary, error)

GetCloudEventTypeSummariesAdvanced returns event type summaries filtered by advanced search options.

func (*Service) GetLatestCloudEvent

func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)

GetLatestCloudEvent fetches and returns the latest cloud event that matches the given options.

func (*Service) GetLatestCloudEventAdvanced

func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.RawEvent, error)

GetLatestCloudEventAdvanced fetches and returns the latest cloud event that matches the given advanced options.

func (*Service) GetLatestIndex

func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndex returns the latest cloud event index that matches the given options.

func (*Service) GetLatestIndexAdvanced

func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndexAdvanced returns the latest cloud event index that matches the given advanced options.

func (*Service) GetObjectFromKey

func (s *Service) GetObjectFromKey(ctx context.Context, key, bucketName string) ([]byte, error)

GetObjectFromKey fetches and returns the raw object for the given key. Routes based on index_key format:

  • If key contains "#": Parquet reference -- reads the event via SeekToRow and returns data.
  • Otherwise: legacy S3 path -- fetches the entire object as before.

func (*Service) ListCloudEvents

func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)

ListCloudEvents fetches and returns the cloud events that match the given options.

func (*Service) ListCloudEventsAdvanced

func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.RawEvent, error)

ListCloudEventsAdvanced fetches and returns the cloud events that match the given advanced options.

func (*Service) ListCloudEventsFromIndexes

func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], bucketName string) ([]cloudevent.RawEvent, error)

ListCloudEventsFromIndexes fetches and returns the cloud events for the given index. Parquet refs sharing the same object key are grouped so they share one S3ReaderAt (avoiding duplicate downloads). Groups and JSON fetches run concurrently.

func (*Service) ListIndexes

func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexes fetches and returns a list of index for cloud events that match the given options.

func (*Service) ListIndexesAdvanced

func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexesAdvanced fetches and returns a list of index for cloud events that match the given advanced options.

func (*Service) ListObjectsFromKeys

func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)

ListObjectsFromKeys fetches and returns the objects for the given keys concurrently.

func (*Service) PresignBlobURL added in v0.1.4

func (s *Service) PresignBlobURL(ctx context.Context, key string) (string, error)

PresignBlobURL returns a short-lived presigned GET URL for the given S3 key in the parquet bucket.

func (*Service) StoreObject

func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeader *cloudevent.CloudEventHeader, data []byte) error

StoreObject stores the given data in S3 with the given cloudevent header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL