prpipeline

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: GPL-3.0 Imports: 11 Imported by: 0

Documentation

Overview

Package prpipeline provides the PR discovery, qualification, and auto-merge pipeline. It implements the core workflow: discover PRs → qualify → merge → update labels.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action string

Action represents the type of work to be performed on a PR.

const (
	// ActionDiscover indicates the PR should be discovered and labeled
	ActionDiscover Action = "discover"

	// ActionQualify indicates the PR should be qualified against validation rules
	ActionQualify Action = "qualify"

	// ActionMerge indicates the PR should be merged
	ActionMerge Action = "merge"
)

func (Action) String

func (a Action) String() string

String returns the string representation of the action.

type Discovery

type Discovery struct {
	// contains filtered or unexported fields
}

Discovery handles PR discovery and initial state management.

func NewDiscovery

func NewDiscovery(githubClient GitHubClient, labelManager labels.LabelManager, logger Logger) *Discovery

NewDiscovery creates a new PR discovery component.

func (*Discovery) DiscoverPRs

func (d *Discovery) DiscoverPRs(ctx context.Context, repo github.Repository) (*DiscoveryResult, error)

DiscoverPRs discovers PRs in a repository and creates appropriate work items.

func (*Discovery) GetPRState

func (d *Discovery) GetPRState(pr github.PullRequest) labels.State

GetPRState is a helper method to get the current state of a PR.

func (*Discovery) RefreshPR

func (d *Discovery) RefreshPR(ctx context.Context, repo github.Repository, prNumber int) (*github.PullRequest, error)

RefreshPR refreshes a PR's information from GitHub.

type DiscoveryResult

type DiscoveryResult struct {
	// Repository is the repository that was scanned
	Repository github.Repository

	// DiscoveredPRs contains newly discovered PRs
	DiscoveredPRs []github.PullRequest

	// ExistingPRs contains PRs that were already known
	ExistingPRs []github.PullRequest

	// WorkItems contains work items created during discovery
	WorkItems []WorkItem

	// Errors contains any non-fatal errors that occurred
	Errors []error

	// Duration is how long discovery took
	Duration time.Duration

	// Timestamp is when discovery completed
	Timestamp time.Time
}

DiscoveryResult represents the result of PR discovery.

type GitHubClient

type GitHubClient interface {
	// ListPRs retrieves all open pull requests for a repository
	ListPRs(ctx context.Context, repo github.Repository) ([]github.PullRequest, error)

	// GetPR retrieves a specific pull request
	GetPR(ctx context.Context, repo github.Repository, number int) (*github.PullRequest, error)

	// MergePR merges a pull request
	MergePR(ctx context.Context, repo github.Repository, number int, commitMsg string) error

	// GetPRFiles retrieves files changed in a pull request
	GetPRFiles(ctx context.Context, repo github.Repository, number int) ([]github.File, error)
}

GitHubClient defines the GitHub operations needed by the PR pipeline.

type InMemoryScheduler

type InMemoryScheduler struct {
	// contains filtered or unexported fields
}

InMemoryScheduler provides a simple in-memory work scheduler. This is suitable for single-instance deployments. For distributed deployments, this could be replaced with a Redis or database-backed scheduler.

func NewInMemoryScheduler

func NewInMemoryScheduler(maxConcurrent int) *InMemoryScheduler

NewInMemoryScheduler creates a new in-memory work scheduler.

func (*InMemoryScheduler) GetPendingWork

func (s *InMemoryScheduler) GetPendingWork(ctx context.Context, limit int) ([]WorkItem, error)

GetPendingWork retrieves work items ready for processing.

func (*InMemoryScheduler) GetStats

func (s *InMemoryScheduler) GetStats() SchedulerStats

GetStats returns scheduler statistics.

func (*InMemoryScheduler) MarkCompleted

func (s *InMemoryScheduler) MarkCompleted(item WorkItem)

MarkCompleted marks a work item as completed and removes it from processing.

func (*InMemoryScheduler) Schedule

func (s *InMemoryScheduler) Schedule(ctx context.Context, item WorkItem) error

Schedule adds a work item to the processing queue.

func (*InMemoryScheduler) ScheduleBatch

func (s *InMemoryScheduler) ScheduleBatch(ctx context.Context, items []WorkItem) error

ScheduleBatch adds multiple work items to the processing queue.

type Logger

type Logger interface {
	Debug(msg string, fields ...zap.Field)
	Info(msg string, fields ...zap.Field)
	Warn(msg string, fields ...zap.Field)
	Error(msg string, fields ...zap.Field)
}

Logger defines the logging interface used by the PR pipeline.

type MergeResult

type MergeResult struct {
	// PR is the pull request that was merged
	PR github.PullRequest

	// MergeCommitSHA is the SHA of the merge commit (if successful)
	MergeCommitSHA string

	// StateTransition indicates the state transition that occurred
	StateTransition *StateTransition

	// Duration is how long the merge took
	Duration time.Duration

	// Error contains any error that occurred
	Error error
}

MergeResult represents the result of PR merge processing.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline orchestrates the PR discovery, qualification, and auto-merge process.

func NewPipeline

func NewPipeline(config *PipelineConfig, logger Logger) (*Pipeline, error)

NewPipeline creates a new PR pipeline with the given configuration.

func (*Pipeline) GetStats

func (p *Pipeline) GetStats() ProcessingStats

GetStats returns current pipeline processing statistics.

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) error

Run starts the pipeline and runs it until the context is cancelled.

func (*Pipeline) RunOnce

func (p *Pipeline) RunOnce(ctx context.Context) error

RunOnce performs a single discovery and processing cycle. Useful for manual triggering or testing.

type PipelineConfig

type PipelineConfig struct {
	// GitHubClient provides GitHub API operations
	GitHubClient GitHubClient

	// LabelManager handles PR label lifecycle
	LabelManager labels.LabelManager

	// QualificationEngine performs PR qualification
	QualificationEngine QualificationEngine

	// Repositories is the list of repositories to monitor
	Repositories []github.Repository

	// DiscoveryInterval is how often to discover new PRs
	DiscoveryInterval time.Duration

	// MergeCommitMessage is the template for auto-merge commit messages
	MergeCommitMessage string

	// DryRun when true will log actions but not execute them
	DryRun bool

	// MaxConcurrentActions limits concurrent PR processing
	MaxConcurrentActions int

	// ProcessingTimeout is the maximum time to spend processing a single PR
	ProcessingTimeout time.Duration
}

PipelineConfig holds configuration for the PR pipeline.

func DefaultPipelineConfig

func DefaultPipelineConfig() *PipelineConfig

DefaultPipelineConfig returns configuration with sensible defaults.

type PipelineError

type PipelineError struct {
	// Type indicates the type of error
	Type PipelineErrorType

	// Repository is the repository where the error occurred
	Repository github.Repository

	// PR is the pull request associated with the error (if applicable)
	PR *github.PullRequest

	// Action is the action that was being performed
	Action Action

	// Cause is the underlying error
	Cause error

	// Timestamp is when the error occurred
	Timestamp time.Time

	// Context provides additional error context
	Context map[string]interface{}
}

PipelineError represents an error that occurred during pipeline processing.

func NewPipelineError

func NewPipelineError(errorType PipelineErrorType, repo github.Repository, action Action, cause error) *PipelineError

NewPipelineError creates a new pipeline error.

func (*PipelineError) Error

func (pe *PipelineError) Error() string

Error implements the error interface.

func (*PipelineError) Unwrap

func (pe *PipelineError) Unwrap() error

Unwrap allows error unwrapping.

func (*PipelineError) WithContext

func (pe *PipelineError) WithContext(key string, value interface{}) *PipelineError

WithContext adds context information to the error.

func (*PipelineError) WithPR

WithPR adds pull request information to the error.

type PipelineErrorType

type PipelineErrorType string

PipelineErrorType represents the type of pipeline error.

const (
	// ErrorTypeDiscovery indicates an error during PR discovery
	ErrorTypeDiscovery PipelineErrorType = "discovery"

	// ErrorTypeQualification indicates an error during PR qualification
	ErrorTypeQualification PipelineErrorType = "qualification"

	// ErrorTypeMerge indicates an error during PR merge
	ErrorTypeMerge PipelineErrorType = "merge"

	// ErrorTypeStateTransition indicates an error during state transition
	ErrorTypeStateTransition PipelineErrorType = "state_transition"

	// ErrorTypeGitHub indicates a GitHub API error
	ErrorTypeGitHub PipelineErrorType = "github"

	// ErrorTypeConfiguration indicates a configuration error
	ErrorTypeConfiguration PipelineErrorType = "configuration"
)

type ProcessResult

type ProcessResult struct {
	// Success indicates whether the processing succeeded
	Success bool

	// Action is the action that was performed
	Action Action

	// PR is the pull request that was processed
	PR github.PullRequest

	// NewState is the state the PR was transitioned to (if any)
	NewState labels.State

	// Error contains any error that occurred during processing
	Error error

	// Duration is how long the processing took
	Duration time.Duration

	// NextActions contains follow-up actions that should be scheduled
	NextActions []Action

	// Metadata contains additional result information
	Metadata map[string]interface{}
}

ProcessResult represents the result of processing a work item.

type ProcessingStats

type ProcessingStats struct {
	// ProcessedItems is the number of work items processed
	ProcessedItems int

	// SuccessfulItems is the number of successfully processed items
	SuccessfulItems int

	// FailedItems is the number of failed items
	FailedItems int

	// DiscoveredPRs is the number of PRs discovered
	DiscoveredPRs int

	// QualifiedPRs is the number of PRs that passed qualification
	QualifiedPRs int

	// MergedPRs is the number of PRs that were merged
	MergedPRs int

	// TotalDuration is the total processing time
	TotalDuration time.Duration

	// AverageProcessingTime is the average time per work item
	AverageProcessingTime time.Duration

	// Timestamp is when these stats were collected
	Timestamp time.Time
}

ProcessingStats contains statistics about pipeline processing.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles the processing of individual work items.

func NewProcessor

func NewProcessor(config *ProcessorConfig, logger Logger) *Processor

NewProcessor creates a new processor with the given configuration.

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, item WorkItem) (*ProcessResult, error)

Process processes a single work item and returns the result.

type ProcessorConfig

type ProcessorConfig struct {
	GitHubClient        GitHubClient
	LabelManager        labels.LabelManager
	QualificationEngine QualificationEngine
	MergeCommitMessage  string
	DryRun              bool
	ProcessingTimeout   time.Duration
}

ProcessorConfig holds configuration for the processor component.

type QualificationEngine

type QualificationEngine interface {
	// Qualify performs complete PR qualification
	Qualify(ctx context.Context, pr github.PullRequest) (*qualification.QualificationResult, error)
}

QualificationEngine defines the qualification operations needed by the pipeline.

type QualificationResult

type QualificationResult struct {
	// PR is the pull request that was qualified
	PR github.PullRequest

	// QualificationResult contains the detailed qualification results
	QualificationResult *qualification.QualificationResult

	// StateTransition indicates if a state transition occurred
	StateTransition *StateTransition

	// DynamicLabelsApplied contains labels that were extracted and applied
	DynamicLabelsApplied map[string]string

	// Duration is how long qualification took
	Duration time.Duration

	// Error contains any error that occurred
	Error error
}

QualificationResult represents the result of PR qualification processing.

type SchedulerStats

type SchedulerStats struct {
	QueuedItems     int
	ProcessingItems int
	MaxConcurrent   int
}

SchedulerStats contains statistics about the scheduler state.

type StateTransition

type StateTransition struct {
	// FromState is the previous state
	FromState labels.State

	// ToState is the new state
	ToState labels.State

	// Reason is why the transition occurred
	Reason string

	// Timestamp is when the transition occurred
	Timestamp time.Time
}

StateTransition represents a state transition that occurred during processing. This is a simple data structure used within the PR pipeline.

Note: A richer domain version exists at domain/pr.StateTransition with validation, immutability, and transition classification. This version remains for backward compatibility with existing pipeline code. New features should use the domain version.

type WorkItem

type WorkItem struct {
	// PR is the pull request to process
	PR github.PullRequest

	// Action is the type of work to perform
	Action Action

	// Repository contains the repository information
	Repository github.Repository

	// Priority determines processing order (higher = more urgent)
	Priority int

	// CreatedAt is when this work item was created
	CreatedAt time.Time

	// Metadata contains additional context for processing
	Metadata map[string]interface{}
}

WorkItem represents a unit of work in the PR pipeline.

type WorkScheduler

type WorkScheduler interface {
	// Schedule adds a work item to the processing queue
	Schedule(ctx context.Context, item WorkItem) error

	// ScheduleBatch adds multiple work items to the processing queue
	ScheduleBatch(ctx context.Context, items []WorkItem) error

	// GetPendingWork retrieves work items ready for processing
	GetPendingWork(ctx context.Context, limit int) ([]WorkItem, error)
}

WorkScheduler defines the interface for scheduling work items.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL