Documentation
¶
Overview ¶
Package task provides task queue domain types for async work processing.
Index ¶
- type Filter
- type Operation
- type PrescribedOperations
- func (p PrescribedOperations) All() []Operation
- func (p PrescribedOperations) CreateNewRepository() []Operation
- func (p PrescribedOperations) IndexCommit() []Operation
- func (p PrescribedOperations) RescanCommit() []Operation
- func (p PrescribedOperations) ScanAndIndexCommit() []Operation
- func (p PrescribedOperations) SyncRepository() []Operation
- type Priority
- type ReportingState
- type Status
- func (s Status) Complete() Status
- func (s Status) CompletionPercent() float64
- func (s Status) CreatedAt() time.Time
- func (s Status) Current() int
- func (s Status) Error() string
- func (s Status) Fail(errorMsg string) Status
- func (s Status) ID() string
- func (s Status) Labels() map[string]string
- func (s Status) Message() string
- func (s Status) Operation() Operation
- func (s Status) Parent() *Status
- func (s Status) SetCurrent(current int, message string) Status
- func (s Status) SetTotal(total int) Status
- func (s Status) SetTrackingInfo(trackableID int64, trackableType TrackableType) Status
- func (s Status) Skip(message string) Status
- func (s Status) State() ReportingState
- func (s Status) Total() int
- func (s Status) TrackableID() int64
- func (s Status) TrackableType() TrackableType
- func (s Status) UpdatedAt() time.Time
- func (s Status) WithLabel(key, value string) Status
- type StatusStore
- type Task
- func (t Task) CreatedAt() time.Time
- func (t Task) DedupKey() string
- func (t Task) ID() int64
- func (t Task) Operation() Operation
- func (t Task) Payload() map[string]any
- func (t Task) PayloadJSON() ([]byte, error)
- func (t Task) Priority() int
- func (t Task) UpdatedAt() time.Time
- func (t Task) WithID(id int64) Task
- func (t Task) WithTimestamps(createdAt, updatedAt time.Time) Task
- type TaskStore
- type TrackableType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter specifies criteria for querying tasks. Immutable — builder methods return a new Filter.
func (Filter) WithOperation ¶
WithOperation returns a new Filter constrained to the given operation.
type Operation ¶
type Operation string
Operation represents the type of task operation.
const ( OperationRoot Operation = "kodit.root" OperationCreateIndex Operation = "kodit.index.create" OperationRunIndex Operation = "kodit.index.run" OperationRefreshWorkingCopy Operation = "kodit.index.run.refresh_working_copy" OperationDeleteOldSnippets Operation = "kodit.index.run.delete_old_snippets" OperationExtractSnippets Operation = "kodit.index.run.extract_snippets" OperationCreateBM25Index Operation = "kodit.index.run.create_bm25_index" OperationCreateCodeEmbeddings Operation = "kodit.index.run.create_code_embeddings" OperationEnrichSnippets Operation = "kodit.index.run.enrich_snippets" OperationCreateTextEmbeddings Operation = "kodit.index.run.create_text_embeddings" OperationUpdateIndexTimestamp Operation = "kodit.index.run.update_index_timestamp" OperationClearFileProcessingStatuses Operation = "kodit.index.run.clear_file_processing_statuses" OperationRepository Operation = "kodit.repository" OperationCreateRepository Operation = "kodit.repository.create" OperationDeleteRepository Operation = "kodit.repository.delete" OperationCloneRepository Operation = "kodit.repository.clone" OperationSyncRepository Operation = "kodit.repository.sync" OperationCommit Operation = "kodit.commit" OperationExtractSnippetsForCommit Operation = "kodit.commit.extract_snippets" OperationCreateBM25IndexForCommit Operation = "kodit.commit.create_bm25_index" OperationCreateCodeEmbeddingsForCommit Operation = "kodit.commit.create_code_embeddings" OperationCreateSummaryEnrichmentForCommit Operation = "kodit.commit.create_summary_enrichment" OperationCreateSummaryEmbeddingsForCommit Operation = "kodit.commit.create_summary_embeddings" OperationCreateArchitectureEnrichmentForCommit Operation = "kodit.commit.create_architecture_enrichment" OperationCreatePublicAPIDocsForCommit Operation = "kodit.commit.create_public_api_docs" OperationCreateCommitDescriptionForCommit Operation = "kodit.commit.create_commit_description" OperationCreateDatabaseSchemaForCommit Operation = "kodit.commit.create_database_schema" OperationCreateCookbookForCommit Operation = "kodit.commit.create_cookbook" OperationExtractExamplesForCommit Operation = "kodit.commit.extract_examples" OperationCreateExampleSummaryForCommit Operation = "kodit.commit.create_example_summary" OperationCreateExampleCodeEmbeddingsForCommit Operation = "kodit.commit.create_example_code_embeddings" OperationCreateExampleSummaryEmbeddingsForCommit Operation = "kodit.commit.create_example_summary_embeddings" OperationGenerateWikiForCommit Operation = "kodit.commit.generate_wiki" OperationScanCommit Operation = "kodit.commit.scan" OperationRescanCommit Operation = "kodit.commit.rescan" )
Operation values for the task queue system.
func (Operation) IsCommitOperation ¶
IsCommitOperation returns true if this is a commit-level operation.
func (Operation) IsRepositoryOperation ¶
IsRepositoryOperation returns true if this is a repository-level operation.
type PrescribedOperations ¶
type PrescribedOperations struct {
// contains filtered or unexported fields
}
PrescribedOperations provides predefined operation sequences for common workflows.
func NewPrescribedOperations ¶
func NewPrescribedOperations(examples bool, enrichments bool) PrescribedOperations
NewPrescribedOperations creates a PrescribedOperations with the given settings. When enrichments is false, LLM-dependent operations (summaries, architecture docs, commit descriptions, cookbooks, wiki) are excluded from all workflows.
func (PrescribedOperations) All ¶
func (p PrescribedOperations) All() []Operation
All returns every operation that appears in any prescribed workflow. Used at startup to validate that all required handlers are registered.
func (PrescribedOperations) CreateNewRepository ¶
func (p PrescribedOperations) CreateNewRepository() []Operation
CreateNewRepository returns the operations needed to create a new repository.
func (PrescribedOperations) IndexCommit ¶
func (p PrescribedOperations) IndexCommit() []Operation
IndexCommit returns the operation sequence for indexing an already-scanned commit.
func (PrescribedOperations) RescanCommit ¶
func (p PrescribedOperations) RescanCommit() []Operation
RescanCommit returns the operation sequence for rescanning a commit (full reindex).
func (PrescribedOperations) ScanAndIndexCommit ¶
func (p PrescribedOperations) ScanAndIndexCommit() []Operation
ScanAndIndexCommit returns the full operation sequence for scanning and indexing a commit.
func (PrescribedOperations) SyncRepository ¶
func (p PrescribedOperations) SyncRepository() []Operation
SyncRepository returns the operations needed to sync a repository.
type Priority ¶
type Priority int
Priority represents task queue priority levels. Values are spaced far apart to ensure batch offsets (up to ~150 for 15 tasks) never cause a lower priority level to exceed a higher one.
type ReportingState ¶
type ReportingState string
ReportingState represents the state of task reporting.
const ( ReportingStateStarted ReportingState = "started" ReportingStateInProgress ReportingState = "in_progress" ReportingStateCompleted ReportingState = "completed" ReportingStateFailed ReportingState = "failed" ReportingStateSkipped ReportingState = "skipped" )
ReportingState values.
func (ReportingState) IsTerminal ¶
func (s ReportingState) IsTerminal() bool
IsTerminal returns true if the state represents a terminal (final) state.
type Status ¶
type Status struct {
// contains filtered or unexported fields
}
Status represents the status of a task with progress tracking.
func NewStatus ¶
func NewStatus( operation Operation, parent *Status, trackableType TrackableType, trackableID int64, ) Status
NewStatus creates a new Status for the given operation.
func NewStatusFull ¶
func NewStatusFull( id string, state ReportingState, operation Operation, message string, createdAt, updatedAt time.Time, total, current int, errorMessage string, parent *Status, trackableID int64, trackableType TrackableType, ) Status
NewStatusFull creates a Status with all fields (used by repository).
func NewStatusWithDefaults ¶
NewStatusWithDefaults creates a Status with default tracking.
func (Status) Complete ¶
Complete marks the task as completed. If already in a terminal state, no change is made.
func (Status) CompletionPercent ¶
CompletionPercent calculates the completion percentage.
func (Status) SetCurrent ¶
SetCurrent sets the current progress and optionally updates the message.
func (Status) SetTrackingInfo ¶
func (s Status) SetTrackingInfo(trackableID int64, trackableType TrackableType) Status
SetTrackingInfo sets the tracking information.
func (Status) TrackableID ¶
TrackableID returns the trackable entity ID.
func (Status) TrackableType ¶
func (s Status) TrackableType() TrackableType
TrackableType returns the trackable entity type.
type StatusStore ¶
type StatusStore interface {
// Get retrieves a task status by ID.
Get(ctx context.Context, id string) (Status, error)
// FindByTrackable retrieves task statuses for a trackable entity.
FindByTrackable(ctx context.Context, trackableType TrackableType, trackableID int64) ([]Status, error)
// Save creates a new task status or updates an existing one.
// If the status has a parent, the parent chain is saved first.
Save(ctx context.Context, status Status) (Status, error)
// SaveBulk creates or updates multiple task statuses.
SaveBulk(ctx context.Context, statuses []Status) ([]Status, error)
// Delete removes a task status.
Delete(ctx context.Context, status Status) error
// DeleteByTrackable removes task statuses for a trackable entity.
DeleteByTrackable(ctx context.Context, trackableType TrackableType, trackableID int64) error
// Count returns the total number of task statuses.
Count(ctx context.Context) (int64, error)
// LoadWithHierarchy loads all task statuses for a trackable entity
// with their parent-child relationships reconstructed.
LoadWithHierarchy(
ctx context.Context,
trackableType TrackableType,
trackableID int64,
) ([]Status, error)
}
StatusStore defines the interface for Status persistence operations.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents an item in the queue waiting to be processed. If the item exists, it is in the queue and waiting to be processed. There is no status associated - existence implies pending.
func NewTask ¶
NewTask creates a new Task with the given operation, priority, and payload. The dedup key is generated automatically from the operation and payload.
func NewTaskWithID ¶
func NewTaskWithID( id int64, dedupKey string, operation Operation, priority int, payload map[string]any, createdAt, updatedAt time.Time, ) Task
NewTaskWithID creates a Task with all fields (used by repository).
func (Task) PayloadJSON ¶
PayloadJSON returns the payload as JSON bytes.
type TaskStore ¶
type TaskStore interface {
// Get retrieves a task by ID.
Get(ctx context.Context, id int64) (Task, error)
// FindAll retrieves all tasks.
FindAll(ctx context.Context) ([]Task, error)
// FindPending retrieves pending tasks ordered by priority.
FindPending(ctx context.Context, options ...repository.Option) ([]Task, error)
// Save creates a new task or updates an existing one.
// Uses dedup_key for conflict resolution - if a task with the same
// dedup_key exists, it will be returned instead of creating a duplicate.
Save(ctx context.Context, task Task) (Task, error)
// SaveBulk creates or updates multiple tasks.
SaveBulk(ctx context.Context, tasks []Task) ([]Task, error)
// Delete removes a task.
Delete(ctx context.Context, task Task) error
// DeleteAll removes all tasks.
DeleteAll(ctx context.Context) error
// CountPending returns the number of pending tasks.
CountPending(ctx context.Context, options ...repository.Option) (int64, error)
// Exists checks if a task with the given ID exists.
Exists(ctx context.Context, id int64) (bool, error)
// Dequeue retrieves and removes the highest priority task.
// Returns the task and true if one was found, or zero-value and false if queue is empty.
Dequeue(ctx context.Context) (Task, bool, error)
// DequeueByOperation retrieves and removes the highest priority task
// of a specific operation type.
DequeueByOperation(ctx context.Context, operation Operation) (Task, bool, error)
}
TaskStore defines the interface for Task persistence operations.
type TrackableType ¶
type TrackableType string
TrackableType represents types of trackable entities.
const ( TrackableTypeIndex TrackableType = "indexes" TrackableTypeRepository TrackableType = "kodit.repository" TrackableTypeCommit TrackableType = "kodit.commit" )
TrackableType values.