service

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 50 Imported by: 0

Documentation

Overview

Package service implements the gRPC server for NICo Flow, the rack-level asset management system. It provides APIs for managing rack-level assets including creating, retrieving, and updating rack and component information.

TODO: This file is getting large. Consider splitting into multiple module files

Index

Constants

View Source
const (
	// DefaultPort is the default port the Flow gRPC server listens on.
	DefaultPort = 50051

	// EnvVarName is the environment variable operators set to declare the
	// deployment environment. Valid values: "development", "staging", "production".
	// Must be set explicitly; there is no implicit default.
	EnvVarName = "FLOW_ENV"
)

Variables

This section is empty.

Functions

func BuildTemporalConfigFromEnv

func BuildTemporalConfigFromEnv() (*temporal.Config, error)

BuildTemporalConfigFromEnv builds a Temporal client configuration from environment variables: TEMPORAL_HOST, TEMPORAL_PORT, TEMPORAL_NAMESPACE, TEMPORAL_CERT_PATH, TEMPORAL_ENABLE_TLS, and TEMPORAL_SERVER_NAME.

func GetDeploymentEnv

func GetDeploymentEnv() (string, error)

GetDeploymentEnv reads FLOW_ENV and returns the resolved environment name. An unset or empty value is an error; callers that want a development default (e.g. a local CLI entrypoint) must set the variable before calling this.

Types

type Config

type Config struct {
	Port             int
	DBConf           cdb.Config
	ExecutorConf     executor.ExecutorConfig
	FlowConfig       config.Config
	CMConfig         cmconfig.Config
	ProviderRegistry *providerapi.ProviderRegistry

	// DevMode enables developer options such as gRPC reflection and debug
	// logging. Must not be set in staging/production environments.
	DevMode bool

	// CertConfig holds certificate file paths for the gRPC server listener.
	// When set, these take precedence over CERTDIR / the k8s default.
	// Either all three fields must be set or none.
	CertConfig pkgcerts.Config
}

Config holds the service configuration. It uses interfaces to abstract implementation details:

  • ExecutorConfig: abstracts the task executor (e.g., Temporal)

func (Config) Validate

func (c Config) Validate() error

Validate checks the Config for unsafe combinations and returns an error for the first violation found, in priority order:

  1. Unknown or unset FLOW_ENV — always rejected regardless of other settings.
  2. DevMode in a non-development environment — staging and production block it.
  3. Partial CertConfig — all three cert paths must be set together or not at all.
  4. Missing TLS in staging or production — those environments require mTLS.

type FlowServerImpl

type FlowServerImpl struct {
	pb.UnimplementedFlowServer // Embedded protobuf server interface for forward compatibility
	// contains filtered or unexported fields
}

FlowServerImpl implements the gRPC Flow server interface. It acts as an adapter between gRPC protobuf messages and the internal managers, handling protobuf conversion and delegating business logic to the InventoryManager.

func (*FlowServerImpl) AddComponent

func (rs *FlowServerImpl) AddComponent(
	ctx context.Context,
	req *pb.AddComponentRequest,
) (*pb.AddComponentResponse, error)

AddComponent creates a single component under an existing rack.

func (*FlowServerImpl) AddTaskScheduleScope

AddTaskScheduleScope is the additive variant of scope management:

  • incoming racks are merged into the existing scope.
  • a rack already present has its component filter unioned with the incoming filter.
  • a rack not yet present is added.
  • existing racks are never removed.

func (*FlowServerImpl) AssociateRuleWithRack

func (rs *FlowServerImpl) AssociateRuleWithRack(
	ctx context.Context,
	req *pb.AssociateRuleWithRackRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) AttachRacksToNVLDomain

func (rs *FlowServerImpl) AttachRacksToNVLDomain(
	ctx context.Context,
	req *pb.AttachRacksToNVLDomainRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) BringUpRack

func (rs *FlowServerImpl) BringUpRack(
	ctx context.Context,
	req *pb.BringUpRackRequest,
) (*pb.SubmitTaskResponse, error)

func (*FlowServerImpl) CancelTask

func (rs *FlowServerImpl) CancelTask(
	ctx context.Context,
	req *pb.CancelTaskRequest,
) (*pb.CancelTaskResponse, error)

func (*FlowServerImpl) CheckScheduleConflicts

CheckScheduleConflicts is an advisory RPC that identifies which existing enabled schedules may conflict with a proposed operation at execution time. It does not block creation — callers may proceed even when conflicts are returned. Execution-time conflict detection remains the authoritative backstop.

The check is deliberately coarse: it compares only the operation type and code of the proposed operation against those of each existing schedule on the same racks. It does not intersect component-type filters or explicit component UUID lists, so two schedules that target disjoint component sets on the same rack will still be flagged as conflicting here. Callers should treat a non-empty response as a prompt for human review rather than a definitive statement that tasks will collide at runtime.

func (*FlowServerImpl) CreateExpectedRack

CreateExpectedRack creates a new expected rack configuration in the system. It converts the protobuf rack definition to internal format and stores it for later matching against physical rack discoveries.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: CreateExpectedRackRequest containing the rack configuration

Returns:

  • *pb.CreateExpectedRackResponse: Response containing the generated or existing or given rack ID
  • error: Any error that occurred during rack creation

func (*FlowServerImpl) CreateNVLDomain

func (*FlowServerImpl) CreateOperationRule

func (*FlowServerImpl) CreateTaskSchedule

func (rs *FlowServerImpl) CreateTaskSchedule(
	ctx context.Context,
	req *pb.CreateTaskScheduleRequest,
) (*pb.TaskSchedule, error)

CreateTaskSchedule creates a new task schedule.

func (*FlowServerImpl) DeleteComponent

DeleteComponent soft-deletes a component by UUID.

func (*FlowServerImpl) DeleteOperationRule

func (rs *FlowServerImpl) DeleteOperationRule(
	ctx context.Context,
	req *pb.DeleteOperationRuleRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) DeleteRack

func (rs *FlowServerImpl) DeleteRack(
	ctx context.Context,
	req *pb.DeleteRackRequest,
) (*pb.DeleteRackResponse, error)

DeleteRack soft-deletes a rack and all its components.

func (*FlowServerImpl) DeleteTaskSchedule

func (rs *FlowServerImpl) DeleteTaskSchedule(
	ctx context.Context,
	req *pb.DeleteTaskScheduleRequest,
) (*emptypb.Empty, error)

DeleteTaskSchedule hard-deletes a task schedule and its scopes (cascade).

func (*FlowServerImpl) DetachRacksFromNVLDomain

func (rs *FlowServerImpl) DetachRacksFromNVLDomain(
	ctx context.Context,
	req *pb.DetachRacksFromNVLDomainRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) DisassociateRuleFromRack

func (rs *FlowServerImpl) DisassociateRuleFromRack(
	ctx context.Context,
	req *pb.DisassociateRuleFromRackRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) GetComponentInfoByID

func (rs *FlowServerImpl) GetComponentInfoByID(
	ctx context.Context,
	req *pb.GetComponentInfoByIDRequest,
) (*pb.GetComponentInfoResponse, error)

GetComponentInfoByID retrieves component information by its unique identifier. Optionally includes the parent rack information if requested. This method performs a two-step lookup: first retrieving the component and its rack ID, then fetching rack details if requested.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: GetComponentInfoByIDRequest containing the component ID and options

Returns:

  • *pb.GetComponentInfoResponse: Response containing component and optionally rack information
  • error: Any error that occurred during component or rack retrieval

func (*FlowServerImpl) GetComponentInfoBySerial

func (rs *FlowServerImpl) GetComponentInfoBySerial(
	ctx context.Context,
	req *pb.GetComponentInfoBySerialRequest,
) (*pb.GetComponentInfoResponse, error)

GetComponentInfoBySerial retrieves component information by its manufacturer and serial number. This allows lookup of components using their physical identification rather than system-generated ID. Optionally includes the parent rack information if requested. Like GetComponentInfoByID, this method performs a two-step lookup when rack information is requested.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: GetComponentInfoBySerialRequest containing manufacturer, serial number, and options

Returns:

  • *pb.GetComponentInfoResponse: Response containing component and optionally rack information
  • error: Any error that occurred during component or rack retrieval

func (*FlowServerImpl) GetComponents

func (rs *FlowServerImpl) GetComponents(
	ctx context.Context,
	req *pb.GetComponentsRequest,
) (*pb.GetComponentsResponse, error)

GetComponents retrieves components from local database with filtering, pagination, and ordering support. If target_spec is provided, it extracts components from the specified racks or components first, then applies additional filters (name, manufacturer, model, component_types), pagination, and ordering. If target_spec is not provided, it queries all components matching the filters.

func (*FlowServerImpl) GetListOfNVLDomains

func (*FlowServerImpl) GetListOfRacks

func (*FlowServerImpl) GetOperationRule

func (rs *FlowServerImpl) GetOperationRule(
	ctx context.Context,
	req *pb.GetOperationRuleRequest,
) (*pb.OperationRule, error)

func (*FlowServerImpl) GetRackInfoByID

func (rs *FlowServerImpl) GetRackInfoByID(
	ctx context.Context,
	req *pb.GetRackInfoByIDRequest,
) (*pb.GetRackInfoResponse, error)

GetRackInfoByID retrieves rack information by its unique identifier. Optionally includes component information if requested.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: GetRackInfoByIDRequest containing the rack ID and options

Returns:

  • *pb.GetRackInfoResponse: Response containing the rack information
  • error: Any error that occurred during rack retrieval

func (*FlowServerImpl) GetRackInfoBySerial

func (rs *FlowServerImpl) GetRackInfoBySerial(
	ctx context.Context,
	req *pb.GetRackInfoBySerialRequest,
) (*pb.GetRackInfoResponse, error)

GetRackInfoBySerial retrieves rack information by its manufacturer and serial number. This allows lookup of racks using their physical identification rather than system-generated ID. Optionally includes component information if requested.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: GetRackInfoBySerialRequest containing manufacturer, serial number, and options

Returns:

  • *pb.GetRackInfoResponse: Response containing the rack information
  • error: Any error that occurred during rack retrieval

func (*FlowServerImpl) GetRackRuleAssociation

func (*FlowServerImpl) GetRacksForNVLDomain

func (*FlowServerImpl) GetTaskSchedule

func (rs *FlowServerImpl) GetTaskSchedule(
	ctx context.Context,
	req *pb.GetTaskScheduleRequest,
) (*pb.TaskSchedule, error)

GetTaskSchedule retrieves a task schedule by ID.

func (*FlowServerImpl) GetTasksByIDs

func (rs *FlowServerImpl) GetTasksByIDs(
	ctx context.Context,
	req *pb.GetTasksByIDsRequest,
) (*pb.GetTasksByIDsResponse, error)

func (*FlowServerImpl) IngestRack

func (rs *FlowServerImpl) IngestRack(
	ctx context.Context,
	req *pb.IngestRackRequest,
) (*pb.SubmitTaskResponse, error)

IngestRack is a convenience API that triggers component ingestion by reusing the BringUp workflow with an ingestion-only rule. This registers expected components with their respective component manager services without performing power or firmware operations.

func (*FlowServerImpl) ListOperationRules

func (*FlowServerImpl) ListRackRuleAssociations

func (*FlowServerImpl) ListTaskScheduleScopes

ListTaskScheduleScopes lists all scope entries for a task schedule.

func (*FlowServerImpl) ListTaskSchedules

ListTaskSchedules lists task schedules, optionally filtered by rack.

func (*FlowServerImpl) ListTasks

func (rs *FlowServerImpl) ListTasks(
	ctx context.Context,
	req *pb.ListTasksRequest,
) (*pb.ListTasksResponse, error)

func (*FlowServerImpl) PatchComponent

PatchComponent updates a single component's patchable fields.

func (*FlowServerImpl) PatchRack

func (rs *FlowServerImpl) PatchRack(
	ctx context.Context,
	req *pb.PatchRackRequest,
) (*pb.PatchRackResponse, error)

PatchRack updates an existing rack configuration with new information. This method performs intelligent merging of rack and component data, creating new components as needed and updating existing ones. It returns a detailed report of all operations performed during the patching process.

Parameters:

  • ctx: Request context for cancellation and deadline management
  • req: PatchRackRequest containing the updated rack configuration

Returns:

  • *pb.PatchRackResponse: Response containing a JSON report of patch operations
  • error: Any error that occurred during rack patching

func (*FlowServerImpl) PauseTaskSchedule

func (rs *FlowServerImpl) PauseTaskSchedule(
	ctx context.Context,
	req *pb.PauseTaskScheduleRequest,
) (*pb.TaskSchedule, error)

PauseTaskSchedule disables a task schedule so it will not fire until resumed. Returns an error if the schedule is a one-time type that has already fired (enabled=false with spec_type=one-time), since there is nothing left to pause. Returns the existing record unchanged if already paused.

func (*FlowServerImpl) PowerOffRack

func (rs *FlowServerImpl) PowerOffRack(
	ctx context.Context,
	req *pb.PowerOffRackRequest,
) (*pb.SubmitTaskResponse, error)

func (*FlowServerImpl) PowerOnRack

func (rs *FlowServerImpl) PowerOnRack(
	ctx context.Context,
	req *pb.PowerOnRackRequest,
) (*pb.SubmitTaskResponse, error)

func (*FlowServerImpl) PowerResetRack

func (rs *FlowServerImpl) PowerResetRack(
	ctx context.Context,
	req *pb.PowerResetRackRequest,
) (*pb.SubmitTaskResponse, error)

func (*FlowServerImpl) PurgeComponent

PurgeComponent permanently removes a soft-deleted component.

func (*FlowServerImpl) PurgeRack

func (rs *FlowServerImpl) PurgeRack(
	ctx context.Context,
	req *pb.PurgeRackRequest,
) (*pb.PurgeRackResponse, error)

PurgeRack permanently removes a soft-deleted rack and its components.

func (*FlowServerImpl) RemoveTaskScheduleScope

func (rs *FlowServerImpl) RemoveTaskScheduleScope(
	ctx context.Context,
	req *pb.RemoveTaskScheduleScopeRequest,
) (*emptypb.Empty, error)

RemoveTaskScheduleScope removes a rack scope entry from a task schedule.

func (*FlowServerImpl) ResumeTaskSchedule

func (rs *FlowServerImpl) ResumeTaskSchedule(
	ctx context.Context,
	req *pb.ResumeTaskScheduleRequest,
) (*pb.TaskSchedule, error)

ResumeTaskSchedule re-enables a paused task schedule. Returns the existing record unchanged if already enabled. Returns an error if the schedule is a one-time type that has already fired (next_run_at=nil), since it cannot be re-armed. A one-time schedule that was paused before firing (next_run_at still set) can be resumed normally. For interval and cron schedules, next_run_at is recomputed from the current time so the schedule does not fire immediately on resume.

func (*FlowServerImpl) SetRuleAsDefault

func (rs *FlowServerImpl) SetRuleAsDefault(
	ctx context.Context,
	req *pb.SetRuleAsDefaultRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) TriggerTaskSchedule

func (rs *FlowServerImpl) TriggerTaskSchedule(
	ctx context.Context,
	req *pb.TriggerTaskScheduleRequest,
) (*pb.SubmitTaskResponse, error)

TriggerTaskSchedule fires a task schedule immediately, regardless of next_run_at or enabled state. The overlap policy is not consulted — all scopes are submitted unconditionally. Returns an error if called on a one-time schedule that has already fired.

func (*FlowServerImpl) UpdateOperationRule

func (rs *FlowServerImpl) UpdateOperationRule(
	ctx context.Context,
	req *pb.UpdateOperationRuleRequest,
) (*emptypb.Empty, error)

func (*FlowServerImpl) UpdateTaskSchedule

func (rs *FlowServerImpl) UpdateTaskSchedule(
	ctx context.Context,
	req *pb.UpdateTaskScheduleRequest,
) (*pb.TaskSchedule, error)

UpdateTaskSchedule updates the name and/or schedule config of a task schedule. update_mask is required and must list the paths to apply:

  • "schedule.name" – display name
  • "schedule.overlap_policy" – overlap behaviour
  • "schedule.spec" – full spec block; recomputes next_run_at
  • "schedule.spec.timezone" – timezone only, spec type/string unchanged

func (*FlowServerImpl) UpdateTaskScheduleScope

UpdateTaskScheduleScope is the reconciling variant of scope management: the schedule's scope is replaced to match the desired target spec exactly. Racks absent from the desired spec are removed; racks present in the desired spec but not in the current scope are added; racks in both have their component filter replaced if it changed.

See AddTaskScheduleScope for the additive variant that merges without removing.

func (*FlowServerImpl) UpgradeFirmware

func (rs *FlowServerImpl) UpgradeFirmware(
	ctx context.Context,
	req *pb.UpgradeFirmwareRequest,
) (*pb.SubmitTaskResponse, error)

UpgradeFirmware upgrades firmware for components. It uses OperationTargetSpec to specify targets and creates a task via the Task framework.

func (*FlowServerImpl) ValidateComponents

ValidateComponents returns pre-computed drifts between expected (local DB) and actual (source system) components. Results are computed asynchronously by the inventory loop, so this API returns quickly without calling external systems.

If target_spec is provided, only drifts for the specified components are returned. If target_spec is not provided, all drifts are returned.

func (*FlowServerImpl) Version

func (rs *FlowServerImpl) Version(
	ctx context.Context,
	req *pb.VersionRequest,
) (*pb.BuildInfo, error)

Version returns the build information for this Flow service. This includes the version, build time, and git commit hash.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the top-level Flow service. It owns the gRPC server, database session, inventory manager, and task manager and coordinates their lifecycles.

func New

func New(ctx context.Context, c Config) (*Service, error)

New creates and initialises a Service from the provided Config. It opens the database connection, runs pending migrations, and wires up the inventory and task managers. The returned service is ready to Start.

func (*Service) Start

func (s *Service) Start(ctx context.Context) (retErr error)

Start starts the inventory manager, task manager, and inventory sync goroutine, then begins serving gRPC requests on the configured port. It blocks until the gRPC server stops.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context)

Stop gracefully shuts down the service in dependency order:

  1. Background producers (dispatcher, system scheduler) — stop submitting new work.
  2. gRPC server — drain in-flight RPCs; no new requests accepted after this.
  3. Task and inventory managers — safe to stop once no new submissions can arrive.
  4. Database session.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL