Documentation
¶
Index ¶
- Constants
- Variables
- type Agent
- type AgentCommsService
- type AgentGroup
- type AgentGroupRepository
- type AgentGroupService
- type AgentHeartbeatRepository
- type AgentMetricsRPC
- type AgentMetricsRPCPayload
- type AgentPoliciesReqRPCPayload
- type AgentPolicyRPC
- type AgentPolicyRPCPayload
- type AgentRepository
- type AgentResetRPC
- type AgentResetRPCPayload
- type AgentService
- type AgentStopRPC
- type AgentStopRPCPayload
- type BackendInfo
- type BackendStateInfo
- type Capabilities
- type DatasetRemovedRPC
- type DatasetRemovedRPCPayload
- type Group
- type GroupMembershipData
- type GroupMembershipRPC
- type GroupMembershipRPCPayload
- type GroupMembershipReqRPCPayload
- type GroupRemovedRPC
- type GroupRemovedRPCPayload
- type GroupStateInfo
- type Heartbeat
- type MatchingGroups
- type OrbAgentInfo
- type Page
- type PageAgentGroup
- type PageMetadata
- type PolicyStateInfo
- type RPC
- type SchemaVersionCheck
- type Service
- type State
Constants ¶
View Source
const ( HeartbeatFreq = 60 * time.Second DefaultTimeout = 300 * time.Second )
View Source
const AgentMetricsRPCFunc = "agent_metrics"
View Source
const AgentPoliciesReqRPCFunc = "agent_policies_req"
View Source
const AgentPolicyRPCFunc = "agent_policy"
View Source
const AgentResetRPCFunc = "agent_reset"
View Source
const AgentStopRPCFunc = "agent_stop"
View Source
const CapabilitiesTopic = "agent"
View Source
const CurrentCapabilitiesSchemaVersion = "1.0"
View Source
const CurrentHeartbeatSchemaVersion = "1.0"
View Source
const CurrentRPCSchemaVersion = "1.0"
View Source
const DatasetRemovedRPCFunc = "dataset_removed"
View Source
const GroupMembershipRPCFunc = "group_membership"
View Source
const GroupMembershipReqRPCFunc = "group_membership_req"
View Source
const GroupRemovedRPCFunc = "group_removed"
View Source
const HeartbeatsTopic = "hb"
View Source
const LogTopic = "log"
View Source
const MaxMsgPayloadSize = 1024 * 25
MaxMsgPayloadSize maximum payload size we will process from a client
View Source
const RPCFromCoreTopic = "fromcore"
View Source
const RPCToCoreTopic = "tocore"
Variables ¶
View Source
var ( ErrCreateAgentGroup = errors.New("failed to create agent group") ErrMaintainAgentGroupChannels = errors.New("failed to maintain agent group channels") )
View Source
var ( // ErrMalformedEntity indicates malformed entity specification (e.g. // invalid username or password). ErrMalformedEntity = errors.New("malformed entity specification") // ErrNotFound indicates a non-existent entity request. ErrNotFound = errors.New("non-existent entity") // ErrConflict indicates that entity already exists. ErrConflict = errors.New("entity already exists") ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided") // ErrScanMetadata indicates problem with metadata in db ErrScanMetadata = errors.New("failed to scan metadata in db") // ErrSelectEntity indicates error while reading entity from database ErrSelectEntity = errors.New("select entity from db error") // ErrEntityConnected indicates error while checking connection in database ErrEntityConnected = errors.New("check connection in database error") // ErrUpdateEntity indicates error while updating a entity ErrUpdateEntity = errors.New("failed to update entity") // ErrRemoveEntity indicates a error while deleting a agent group ErrRemoveEntity = errors.New("failed to remove entity") )
View Source
var ( ErrCreateAgent = errors.New("failed to create agent") // ErrThings indicates failure to communicate with Mainflux Things service. // It can be due to networking error or invalid/unauthorized request. ErrThings = errors.New("failed to receive response from Things service") )
View Source
var ( // ErrSchemaVersion a message was received indicating a version we don't support ErrSchemaVersion = errors.New("unsupported schema version") // ErrSchemaMalformed a message contained a schema we couldn't parse ErrSchemaMalformed = errors.New("schema malformed") // ErrPayloadTooBig a message contained a payload that was abnormally large ErrPayloadTooBig = errors.New("payload too big") )
Functions ¶
This section is empty.
Types ¶
type AgentCommsService ¶
type AgentCommsService interface {
// Start set up communication with the message bus to communicate with agents
Start() error
// Stop end communication with the message bus
Stop() error
// NotifyAgentNewGroupMembership RPC Core -> Agent: Notify a specific Agent of new AgentGroup membership it now belongs to
NotifyAgentNewGroupMembership(a Agent, ag AgentGroup) error
// NotifyAgentGroupMemberships RPC Core -> Agent: Notify a specific Agent of all AgentGroup memberships it belongs to
NotifyAgentGroupMemberships(a Agent) error
// NotifyAgentAllDatasets RPC Core -> Agent: Notify Agent of all Policy it should currently run based on group membership and current Datasets
NotifyAgentAllDatasets(a Agent) error
// NotifyAgentStop RPC Core -> Agent: Notify Agent that it should Stop (Send the message to Agent Channel)
NotifyAgentStop(agent Agent, reason string) error
// NotifyGroupNewDataset RPC Core -> Agent: Notify AgentGroup of a newly created Dataset, exposing a new Policy to run
NotifyGroupNewDataset(ctx context.Context, ag AgentGroup, datasetID string, policyID string, ownerID string) error
// NotifyGroupRemoval RPC core -> Agent: Notify AgentGroup that the group has been removed
NotifyGroupRemoval(ctx context.Context, ag AgentGroup) error
// NotifyGroupPolicyRemoval RPC core -> Agent: Notify AgentGroup that a Policy has been removed
NotifyGroupPolicyRemoval(ag AgentGroup, policyID string, policyName string, backend string) error
// NotifyGroupDatasetRemoval RPC core -> Agent: Notify AgentGroup that a Dataset has been removed
NotifyGroupDatasetRemoval(ag AgentGroup, dsID string, policyID string) error
// NotifyGroupPolicyUpdate RPC core -> Agent: Notify AgentGroup that a Policy has been updated
NotifyGroupPolicyUpdate(ctx context.Context, ag AgentGroup, policyID string, ownerID string) error
//NotifyAgentReset RPC core -> Agent: Notify Agent to reset the backend
NotifyAgentReset(agent Agent, fullReset bool, reason string) error
// NotifyGroupDatasetEdit RPC core -> Agent: Notify Agent an already created Dataset goes invalid or valid
NotifyGroupDatasetEdit(ctx context.Context, ag AgentGroup, datasetID, policyID, ownerID string, valid bool) error
}
func CommsMetricsMiddleware ¶
func CommsMetricsMiddleware(svc AgentCommsService, counter metrics.Counter, latency metrics.Histogram) AgentCommsService
func NewFleetCommsService ¶
func NewFleetCommsService(logger *zap.Logger, policyClient pb.PolicyServiceClient, agentRepo AgentRepository, agentGroupRepo AgentGroupRepository, agentPubSub mfnats.PubSub) AgentCommsService
type AgentGroup ¶
type AgentGroupRepository ¶
type AgentGroupRepository interface {
// Save persists the AgentGroup. Successful operation is indicated by non-nil
// error response.
Save(ctx context.Context, group AgentGroup) (string, error)
// RetrieveAllByAgent get all AgentGroup which an Agent belongs to.
RetrieveAllByAgent(ctx context.Context, a Agent) ([]AgentGroup, error)
// RetrieveByID get an AgentGroup by id
RetrieveByID(ctx context.Context, groupID string, ownerID string) (AgentGroup, error)
// RetrieveAllAgentGroupsByOwner get all AgentGroup by owner.
RetrieveAllAgentGroupsByOwner(ctx context.Context, ownerID string, pm PageMetadata) (PageAgentGroup, error)
// Update a existing agent group by owner and id
Update(ctx context.Context, ownerID string, group AgentGroup) (AgentGroup, error)
// Delete a existing agent group by owner and id
Delete(ctx context.Context, groupID string, ownerID string) error
// RetrieveMatchingGroups Groups this Agent currently belongs to, according to matching agent and group tags
RetrieveMatchingGroups(ctx context.Context, ownerID string, thingID string) (MatchingGroups, error)
}
type AgentGroupService ¶
type AgentGroupService interface {
// CreateAgentGroup creates new AgentGroup, associated channel, applies to Agents as appropriate
CreateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error)
// ViewAgentGroupByID Retrieve an AgentGroup by id
ViewAgentGroupByID(ctx context.Context, token string, id string) (AgentGroup, error)
// ViewAgentGroupByIDInternal Retrieve an AgentGroup by id, without a token
ViewAgentGroupByIDInternal(ctx context.Context, groupID string, ownerID string) (AgentGroup, error)
// ListAgentGroups Retrieve a list of AgentGroups by owner
ListAgentGroups(ctx context.Context, token string, pm PageMetadata) (PageAgentGroup, error)
// EditAgentGroup edit a existing agent group by id and owner
EditAgentGroup(ctx context.Context, token string, ag AgentGroup) (AgentGroup, error)
// RemoveAgentGroup Remove a existing agent group by owner an id
RemoveAgentGroup(ctx context.Context, token string, id string) error
// ValidateAgentGroup validate AgentGroup
ValidateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error)
}
type AgentMetricsRPC ¶
type AgentMetricsRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload []AgentMetricsRPCPayload `json:"payload"`
}
type AgentMetricsRPCPayload ¶
type AgentPoliciesReqRPCPayload ¶
type AgentPoliciesReqRPCPayload struct {
}
type AgentPolicyRPC ¶
type AgentPolicyRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload []AgentPolicyRPCPayload `json:"payload"`
FullList bool `json:"full_list"`
}
type AgentPolicyRPCPayload ¶
type AgentRepository ¶
type AgentRepository interface {
AgentHeartbeatRepository // may move this out so it can be in e.g. redis
// Save persists the Agent. Successful operation is indicated by non-nil
// error response.
Save(ctx context.Context, agent Agent) error
// UpdateDataByIDWithChannel update the tags and metadata for the Agent having the provided ID and owner
UpdateDataByIDWithChannel(ctx context.Context, agent Agent) error
// RetrieveByIDWithChannel retrieves the Agent having the provided ID and channelID access (i.e. from a Message)
RetrieveByIDWithChannel(ctx context.Context, thingID string, channelID string) (Agent, error)
// RetrieveAll retrieves the subset of Agents owned by the specified user
RetrieveAll(ctx context.Context, owner string, pm PageMetadata) (Page, error)
// RetrieveAllByAgentGroupID retrieves Agents in the specified group
RetrieveAllByAgentGroupID(ctx context.Context, owner string, agentGroupID string, onlinishOnly bool) ([]Agent, error)
// RetrieveMatchingAgents retrieve the matching agents by tags
RetrieveMatchingAgents(ctx context.Context, owner string, tags types.Tags) (types.Metadata, error)
// UpdateAgentByID update the the tags and name for the Agent having provided ID and owner
UpdateAgentByID(ctx context.Context, ownerID string, agent Agent) error
// RetrieveByID retrieves the Agent having the provided ID and owner
RetrieveByID(ctx context.Context, ownerID string, thingID string) (Agent, error)
// Delete an existing agent by owner and id
Delete(ctx context.Context, ownerID string, thingID string) error
// RetrieveAgentMetadataByOwner retrieves the Metadata having the OwnerID
RetrieveAgentMetadataByOwner(ctx context.Context, ownerID string) ([]types.Metadata, error)
// SetStaleStatus change status to stale according provided duration without heartbeats
SetStaleStatus(ctx context.Context, minutes time.Duration) (int64, error)
// RetrieveAgentInfoByChannelID gRPC version to retrieve ownerID, name and agent tags by a provided channelID
RetrieveAgentInfoByChannelID(ctx context.Context, channelID string) (Agent, error)
}
type AgentResetRPC ¶
type AgentResetRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload AgentResetRPCPayload `json:"payload"`
}
type AgentResetRPCPayload ¶
type AgentService ¶
type AgentService interface {
// CreateAgent creates new agent
CreateAgent(ctx context.Context, token string, a Agent) (Agent, error)
// ViewAgentByID retrieves a Agent by provided thingID
ViewAgentByID(ctx context.Context, token string, thingID string) (Agent, error)
// ViewAgentMatchingGroupsByID Groups this Agent currently belongs to, according to matching agent and group tags
ViewAgentMatchingGroupsByID(ctx context.Context, token string, thingID string) (MatchingGroups, error)
// ViewAgentByIDInternal retrieves a Agent by provided thingID
ViewAgentByIDInternal(ctx context.Context, ownerID string, thingID string) (Agent, error)
// ListAgents retrieves data about subset of agents that belongs to the
// user identified by the provided key.
ListAgents(ctx context.Context, token string, pm PageMetadata) (Page, error)
// EditAgent edit a Agent by provided thingID
EditAgent(ctx context.Context, token string, agent Agent) (Agent, error)
// ValidateAgent validates agent
ValidateAgent(ctx context.Context, token string, a Agent) (Agent, error)
// RemoveAgent removes an existing agent by owner and id
RemoveAgent(ctx context.Context, token string, thingID string) error
// ListAgentBackends List the available backends from fleet agents
ListAgentBackends(ctx context.Context, token string) ([]string, error)
// ViewAgentBackend retrieves a Backend by provided backend name
ViewAgentBackend(ctx context.Context, token string, name string) (interface{}, error)
//ViewAgentInfoByChannelIDInternal return a correspondent ownerID, name and agent tags by a provided channel id
ViewAgentInfoByChannelIDInternal(ctx context.Context, channelID string) (Agent, error)
// ResetAgent reset a agent on edge by a provided agent
ResetAgent(ct context.Context, token string, agentID string) error
// GetPolicyState get all policies state per agent in a formatted way from a given existent agent
GetPolicyState(ctx context.Context, agent Agent) (map[string]interface{}, error)
// ViewAgentMatchingGroupsByIDInternal Groups this Agent currently belongs to, according to matching agent and group tags
ViewAgentMatchingGroupsByIDInternal(ctx context.Context, agentID string, ownerID string) (MatchingGroups, error)
}
AgentService Agent CRUD interface
type AgentStopRPC ¶
type AgentStopRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload AgentStopRPCPayload `json:"payload"`
}
type AgentStopRPCPayload ¶
type AgentStopRPCPayload struct {
Reason string `json:"reason"`
}
type BackendInfo ¶
type BackendStateInfo ¶
type BackendStateInfo struct {
State string `json:"state"`
Error string `json:"error,omitempty"`
RestartCount int64 `json:"restart_count,omitempty"`
LastError string `json:"last_error,omitempty"`
LastRestartTS time.Time `json:"last_restart_ts,omitempty"`
LastRestartReason string `json:"last_restart_reason,omitempty"`
}
type Capabilities ¶
type Capabilities struct {
SchemaVersion string `json:"schema_version"`
OrbAgent OrbAgentInfo `json:"orb_agent"`
AgentTags map[string]string `json:"agent_tags"`
Backends map[string]BackendInfo `json:"backends"`
}
type DatasetRemovedRPC ¶
type DatasetRemovedRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload DatasetRemovedRPCPayload `json:"payload"`
}
type Group ¶
type Group struct {
GroupID string
GroupName types.Identifier
}
type GroupMembershipData ¶
type GroupMembershipRPC ¶
type GroupMembershipRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload GroupMembershipRPCPayload `json:"payload"`
}
type GroupMembershipRPCPayload ¶
type GroupMembershipRPCPayload struct {
Groups []GroupMembershipData `json:"groups"`
FullList bool `json:"full_list"`
}
type GroupMembershipReqRPCPayload ¶
type GroupMembershipReqRPCPayload struct {
}
type GroupRemovedRPC ¶
type GroupRemovedRPC struct {
SchemaVersion string `json:"schema_version"`
Func string `json:"func"`
Payload GroupRemovedRPCPayload `json:"payload"`
}
type GroupRemovedRPCPayload ¶
type GroupStateInfo ¶
type Heartbeat ¶
type Heartbeat struct {
SchemaVersion string `json:"schema_version"`
TimeStamp time.Time `json:"ts"`
State State `json:"state"`
BackendState map[string]BackendStateInfo `json:"backend_state"`
PolicyState map[string]PolicyStateInfo `json:"policy_state"`
GroupState map[string]GroupStateInfo `json:"group_state"`
}
type MatchingGroups ¶
type OrbAgentInfo ¶
type OrbAgentInfo struct {
Version string `json:"version"`
}
type Page ¶
type Page struct {
PageMetadata
Agents []Agent
}
Page contains page related metadata as well as list of agents that belong to this page.
type PageAgentGroup ¶
type PageAgentGroup struct {
PageMetadata
AgentGroups []AgentGroup
}
type PageMetadata ¶
type PageMetadata struct {
Total uint64
Offset uint64 `json:"offset,omitempty"`
Limit uint64 `json:"limit,omitempty"`
Name string `json:"name,omitempty"`
Order string `json:"order,omitempty"`
Dir string `json:"dir,omitempty"`
Metadata types.Metadata `json:"metadata,omitempty"`
Tags types.Tags `json:"tags,omitempty"`
}
PageMetadata contains page metadata that helps navigation.
type PolicyStateInfo ¶
type PolicyStateInfo struct {
Name string `json:"name"`
Datasets []string `json:"datasets,omitempty"`
State string `json:"state"`
Error string `json:"error,omitempty"`
Version int32 `json:"version,omitempty"`
LastScrapeBytes int64 `json:"last_scrape_bytes,omitempty"`
LastScrapeTS time.Time `json:"last_scrape_ts,omitempty"`
Backend string `json:"backend,omitempty"`
}
type SchemaVersionCheck ¶
type SchemaVersionCheck struct {
SchemaVersion string `json:"schema_version"`
}
type Service ¶
type Service interface {
AgentService
AgentGroupService
}
func NewFleetService ¶
func NewFleetService(logger *zap.Logger, auth mainflux.AuthServiceClient, agentRepo AgentRepository, agentGroupRepository AgentGroupRepository, agentComms AgentCommsService, mfsdk mfsdk.SDK, aDone chan bool) Service
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
api
|
|
|
Package postgres contains repository implementations using PostgreSQL as the underlying database.
|
Package postgres contains repository implementations using PostgreSQL as the underlying database. |
|
redis
|
|
|
consumer
Package esconsumer contains events esconsumer for events
|
Package esconsumer contains events esconsumer for events |
|
producer
Package producer contains the domain events needed to support event sourcing of Sink service actions.
|
Package producer contains the domain events needed to support event sourcing of Sink service actions. |
Click to show internal directories.
Click to hide internal directories.