Documentation
¶
Index ¶
- Constants
- Variables
- func CloseMQTTConnection()
- func ConnectMQTTBroker(config config.MqttConfig, pipelineService PipelineApiService) error
- func GenerateFogOperatorStartCommand(operator Operator, pipelineID string, inputTopics []operatorLib.InputTopic) operatorLib.StartOperatorControlCommand
- func GetLogger() *slog.Logger
- func StringInSlice(a string, list []string) bool
- type Claims
- type DeviceManagerService
- type DownstreamConfig
- type Driver
- type FlowEngine
- func (f *FlowEngine) DeletePipeline(id string, userId string, token string) (err error)
- func (f *FlowEngine) GetPipelineStatus(id, userId, token string) (status PipelineStatus, err error)
- func (f *FlowEngine) GetPipelinesStatus(ids []string, userId, token string) (status []PipelineStatus, err error)
- func (f *FlowEngine) StartPipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)
- func (f *FlowEngine) UpdatePipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)
- type FogClient
- type InputSelection
- type InputTopic
- type Kafka2MqttApiService
- type Mapping
- type NodeConfig
- type NodeInput
- type NodeValue
- type Operator
- type OperatorRequestConfig
- type ParsingApiService
- type PermissionApiService
- type Pipeline
- type PipelineApiService
- type PipelineConfig
- type PipelineNode
- type PipelineRequest
- type PipelineStatus
- type PipelineStatusRequest
- type PipelinesResponse
- type Response
- type UpstreamConfig
Constants ¶
View Source
const RequestDeviceId = "deviceId"
View Source
const RequestOperatorId = "operatorId"
Variables ¶
View Source
var ErrSomethingWentWrong = errors.New("container API - something went wrong")
View Source
var ErrWorkloadNotFound = errors.New("container API - could not delete operator: workload not found")
Functions ¶
func CloseMQTTConnection ¶
func CloseMQTTConnection()
func ConnectMQTTBroker ¶
func ConnectMQTTBroker(config config.MqttConfig, pipelineService PipelineApiService) error
func GenerateFogOperatorStartCommand ¶ added in v0.0.4
func GenerateFogOperatorStartCommand(operator Operator, pipelineID string, inputTopics []operatorLib.InputTopic) operatorLib.StartOperatorControlCommand
func StringInSlice ¶
Types ¶
type Claims ¶
type DeviceManagerService ¶ added in v0.0.16
type DownstreamConfig ¶ added in v0.0.4
type Driver ¶
type Driver interface {
CreateOperators(pipelineId string, input []Operator, pipelineConfig PipelineConfig) error
/*
DeleteOperator deletes an operator in the given pipeline
Deprecated: Use DeleteOperators instead.
*/
DeleteOperator(pipelineId string, input Operator) error
DeleteOperators(pipelineId string, inputs []Operator) error
GetPipelineStatus(pipelineId string) (PipelineStatus, error)
GetPipelinesStatus() ([]PipelineStatus, error)
}
type FlowEngine ¶
type FlowEngine struct {
// contains filtered or unexported fields
}
func NewFlowEngine ¶
func NewFlowEngine( driver Driver, parsingService ParsingApiService, permissionService PermissionApiService, kafak2mqttService Kafka2MqttApiService, deviceManagerService DeviceManagerService, pipelineService PipelineApiService) *FlowEngine
func (*FlowEngine) DeletePipeline ¶
func (f *FlowEngine) DeletePipeline(id string, userId string, token string) (err error)
func (*FlowEngine) GetPipelineStatus ¶
func (f *FlowEngine) GetPipelineStatus(id, userId, token string) (status PipelineStatus, err error)
func (*FlowEngine) GetPipelinesStatus ¶ added in v0.0.24
func (f *FlowEngine) GetPipelinesStatus(ids []string, userId, token string) (status []PipelineStatus, err error)
func (*FlowEngine) StartPipeline ¶
func (f *FlowEngine) StartPipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)
func (*FlowEngine) UpdatePipeline ¶
func (f *FlowEngine) UpdatePipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)
type FogClient ¶ added in v0.0.46
type FogClient struct {
// contains filtered or unexported fields
}
func NewFogClient ¶ added in v0.0.46
func NewFogClient(pipelineService PipelineApiService) *FogClient
type InputSelection ¶
type InputTopic ¶
type Kafka2MqttApiService ¶ added in v0.0.4
type NodeConfig ¶
type Operator ¶
type Operator struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
ApplicationId uuid.UUID `json:"applicationId,omitempty"`
ImageId string `json:"imageId,omitempty"`
DeploymentType string `json:"deploymentType,omitempty"`
OperatorId string `json:"operatorId,omitempty"`
Config map[string]string `json:"config,omitempty"`
OutputTopic string `json:"outputTopic,omitempty"`
PersistData bool `json:"persistData,omitempty"`
InputTopics []InputTopic
InputSelections []InputSelection `json:"inputSelections,omitempty"`
Cost uint `json:"cost"`
UpstreamConfig UpstreamConfig `json:"upstream,omitempty"`
DownstreamConfig DownstreamConfig `json:"downstream,omitempty"`
}
type OperatorRequestConfig ¶
type OperatorRequestConfig struct {
Config map[string]string `json:"config,omitempty"`
InputTopics []InputTopic `json:"inputTopics,omitempty"`
}
type ParsingApiService ¶
type PermissionApiService ¶
type Pipeline ¶
type Pipeline struct {
Id uuid.UUID `json:"id,omitempty"`
FlowId string `json:"flowId,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Image string `json:"image,omitempty"`
WindowTime int `json:"windowTime,omitempty"`
MergeStrategy string `json:"mergeStrategy,omitempty"`
ConsumeAllMessages bool `json:"consumeAllMessages,omitempty"`
Metrics bool `json:"metrics,omitempty"`
Operators []Operator `json:"operators,omitempty"`
}
type PipelineApiService ¶ added in v0.0.46
type PipelineApiService interface {
RegisterPipeline(pipeline *Pipeline, userId string, authorization string) (id uuid.UUID, err error)
UpdatePipeline(pipeline *Pipeline, userId string, authorization string) (err error)
GetPipeline(id string, userId string, authorization string) (pipe Pipeline, err error)
GetPipelines(userId string, authorization string) (pipelines []Pipeline, err error)
DeletePipeline(id string, userId string, authorization string) (err error)
}
type PipelineConfig ¶
type PipelineNode ¶
type PipelineNode struct {
NodeId string `json:"nodeId,omitempty"`
Inputs []NodeInput `json:"inputs,omitempty"`
Config []NodeConfig `json:"config,omitempty"`
InputSelections []InputSelection `json:"inputSelections,omitempty"`
PersistData bool `json:"persistData,omitempty"`
}
type PipelineRequest ¶
type PipelineRequest struct {
Id string `json:"id,omitempty"`
FlowId string `json:"flowId,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
WindowTime int `json:"windowTime,omitempty"`
MergeStrategy string `json:"mergeStrategy,omitempty"`
ConsumeAllMessages bool `json:"consumeAllMessages,omitempty"`
Metrics bool `json:"metrics,omitempty"`
Nodes []PipelineNode `json:"nodes,omitempty"`
}
type PipelineStatus ¶ added in v0.0.5
type PipelineStatusRequest ¶ added in v0.0.24
type PipelineStatusRequest struct {
Ids []string `json:"ids,omitempty"`
}
type PipelinesResponse ¶ added in v0.0.26
type UpstreamConfig ¶ added in v0.0.4
type UpstreamConfig struct {
Enabled bool
}
Click to show internal directories.
Click to hide internal directories.