Documentation
¶
Overview ¶
Package service provides the orchestration layer for pulse operations.
Index ¶
- type Cohort
- type ComposeOptions
- type Row
- type RowIter
- type Service
- func (s *Service) Compose(ctx context.Context, composed *types.ComposedRequest) ([]*types.Response, error)
- func (s *Service) ComposeParallel(ctx context.Context, composed *types.ComposedRequest, opts ComposeOptions) ([]*types.Response, error)
- func (s *Service) Facet(ctx context.Context, path string, field string) ([]string, error)
- func (s *Service) Open(_ context.Context, path string) (*Cohort, error)
- func (s *Service) Process(ctx context.Context, req *types.Request) (*types.Response, error)
- func (s *Service) ProcessStream(ctx context.Context, req *types.Request) (RowIter, error)
- func (s *Service) Sample(ctx context.Context, path string, n int) ([]map[string]any, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cohort ¶
type Cohort struct {
// contains filtered or unexported fields
}
Cohort represents an opened .pulse file with its parsed schema.
func (*Cohort) RecordCount ¶
RecordCount returns the number of records in the cohort file. It reads the file and counts records based on the schema's per-record byte size.
func (*Cohort) Records ¶
func (c *Cohort) Records(_ context.Context) (processing.RecordIterator, error)
Records returns a streaming iterator over records in the cohort. Records are decoded lazily from disk — the full file is not materialized.
type ComposeOptions ¶ added in v0.2.0
type ComposeOptions struct {
// MaxWorkers caps concurrent in-flight Process calls. Zero means
// runtime.GOMAXPROCS(0). Negative values are clamped to 1.
MaxWorkers int
// PerRequestTimeout, if positive, derives a context.WithTimeout for
// each request. Zero means no per-request timeout (the parent ctx's
// deadline still applies).
PerRequestTimeout time.Duration
// FailFast cancels in-flight siblings on the first request error.
// Default is true: surface errors quickly. Set false to collect every
// request's outcome (errors aggregated into a single CodedError with
// per-index detail).
FailFast bool
}
ComposeOptions controls parallel execution of a ComposedRequest.
Order of responses is always preserved — slot-by-index — regardless of MaxWorkers or completion order.
type Row ¶ added in v0.2.0
Row is a single result row in a processing stream. Aliased so callers can write `service.Row` without leaking the underlying map type.
type RowIter ¶ added in v0.2.0
type RowIter interface {
Next(ctx context.Context) (Row, bool, error)
Close() error
// Metadata returns the run metadata (total/filtered row counts,
// cohort filename). Available after the iterator is exhausted; may
// return nil before then. Streaming consumers that need metadata
// before draining should call Process instead.
Metadata() *types.ResponseMetadata
}
RowIter is a pull-based iterator over a processing result. Next reports (row, true, nil) for each available row, then (nil, false, nil) on exhaustion. Close releases any underlying file handles.
Implementations are NOT goroutine-safe. A single consumer per iterator.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the orchestration layer connecting filesystem, encoding, and processing.
func (*Service) Compose ¶
func (s *Service) Compose(ctx context.Context, composed *types.ComposedRequest) ([]*types.Response, error)
Compose executes multiple requests, returning a response for each.
func (*Service) ComposeParallel ¶ added in v0.2.0
func (s *Service) ComposeParallel( ctx context.Context, composed *types.ComposedRequest, opts ComposeOptions, ) ([]*types.Response, error)
ComposeParallel runs every request in composed concurrently across a bounded worker pool. Responses are returned in the same order as composed.Requests; per-request errors are surfaced according to opts.
Registry factories return fresh stateful instances per request, so concurrent Process calls do not share aggregator/attribute state. Geo and decimal aggregators dispatch through buffered code paths that are also safe for concurrent invocation (no shared mutable state).
func (*Service) Facet ¶
Facet returns distinct values for the named field in the cohort. For categorical fields, it returns the dictionary values. For numeric fields, it returns string representations of all distinct values seen.
func (*Service) Process ¶
Process executes a single request against the specified cohort. Records are streamed from disk — the full file is never held in memory as raw bytes alongside the decoded records.
func (*Service) ProcessStream ¶ added in v0.2.0
ProcessStream executes a request and returns a pull-based row iterator over the result. The semantics match Process — same gates, same errors, same metadata — but the consumer can drain rows incrementally instead of receiving the whole []map[string]any up front.
Today the iterator buffers internally (calls Process and walks Data). The signature is forward-compatible with a future true-streaming path driven by the streaming Processor; consumers that adopt it now will pick up that change without a code change on their side.