lib

package
v0.0.44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const RequestDeviceId = "deviceId"
View Source
const RequestOperatorId = "operatorId"

Variables

View Source
var ErrNotFound = errors.New("not found")
View Source
var ErrSomethingWentWrong = errors.New("container API - something went wrong")
View Source
var ErrWorkloadNotFound = errors.New("container API - could not delete operator: workload not found")

Functions

func CloseMQTTConnection

func CloseMQTTConnection()

func ConnectMQTTBroker

func ConnectMQTTBroker() error

func DebugMode added in v0.0.41

func DebugMode() bool

func GenerateFogOperatorStartCommand added in v0.0.4

func GenerateFogOperatorStartCommand(operator Operator, pipelineID string, inputTopics []operatorLib.InputTopic) operatorLib.StartOperatorControlCommand

func GetEnv

func GetEnv(key, fallback string) string

func GetLogger added in v0.0.34

func GetLogger() *slog.Logger

func StringInSlice

func StringInSlice(a string, list []string) bool

Types

type Claims

type Claims struct {
	Sub         string              `json:"sub,omitempty"`
	RealmAccess map[string][]string `json:"realm_access,omitempty"`
}

func (Claims) Valid

func (c Claims) Valid() error

type DeviceManagerService added in v0.0.16

type DeviceManagerService interface {
	GetDevice(deviceID, userID, token string) (models.Device, error)
	GetDeviceType(deviceTypeID, userID, token string) (models.DeviceType, error)
}

type DownstreamConfig added in v0.0.4

type DownstreamConfig struct {
	Enabled    bool
	InstanceID string
	ServiceID  string
}

type Driver

type Driver interface {
	CreateOperators(pipelineId string, input []Operator, pipelineConfig PipelineConfig) error
	/*
		DeleteOperator deletes an operator in the given pipeline
		Deprecated: Use DeleteOperators instead.
	*/
	DeleteOperator(pipelineId string, input Operator) error
	DeleteOperators(pipelineId string, inputs []Operator) error
	GetPipelineStatus(pipelineId string) (PipelineStatus, error)
	GetPipelinesStatus() ([]PipelineStatus, error)
}

type FlowEngine

type FlowEngine struct {
	// contains filtered or unexported fields
}

func NewFlowEngine

func NewFlowEngine(
	driver Driver,
	parsingService ParsingApiService,
	permissionService PermissionApiService,
	kafak2mqttService Kafka2MqttApiService,
	deviceManagerService DeviceManagerService) *FlowEngine

func (*FlowEngine) DeletePipeline

func (f *FlowEngine) DeletePipeline(id string, userId string, token string) (err error)

func (*FlowEngine) GetPipelineStatus

func (f *FlowEngine) GetPipelineStatus(id, userId, token string) (status PipelineStatus, err error)

func (*FlowEngine) GetPipelinesStatus added in v0.0.24

func (f *FlowEngine) GetPipelinesStatus(ids []string, userId, token string) (status []PipelineStatus, err error)

func (*FlowEngine) StartPipeline

func (f *FlowEngine) StartPipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)

func (*FlowEngine) UpdatePipeline

func (f *FlowEngine) UpdatePipeline(pipelineRequest PipelineRequest, userId string, token string) (pipeline Pipeline, err error)

type InputSelection

type InputSelection struct {
	InputName         string   `json:"inputName,omitempty"`
	AspectId          string   `json:"aspectId,omitempty"`
	FunctionId        string   `json:"functionId,omitempty"`
	CharacteristicIds []string `json:"characteristicIds,omitempty"`
	SelectableId      string   `json:"selectableId,omitempty"`
}

type InputTopic

type InputTopic struct {
	Name         string    `json:"name,omitempty"`
	FilterType   string    `json:"filterType,omitempty"`
	FilterValue  string    `json:"filterValue,omitempty"`
	FilterValue2 string    `json:"filterValue2,omitempty"`
	Mappings     []Mapping `json:"mappings,omitempty"`
}

type Kafka2MqttApiService added in v0.0.4

type Kafka2MqttApiService interface {
	StartOperatorInstance(operatorName, operatorID string, pipelineID, userI, token string) (kafka2mqtt_api.Instance, error)
	RemoveInstance(id, pipelineID, userID, token string) error
}

type Mapping

type Mapping struct {
	Dest   string `json:"dest,omitempty"`
	Source string `json:"source,omitempty"`
}

type NodeConfig

type NodeConfig struct {
	Name  string `json:"name,omitempty"`
	Value string `json:"value,omitempty"`
}

type NodeInput

type NodeInput struct {
	FilterType string      `json:"filterType,omitempty"`
	FilterIds  string      `json:"filterIds,omitempty"`
	TopicName  string      `json:"topicName,omitempty"`
	Values     []NodeValue `json:"values,omitempty"`
}

type NodeValue

type NodeValue struct {
	Name string `json:"name,omitempty"`
	Path string `json:"path,omitempty"`
}

type Operator

type Operator struct {
	Id               string            `json:"id,omitempty"`
	Name             string            `json:"name,omitempty"`
	ApplicationId    uuid.UUID         `json:"applicationId,omitempty"`
	ImageId          string            `json:"imageId,omitempty"`
	DeploymentType   string            `json:"deploymentType,omitempty"`
	OperatorId       string            `json:"operatorId,omitempty"`
	Config           map[string]string `json:"config,omitempty"`
	OutputTopic      string            `json:"outputTopic,omitempty"`
	PersistData      bool              `json:"persistData,omitempty"`
	InputTopics      []InputTopic
	InputSelections  []InputSelection `json:"inputSelections,omitempty"`
	Cost             uint             `json:"cost"`
	UpstreamConfig   UpstreamConfig   `json:"upstream,omitempty"`
	DownstreamConfig DownstreamConfig `json:"downstream,omitempty"`
}

type OperatorRequestConfig

type OperatorRequestConfig struct {
	Config      map[string]string `json:"config,omitempty"`
	InputTopics []InputTopic      `json:"inputTopics,omitempty"`
}

type ParsingApiService

type ParsingApiService interface {
	GetPipeline(id string, userId string, authorization string) (p parsing_api.Pipeline, err error)
}

type PermissionApiService

type PermissionApiService interface {
	UserHasDevicesReadAccess(ids []string, authorization string) (bool, error)
}

type Pipeline

type Pipeline struct {
	Id                 uuid.UUID  `json:"id,omitempty"`
	FlowId             string     `json:"flowId,omitempty"`
	Name               string     `json:"name,omitempty"`
	Description        string     `json:"description,omitempty"`
	Image              string     `json:"image,omitempty"`
	WindowTime         int        `json:"windowTime,omitempty"`
	MergeStrategy      string     `json:"mergeStrategy,omitempty"`
	ConsumeAllMessages bool       `json:"consumeAllMessages,omitempty"`
	Metrics            bool       `json:"metrics,omitempty"`
	Operators          []Operator `json:"operators,omitempty"`
}

type PipelineConfig

type PipelineConfig struct {
	WindowTime     int
	MergeStrategy  string
	Metrics        bool
	ConsumerOffset string
	FlowId         string
	PipelineId     string
	UserId         string
}

type PipelineNode

type PipelineNode struct {
	NodeId          string           `json:"nodeId,omitempty"`
	Inputs          []NodeInput      `json:"inputs,omitempty"`
	Config          []NodeConfig     `json:"config,omitempty"`
	InputSelections []InputSelection `json:"inputSelections,omitempty"`
	PersistData     bool             `json:"persistData,omitempty"`
}

type PipelineRequest

type PipelineRequest struct {
	Id                 string         `json:"id,omitempty"`
	FlowId             string         `json:"flowId,omitempty"`
	Name               string         `json:"name,omitempty"`
	Description        string         `json:"description,omitempty"`
	WindowTime         int            `json:"windowTime,omitempty"`
	MergeStrategy      string         `json:"mergeStrategy,omitempty"`
	ConsumeAllMessages bool           `json:"consumeAllMessages,omitempty"`
	Metrics            bool           `json:"metrics,omitempty"`
	Nodes              []PipelineNode `json:"nodes,omitempty"`
}

type PipelineResponse

type PipelineResponse struct {
	Id uuid.UUID `json:"id,omitempty"`
}

type PipelineStatus added in v0.0.5

type PipelineStatus struct {
	Name          string `json:"name,omitempty"`
	Running       bool   `json:"running"`
	Transitioning bool   `json:"transitioning"`
	Message       string `json:"message"`
}

type PipelineStatusRequest added in v0.0.24

type PipelineStatusRequest struct {
	Ids []string `json:"ids,omitempty"`
}

type PipelinesResponse added in v0.0.26

type PipelinesResponse struct {
	Data  []Pipeline `json:"data,omitempty"`
	Total int        `json:"total,omitempty"`
}

type Response

type Response struct {
	Message string `json:"message,omitempty"`
}

type UpstreamConfig added in v0.0.4

type UpstreamConfig struct {
	Enabled bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL