Documentation
¶
Index ¶
- Constants
- func CoalesceNullString(s sql.NullString) string
- func IsTerminalPhase(phase common.ActionPhase) bool
- func NewTaskService(repo interfaces.Repository, projectClient projectconnect.ProjectServiceClient) taskconnect.TaskServiceHandler
- func NewTriggerService(repo interfaces.Repository) triggerconnect.TriggerServiceHandler
- type AbortReconciler
- type AbortReconcilerConfig
- type AuthMetadataService
- type IdentityService
- type K8sLogStreamer
- type LogStreamer
- type ProjectService
- func (s *ProjectService) CreateProject(ctx context.Context, req *connect.Request[project.CreateProjectRequest]) (*connect.Response[project.CreateProjectResponse], error)
- func (s *ProjectService) GetProject(ctx context.Context, req *connect.Request[project.GetProjectRequest]) (*connect.Response[project.GetProjectResponse], error)
- func (s *ProjectService) ListProjects(ctx context.Context, req *connect.Request[project.ListProjectsRequest]) (*connect.Response[project.ListProjectsResponse], error)
- func (s *ProjectService) UpdateProject(ctx context.Context, req *connect.Request[project.UpdateProjectRequest]) (*connect.Response[project.UpdateProjectResponse], error)
- type RunLogsService
- type RunService
- func (s *RunService) AbortAction(ctx context.Context, req *connect.Request[workflow.AbortActionRequest]) (*connect.Response[workflow.AbortActionResponse], error)
- func (s *RunService) AbortRun(ctx context.Context, req *connect.Request[workflow.AbortRunRequest]) (*connect.Response[workflow.AbortRunResponse], error)
- func (s *RunService) CreateRun(ctx context.Context, req *connect.Request[workflow.CreateRunRequest]) (*connect.Response[workflow.CreateRunResponse], error)
- func (s *RunService) GetActionData(ctx context.Context, req *connect.Request[workflow.GetActionDataRequest]) (*connect.Response[workflow.GetActionDataResponse], error)
- func (s *RunService) GetActionDataURIs(ctx context.Context, req *connect.Request[workflow.GetActionDataURIsRequest]) (*connect.Response[workflow.GetActionDataURIsResponse], error)
- func (s *RunService) GetActionDetails(ctx context.Context, req *connect.Request[workflow.GetActionDetailsRequest]) (*connect.Response[workflow.GetActionDetailsResponse], error)
- func (s *RunService) GetActionLogContext(ctx context.Context, req *connect.Request[workflow.GetActionLogContextRequest]) (*connect.Response[workflow.GetActionLogContextResponse], error)
- func (s *RunService) GetRunDetails(ctx context.Context, req *connect.Request[workflow.GetRunDetailsRequest]) (*connect.Response[workflow.GetRunDetailsResponse], error)
- func (s *RunService) ListActions(ctx context.Context, req *connect.Request[workflow.ListActionsRequest]) (*connect.Response[workflow.ListActionsResponse], error)
- func (s *RunService) ListRuns(ctx context.Context, req *connect.Request[workflow.ListRunsRequest]) (*connect.Response[workflow.ListRunsResponse], error)
- func (s *RunService) RecordAction(ctx context.Context, req *connect.Request[workflow.RecordActionRequest]) (*connect.Response[workflow.RecordActionResponse], error)
- func (s *RunService) RecordActionEventStream(ctx context.Context, ...) error
- func (s *RunService) RecordActionEvents(ctx context.Context, req *connect.Request[workflow.RecordActionEventsRequest]) (*connect.Response[workflow.RecordActionEventsResponse], error)
- func (s *RunService) RecordActionStream(ctx context.Context, ...) error
- func (s *RunService) SignalEvent(ctx context.Context, _ *connect.Request[workflow.SignalEventRequest]) (*connect.Response[workflow.SignalEventResponse], error)
- func (s *RunService) UpdateActionStatus(ctx context.Context, req *connect.Request[workflow.UpdateActionStatusRequest]) (*connect.Response[workflow.UpdateActionStatusResponse], error)
- func (s *RunService) UpdateActionStatusStream(ctx context.Context, ...) error
- func (s *RunService) WatchActionDetails(ctx context.Context, req *connect.Request[workflow.WatchActionDetailsRequest], ...) error
- func (s *RunService) WatchActions(ctx context.Context, req *connect.Request[workflow.WatchActionsRequest], ...) error
- func (s *RunService) WatchClusterEvents(ctx context.Context, req *connect.Request[workflow.WatchClusterEventsRequest], ...) error
- func (s *RunService) WatchGroups(ctx context.Context, req *connect.Request[workflow.WatchGroupsRequest], ...) error
- func (s *RunService) WatchRunDetails(ctx context.Context, req *connect.Request[workflow.WatchRunDetailsRequest], ...) error
- func (s *RunService) WatchRuns(ctx context.Context, req *connect.Request[workflow.WatchRunsRequest], ...) error
Constants ¶
const (
RootActionName = "a0"
)
Variables ¶
This section is empty.
Functions ¶
func CoalesceNullString ¶ added in v2.0.9
func CoalesceNullString(s sql.NullString) string
func IsTerminalPhase ¶ added in v2.0.9
func IsTerminalPhase(phase common.ActionPhase) bool
func NewTaskService ¶ added in v2.0.1
func NewTaskService(repo interfaces.Repository, projectClient projectconnect.ProjectServiceClient) taskconnect.TaskServiceHandler
func NewTriggerService ¶ added in v2.0.9
func NewTriggerService(repo interfaces.Repository) triggerconnect.TriggerServiceHandler
Types ¶
type AbortReconciler ¶ added in v2.0.10
type AbortReconciler struct {
// contains filtered or unexported fields
}
AbortReconciler watches for abort requests and drives pod termination to completion with exponential backoff retries.
func NewAbortReconciler ¶ added in v2.0.10
func NewAbortReconciler(repo interfaces.Repository, actionsClient actionsconnect.ActionsServiceClient, cfg AbortReconcilerConfig) *AbortReconciler
NewAbortReconciler creates a new AbortReconciler. Zero-value cfg fields are filled with defaults.
func (*AbortReconciler) Push ¶ added in v2.0.10
func (r *AbortReconciler) Push(ctx context.Context, actionID *common.ActionIdentifier, reason string)
Push enqueues an abort request for the given action. Safe to call concurrently. No-op if the key is already queued (dedup).
type AbortReconcilerConfig ¶ added in v2.0.10
type AbortReconcilerConfig struct {
// Workers is the number of concurrent pod-termination goroutines.
Workers int
// MaxAttempts is the maximum number of actionsClient.Abort calls per action before giving up.
MaxAttempts int
// QueueSize is the buffer size of the internal channel.
QueueSize int
// InitialDelay is the backoff duration before the first retry.
InitialDelay time.Duration
// MaxDelay caps the exponential backoff.
MaxDelay time.Duration
}
AbortReconcilerConfig holds tunables for the reconciler.
type AuthMetadataService ¶ added in v2.0.10
type AuthMetadataService struct {
authconnect.UnimplementedAuthMetadataServiceHandler
// contains filtered or unexported fields
}
AuthMetadataService implements the AuthMetadataServiceHandler interface.
func NewAuthMetadataService ¶ added in v2.0.10
func NewAuthMetadataService(dataplaneDomain string) *AuthMetadataService
NewAuthMetadataService creates a new AuthMetadataService instance.
func (*AuthMetadataService) GetPublicClientConfig ¶ added in v2.0.10
func (s *AuthMetadataService) GetPublicClientConfig( ctx context.Context, req *connect.Request[auth.GetPublicClientConfigRequest], ) (*connect.Response[auth.GetPublicClientConfigResponse], error)
type IdentityService ¶ added in v2.0.8
type IdentityService struct{}
IdentityService implements the IdentityServiceHandler interface.
func NewIdentityService ¶ added in v2.0.8
func NewIdentityService() *IdentityService
NewIdentityService creates a new IdentityService instance.
func (*IdentityService) UserInfo ¶ added in v2.0.8
func (s *IdentityService) UserInfo( ctx context.Context, req *connect.Request[auth.UserInfoRequest], ) (*connect.Response[auth.UserInfoResponse], error)
UserInfo returns information about the currently logged in user. TODO: Wire with real auth to populate user info from the authenticated context.
type K8sLogStreamer ¶ added in v2.0.9
type K8sLogStreamer struct {
// contains filtered or unexported fields
}
K8sLogStreamer streams logs directly from Kubernetes pods.
func NewK8sLogStreamer ¶ added in v2.0.9
func NewK8sLogStreamer(k8sConfig *rest.Config) (*K8sLogStreamer, error)
NewK8sLogStreamer creates a K8sLogStreamer from a Kubernetes REST config. It clears the timeout so that long-lived log streams are not interrupted.
func (*K8sLogStreamer) TailLogs ¶ added in v2.0.9
func (s *K8sLogStreamer) TailLogs(ctx context.Context, logContext *core.LogContext, stream *connect.ServerStream[workflow.TailLogsResponse]) error
TailLogs streams log lines for the given LogContext from a Kubernetes pod.
type LogStreamer ¶ added in v2.0.9
type LogStreamer interface {
TailLogs(ctx context.Context, logContext *core.LogContext, stream *connect.ServerStream[workflow.TailLogsResponse]) error
}
LogStreamer abstracts log fetching from different backends.
type ProjectService ¶ added in v2.0.8
type ProjectService struct {
// contains filtered or unexported fields
}
func NewProjectService ¶ added in v2.0.8
func NewProjectService(projectRepo interfaces.ProjectRepo, domains []*project.Domain) *ProjectService
func (*ProjectService) CreateProject ¶ added in v2.0.8
func (s *ProjectService) CreateProject( ctx context.Context, req *connect.Request[project.CreateProjectRequest], ) (*connect.Response[project.CreateProjectResponse], error)
func (*ProjectService) GetProject ¶ added in v2.0.8
func (s *ProjectService) GetProject( ctx context.Context, req *connect.Request[project.GetProjectRequest], ) (*connect.Response[project.GetProjectResponse], error)
func (*ProjectService) ListProjects ¶ added in v2.0.8
func (s *ProjectService) ListProjects( ctx context.Context, req *connect.Request[project.ListProjectsRequest], ) (*connect.Response[project.ListProjectsResponse], error)
func (*ProjectService) UpdateProject ¶ added in v2.0.8
func (s *ProjectService) UpdateProject( ctx context.Context, req *connect.Request[project.UpdateProjectRequest], ) (*connect.Response[project.UpdateProjectResponse], error)
type RunLogsService ¶ added in v2.0.9
type RunLogsService struct {
// contains filtered or unexported fields
}
RunLogsService implements the RunLogsServiceHandler interface.
func NewRunLogsService ¶ added in v2.0.9
func NewRunLogsService(repo interfaces.Repository, streamer LogStreamer) *RunLogsService
NewRunLogsService creates a new RunLogsService.
func (*RunLogsService) TailLogs ¶ added in v2.0.9
func (s *RunLogsService) TailLogs(ctx context.Context, req *connect.Request[workflow.TailLogsRequest], stream *connect.ServerStream[workflow.TailLogsResponse]) error
TailLogs streams pod logs for an action attempt.
type RunService ¶
type RunService struct {
// contains filtered or unexported fields
}
RunService implements the RunServiceHandler interface
func NewRunService ¶
func NewRunService( repo interfaces.Repository, actionsClient actionsconnect.ActionsServiceClient, dataProxyClient dataproxyconnect.DataProxyServiceClient, projectClient projectconnect.ProjectServiceClient, storagePrefix string, dataStore *storage.DataStore, reconciler *AbortReconciler, ) *RunService
NewRunService creates a new RunService instance
func (*RunService) AbortAction ¶
func (s *RunService) AbortAction( ctx context.Context, req *connect.Request[workflow.AbortActionRequest], ) (*connect.Response[workflow.AbortActionResponse], error)
AbortAction aborts a specific action
func (*RunService) AbortRun ¶
func (s *RunService) AbortRun( ctx context.Context, req *connect.Request[workflow.AbortRunRequest], ) (*connect.Response[workflow.AbortRunResponse], error)
AbortRun aborts a run
func (*RunService) CreateRun ¶
func (s *RunService) CreateRun( ctx context.Context, req *connect.Request[workflow.CreateRunRequest], ) (*connect.Response[workflow.CreateRunResponse], error)
CreateRun creates a new run
func (*RunService) GetActionData ¶
func (s *RunService) GetActionData( ctx context.Context, req *connect.Request[workflow.GetActionDataRequest], ) (*connect.Response[workflow.GetActionDataResponse], error)
GetActionData keeps backward compatibility by delegating data reads to DataProxy.
func (*RunService) GetActionDataURIs ¶ added in v2.0.12
func (s *RunService) GetActionDataURIs( ctx context.Context, req *connect.Request[workflow.GetActionDataURIsRequest], ) (*connect.Response[workflow.GetActionDataURIsResponse], error)
func (*RunService) GetActionDetails ¶
func (s *RunService) GetActionDetails( ctx context.Context, req *connect.Request[workflow.GetActionDetailsRequest], ) (*connect.Response[workflow.GetActionDetailsResponse], error)
GetActionDetails gets detailed information about an action from the DB.
func (*RunService) GetActionLogContext ¶ added in v2.0.12
func (s *RunService) GetActionLogContext( ctx context.Context, req *connect.Request[workflow.GetActionLogContextRequest], ) (*connect.Response[workflow.GetActionLogContextResponse], error)
func (*RunService) GetRunDetails ¶
func (s *RunService) GetRunDetails( ctx context.Context, req *connect.Request[workflow.GetRunDetailsRequest], ) (*connect.Response[workflow.GetRunDetailsResponse], error)
GetRunDetails gets detailed information about a run from the DB.
func (*RunService) ListActions ¶
func (s *RunService) ListActions( ctx context.Context, req *connect.Request[workflow.ListActionsRequest], ) (*connect.Response[workflow.ListActionsResponse], error)
ListActions lists actions for a run
func (*RunService) ListRuns ¶
func (s *RunService) ListRuns( ctx context.Context, req *connect.Request[workflow.ListRunsRequest], ) (*connect.Response[workflow.ListRunsResponse], error)
ListRuns lists runs based on filter criteria
func (*RunService) RecordAction ¶ added in v2.0.8
func (s *RunService) RecordAction( ctx context.Context, req *connect.Request[workflow.RecordActionRequest], ) (*connect.Response[workflow.RecordActionResponse], error)
RecordAction records a new action in the database.
func (*RunService) RecordActionEventStream ¶ added in v2.0.8
func (s *RunService) RecordActionEventStream( ctx context.Context, stream *connect.BidiStream[workflow.RecordActionEventStreamRequest, workflow.RecordActionEventStreamResponse], ) error
RecordActionEventStream is the bidirectional streaming variant of RecordActionEvents.
func (*RunService) RecordActionEvents ¶ added in v2.0.8
func (s *RunService) RecordActionEvents( ctx context.Context, req *connect.Request[workflow.RecordActionEventsRequest], ) (*connect.Response[workflow.RecordActionEventsResponse], error)
RecordActionEvents records a batch of action events.
func (*RunService) RecordActionStream ¶ added in v2.0.8
func (s *RunService) RecordActionStream( ctx context.Context, stream *connect.BidiStream[workflow.RecordActionStreamRequest, workflow.RecordActionStreamResponse], ) error
RecordActionStream is the bidirectional streaming variant of RecordAction.
func (*RunService) SignalEvent ¶ added in v2.0.18
func (s *RunService) SignalEvent( ctx context.Context, _ *connect.Request[workflow.SignalEventRequest], ) (*connect.Response[workflow.SignalEventResponse], error)
SignalEvent resolves a paused condition action. Condition signalling is not supported by this single-binary backend; downstream backends that route to a dedicated actions service override this RPC.
func (*RunService) UpdateActionStatus ¶ added in v2.0.8
func (s *RunService) UpdateActionStatus( ctx context.Context, req *connect.Request[workflow.UpdateActionStatusRequest], ) (*connect.Response[workflow.UpdateActionStatusResponse], error)
UpdateActionStatus updates the phase of an action.
func (*RunService) UpdateActionStatusStream ¶ added in v2.0.8
func (s *RunService) UpdateActionStatusStream( ctx context.Context, stream *connect.BidiStream[workflow.UpdateActionStatusStreamRequest, workflow.UpdateActionStatusStreamResponse], ) error
UpdateActionStatusStream is the bidirectional streaming variant of UpdateActionStatus.
func (*RunService) WatchActionDetails ¶
func (s *RunService) WatchActionDetails( ctx context.Context, req *connect.Request[workflow.WatchActionDetailsRequest], stream *connect.ServerStream[workflow.WatchActionDetailsResponse], ) error
WatchActionDetails streams action details updates from the DB.
func (*RunService) WatchActions ¶
func (s *RunService) WatchActions( ctx context.Context, req *connect.Request[workflow.WatchActionsRequest], stream *connect.ServerStream[workflow.WatchActionsResponse], ) error
WatchActions streams action updates for a run from the DB.
func (*RunService) WatchClusterEvents ¶
func (s *RunService) WatchClusterEvents( ctx context.Context, req *connect.Request[workflow.WatchClusterEventsRequest], stream *connect.ServerStream[workflow.WatchClusterEventsResponse], ) error
WatchClusterEvents streams Kubernetes cluster events recorded in action_events.
func (*RunService) WatchGroups ¶
func (s *RunService) WatchGroups(ctx context.Context, req *connect.Request[workflow.WatchGroupsRequest], stream *connect.ServerStream[workflow.WatchGroupsResponse]) error
WatchGroups streams task groups (runs grouped by task) from the database.
func (*RunService) WatchRunDetails ¶
func (s *RunService) WatchRunDetails( ctx context.Context, req *connect.Request[workflow.WatchRunDetailsRequest], stream *connect.ServerStream[workflow.WatchRunDetailsResponse], ) error
WatchRunDetails streams run details updates from the DB.
func (*RunService) WatchRuns ¶
func (s *RunService) WatchRuns( ctx context.Context, req *connect.Request[workflow.WatchRunsRequest], stream *connect.ServerStream[workflow.WatchRunsResponse], ) error
WatchRuns streams run updates based on filter criteria