Documentation
¶
Index ¶
- Variables
- type ConnectorOrchestrator
- func (c *ConnectorOrchestrator) Create(ctx context.Context, t connector.Type, plugin string, pipelineID string, ...) (*connector.Instance, error)
- func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error
- func (c *ConnectorOrchestrator) Get(ctx context.Context, id string) (*connector.Instance, error)
- func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (*inspector.Session, error)
- func (c *ConnectorOrchestrator) List(ctx context.Context) map[string]*connector.Instance
- func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error)
- func (c *ConnectorOrchestrator) Validate(ctx context.Context, t connector.Type, plugin string, config connector.Config) error
- type ConnectorPluginOrchestrator
- type ConnectorPluginService
- type ConnectorService
- type LifecycleService
- type Orchestrator
- type PipelineOrchestrator
- func (po *PipelineOrchestrator) Create(ctx context.Context, cfg pipeline.Config) (*pipeline.Instance, error)
- func (po *PipelineOrchestrator) Delete(ctx context.Context, id string) error
- func (po *PipelineOrchestrator) Get(ctx context.Context, id string) (*pipeline.Instance, error)
- func (po *PipelineOrchestrator) List(ctx context.Context) map[string]*pipeline.Instance
- func (po *PipelineOrchestrator) Start(ctx context.Context, id string) error
- func (po *PipelineOrchestrator) Stop(ctx context.Context, id string, force bool) error
- func (po *PipelineOrchestrator) Update(ctx context.Context, id string, cfg pipeline.Config) (*pipeline.Instance, error)
- func (po *PipelineOrchestrator) UpdateDLQ(ctx context.Context, id string, dlq pipeline.DLQ) (*pipeline.Instance, error)
- type PipelineService
- type ProcessorOrchestrator
- func (p *ProcessorOrchestrator) Create(ctx context.Context, plugin string, parent processor.Parent, ...) (*processor.Instance, error)
- func (p *ProcessorOrchestrator) Delete(ctx context.Context, id string) error
- func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.Instance, error)
- func (p *ProcessorOrchestrator) InspectIn(ctx context.Context, id string) (*inspector.Session, error)
- func (p *ProcessorOrchestrator) InspectOut(ctx context.Context, id string) (*inspector.Session, error)
- func (p *ProcessorOrchestrator) List(ctx context.Context) map[string]*processor.Instance
- func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error)
- type ProcessorPluginOrchestrator
- type ProcessorPluginService
- type ProcessorService
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrInvalidProcessorParentType = cerrors.New("invalid processor parent type") ErrPipelineHasProcessorsAttached = cerrors.New("pipeline has processors attached") ErrPipelineHasConnectorsAttached = cerrors.New("pipeline has connectors attached") ErrConnectorHasProcessorsAttached = cerrors.New("connector has processors attached") ErrImmutableProvisionedByConfig = cerrors.New("entity was provisioned by a config file and cannot be mutated through the API, please change the corresponding config file instead") )
Functions ¶
This section is empty.
Types ¶
type ConnectorOrchestrator ¶
type ConnectorOrchestrator base
func (*ConnectorOrchestrator) Delete ¶
func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error
type ConnectorPluginOrchestrator ¶ added in v0.9.0
type ConnectorPluginOrchestrator base
func (*ConnectorPluginOrchestrator) List ¶ added in v0.9.0
func (ps *ConnectorPluginOrchestrator) List(ctx context.Context) (map[string]pconnector.Specification, error)
type ConnectorPluginService ¶ added in v0.9.0
type ConnectorPluginService interface {
List(ctx context.Context) (map[string]pconnector.Specification, error)
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) error
ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) error
}
type ConnectorService ¶
type ConnectorService interface {
List(ctx context.Context) map[string]*connector.Instance
Get(ctx context.Context, id string) (*connector.Instance, error)
Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error
Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error)
AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
}
type LifecycleService ¶ added in v0.12.0
type LifecycleService interface {
// Start initiates a start of the given pipeline.
Start(ctx context.Context, pipelineID string) error
// Stop initiates a stop of the given pipeline. The method does not wait for
// the pipeline (and its nodes) to actually stop.
// When force is false the pipeline will try to stop gracefully and drain
// any in-flight messages that have not yet reached the destination. When
// force is true the pipeline will stop without draining in-flight messages.
// It is allowed to execute a force stop even after a graceful stop was
// requested.
Stop(ctx context.Context, pipelineID string, force bool) error
}
type Orchestrator ¶
type Orchestrator struct {
Processors *ProcessorOrchestrator
Pipelines *PipelineOrchestrator
Connectors *ConnectorOrchestrator
ConnectorPlugins *ConnectorPluginOrchestrator
ProcessorPlugins *ProcessorPluginOrchestrator
}
func NewOrchestrator ¶
func NewOrchestrator( db database.DB, logger log.CtxLogger, pipelines PipelineService, connectors ConnectorService, processors ProcessorService, connectorPlugins ConnectorPluginService, processorPlugins ProcessorPluginService, lifecycle LifecycleService, ) *Orchestrator
type PipelineOrchestrator ¶
type PipelineOrchestrator base
func (*PipelineOrchestrator) Delete ¶
func (po *PipelineOrchestrator) Delete(ctx context.Context, id string) error
func (*PipelineOrchestrator) Start ¶
func (po *PipelineOrchestrator) Start(ctx context.Context, id string) error
type PipelineService ¶
type PipelineService interface {
List(ctx context.Context) map[string]*pipeline.Instance
Get(ctx context.Context, id string) (*pipeline.Instance, error)
Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error)
Update(ctx context.Context, pipelineID string, cfg pipeline.Config) (*pipeline.Instance, error)
Delete(ctx context.Context, pipelineID string) error
UpdateDLQ(ctx context.Context, id string, dlq pipeline.DLQ) (*pipeline.Instance, error)
AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
}
type ProcessorOrchestrator ¶
type ProcessorOrchestrator base
func (*ProcessorOrchestrator) Delete ¶
func (p *ProcessorOrchestrator) Delete(ctx context.Context, id string) error
func (*ProcessorOrchestrator) InspectOut ¶ added in v0.5.0
type ProcessorPluginOrchestrator ¶ added in v0.9.0
type ProcessorPluginOrchestrator base
func (*ProcessorPluginOrchestrator) List ¶ added in v0.9.0
func (ps *ProcessorPluginOrchestrator) List(ctx context.Context) (map[string]processorSdk.Specification, error)
func (*ProcessorPluginOrchestrator) RegisterStandalonePlugin ¶ added in v0.9.1
type ProcessorPluginService ¶ added in v0.9.0
type ProcessorService ¶
type ProcessorService interface {
List(ctx context.Context) map[string]*processor.Instance
Get(ctx context.Context, id string) (*processor.Instance, error)
Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error)
Delete(ctx context.Context, id string) error
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.