store

package
v0.92.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: GPL-3.0 Imports: 34 Imported by: 0

Documentation

Overview

Package store provides database storage implementations.

Index

Constants

This section is empty.

Variables

View Source
var FileSystem media.Handler

FileSystem Media handler

Functions

func Init

func Init()

func Migrate

func Migrate() error

func ParameterIsExpired added in v0.92.0

func ParameterIsExpired(p gen.Parameter) bool

ParameterIsExpired checks whether the given access token parameter has expired.

func RegisterAdapter

func RegisterAdapter(a Adapter)

func RegisterMediaHandler

func RegisterMediaHandler(name string, mh media.Handler)

RegisterMediaHandler saves reference to a media handler (file upload-download handler).

func UseMediaHandler

func UseMediaHandler(name, mediaConfig string) error

UseMediaHandler sets specified media handler as default.

Types

type Adapter

type Adapter interface {

	// Open and configure the adapter
	Open(storeConfig config.StoreType) error
	// Close the adapter
	Close() error
	// IsOpen checks if the adapter is ready for use
	IsOpen() bool
	// GetName returns the name of the adapter
	GetName() string
	// Stats returns the DB connection stats object.
	Stats() any
	// Ping checks database connectivity and returns the round-trip latency.
	Ping(ctx context.Context) (time.Duration, error)
	// GetDB returns the underlying DB connection
	GetDB() any

	// UserCreate creates user record
	UserCreate(ctx context.Context, user *gen.User) error
	// UserGet returns record for a given user ID
	UserGet(ctx context.Context, uid types.Uid) (*gen.User, error)
	// UserGetAll returns user records for a given list of user IDs
	UserGetAll(ctx context.Context, ids ...types.Uid) ([]*gen.User, error)
	// FirstUser returns the first user
	FirstUser(ctx context.Context) (*gen.User, error)
	// UserDelete deletes user record
	UserDelete(ctx context.Context, uid types.Uid, hard bool) error
	// UserUpdate updates user record
	UserUpdate(ctx context.Context, uid types.Uid, update types.KV) error

	// FileStartUpload initializes a file upload.
	FileStartUpload(ctx context.Context, fd *types.FileDef) error
	// FileFinishUpload marks file upload as completed, successfully or otherwise.
	FileFinishUpload(ctx context.Context, fd *types.FileDef, success bool, size int64) (*types.FileDef, error)
	// FileGet fetches a record of a specific file
	FileGet(ctx context.Context, fid string) (*types.FileDef, error)
	// FileDeleteUnused deletes records where UseCount is zero. If olderThan is non-zero, deletes
	// unused records with UpdatedAt before olderThan.
	// Returns array of FileDef.Location of deleted filerecords so actual files can be deleted too.
	FileDeleteUnused(ctx context.Context, olderThan time.Time, limit int) ([]string, error)

	GetUsers(ctx context.Context) ([]*gen.User, error)
	GetUserById(ctx context.Context, id int64) (*gen.User, error)
	GetUserByFlag(ctx context.Context, flag string) (*gen.User, error)
	CreatePlatformUser(ctx context.Context, item *gen.PlatformUser) (int64, error)
	GetPlatformUsersByUserId(ctx context.Context, userId int64) ([]*gen.PlatformUser, error)
	GetPlatformUserByFlag(ctx context.Context, flag string) (*gen.PlatformUser, error)
	UpdatePlatformUser(ctx context.Context, item *gen.PlatformUser) error
	GetPlatformChannelByFlag(ctx context.Context, flag string) (*gen.PlatformChannel, error)
	GetPlatformChannelsByPlatformIds(ctx context.Context, platformIds []int64) ([]*gen.PlatformChannel, error)
	GetPlatformChannelsByChannelId(ctx context.Context, channelId int64) (*gen.PlatformChannel, error)
	CreatePlatformChannel(ctx context.Context, item *gen.PlatformChannel) (int64, error)
	CreatePlatformChannelUser(ctx context.Context, item *gen.PlatformChannelUser) (int64, error)
	GetPlatformChannelUsersByUserFlag(ctx context.Context, userFlag string) ([]*gen.PlatformChannelUser, error)
	GetPlatformChannelUsersByUserFlags(ctx context.Context, userFlags []string) ([]*gen.PlatformChannelUser, error)
	GetMessage(ctx context.Context, flag string) (*gen.Message, error)
	GetMessageByPlatform(ctx context.Context, platformId int64, platformMsgId string) (*gen.Message, error)
	GetMessagesBySession(ctx context.Context, session string) ([]*gen.Message, error)
	CreateMessage(ctx context.Context, message gen.Message) error

	GetBot(ctx context.Context, id int64) (*gen.Bot, error)
	GetBotByName(ctx context.Context, name string) (*gen.Bot, error)
	CreateBot(ctx context.Context, bot *gen.Bot) (int64, error)
	UpdateBot(ctx context.Context, bot *gen.Bot) error
	DeleteBot(ctx context.Context, name string) error
	GetBots(ctx context.Context) ([]*gen.Bot, error)
	GetPlatform(ctx context.Context, id int64) (*gen.Platform, error)
	GetPlatformByName(ctx context.Context, name string) (*gen.Platform, error)
	GetPlatforms(ctx context.Context) ([]*gen.Platform, error)
	CreatePlatform(ctx context.Context, platform *gen.Platform) (int64, error)
	GetChannel(ctx context.Context, id int64) (*gen.Channel, error)
	GetChannelByName(ctx context.Context, name string) (*gen.Channel, error)
	CreateChannel(ctx context.Context, channel *gen.Channel) (int64, error)
	UpdateChannel(ctx context.Context, channel *gen.Channel) error
	DeleteChannel(ctx context.Context, name string) error
	GetChannels(ctx context.Context) ([]*gen.Channel, error)

	DataSet(ctx context.Context, uid types.Uid, topic, key string, value types.KV) error
	DataGet(ctx context.Context, uid types.Uid, topic, key string) (types.KV, error)
	DataList(ctx context.Context, uid types.Uid, topic string, filter types.DataFilter) ([]*gen.Data, error)
	DataDelete(ctx context.Context, uid types.Uid, topic, key string) error
	ConfigSet(ctx context.Context, uid types.Uid, topic, key string, value types.KV) error
	ConfigGet(ctx context.Context, uid types.Uid, topic, key string) (types.KV, error)
	ListConfigByPrefix(ctx context.Context, uid types.Uid, topic, prefix string) ([]*gen.ConfigData, error)
	ConfigDelete(ctx context.Context, uid types.Uid, topic, key string) error
	// ListConfigs returns config items across all uids/topics with optional search and pagination.
	ListConfigs(ctx context.Context, opts ListConfigOptions) ([]model.ConfigItem, error)
	OAuthSet(ctx context.Context, oauth gen.OAuth) error
	// OAuthGet returns the raw oauth record. Most callers should use
	// providers.GetOrRefreshToken() which handles expired token refresh.
	OAuthGet(ctx context.Context, uid types.Uid, topic, t string) (gen.OAuth, error)
	OAuthGetAvailable(ctx context.Context, t string) ([]gen.OAuth, error)
	FormSet(ctx context.Context, formId string, form gen.Form) error
	FormGet(ctx context.Context, formId string) (gen.Form, error)
	PageSet(ctx context.Context, pageId string, page gen.Page) error
	PageGet(ctx context.Context, pageId string) (gen.Page, error)
	BehaviorSet(ctx context.Context, behavior gen.Behavior) error
	BehaviorGet(ctx context.Context, uid types.Uid, flag string) (gen.Behavior, error)
	BehaviorList(ctx context.Context, uid types.Uid) ([]*gen.Behavior, error)
	BehaviorIncrease(ctx context.Context, uid types.Uid, flag string, number int) error
	ParameterSet(ctx context.Context, flag string, params types.KV, expiredAt time.Time) error
	ParameterGet(ctx context.Context, flag string) (gen.Parameter, error)
	ParameterDelete(ctx context.Context, flag string) error
	// ListTokens returns all token parameters (flag LIKE 'fb_%'), sorted by created_at desc.
	ListTokens(ctx context.Context) ([]model.TokenItem, error)
	// CreateToken generates a new token and persists it as a parameter row.
	// Returns the plaintext token string.
	CreateToken(ctx context.Context, uid types.Uid, expiresAt time.Time, scopes []string) (string, error)
	// RevokeToken deletes the parameter row identified by the token flag.
	RevokeToken(ctx context.Context, flag string) error
	CreateInstruct(ctx context.Context, instruct *gen.Instruct) (int64, error)
	ListInstruct(ctx context.Context, uid types.Uid, isExpire bool, limit int) ([]*gen.Instruct, error)
	UpdateInstruct(ctx context.Context, instruct *gen.Instruct) error
	CreateCounter(ctx context.Context, counter *gen.Counter) (int64, error)
	IncreaseCounter(ctx context.Context, id, amount int64) error
	DecreaseCounter(ctx context.Context, id, amount int64) error
	ListCounter(ctx context.Context, uid types.Uid, topic string) ([]*gen.Counter, error)
	GetCounter(ctx context.Context, id int64) (gen.Counter, error)
	GetCounterByFlag(ctx context.Context, uid types.Uid, topic, flag string) (gen.Counter, error)

	GetAgents(ctx context.Context) ([]*gen.Agent, error)
	GetAgentByHostid(ctx context.Context, uid types.Uid, topic, hostid string) (*gen.Agent, error)
	CreateAgent(ctx context.Context, agent *gen.Agent) (int64, error)
	UpdateAgentLastOnlineAt(ctx context.Context, uid types.Uid, topic, hostid string, lastOnlineAt time.Time) error
	UpdateAgentOnlineDuration(ctx context.Context, uid types.Uid, topic, hostid string, offlineTime time.Time) error

	// NotifyChannel CRUD
	CreateNotifyChannel(ctx context.Context, name, protocol, uri string) (int64, error)
	GetNotifyChannel(ctx context.Context, id int64) (model.NotifyChannel, error)    // returns masked URI
	GetNotifyChannelRaw(ctx context.Context, id int64) (model.NotifyChannel, error) // returns raw URI (internal use only)
	ListNotifyChannels(ctx context.Context, opts ListNotifyChannelOptions) ([]model.NotifyChannel, error)
	UpdateNotifyChannel(ctx context.Context, id int64, name, protocol, uri string, enabled bool) error
	DeleteNotifyChannel(ctx context.Context, id int64) error

	// NotifyRule CRUD
	CreateNotifyRule(ctx context.Context, rule model.NotifyRule) (int64, error)
	GetNotifyRule(ctx context.Context, id int64) (model.NotifyRule, error)
	ListNotifyRules(ctx context.Context, opts ListNotifyRuleOptions) ([]model.NotifyRule, error)
	UpdateNotifyRule(ctx context.Context, id int64, rule model.NotifyRule) error
	DeleteNotifyRule(ctx context.Context, id int64) error

	// Notify URI masking
	MaskNotifyURI(protocol, uri string) string
}
var Database Adapter

type AppInfo added in v0.92.0

type AppInfo struct {
	Name      string
	UpdatedAt time.Time
}

AppInfo is a lightweight projection of store-level app metadata.

type AuditStore added in v0.92.0

type AuditStore struct {
	// contains filtered or unexported fields
}

func NewAuditStore added in v0.92.0

func NewAuditStore(client *gen.Client) *AuditStore

func (*AuditStore) Record added in v0.92.0

func (s *AuditStore) Record(ctx context.Context, entry audit.Entry) error

Record writes an audit entry to persistent storage. If the store or client is nil, the call is silently skipped. Audit write failures are logged and do not propagate to the caller. Sensitive fields in entry.Request are redacted before storage.

func (*AuditStore) RecordFailure added in v0.92.0

func (s *AuditStore) RecordFailure(ctx context.Context, entry audit.Entry, err error) error

RecordFailure writes a failure audit entry with the error message.

func (*AuditStore) RecordRejected added in v0.92.0

func (s *AuditStore) RecordRejected(ctx context.Context, entry audit.Entry, reason string) error

RecordRejected writes a rejected audit entry with the reason.

func (*AuditStore) RecordSuccess added in v0.92.0

func (s *AuditStore) RecordSuccess(ctx context.Context, entry audit.Entry) error

RecordSuccess writes a success audit entry.

type Client added in v0.92.0

type Client = gen.Client

Client is a type alias for the Ent client.

type EventStore added in v0.92.0

type EventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore added in v0.92.0

func NewEventStore(client *gen.Client) *EventStore

func (*EventStore) AppendDataEvent added in v0.92.0

func (s *EventStore) AppendDataEvent(ctx context.Context, event types.DataEvent) error

func (*EventStore) AppendEventOutbox added in v0.92.0

func (s *EventStore) AppendEventOutbox(ctx context.Context, event types.DataEvent) error

func (*EventStore) CountDataEvents added in v0.92.0

func (s *EventStore) CountDataEvents(ctx context.Context, opts ListDataEventsOptions) (int64, error)

CountDataEvents returns the total number of data_events matching the given filters. Uses the same filter predicates as ListDataEvents without pagination.

func (*EventStore) GetDataEventByEventID added in v0.92.0

func (s *EventStore) GetDataEventByEventID(ctx context.Context, eventID string) (*gen.DataEvent, error)

GetDataEventByEventID looks up a single data event by its event_id.

func (*EventStore) GetPipelineRunsForEvents added in v0.92.0

func (s *EventStore) GetPipelineRunsForEvents(ctx context.Context, eventIDs []string) (map[string][]PipelineRunInfo, error)

GetPipelineRunsForEvents batch-looks up pipeline runs for the given event IDs. Returns a map of eventID -> []PipelineRunInfo.

func (*EventStore) ListDataEvents added in v0.92.0

func (s *EventStore) ListDataEvents(ctx context.Context, opts ListDataEventsOptions) ([]*gen.DataEvent, string, error)

ListDataEvents returns paginated data_events ordered by created_at DESC. Supports offset-based pagination (when Offset > 0) and cursor-based (backward compatible).

func (*EventStore) ListDistinctEventPipelineNames added in v0.92.0

func (s *EventStore) ListDistinctEventPipelineNames(ctx context.Context) ([]string, error)

ListDistinctEventPipelineNames returns distinct pipeline names from pipeline_runs that have matched events, ordered alphabetically.

func (*EventStore) ListDistinctEventSources added in v0.92.0

func (s *EventStore) ListDistinctEventSources(ctx context.Context, since time.Duration) ([]string, error)

ListDistinctEventSources returns unique source values from data_events created within the given duration (e.g. 30*24*time.Hour for last 30 days).

func (*EventStore) ListDistinctEventTypes added in v0.92.0

func (s *EventStore) ListDistinctEventTypes(ctx context.Context, since time.Duration) ([]string, error)

ListDistinctEventTypes returns unique event_type values from data_events created within the given duration.

func (*EventStore) MarkOutboxPublished added in v0.92.0

func (s *EventStore) MarkOutboxPublished(ctx context.Context, eventID string) error

type HubStore added in v0.92.0

type HubStore struct {
	// contains filtered or unexported fields
}

HubStore persists homelab discovery data to the database.

func NewHubStore added in v0.92.0

func NewHubStore(client *gen.Client) *HubStore

NewHubStore returns a HubStore backed by the given Ent client.

func (*HubStore) ListApps added in v0.92.0

func (s *HubStore) ListApps(ctx context.Context) ([]AppInfo, error)

ListApps returns all apps from the database with Name and UpdatedAt. When the client is nil, returns nil (safe for no-DB environments).

func (*HubStore) SaveHomelabApps added in v0.92.0

func (s *HubStore) SaveHomelabApps(ctx context.Context, apps []homelab.App) error

SaveHomelabApps upserts a batch of discovered homelab apps. Each app is looked up by name; existing rows are updated, new rows are created.

type ListConfigOptions added in v0.92.0

type ListConfigOptions struct {
	Offset int
	Limit  int
	Search string
}

ListConfigOptions controls pagination and search for ListConfigs.

type ListDataEventsOptions added in v0.92.0

type ListDataEventsOptions struct {
	Limit        int        // max 100, default 20
	Offset       int        // page offset for offset-based pagination
	Cursor       string     // opaque CreatedAt cursor (backward compatible)
	Source       string     // filter by source, empty = all
	EventType    string     // filter by event type, empty = all
	Webhook      bool       // if true, only events where data->>'_webhook_method' IS NOT NULL
	Search       string     // ILIKE match against source and data::text
	PipelineName string     // filter events that triggered a specific pipeline
	TimeStart    *time.Time // created_at >= TimeStart
	TimeEnd      *time.Time // created_at <= TimeEnd
}

ListDataEventsOptions holds filters and pagination for listing data events.

type ListNotifyChannelOptions added in v0.92.0

type ListNotifyChannelOptions struct {
	Protocol string
	Enabled  *bool // nil = all, true = enabled only, false = disabled only
}

ListNotifyChannelOptions holds filtering options for listing notification channels.

type ListNotifyRecordsOptions added in v0.92.0

type ListNotifyRecordsOptions struct {
	Limit  int    // max 100, default 20
	Cursor string // opaque cursor: ID value as string
}

ListNotifyRecordsOptions holds filters and pagination for listing notification records.

type ListNotifyRuleOptions added in v0.92.0

type ListNotifyRuleOptions struct {
	Enabled *bool // nil = all, true = enabled only, false = disabled only
}

ListNotifyRuleOptions holds filtering and sorting options for listing notification rules.

type NotifyStore added in v0.92.0

type NotifyStore struct {
	// contains filtered or unexported fields
}

NotifyStore provides CRUD for notification delivery records.

func NewNotifyStore added in v0.92.0

func NewNotifyStore(client *gen.Client) *NotifyStore

NewNotifyStore returns a NotifyStore backed by the given Ent client.

func (*NotifyStore) DeleteOldest added in v0.92.0

func (s *NotifyStore) DeleteOldest(ctx context.Context, uid string, keepN int) error

DeleteOldest removes the oldest records for a user exceeding keepN.

func (*NotifyStore) GetRecord added in v0.92.0

func (s *NotifyStore) GetRecord(ctx context.Context, id int64) (*gen.NotificationRecord, error)

GetRecord returns a single notification record by ID.

func (*NotifyStore) ListRecords added in v0.92.0

ListRecords returns per-user notification records, cursor-paginated (newest first).

func (*NotifyStore) Record added in v0.92.0

func (s *NotifyStore) Record(ctx context.Context, uid, channel, templateID, summary, status, errorMsg string, payload map[string]any) (int64, error)

Record inserts a notification delivery record and returns the new row ID.

type PageDataStore added in v0.92.0

type PageDataStore struct {
	// contains filtered or unexported fields
}

PageDataStore persists shareable view page data keyed by opaque tokens.

func NewPageDataStore added in v0.92.0

func NewPageDataStore(client *gen.Client) *PageDataStore

NewPageDataStore creates a PageDataStore with the given ent client.

func (*PageDataStore) CreatePageData added in v0.92.0

func (s *PageDataStore) CreatePageData(ctx context.Context, token string, pageType string, title string, data types.KV, createdBy string, expiresAt *time.Time) error

CreatePageData inserts a new page_data row.

func (*PageDataStore) DeleteExpiredPageData added in v0.92.0

func (s *PageDataStore) DeleteExpiredPageData(ctx context.Context) (int64, error)

DeleteExpiredPageData removes rows where expires_at < now(). Returns the number of deleted rows.

func (*PageDataStore) DeletePageData added in v0.92.0

func (s *PageDataStore) DeletePageData(ctx context.Context, token string) (int, error)

DeletePageData removes a page_data row by token. Returns the number of deleted rows.

func (*PageDataStore) GetPageDataByToken added in v0.92.0

func (s *PageDataStore) GetPageDataByToken(ctx context.Context, token string) (*gen.PageData, error)

GetPageDataByToken retrieves a page_data row by token. Returns nil if not found.

type PersistentStorageInterface

type PersistentStorageInterface interface {
	Open(jsonConfig config.StoreType) error
	Close() error
	IsOpen() bool
	GetAdapter() Adapter
	DbStats() func() any
}

PersistentStorageInterface defines methods used for interaction with persistent storage.

Store is the main object for interacting with persistent storage.

type PipelineRunInfo added in v0.92.0

type PipelineRunInfo struct {
	PipelineName  string
	EventID       string
	Status        string
	TriggerSource string
}

PipelineRunInfo is a lightweight view of a pipeline run for event matching display.

type PipelineStore added in v0.92.0

type PipelineStore struct {
	// contains filtered or unexported fields
}

PipelineStore persists pipeline definitions, runs, step runs, and event consumptions.

func NewPipelineStore added in v0.92.0

func NewPipelineStore(client *gen.Client) *PipelineStore

func (*PipelineStore) CreateDefinition added in v0.92.0

func (s *PipelineStore) CreateDefinition(ctx context.Context, name, description string) error

CreateDefinition creates a new pipeline definition with initial yaml_draft and version 1.

func (*PipelineStore) CreateRun added in v0.92.0

func (s *PipelineStore) CreateRun(ctx context.Context, pipelineName, eventID, eventType, triggerSource string) (*gen.PipelineRun, error)

func (*PipelineStore) CreateStepRun added in v0.92.0

func (s *PipelineStore) CreateStepRun(ctx context.Context, runID int64, stepName, capability, operation string, params map[string]any, attempt int) (*gen.PipelineStepRun, error)

func (*PipelineStore) DeleteDefinitionByName added in v0.92.0

func (s *PipelineStore) DeleteDefinitionByName(ctx context.Context, name string) (int64, error)

DeleteDefinitionByName removes a pipeline definition and its associated runs. Returns the number of pipeline runs that were deleted.

func (*PipelineStore) GetCheckpoint added in v0.92.0

func (s *PipelineStore) GetCheckpoint(ctx context.Context, runID int64, target any) error

GetCheckpoint loads the checkpoint data for a pipeline run.

func (*PipelineStore) GetDefinitionByName added in v0.92.0

func (s *PipelineStore) GetDefinitionByName(ctx context.Context, name string) (*gen.PipelineDefinition, error)

GetDefinitionByName returns a pipeline definition by name.

func (*PipelineStore) GetDefinitionVersion added in v0.92.0

func (s *PipelineStore) GetDefinitionVersion(ctx context.Context, name string, version int) (*gen.PipelineDefinitionVersion, error)

GetDefinitionVersion returns a single version snapshot by pipeline name and version number.

func (*PipelineStore) GetIncompleteRuns added in v0.92.0

func (s *PipelineStore) GetIncompleteRuns(ctx context.Context) ([]*gen.PipelineRun, error)

GetIncompleteRuns returns pipeline runs that are in Start state and may need recovery.

func (*PipelineStore) GetRun added in v0.92.0

func (s *PipelineStore) GetRun(ctx context.Context, runID int64) (*gen.PipelineRun, error)

GetRun returns a pipeline run by ID.

func (*PipelineStore) GetRunByID added in v0.92.0

func (s *PipelineStore) GetRunByID(ctx context.Context, id int64) (*gen.PipelineRun, error)

GetRunByID returns a pipeline run by its database ID.

func (*PipelineStore) GetRunsByParentName added in v0.92.0

func (s *PipelineStore) GetRunsByParentName(ctx context.Context, parentName string) ([]*gen.PipelineRun, error)

GetRunsByParentName returns pipeline runs matching a parent pipeline name. Matches both exact name and compound trigger names (name__trigger_*).

func (*PipelineStore) GetStepRunsByRunID added in v0.92.0

func (s *PipelineStore) GetStepRunsByRunID(ctx context.Context, runID int64) ([]*gen.PipelineStepRun, error)

GetStepRunsByRunID returns all step runs for a given pipeline run, ordered by ID.

func (*PipelineStore) HasConsumed added in v0.92.0

func (s *PipelineStore) HasConsumed(ctx context.Context, consumerName, eventID string) (bool, error)

func (*PipelineStore) ListDefinitionVersions added in v0.92.0

func (s *PipelineStore) ListDefinitionVersions(ctx context.Context, name string) ([]*gen.PipelineDefinitionVersion, error)

ListDefinitionVersions returns all published version snapshots for a pipeline, ordered by version descending (newest first).

func (*PipelineStore) ListDefinitions added in v0.92.0

func (s *PipelineStore) ListDefinitions(ctx context.Context) ([]*gen.PipelineDefinition, error)

ListDefinitions returns all pipeline definitions ordered by updated_at desc.

func (*PipelineStore) ListPublishedDefinitions added in v0.92.0

func (s *PipelineStore) ListPublishedDefinitions(ctx context.Context) ([]pipeline.DefinitionRecord, error)

ListPublishedDefinitions returns all pipeline definitions that are published and have a non-nil yaml_published field.

func (*PipelineStore) ListStepRunsByRunID added in v0.92.0

func (s *PipelineStore) ListStepRunsByRunID(ctx context.Context, runID int64) ([]*gen.PipelineStepRun, error)

ListStepRunsByRunID returns all step runs for a pipeline run, ordered by creation time.

func (*PipelineStore) PipelineStats added in v0.92.0

func (s *PipelineStore) PipelineStats(ctx context.Context, name string, since time.Time, groupBy string) (*types.PipelineStats, error)

PipelineStats returns aggregated pipeline run statistics for chart rendering. name empty = all pipelines. since zero = no time filter. groupBy = "day"|"week"|"month".

func (*PipelineStore) PublishDefinition added in v0.92.0

func (s *PipelineStore) PublishDefinition(ctx context.Context, name string, version int) (*gen.PipelineDefinition, error)

PublishDefinition copies yaml_draft to yaml_published with atomic optimistic locking. Also inserts a version snapshot into pipeline_definition_versions.

func (*PipelineStore) RecordConsumption added in v0.92.0

func (s *PipelineStore) RecordConsumption(ctx context.Context, consumerName, eventID string) error
func (s *PipelineStore) RecordResourceLink(ctx context.Context, link *gen.ResourceLink) error

RecordResourceLink inserts a resource link with UPSERT semantics.

func (*PipelineStore) SaveCheckpoint added in v0.92.0

func (s *PipelineStore) SaveCheckpoint(ctx context.Context, runID int64, data any) error

SaveCheckpoint persists the intermediate pipeline run state.

func (*PipelineStore) UpdateDefinitionDraft added in v0.92.0

func (s *PipelineStore) UpdateDefinitionDraft(ctx context.Context, name, yamlDraft string, version int) (*gen.PipelineDefinition, error)

UpdateDefinitionDraft updates the yaml_draft with atomic optimistic locking. Uses conditional UPDATE WHERE version=X. Returns ErrConflict if no row matched.

func (*PipelineStore) UpdateRunHeartbeat added in v0.92.0

func (s *PipelineStore) UpdateRunHeartbeat(ctx context.Context, runID int64) error

UpdateRunHeartbeat refreshes the last_heartbeat timestamp for a running pipeline.

func (*PipelineStore) UpdateRunStatus added in v0.92.0

func (s *PipelineStore) UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error

func (*PipelineStore) UpdateStepRun added in v0.92.0

func (s *PipelineStore) UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error

type PollingStateEntry added in v0.92.0

type PollingStateEntry struct {
	Cursor      string
	KnownHashes map[string]string
	UpdatedAt   any
}

PollingStateEntry represents a single persisted polling state row.

type PollingStateStore added in v0.92.0

type PollingStateStore struct {
	// contains filtered or unexported fields
}

PollingStateStore persists polling state entries for the provider event source framework.

func NewPollingStateStore added in v0.92.0

func NewPollingStateStore(client *gen.Client) *PollingStateStore

NewPollingStateStore returns a PollingStateStore backed by the given Ent client.

func (*PollingStateStore) LoadAll added in v0.92.0

LoadAll loads all polling state entries from the database.

func (*PollingStateStore) Save added in v0.92.0

func (s *PollingStateStore) Save(ctx context.Context, resourceName, cursor string, knownHashes map[string]string) error

Save upserts a polling state entry for the given resource. If an entry with the same resource name already exists, it is updated; otherwise a new one is created.

type ResourceChainStore added in v0.92.0

type ResourceChainStore struct {
	// contains filtered or unexported fields
}

ResourceChainStore provides query methods for resource tag and lineage lookups.

func NewResourceChainStore added in v0.92.0

func NewResourceChainStore(client *gen.Client) *ResourceChainStore

NewResourceChainStore creates a ResourceChainStore with the given ent client.

func (*ResourceChainStore) FindNodeRelations added in v0.92.0

func (s *ResourceChainStore) FindNodeRelations(ctx context.Context, appName, capability, entityID string, pipelineName string, since time.Duration) ([]schema.ResourceEdge, []schema.ResourceEdge, error)

FindNodeRelations returns upstream and downstream edges for a node identified by (appName, capability, entityID). Optional pipelineName filter and time window.

func (*ResourceChainStore) FindRelations added in v0.92.0

func (s *ResourceChainStore) FindRelations(ctx context.Context, appName, entityID string) (*schema.ResourceRelations, error)

FindRelations returns upstream and downstream resource references for a specific resource identified by appName + entity_id.

func (s *ResourceChainStore) FindResourceLinks(ctx context.Context, eventIDs []string) ([]*gen.ResourceLink, error)

FindResourceLinks returns all links involving any of the given event IDs, either as source or target.

func (*ResourceChainStore) FindResourcesByTag added in v0.92.0

func (s *ResourceChainStore) FindResourcesByTag(ctx context.Context, key, value string, limit int, cursor string) ([]*gen.DataEvent, string, error)

FindResourcesByTag returns DataEvents matching a tag key-value pair, ordered by created_at descending. Supports limit + opaque cursor pagination.

func (*ResourceChainStore) SearchNodes added in v0.92.0

func (s *ResourceChainStore) SearchNodes(ctx context.Context, query string, limit int, _ string) ([]schema.ResourceRef, string, error)

SearchNodes returns distinct (app, capability, entity_id) tuples from resource_links where source_entity_id, target_entity_id, source_app, target_app, source_capability, or target_capability contains the query. The cursor parameter is reserved for future use; not implemented in MVP.

type WorkflowRunStore added in v0.92.0

type WorkflowRunStore struct {
	// contains filtered or unexported fields
}

WorkflowRunStore persists workflow runs, step runs, and checkpoint data.

func NewWorkflowRunStore added in v0.92.0

func NewWorkflowRunStore(client *gen.Client) *WorkflowRunStore

NewWorkflowRunStore creates a WorkflowRunStore backed by the given ent client.

func (*WorkflowRunStore) CreateRun added in v0.92.0

func (s *WorkflowRunStore) CreateRun(ctx context.Context, workflowName, workflowFile, triggerType string, triggerInfo, inputParams map[string]any) (*gen.WorkflowRun, error)

CreateRun inserts a new workflow run record.

func (*WorkflowRunStore) CreateStepRun added in v0.92.0

func (s *WorkflowRunStore) CreateStepRun(ctx context.Context, runID int64, stepID, stepName, action, actionType string, params map[string]any, attempt int) (*gen.WorkflowStepRun, error)

CreateStepRun inserts a new workflow step run record.

func (*WorkflowRunStore) GetCheckpoint added in v0.92.0

func (s *WorkflowRunStore) GetCheckpoint(ctx context.Context, runID int64, target any) error

GetCheckpoint loads the checkpoint data for a workflow run.

func (*WorkflowRunStore) GetIncompleteRuns added in v0.92.0

func (s *WorkflowRunStore) GetIncompleteRuns(ctx context.Context) ([]*gen.WorkflowRun, error)

GetIncompleteRuns returns workflow runs that are still running and may need recovery.

func (*WorkflowRunStore) GetRun added in v0.92.0

func (s *WorkflowRunStore) GetRun(ctx context.Context, runID int64) (*gen.WorkflowRun, error)

GetRun returns a workflow run by ID.

func (*WorkflowRunStore) SaveCheckpoint added in v0.92.0

func (s *WorkflowRunStore) SaveCheckpoint(ctx context.Context, runID int64, data any) error

SaveCheckpoint persists the intermediate workflow run state.

func (*WorkflowRunStore) UpdateRunHeartbeat added in v0.92.0

func (s *WorkflowRunStore) UpdateRunHeartbeat(ctx context.Context, runID int64) error

UpdateRunHeartbeat refreshes the last_heartbeat timestamp for a running workflow.

func (*WorkflowRunStore) UpdateRunStatus added in v0.92.0

func (s *WorkflowRunStore) UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error

UpdateRunStatus updates the status, error, and completed_at of a workflow run.

func (*WorkflowRunStore) UpdateStepRun added in v0.92.0

func (s *WorkflowRunStore) UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error

UpdateStepRun updates the status, result, error, and attempt count of a workflow step run. completed_at is only set for terminal states (Done, Failed).

Directories

Path Synopsis
ent
Package ent provides the Ent ORM client initialization and database connectivity.
Package ent provides the Ent ORM client initialization and database connectivity.
gen
schema
Package schema provides Ent ORM schema definitions.
Package schema provides Ent ORM schema definitions.
Package postgres implements the PostgreSQL storage adapter.
Package postgres implements the PostgreSQL storage adapter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL