core

package
v0.8.0-beta.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecutionLogPendingTimeout = 30 * time.Second
)
View Source
const ExecutorTokenPrefix = "fctl_"
View Source
const (
	SystemUserUUID = "00000000-0000-0000-0000-000000000000"
)
View Source
const (
	TimeFormat = time.RFC3339
)

Variables

View Source
var (
	ErrNoPendingApproval = errors.New("no pending approval")
	ErrNil               = errors.New("not found")
)
View Source
var (
	ErrFlowNotFound = errors.New("flow not found")
)

Functions

func FlowDomain added in v0.8.0

func FlowDomain(namespaceID, prefix string) string

FlowDomain returns the Casbin domain for a flow based on namespace and prefix. Ungrouped flows (empty prefix) use the "_" sentinel.

func GenerateExecutorToken added in v0.7.0

func GenerateExecutorToken(executorName string, signingKey []byte) (string, error)

GenerateExecutorToken creates a stateless HMAC-SHA256 token for an executor. Format: fctl_<executor_name>.<base64url(HMAC-SHA256(executor_name, signing_key))>

func GenerateSigningKey added in v0.7.0

func GenerateSigningKey() ([]byte, error)

GenerateSigningKey generates a 32-byte random signing key for HMAC-SHA256.

func NamespaceDomain added in v0.8.0

func NamespaceDomain(namespaceID string) string

NamespaceDomain returns the Casbin domain for namespace-level checks.

func ValidateExecutorToken added in v0.7.0

func ValidateExecutorToken(token string, signingKey []byte) (string, error)

ValidateExecutorToken validates an executor token and returns the executor name.

Types

type Core

type Core struct {
	LogManager streamlogger.LogManager
	// contains filtered or unexported fields
}

func NewCore

func NewCore(flowsDirectory string, s repo.Store, sch scheduler.TaskScheduler, keeper *secrets.Keeper, enforcer *casbin.Enforcer) (*Core, error)

func (*Core) ApproveOrRejectAction

func (c *Core) ApproveOrRejectAction(ctx context.Context, approvalUUID, decidedBy string, status models.ApprovalType, namespaceID string) error

ApproveOrRejectAction handles approval or rejection of an action request by a user. It takes the approval UUID, the ID of the user making the decision, and the approval status. The function updates the database with the decision. Once approved, the task is moved to a resume queue for further processing.

func (*Core) AssignNamespaceRole

func (c *Core) AssignNamespaceRole(ctx context.Context, subjectID string, subjectType string, namespaceID string, role models.NamespaceRole) error

AssignNamespaceRole assigns a role to a user or group in a namespace

func (*Core) AssignPrefixAccess added in v0.8.0

func (c *Core) AssignPrefixAccess(ctx context.Context, subjectID, subjectType, namespaceID, prefix string) error

AssignPrefixAccess grants a user or group access to a specific prefix (dual write: DB + Casbin)

func (*Core) CancelFlowExecution

func (c *Core) CancelFlowExecution(ctx context.Context, execID string, namespaceID string) error

CancelFlowExecution cancels the given execution using the scheduler

func (*Core) CheckPermission

func (c *Core) CheckPermission(ctx context.Context, userID string, domain string, resource models.Resource, action models.RBACAction) (bool, error)

CheckPermission checks if a user has permission to perform an action on a resource. The domain parameter encodes namespace and optional prefix scope.

func (*Core) CreateCredential

func (c *Core) CreateCredential(ctx context.Context, cred models.Credential, namespaceID string) (models.Credential, error)

func (*Core) CreateFlow

func (c *Core) CreateFlow(ctx context.Context, f models.Flow, namespaceID string) error

func (*Core) CreateFlowPrefix added in v0.8.0

func (c *Core) CreateFlowPrefix(ctx context.Context, namespaceID, name, description string) (models.FlowPrefix, error)

CreateFlowPrefix creates a new flow prefix in the given namespace.

func (*Core) CreateFlowSecret

func (c *Core) CreateFlowSecret(ctx context.Context, flowID string, secret models.FlowSecret, namespaceID string) (models.FlowSecret, error)

func (*Core) CreateGroup

func (c *Core) CreateGroup(ctx context.Context, name, description string) (models.GroupWithUsers, error)

func (*Core) CreateNamespace

func (c *Core) CreateNamespace(ctx context.Context, namespace *models.Namespace) (models.Namespace, error)

func (*Core) CreateNamespaceSecret

func (c *Core) CreateNamespaceSecret(ctx context.Context, secret models.NamespaceSecret, namespaceID string) (models.NamespaceSecret, error)

func (*Core) CreateNode

func (c *Core) CreateNode(ctx context.Context, node *models.Node, namespaceID string) (models.Node, error)

func (*Core) CreateSchedule added in v0.5.0

func (c *Core) CreateSchedule(ctx context.Context, flowID, cron, timezone string, inputs map[string]interface{}, userUUID, namespaceID string) (models.Schedule, error)

func (*Core) CreateUser

func (c *Core) CreateUser(ctx context.Context, name, username string, loginType models.UserLoginType, userRole models.UserRoleType, groups []string) (models.UserWithGroups, error)

func (*Core) DeleteCredential

func (c *Core) DeleteCredential(ctx context.Context, id string, namespaceID string) error

func (*Core) DeleteFlow

func (c *Core) DeleteFlow(ctx context.Context, flowID, namespaceID string) error

func (*Core) DeleteFlowPrefix added in v0.8.0

func (c *Core) DeleteFlowPrefix(ctx context.Context, prefixUUID, namespaceID string) error

DeleteFlowPrefix deletes a flow prefix by UUID and all flows in the group.

func (*Core) DeleteFlowSecret

func (c *Core) DeleteFlowSecret(ctx context.Context, id string, namespaceID string) error

func (*Core) DeleteGroupByUUID

func (c *Core) DeleteGroupByUUID(ctx context.Context, groupUUID string) error

func (*Core) DeleteNamespace

func (c *Core) DeleteNamespace(ctx context.Context, id string) error

func (*Core) DeleteNamespaceSecret

func (c *Core) DeleteNamespaceSecret(ctx context.Context, id string, namespaceID string) error

func (*Core) DeleteNode

func (c *Core) DeleteNode(ctx context.Context, id string, namespaceID string) error

func (*Core) DeleteSchedule added in v0.5.0

func (c *Core) DeleteSchedule(ctx context.Context, scheduleUUID, userUUID, namespaceID string) error

func (*Core) DeleteUserByUUID

func (c *Core) DeleteUserByUUID(ctx context.Context, userUUID string) error

func (*Core) GetAccessibleGroups added in v0.8.0

func (c *Core) GetAccessibleGroups(ctx context.Context, userID, namespaceID string) ([]models.FlowPrefix, error)

GetAccessibleGroups returns the flow groups (prefixes) in a namespace that the user has access to. Superusers and namespace admins/reviewers see all groups; regular users see only their granted prefixes.

func (*Core) GetAllExecutionSummaryPaginated

func (c *Core) GetAllExecutionSummaryPaginated(ctx context.Context, namespaceID string, filter string, limit, offset int) ([]models.ExecutionSummary, int64, int64, error)

func (*Core) GetAllGroups

func (c *Core) GetAllGroups(ctx context.Context) ([]models.Group, error)

func (*Core) GetAllGroupsWithUsers

func (c *Core) GetAllGroupsWithUsers(ctx context.Context) ([]models.GroupWithUsers, error)

func (*Core) GetAllUsersWithGroups

func (c *Core) GetAllUsersWithGroups(ctx context.Context) ([]models.UserWithGroups, error)

func (*Core) GetApprovalRequest

func (c *Core) GetApprovalRequest(ctx context.Context, approvalUUID string, namespaceID string) (models.ApprovalRequest, error)

GetApprovalRequest returns an approval request using the approval UUID and namespace UUID

func (*Core) GetApprovalWithInputs

func (c *Core) GetApprovalWithInputs(ctx context.Context, approvalUUID string, namespaceID string) (models.ApprovalDetails, error)

GetApprovalWithInputs returns an approval request with additional info like flow name and id and inputs for the execution

func (*Core) GetApprovalsPaginated

func (c *Core) GetApprovalsPaginated(ctx context.Context, namespaceID, status, filter string, page, countPerPage int) ([]models.ApprovalPaginationDetails, int64, int64, error)

func (*Core) GetApprovalsRequestsForExec

func (c *Core) GetApprovalsRequestsForExec(ctx context.Context, execID string, namespaceID string) (models.ApprovalRequest, error)

GetApprovalsRequestsForExec returns approval requests for a given execution

func (*Core) GetCredentialByID

func (c *Core) GetCredentialByID(ctx context.Context, id string, namespaceID string) (models.Credential, error)

func (*Core) GetDecryptedFlowSecrets

func (c *Core) GetDecryptedFlowSecrets(ctx context.Context, flowID string, namespaceID string) (map[string]string, error)

func (*Core) GetDistinctPrefixes added in v0.8.0

func (c *Core) GetDistinctPrefixes(ctx context.Context, namespaceID string) ([]models.FlowPrefix, error)

GetDistinctPrefixes returns all distinct prefixes in a namespace

func (*Core) GetExecutionByExecID

func (c *Core) GetExecutionByExecID(ctx context.Context, execID string, namespaceID string) (models.Execution, error)

func (*Core) GetExecutionSummaryByExecID

func (c *Core) GetExecutionSummaryByExecID(ctx context.Context, execID string, namespaceID string) (models.ExecutionSummary, error)

func (*Core) GetExecutionSummaryPaginated

func (c *Core) GetExecutionSummaryPaginated(ctx context.Context, f models.Flow, namespaceID string, limit, offset int) ([]models.ExecutionSummary, int64, int64, error)

func (*Core) GetFlowByID

func (c *Core) GetFlowByID(id string, namespaceID string) (models.Flow, error)

GetFlowByID returns a flow from memory using the flow slug (id) and namespace

func (*Core) GetFlowCountByPrefix added in v0.8.0

func (c *Core) GetFlowCountByPrefix(ctx context.Context, namespaceID, prefix string) (int64, error)

GetFlowCountByPrefix returns the number of active flows with a given prefix in a namespace

func (*Core) GetFlowFromLogID

func (c *Core) GetFlowFromLogID(logID string, namespaceID string) (models.Flow, error)

func (*Core) GetFlowPrefix added in v0.8.0

func (c *Core) GetFlowPrefix(ctx context.Context, prefixUUID, namespaceID string) (models.FlowPrefix, error)

GetFlowPrefix returns a flow prefix by UUID.

func (*Core) GetFlowSecretByID

func (c *Core) GetFlowSecretByID(ctx context.Context, id string, namespaceID string) (models.FlowSecret, error)

func (*Core) GetFlowsByPrefix added in v0.8.0

func (c *Core) GetFlowsByPrefix(ctx context.Context, namespaceID, prefix string) ([]models.Flow, error)

GetFlowsByPrefix returns all flows in a namespace with the given prefix name

func (*Core) GetFlowsPaginated

func (c *Core) GetFlowsPaginated(ctx context.Context, namespaceID string, userID string, limit, offset int) ([]models.Flow, int64, int64, error)

func (*Core) GetGroupByName

func (c *Core) GetGroupByName(ctx context.Context, name string) (models.Group, error)

func (*Core) GetGroupByUUID

func (c *Core) GetGroupByUUID(ctx context.Context, groupUUID string) (models.Group, error)

func (*Core) GetGroupWithUsers

func (c *Core) GetGroupWithUsers(ctx context.Context, groupUUID string) (models.GroupWithUsers, error)

func (*Core) GetInputForExec

func (c *Core) GetInputForExec(ctx context.Context, execID string, namespaceID string) (map[string]interface{}, error)

func (*Core) GetMemberPrefixes added in v0.8.0

func (c *Core) GetMemberPrefixes(ctx context.Context, namespaceID, membershipID string) ([]models.FlowPrefix, error)

GetMemberPrefixes returns the flow prefixes accessible to a specific namespace member. Admin/reviewer roles see all groups; user role sees only explicitly granted prefixes.

func (*Core) GetMergedSecretsForFlow

func (c *Core) GetMergedSecretsForFlow(ctx context.Context, flowID string, namespaceID string) (map[string]string, error)

GetMergedSecretsForFlow returns merged namespace + flow secrets (flow overrides namespace) This is the SecretsProviderFn implementation that should be used by the scheduler

func (*Core) GetNamespaceByID

func (c *Core) GetNamespaceByID(ctx context.Context, id string) (models.Namespace, error)

func (*Core) GetNamespaceByName

func (c *Core) GetNamespaceByName(ctx context.Context, name string) (models.Namespace, error)

func (*Core) GetNamespaceMembers

func (c *Core) GetNamespaceMembers(ctx context.Context, namespaceID string) ([]models.NamespaceMember, error)

GetNamespaceMembers returns all members of a namespace

func (*Core) GetNamespaceSecretByID

func (c *Core) GetNamespaceSecretByID(ctx context.Context, id string, namespaceID string) (models.NamespaceSecret, error)

func (*Core) GetNodeByID

func (c *Core) GetNodeByID(ctx context.Context, id string, namespaceID string) (models.Node, error)

func (*Core) GetNodeStats

func (c *Core) GetNodeStats(ctx context.Context, namespaceID string) (models.NodeStats, error)

func (*Core) GetNodesByNames

func (c *Core) GetNodesByNames(ctx context.Context, nodeNames []string, namespaceUUID uuid.UUID) ([]models.Node, error)

GetNodesByNames retrieves nodes by their names and returns a slice of models.Node This is used as a lookup function for converting flows to task models

func (*Core) GetNodesByTags added in v0.5.0

func (c *Core) GetNodesByTags(ctx context.Context, tags []string, namespaceUUID uuid.UUID) ([]models.Node, error)

GetNodesByTags retrieves nodes by the given tags. Nodes with any of the given tags will be returned

func (*Core) GetPermissionsForUser

func (c *Core) GetPermissionsForUser(userID string) (string, error)

GetPermissionsForUser returns the casbin policies for the user

func (*Core) GetSchedule added in v0.5.0

func (c *Core) GetSchedule(ctx context.Context, scheduleUUID, userUUID, namespaceID string) (models.Schedule, error)

func (*Core) GetScheduledExecutionsByFlow added in v0.4.0

func (c *Core) GetScheduledExecutionsByFlow(ctx context.Context, flowID int32, namespaceID string) ([]models.ScheduledExecution, error)

func (*Core) GetScheduledFlows

func (c *Core) GetScheduledFlows() []models.Flow

GetScheduledFlows returns all flows that have a cron schedule configured

func (*Core) GetSchedulerFlow

func (c *Core) GetSchedulerFlow(ctx context.Context, flowSlug string, namespaceUUID string) (scheduler.Flow, error)

GetSchedulerFlow loads a flow and converts it to scheduler.Flow format This function can be used as a FlowLoaderFn for the scheduler

func (*Core) GetUserByUUID

func (c *Core) GetUserByUUID(ctx context.Context, userUUID string) (models.User, error)

func (*Core) GetUserByUsername

func (c *Core) GetUserByUsername(ctx context.Context, username string) (models.User, error)

func (*Core) GetUserByUsernameWithGroups

func (c *Core) GetUserByUsernameWithGroups(ctx context.Context, username string) (models.UserWithGroups, error)

func (*Core) GetUserNamespaces

func (c *Core) GetUserNamespaces(ctx context.Context, userID string) ([]models.NamespaceWithRole, error)

GetUserNamespaces returns all namespaces a user has access to with their roles

func (*Core) GetUserWithUUIDWithGroups

func (c *Core) GetUserWithUUIDWithGroups(ctx context.Context, userUUID string) (models.UserWithGroups, error)

func (*Core) GrantPrefixAccessForMember added in v0.8.0

func (c *Core) GrantPrefixAccessForMember(ctx context.Context, namespaceID, membershipID, prefix string) error

GrantPrefixAccessForMember resolves a namespace member and grants prefix access

func (*Core) GrantSuperusersAdminAccessToAllNamespaces

func (c *Core) GrantSuperusersAdminAccessToAllNamespaces(ctx context.Context) error

GrantSuperusersAdminAccessToAllNamespaces queries for all users with superuser role and adds a grouping policy to them to have admin access to all namespaces

func (*Core) InitializeRBACPolicies

func (c *Core) InitializeRBACPolicies() error

InitializeRBACPolicies sets up the base policies for each role. Domain-based: "/*" for all namespaces, "/:ns/_" for ungrouped flows only.

func (*Core) ListFlowPrefixes added in v0.8.0

func (c *Core) ListFlowPrefixes(ctx context.Context, namespaceID string) ([]models.FlowPrefix, error)

ListFlowPrefixes returns all flow prefixes in a namespace.

func (*Core) ListFlowSecrets

func (c *Core) ListFlowSecrets(ctx context.Context, flowID string, namespaceID string) ([]models.FlowSecret, error)

func (*Core) ListNamespaceSecrets

func (c *Core) ListNamespaceSecrets(ctx context.Context, namespaceID string) ([]models.NamespaceSecret, error)

func (*Core) ListNamespaces

func (c *Core) ListNamespaces(ctx context.Context, userID string, name string, limit, offset int) ([]models.Namespace, int64, int64, error)

func (*Core) ListSchedules added in v0.5.0

func (c *Core) ListSchedules(ctx context.Context, flowSlug, userUUID, namespaceID string, limit, offset int) ([]models.Schedule, int64, int64, error)

ListSchedules returns a paginated list of all schedules (both user and system schedules)

func (*Core) LoadFlows

func (c *Core) LoadFlows(ctx context.Context) error

func (*Core) QueueFlowExecution

func (c *Core) QueueFlowExecution(ctx context.Context, f models.Flow, input map[string]interface{}, userUUID string, namespaceID string, scheduledAt *time.Time) (string, error)

QueueFlowExecution adds a flow in the execution queue. The ID returned is the execution queue ID. Exec ID should be universally unique, this is used to create the log stream and identify each execution If scheduledAt is provided, the flow will be scheduled to run at that time instead of immediately.

func (*Core) QueueFlowExecutionWithExecID added in v0.4.0

func (c *Core) QueueFlowExecutionWithExecID(ctx context.Context, f models.Flow, input map[string]interface{}, userUUID string, namespaceID string, execID string, scheduledAt *time.Time) (string, error)

QueueFlowExecutionWithExecID adds a flow in the execution queue with a pre-generated execution ID. If execID is empty, a new UUID is generated. Use this when files need to be uploaded before queuing.

func (*Core) RemoveNamespaceMember

func (c *Core) RemoveNamespaceMember(ctx context.Context, membershipID, namespaceID string) error

RemoveNamespaceMember removes a user or group from a namespace

func (*Core) RequestApproval

func (c *Core) RequestApproval(ctx context.Context, execID string, action models.Action, namespaceID string) (string, error)

func (*Core) ResolveGroupEmails added in v0.8.0

func (c *Core) ResolveGroupEmails(ctx context.Context, groupName string) ([]string, error)

ResolveGroupEmails resolves a group name to member email addresses. This implements the messengers.GroupResolver interface.

func (*Core) ResolvePrefixID added in v0.8.0

func (c *Core) ResolvePrefixID(ctx context.Context, prefix, namespaceID string) (sql.NullInt32, error)

ResolvePrefixID looks up a flow prefix by name in the given namespace (by UUID). Returns sql.NullInt32{Valid: false} when prefix is empty. Returns an error if the prefix name is non-empty but not found.

func (*Core) ResumeFlowExecution

func (c *Core) ResumeFlowExecution(ctx context.Context, execID string, actionID string, userUUID string, namespaceID string, retry bool) error

ResumeFlowExecution moves the task to a resume queue for further processing.

func (*Core) RetryFlowExecution added in v0.3.0

func (c *Core) RetryFlowExecution(ctx context.Context, execID string, userUUID string, namespaceID string) error

RetryFlowExecution retries a failed or cancelled execution from the point of failure. It automatically detects the retry point from CurrentActionID and resumes execution from there.

func (*Core) RevokePrefixAccess added in v0.8.0

func (c *Core) RevokePrefixAccess(ctx context.Context, subjectID, subjectType, namespaceID, prefix string) error

RevokePrefixAccess removes a user or group's access to a specific prefix (dual write: DB + Casbin)

func (*Core) RevokePrefixAccessForMember added in v0.8.0

func (c *Core) RevokePrefixAccessForMember(ctx context.Context, namespaceID, membershipID, prefix string) error

RevokePrefixAccessForMember resolves a namespace member and revokes prefix access

func (*Core) SearchCredentials

func (c *Core) SearchCredentials(ctx context.Context, filter string, limit, offset int, namespaceID string) ([]models.Credential, int64, int64, error)

func (*Core) SearchFlows

func (c *Core) SearchFlows(ctx context.Context, namespaceID string, userID string, query string, limit, offset int) ([]models.Flow, int64, int64, error)

func (*Core) SearchGroup

func (c *Core) SearchGroup(ctx context.Context, query string, limit, offset int) ([]models.GroupWithUsers, int64, int64, error)

func (*Core) SearchNodes

func (c *Core) SearchNodes(ctx context.Context, filter string, tags []string, limit, offset int, namespaceID string) ([]models.Node, int64, int64, error)

func (*Core) SearchUser

func (c *Core) SearchUser(ctx context.Context, query string, limit, offset int) ([]models.UserWithGroups, int64, int64, error)

func (*Core) StreamLogs

func (c *Core) StreamLogs(ctx context.Context, logID string, namespaceID string) (chan models.StreamMessage, error)

StreamLogs reads values from a stream from the beginning and returns a channel to which all the messages are sent. logID is the ID sent to the NewFlowExecution task

func (*Core) SyncScheduledFlowJobs added in v0.2.0

func (c *Core) SyncScheduledFlowJobs(ctx context.Context) ([]scheduler.ScheduledJob, error)

SyncScheduledFlowJobs loads scheduled flows from the database and converts them to scheduled jobs This function can be used as a JobSyncerFn for the scheduler

func (*Core) SynchronizePolicies

func (c *Core) SynchronizePolicies(ctx context.Context) error

SynchronizePolicies synchronizes Casbin grouping policies from the namespace_members table

func (*Core) SynchronizePrefixPolicies added in v0.8.0

func (c *Core) SynchronizePrefixPolicies(ctx context.Context) error

SynchronizePrefixPolicies reads the prefix_access table and rebuilds Casbin p policies

func (*Core) UpdateCredential

func (c *Core) UpdateCredential(ctx context.Context, id string, cred *models.Credential, namespaceID string) (models.Credential, error)

func (*Core) UpdateFlow

func (c *Core) UpdateFlow(ctx context.Context, f models.Flow, namespaceID string) error

func (*Core) UpdateFlowPrefix added in v0.8.0

func (c *Core) UpdateFlowPrefix(ctx context.Context, prefixUUID, namespaceID, name, description string) (models.FlowPrefix, error)

UpdateFlowPrefix updates an existing flow prefix.

func (*Core) UpdateFlowSecret

func (c *Core) UpdateFlowSecret(ctx context.Context, id string, secret models.FlowSecret, namespaceID string) (models.FlowSecret, error)

func (*Core) UpdateGroup

func (c *Core) UpdateGroup(ctx context.Context, groupUUID, name, description string) (models.GroupWithUsers, error)

func (*Core) UpdateNamespace

func (c *Core) UpdateNamespace(ctx context.Context, id string, namespace models.Namespace) (models.Namespace, error)

func (*Core) UpdateNamespaceMember

func (c *Core) UpdateNamespaceMember(ctx context.Context, membershipID, namespaceID string, role models.NamespaceRole) error

UpdateNamespaceMember updates the role of a user or group in a namespace

func (*Core) UpdateNamespaceSecret

func (c *Core) UpdateNamespaceSecret(ctx context.Context, id string, secret models.NamespaceSecret, namespaceID string) (models.NamespaceSecret, error)

func (*Core) UpdateNode

func (c *Core) UpdateNode(ctx context.Context, id string, node *models.Node, namespaceID string) (models.Node, error)

func (*Core) UpdateSchedule added in v0.5.0

func (c *Core) UpdateSchedule(ctx context.Context, scheduleUUID, cron, timezone string, inputs map[string]interface{}, isActive bool, userUUID, namespaceID string) (models.Schedule, error)

func (*Core) UpdateUser

func (c *Core) UpdateUser(ctx context.Context, userUUID string, name string, username string, groups []string) (models.UserWithGroups, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL