data_structures_module

package module
v0.0.0-...-a815409 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClearQueueResult

type ClearQueueResult struct {
	Success      bool                     `json:"success"`
	ClearedCount int                      `json:"cleared_count"`
	ClearedJobs  []map[string]interface{} `json:"cleared_jobs"`
}

ClearQueueResult represents the result of clearing the job queue

type ClearResult

type ClearResult struct {
	Success        bool                     `json:"success"`
	ClearedCount   int                      `json:"cleared_count"`
	ClearedRecords []map[string]interface{} `json:"cleared_records,omitempty"`
	Path           string                   `json:"path"`
	Operation      string                   `json:"operation,omitempty"`
	Error          string                   `json:"error,omitempty"`
}

ClearResult represents the result of clearing stream data

type Filter

type Filter struct {
	Condition string
	Params    map[string]interface{}
}

Filter represents a query filter with condition and parameters

type JobCompletionResult

type JobCompletionResult struct {
	Success     bool       `json:"success"`
	JobID       int        `json:"job_id"`
	CompletedAt *time.Time `json:"completed_at"`
}

JobCompletionResult represents the result of marking a job as completed

type JobCounts

type JobCounts struct {
	EmptyJobs      int `json:"empty_jobs"`
	NewJobs        int `json:"new_jobs"`
	ProcessingJobs int `json:"processing_jobs"`
}

JobCounts represents counts of different job states

type JobRecord

type JobRecord struct {
	ID          int                    `json:"id"`
	Path        string                 `json:"path"`
	ScheduleAt  *time.Time             `json:"schedule_at"`
	StartedAt   *time.Time             `json:"started_at"`
	CompletedAt *time.Time             `json:"completed_at"`
	IsActive    bool                   `json:"is_active"`
	Valid       bool                   `json:"valid"`
	Data        map[string]interface{} `json:"data"`
}

JobRecord represents a single job record

type JobStatistics

type JobStatistics struct {
	TotalJobs                int        `json:"total_jobs"`
	PendingJobs              int        `json:"pending_jobs"`
	ActiveJobs               int        `json:"active_jobs"`
	CompletedJobs            int        `json:"completed_jobs"`
	EarliestScheduled        *time.Time `json:"earliest_scheduled"`
	LatestCompleted          *time.Time `json:"latest_completed"`
	AvgProcessingTimeSeconds *float64   `json:"avg_processing_time_seconds"`
}

JobStatistics represents job queue statistics

type KBDataStructures

type KBDataStructures struct {
	// contains filtered or unexported fields
}

KBDataStructures handles the data structures for the knowledge base

func NewKBDataStructures

func NewKBDataStructures(host, port, dbname, user, password, database string) (*KBDataStructures, error)

NewKBDataStructures creates a new instance of KBDataStructures

func (*KBDataStructures) ClearFilters

func (kds *KBDataStructures) ClearFilters()

Query Support Methods (delegated to querySupport)

func (*KBDataStructures) ClearJobQueue

func (kds *KBDataStructures) ClearJobQueue(jobPath string) (*ClearQueueResult, error)

func (*KBDataStructures) ClearStreamData

func (kds *KBDataStructures) ClearStreamData(path string, olderThan *time.Time) *ClearResult

func (*KBDataStructures) DecodeLinkNodes

func (kds *KBDataStructures) DecodeLinkNodes(path string) (string, [][]string, error)

func (*KBDataStructures) Disconnect

func (kds *KBDataStructures) Disconnect() error

Disconnect closes the database connection

func (*KBDataStructures) ExecuteKBSearch

func (kds *KBDataStructures) ExecuteKBSearch(property_value map[string]interface{}) ([]map[string]interface{}, error)

func (*KBDataStructures) FindDescription

func (kds *KBDataStructures) FindDescription(row map[string]interface{}) map[string]string

func (*KBDataStructures) FindDescriptionPath

func (kds *KBDataStructures) FindDescriptionPath(path string) (map[string]interface{}, error)

func (*KBDataStructures) FindDescriptionPaths

func (kds *KBDataStructures) FindDescriptionPaths(paths []string) ([]map[string]interface{}, error)

func (*KBDataStructures) FindDescriptions

func (kds *KBDataStructures) FindDescriptions(row []map[string]interface{}) []map[string]string

func (*KBDataStructures) FindJobID

func (kds *KBDataStructures) FindJobID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

Job Queue Methods (delegated to jobQueue)

func (*KBDataStructures) FindJobIDs

func (kds *KBDataStructures) FindJobIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

func (*KBDataStructures) FindPathValues

func (kds *KBDataStructures) FindPathValues(keyData []map[string]interface{}) []string

func (*KBDataStructures) FindRPCClientID

func (kds *KBDataStructures) FindRPCClientID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

RPC Client Methods (delegated to rpcClient)

func (*KBDataStructures) FindRPCClientIDs

func (kds *KBDataStructures) FindRPCClientIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

func (*KBDataStructures) FindRPCClientKeys

func (kds *KBDataStructures) FindRPCClientKeys(keyData []map[string]interface{}) []string

func (*KBDataStructures) FindRPCServerID

func (kds *KBDataStructures) FindRPCServerID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

RPC Server Methods (delegated to rpcServer)

func (*KBDataStructures) FindRPCServerIDs

func (kds *KBDataStructures) FindRPCServerIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

func (*KBDataStructures) FindRPCServerTableKeys

func (kds *KBDataStructures) FindRPCServerTableKeys(keyData []map[string]interface{}) []string

func (*KBDataStructures) FindStatusNodeIDs

func (kds *KBDataStructures) FindStatusNodeIDs(kb, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

Status Data Methods (delegated to statusData)

func (*KBDataStructures) FindStreamID

func (kds *KBDataStructures) FindStreamID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

func (*KBDataStructures) FindStreamIDs

func (kds *KBDataStructures) FindStreamIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

func (*KBDataStructures) FindStreamTableKeys

func (kds *KBDataStructures) FindStreamTableKeys(nodeIDs []map[string]interface{}) []string

func (*KBDataStructures) GetFreeNumber

func (kds *KBDataStructures) GetFreeNumber(jobPath string) (int, error)

func (*KBDataStructures) GetQueuedNumber

func (kds *KBDataStructures) GetQueuedNumber(jobPath string) (int, error)

func (*KBDataStructures) GetStatusData

func (kds *KBDataStructures) GetStatusData(path string) (map[string]interface{}, string, error)

func (*KBDataStructures) GetStreamDataByID

func (kds *KBDataStructures) GetStreamDataByID(recordID int) (*StreamRecord, error)

func (*KBDataStructures) GetStreamDataCount

func (kds *KBDataStructures) GetStreamDataCount(path string, includeInvalid bool) (int, error)

func (*KBDataStructures) GetStreamDataRange

func (kds *KBDataStructures) GetStreamDataRange(path string, startTime, endTime time.Time) ([]StreamRecord, error)

func (*KBDataStructures) GetStreamStatistics

func (kds *KBDataStructures) GetStreamStatistics(path string, includeInvalid bool) (*StreamStatistics, error)

func (*KBDataStructures) LinkMountTableFindAllLinkNames

func (kds *KBDataStructures) LinkMountTableFindAllLinkNames() ([]string, error)

func (*KBDataStructures) LinkMountTableFindAllMountPaths

func (kds *KBDataStructures) LinkMountTableFindAllMountPaths() ([]string, error)

func (*KBDataStructures) LinkMountTableFindRecordsByLinkName

func (kds *KBDataStructures) LinkMountTableFindRecordsByLinkName(linkName string, kb *string) ([]map[string]interface{}, error)

Link Mount Table Methods (delegated to linkMountTable)

func (*KBDataStructures) LinkMountTableFindRecordsByMountPath

func (kds *KBDataStructures) LinkMountTableFindRecordsByMountPath(mountPath string, kb *string) ([]map[string]interface{}, error)

func (*KBDataStructures) LinkTableFindAllLinkNames

func (kds *KBDataStructures) LinkTableFindAllLinkNames() ([]string, error)

func (*KBDataStructures) LinkTableFindAllNodeNames

func (kds *KBDataStructures) LinkTableFindAllNodeNames() ([]string, error)

func (*KBDataStructures) LinkTableFindRecordsByLinkName

func (kds *KBDataStructures) LinkTableFindRecordsByLinkName(linkName string, kb *string) ([]map[string]interface{}, error)

Link Table Methods (delegated to linkTable)

func (*KBDataStructures) LinkTableFindRecordsByNodePath

func (kds *KBDataStructures) LinkTableFindRecordsByNodePath(nodePath string, kb *string) ([]map[string]interface{}, error)

func (*KBDataStructures) ListActiveJobs

func (kds *KBDataStructures) ListActiveJobs(jobPath string, limit *int, offset int) ([]JobRecord, error)

func (*KBDataStructures) ListPendingJobs

func (kds *KBDataStructures) ListPendingJobs(jobPath string, limit *int, offset int) ([]JobRecord, error)

func (*KBDataStructures) ListStreamData

func (kds *KBDataStructures) ListStreamData(path string, limit *int, offset int, recordedAfter, recordedBefore *time.Time, order string) ([]StreamRecord, error)

func (*KBDataStructures) MarkJobCompleted

func (kds *KBDataStructures) MarkJobCompleted(jobID int, maxRetries int, retryDelay time.Duration) (*JobCompletionResult, error)

func (*KBDataStructures) PeakJobData

func (kds *KBDataStructures) PeakJobData(jobPath string, maxRetries int, retryDelay time.Duration) (*PeakJobResult, error)

func (*KBDataStructures) PushJobData

func (kds *KBDataStructures) PushJobData(jobPath string, data map[string]interface{}, maxRetries int, retryDelay time.Duration) (*PushJobResult, error)

func (*KBDataStructures) PushStreamData

func (kds *KBDataStructures) PushStreamData(streamKey string, data map[string]interface{}, maxRetries int, retryDelay time.Duration) (*StreamPushResult, error)

func (*KBDataStructures) RPCClientClearReplyQueue

func (kds *KBDataStructures) RPCClientClearReplyQueue(clientPath string, maxRetries int, retryDelay time.Duration) (int, error)

func (*KBDataStructures) RPCClientFindFreeSlots

func (kds *KBDataStructures) RPCClientFindFreeSlots(clientPath string) (int, error)

func (*KBDataStructures) RPCClientFindQueuedSlots

func (kds *KBDataStructures) RPCClientFindQueuedSlots(clientPath string) (int, error)

func (*KBDataStructures) RPCClientListWaitingJobs

func (kds *KBDataStructures) RPCClientListWaitingJobs(clientPath *string) ([]ReplyData, error)

func (*KBDataStructures) RPCClientPeakAndClaimReplyData

func (kds *KBDataStructures) RPCClientPeakAndClaimReplyData(clientPath string, maxRetries int, retryDelay time.Duration) (*ReplyData, error)

func (*KBDataStructures) RPCClientPushAndClaimReplyData

func (kds *KBDataStructures) RPCClientPushAndClaimReplyData(clientPath string, requestUUID, serverPath, rpcAction,
	transactionTag string, replyData map[string]interface{}, maxRetries int, retryDelay time.Duration) error

func (*KBDataStructures) RPCServerClearServerQueue

func (kds *KBDataStructures) RPCServerClearServerQueue(serverPath string, maxRetries int, retryDelay time.Duration) (int, error)

func (*KBDataStructures) RPCServerCountAllJobs

func (kds *KBDataStructures) RPCServerCountAllJobs(serverPath string) (*JobCounts, error)

func (*KBDataStructures) RPCServerCountEmptyJobs

func (kds *KBDataStructures) RPCServerCountEmptyJobs(serverPath string) (int, error)

func (*KBDataStructures) RPCServerCountJobsJobTypes

func (kds *KBDataStructures) RPCServerCountJobsJobTypes(serverPath, jobType string) (int, error)

func (*KBDataStructures) RPCServerCountNewJobs

func (kds *KBDataStructures) RPCServerCountNewJobs(serverPath string) (int, error)

func (*KBDataStructures) RPCServerCountProcessingJobs

func (kds *KBDataStructures) RPCServerCountProcessingJobs(serverPath string) (int, error)

func (*KBDataStructures) RPCServerListJobsJobTypes

func (kds *KBDataStructures) RPCServerListJobsJobTypes(serverPath, jobType string) ([]map[string]interface{}, error)

func (*KBDataStructures) RPCServerMarkJobCompletion

func (kds *KBDataStructures) RPCServerMarkJobCompletion(serverPath string, id int, maxRetries int, retryDelay time.Duration) (bool, error)

func (*KBDataStructures) RPCServerPeakServerQueue

func (kds *KBDataStructures) RPCServerPeakServerQueue(serverPath string, retries int, waitTime time.Duration) (map[string]interface{}, error)

func (*KBDataStructures) RPCServerPushRPCQueue

func (kds *KBDataStructures) RPCServerPushRPCQueue(serverPath, requestID, rpcAction string, requestPayload map[string]interface{},
	transactionTag string, priority int, rpcClientQueue *string, maxRetries int, waitTime time.Duration) (map[string]interface{}, error)
func (kds *KBDataStructures) SearchHasLink()

func (*KBDataStructures) SearchHasLinkMount

func (kds *KBDataStructures) SearchHasLinkMount()

func (*KBDataStructures) SearchLabel

func (kds *KBDataStructures) SearchLabel(label string)

func (*KBDataStructures) SearchName

func (kds *KBDataStructures) SearchName(name string)

func (*KBDataStructures) SearchPath

func (kds *KBDataStructures) SearchPath(path string)

func (*KBDataStructures) SearchPropertyKey

func (kds *KBDataStructures) SearchPropertyKey(key string)

func (*KBDataStructures) SearchPropertyValue

func (kds *KBDataStructures) SearchPropertyValue(value string, property_value map[string]interface{})

func (*KBDataStructures) SearchStartingPath

func (kds *KBDataStructures) SearchStartingPath(path string)

func (*KBDataStructures) SetStatusData

func (kds *KBDataStructures) SetStatusData(path string, data map[string]interface{}, retryCount int, retryDelay time.Duration) (bool, string, error)

type KBJobQueue

type KBJobQueue struct {
	KBSearch *KBSearch

	BaseTable string
	// contains filtered or unexported fields
}

KBJobQueue handles job queue operations for the knowledge base

func NewKBJobQueue

func NewKBJobQueue(kbSearch *KBSearch, database string) *KBJobQueue

NewKBJobQueue creates a new KBJobQueue instance

func (*KBJobQueue) ClearJobQueue

func (jq *KBJobQueue) ClearJobQueue(path string) (*ClearQueueResult, error)

ClearJobQueue clears all jobs for a given path

func (*KBJobQueue) FindJobID

func (jq *KBJobQueue) FindJobID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

FindJobID finds a single job id for given parameters

func (*KBJobQueue) FindJobIDs

func (jq *KBJobQueue) FindJobIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

FindJobIDs finds all job ids matching the given parameters

func (*KBJobQueue) FindJobPaths

func (jq *KBJobQueue) FindJobPaths(tableDictRows []map[string]interface{}) []string

FindJobPaths extracts path values from job query results

func (*KBJobQueue) GetFreeNumber

func (jq *KBJobQueue) GetFreeNumber(path string) (int, error)

GetFreeNumber counts the number of invalid job entries for a given path

func (*KBJobQueue) GetJobByID

func (jq *KBJobQueue) GetJobByID(jobID int) (*JobRecord, error)

GetJobByID retrieves a specific job by its ID

func (*KBJobQueue) GetJobStatistics

func (jq *KBJobQueue) GetJobStatistics(path string) (*JobStatistics, error)

GetJobStatistics gets comprehensive statistics for jobs at a given path

func (*KBJobQueue) GetQueuedNumber

func (jq *KBJobQueue) GetQueuedNumber(path string) (int, error)

GetQueuedNumber counts the number of valid job entries for a given path

func (*KBJobQueue) ListActiveJobs

func (jq *KBJobQueue) ListActiveJobs(path string, limit *int, offset int) ([]JobRecord, error)

ListActiveJobs lists all active jobs for a path

func (*KBJobQueue) ListPendingJobs

func (jq *KBJobQueue) ListPendingJobs(path string, limit *int, offset int) ([]JobRecord, error)

ListPendingJobs lists all pending jobs for a path

func (*KBJobQueue) MarkJobCompleted

func (jq *KBJobQueue) MarkJobCompleted(jobID int, maxRetries int, retryDelay time.Duration) (*JobCompletionResult, error)

MarkJobCompleted marks a job as completed

func (*KBJobQueue) PeakJobData

func (jq *KBJobQueue) PeakJobData(path string, maxRetries int, retryDelay time.Duration) (*PeakJobResult, error)

PeakJobData finds and claims the earliest scheduled job for a path

func (*KBJobQueue) PushJobData

func (jq *KBJobQueue) PushJobData(path string, data map[string]interface{}, maxRetries int, retryDelay time.Duration) (*PushJobResult, error)

PushJobData pushes new job data to an available slot

type KBLinkMountTable

type KBLinkMountTable struct {
	// contains filtered or unexported fields
}

KBLinkMountTable represents the link mount table operations

func NewKBLinkMountTable

func NewKBLinkMountTable(conn *sql.DB, database string) *KBLinkMountTable

NewKBLinkMountTable creates a new instance of KBLinkMountTable Equivalent to Python's __init__ method

func (*KBLinkMountTable) FindAllLinkNames

func (kmt *KBLinkMountTable) FindAllLinkNames() ([]string, error)

FindAllLinkNames gets all unique link names from the table

func (*KBLinkMountTable) FindAllMountPaths

func (kmt *KBLinkMountTable) FindAllMountPaths() ([]string, error)

FindAllMountPaths gets all unique mount paths from the table

func (*KBLinkMountTable) FindRecordsByLinkName

func (kmt *KBLinkMountTable) FindRecordsByLinkName(linkName string, kb *string) ([]map[string]interface{}, error)

FindRecordsByLinkName finds records by link_name, optionally filtered by parent_node_kb

func (*KBLinkMountTable) FindRecordsByMountPath

func (kmt *KBLinkMountTable) FindRecordsByMountPath(mountPath string, kb *string) ([]map[string]interface{}, error)

FindRecordsByMountPath finds records by mount_path, optionally filtered by knowledge_base

type KBLinkTable

type KBLinkTable struct {
	// contains filtered or unexported fields
}

KBLinkTable represents the link table operations

func NewKBLinkTable

func NewKBLinkTable(conn *sql.DB, baseTable string) *KBLinkTable

NewKBLinkTable creates a new instance of KBLinkTable Equivalent to Python's __init__ method

func (*KBLinkTable) FindAllLinkNames

func (kt *KBLinkTable) FindAllLinkNames() ([]string, error)

FindAllLinkNames gets all unique link names from the table

func (*KBLinkTable) FindAllNodeNames

func (kt *KBLinkTable) FindAllNodeNames() ([]string, error)

FindAllNodeNames gets all unique node paths from the table

func (*KBLinkTable) FindRecordsByLinkName

func (kt *KBLinkTable) FindRecordsByLinkName(linkName string, kb *string) ([]map[string]interface{}, error)

FindRecordsByLinkName finds records by link_name, optionally filtered by knowledge_base

func (*KBLinkTable) FindRecordsByNodePath

func (kt *KBLinkTable) FindRecordsByNodePath(nodePath string, kb *string) ([]map[string]interface{}, error)

FindRecordsByNodePath finds records by node_path, optionally filtered by knowledge_base

type KBRPCClient

type KBRPCClient struct {
	KBSearch *KBSearch

	BaseTable string
	// contains filtered or unexported fields
}

KBRPCClient handles RPC client operations for the knowledge base

func NewKBRPCClient

func NewKBRPCClient(kbSearch *KBSearch, database string) *KBRPCClient

NewKBRPCClient creates a new KBRPCClient instance

func (*KBRPCClient) ClearReplyQueue

func (client *KBRPCClient) ClearReplyQueue(clientPath string, maxRetries int, retryDelay time.Duration) (int, error)

ClearReplyQueue clears the reply queue by resetting records matching the specified client path

func (*KBRPCClient) FindFreeSlots

func (client *KBRPCClient) FindFreeSlots(clientPath string) (int, error)

FindFreeSlots finds the number of free slots for a given client path

func (*KBRPCClient) FindQueuedSlots

func (client *KBRPCClient) FindQueuedSlots(clientPath string) (int, error)

FindQueuedSlots finds the number of queued slots for a given client path

func (*KBRPCClient) FindRPCClientID

func (client *KBRPCClient) FindRPCClientID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

FindRPCClientID finds a single RPC client id for given parameters

func (*KBRPCClient) FindRPCClientIDs

func (client *KBRPCClient) FindRPCClientIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

FindRPCClientIDs finds all RPC client ids matching the given parameters

func (*KBRPCClient) FindRPCClientKeys

func (client *KBRPCClient) FindRPCClientKeys(keyData []map[string]interface{}) []string

FindRPCClientKeys extracts path values from RPC client query results

func (*KBRPCClient) ListWaitingJobs

func (client *KBRPCClient) ListWaitingJobs(clientPath *string) ([]ReplyData, error)

ListWaitingJobs lists all rows where is_new_result is TRUE

func (*KBRPCClient) PeakAndClaimReplyData

func (client *KBRPCClient) PeakAndClaimReplyData(clientPath string, maxRetries int, retryDelay time.Duration) (*ReplyData, error)

PeakAndClaimReplyData atomically fetches and marks the next available reply as processed

func (*KBRPCClient) PushAndClaimReplyData

func (client *KBRPCClient) PushAndClaimReplyData(clientPath, requestUUID, serverPath, rpcAction,
	transactionTag string, replyData map[string]interface{}, maxRetries int, retryDelay time.Duration) error

PushAndClaimReplyData atomically claims and updates the earliest matching record

type KBRPCServer

type KBRPCServer struct {
	KBSearch *KBSearch

	BaseTable string
	// contains filtered or unexported fields
}

KBRPCServer handles RPC server operations for the knowledge base

func NewKBRPCServer

func NewKBRPCServer(kbSearch *KBSearch, database string) *KBRPCServer

NewKBRPCServer creates a new KBRPCServer instance

func (*KBRPCServer) ClearServerQueue

func (rpc *KBRPCServer) ClearServerQueue(serverPath string, maxRetries int, retryDelay time.Duration) (int, error)

ClearServerQueue clears the reply queue by resetting records matching the specified server path

func (*KBRPCServer) CountAllJobs

func (rpc *KBRPCServer) CountAllJobs(serverPath string) (*JobCounts, error)

CountAllJobs counts all jobs by state for a server path

func (*KBRPCServer) CountEmptyJobs

func (rpc *KBRPCServer) CountEmptyJobs(serverPath string) (int, error)

CountEmptyJobs counts empty jobs for a server path

func (*KBRPCServer) CountJobsJobTypes

func (rpc *KBRPCServer) CountJobsJobTypes(serverPath string, state string) (int, error)

CountJobsJobTypes counts jobs by type for a server path

func (*KBRPCServer) CountNewJobs

func (rpc *KBRPCServer) CountNewJobs(serverPath string) (int, error)

CountNewJobs counts new jobs for a server path

func (*KBRPCServer) CountProcessingJobs

func (rpc *KBRPCServer) CountProcessingJobs(serverPath string) (int, error)

CountProcessingJobs counts processing jobs for a server path

func (*KBRPCServer) FindRPCServerID

func (rpc *KBRPCServer) FindRPCServerID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

FindRPCServerID finds a single RPC server id for given parameters

func (*KBRPCServer) FindRPCServerIDs

func (rpc *KBRPCServer) FindRPCServerIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

FindRPCServerIDs finds all RPC server ids matching the given parameters

func (*KBRPCServer) FindRPCServerTableKeys

func (rpc *KBRPCServer) FindRPCServerTableKeys(keyData []map[string]interface{}) []string

FindRPCServerTableKeys extracts path values from RPC server query results

func (*KBRPCServer) ListJobsJobTypes

func (rpc *KBRPCServer) ListJobsJobTypes(serverPath string, state string) ([]map[string]interface{}, error)

ListJobsJobTypes lists records matching server path and state

func (*KBRPCServer) MarkJobCompletion

func (rpc *KBRPCServer) MarkJobCompletion(serverPath string, id int, retries int, waitTime time.Duration) (bool, error)

MarkJobCompletion marks a job as completed in the server queue

func (*KBRPCServer) PeakServerQueue

func (rpc *KBRPCServer) PeakServerQueue(serverPath string, retries int, waitTime time.Duration) (map[string]interface{}, error)

PeakServerQueue finds and processes one pending record from the server queue

func (*KBRPCServer) PushRPCQueue

func (rpc *KBRPCServer) PushRPCQueue(serverPath, requestID, rpcAction string, requestPayload map[string]interface{},
	transactionTag string, priority int, rpcClientQueue *string, maxRetries int, waitTime time.Duration) (map[string]interface{}, error)

PushRPCQueue pushes a request to the RPC queue

type KBSearch

type KBSearch struct {
	Path           []string
	Host           string
	Port           string
	DBName         string
	User           string
	Password       string
	BaseTable      string
	LinkTable      string
	LinkMountTable string
	Filters        []Filter
	Results        []map[string]interface{}
	PathValues     map[string]interface{}
	// contains filtered or unexported fields
}

KBSearch handles SQL filtering for the knowledge_base table

func NewKBSearch

func NewKBSearch(host, port, dbname, user, password, database string) (*KBSearch, error)

NewKBSearch creates a new KBSearch instance and connects to the database

func (*KBSearch) ClearFilters

func (kb *KBSearch) ClearFilters()

ClearFilters clears all filters and resets the query state

func (*KBSearch) DecodeLinkNodes

func (kb *KBSearch) DecodeLinkNodes(path string) (string, [][]string, error)

DecodeLinkNodes decodes an ltree path into knowledge base name and node link/name pairs

func (*KBSearch) Disconnect

func (kb *KBSearch) Disconnect() error

Disconnect closes the database connection

func (*KBSearch) ExecuteQuery

func (kb *KBSearch) ExecuteQuery() ([]map[string]interface{}, error)

ExecuteQuery executes the progressive query with all added filters using CTEs

func (*KBSearch) FindDescription

func (kb *KBSearch) FindDescription(row map[string]interface{}) map[string]string

FindDescription extracts description from properties field of a single query result

func (*KBSearch) FindDescriptionPath

func (kb *KBSearch) FindDescriptionPath(path string) (map[string]interface{}, error)

FindDescriptionPath finds data for a single specified path in the knowledge base

func (*KBSearch) FindDescriptionPaths

func (kb *KBSearch) FindDescriptionPaths(paths []string) ([]map[string]interface{}, error)

FindDescriptionPaths finds data for multiple specified paths in the knowledge base

func (*KBSearch) FindDescriptions

func (kb *KBSearch) FindDescriptions(dataSlice []map[string]interface{}) []map[string]string

FindDescriptions extracts descriptions from properties field of multiple query results

func (*KBSearch) FindPathValues

func (kb *KBSearch) FindPathValues(keyData []map[string]interface{}) []string

FindPathValues extracts path values from query results

func (*KBSearch) GetConnAndCursor

func (kb *KBSearch) GetConnAndCursor() (*sql.DB, error)

GetConnAndCursor returns the database connection

func (*KBSearch) GetResults

func (kb *KBSearch) GetResults() []map[string]interface{}

GetResults returns the results of the last executed query

func (kb *KBSearch) SearchHasLink()

SearchHasLink adds a filter to search for rows where has_link is TRUE

func (*KBSearch) SearchHasLinkMount

func (kb *KBSearch) SearchHasLinkMount()

SearchHasLinkMount adds a filter to search for rows where has_link_mount is TRUE

func (*KBSearch) SearchKB

func (kb *KBSearch) SearchKB(knowledgeBase string)

SearchKB adds a filter to search for rows matching the specified knowledge_base

func (*KBSearch) SearchLabel

func (kb *KBSearch) SearchLabel(label string)

SearchLabel adds a filter to search for rows matching the specified label

func (*KBSearch) SearchName

func (kb *KBSearch) SearchName(name string)

SearchName adds a filter to search for rows matching the specified name

func (*KBSearch) SearchPath

func (kb *KBSearch) SearchPath(pathExpression string)

SearchPath adds a filter to search for rows matching the LTREE path expression

func (*KBSearch) SearchPropertyKey

func (kb *KBSearch) SearchPropertyKey(key string)

SearchPropertyKey adds a filter to search for rows where properties contains the key

func (*KBSearch) SearchPropertyValue

func (kb *KBSearch) SearchPropertyValue(key string, value interface{})

SearchPropertyValue adds a filter to search for rows where properties contains the key-value pair

func (*KBSearch) SearchStartingPath

func (kb *KBSearch) SearchStartingPath(startingPath string)

SearchStartingPath adds a filter to search for descendants of the specified path

type KBStatusData

type KBStatusData struct {
	KBSearch  *KBSearch
	BaseTable string
}

KBStatusData handles the status data for the knowledge base

func NewKBStatusData

func NewKBStatusData(kbSearch *KBSearch, database string) *KBStatusData

NewKBStatusData creates a new KBStatusData instance

func (*KBStatusData) FindNodeID

func (ksd *KBStatusData) FindNodeID(kb, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

FindNodeID finds a single node id for given parameters

func (*KBStatusData) FindNodeIDs

func (ksd *KBStatusData) FindNodeIDs(kb, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

FindNodeIDs finds all node ids matching the given parameters

func (*KBStatusData) GetMultipleStatusData

func (ksd *KBStatusData) GetMultipleStatusData(paths []string) (map[string]map[string]interface{}, error)

GetMultipleStatusData retrieves status data for multiple paths in a single query

func (*KBStatusData) GetStatusData

func (ksd *KBStatusData) GetStatusData(path string) (map[string]interface{}, string, error)

GetStatusData retrieves status data for a given path

func (*KBStatusData) SetMultipleStatusData

func (ksd *KBStatusData) SetMultipleStatusData(pathDataPairs map[string]map[string]interface{}, retryCount int, retryDelay time.Duration) (bool, string, map[string]string, error)

SetMultipleStatusData updates multiple path-data pairs in a single transaction

func (*KBStatusData) SetMultipleStatusDataList

func (ksd *KBStatusData) SetMultipleStatusDataList(pathDataPairs []struct {
	Path string
	Data map[string]interface{}
}, retryCount int, retryDelay time.Duration) (bool, string, map[string]string, error)

SetMultipleStatusDataList is an alternative method that accepts a list of path-data pairs

func (*KBStatusData) SetStatusData

func (ksd *KBStatusData) SetStatusData(path string, data map[string]interface{}, retryCount int, retryDelay time.Duration) (bool, string, error)

SetStatusData updates status data for a given path with retry logic

type KBStream

type KBStream struct {
	KBSearch *KBSearch

	BaseTable string
	// contains filtered or unexported fields
}

KBStream handles stream data for the knowledge base

func NewKBStream

func NewKBStream(kbSearch *KBSearch, database string) *KBStream

NewKBStream creates a new KBStream instance

func (*KBStream) ClearStreamData

func (ks *KBStream) ClearStreamData(path string, olderThan *time.Time) *ClearResult

ClearStreamData clears stream data for a given path by setting valid to FALSE

func (*KBStream) FindStreamID

func (ks *KBStream) FindStreamID(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) (map[string]interface{}, error)

FindStreamID finds a single stream node id for given parameters

func (*KBStream) FindStreamIDs

func (ks *KBStream) FindStreamIDs(kb *string, nodeName *string, properties map[string]interface{}, nodePath *string) ([]map[string]interface{}, error)

FindStreamIDs finds all stream node ids matching the given parameters

func (*KBStream) FindStreamTableKeys

func (ks *KBStream) FindStreamTableKeys(keyData []map[string]interface{}) []string

FindStreamTableKeys extracts path values from stream query results

func (*KBStream) GetLatestStreamData

func (ks *KBStream) GetLatestStreamData(path string) (*StreamRecord, error)

GetLatestStreamData gets the most recent valid stream data for a given path

func (*KBStream) GetStreamDataByID

func (ks *KBStream) GetStreamDataByID(recordID int) (*StreamRecord, error)

GetStreamDataByID retrieves a specific stream record by its ID

func (*KBStream) GetStreamDataCount

func (ks *KBStream) GetStreamDataCount(path string, includeInvalid bool) (int, error)

GetStreamDataCount counts the number of stream entries for a given path

func (*KBStream) GetStreamDataRange

func (ks *KBStream) GetStreamDataRange(path string, startTime, endTime time.Time) ([]StreamRecord, error)

GetStreamDataRange gets valid stream data within a specific time range

func (*KBStream) GetStreamStatistics

func (ks *KBStream) GetStreamStatistics(path string, includeInvalid bool) (*StreamStatistics, error)

GetStreamStatistics gets comprehensive statistics for stream data at a given path

func (*KBStream) ListStreamData

func (ks *KBStream) ListStreamData(path string, limit *int, offset int, recordedAfter, recordedBefore *time.Time, order string) ([]StreamRecord, error)

ListStreamData lists valid stream data for a given path with filtering and pagination

func (*KBStream) PushStreamData

func (ks *KBStream) PushStreamData(path string, data map[string]interface{}, maxRetries int, retryDelay time.Duration) (*StreamPushResult, error)

PushStreamData finds the oldest record for the given path and updates it with new data

type MultipleStatusResult

type MultipleStatusResult struct {
	Success bool
	Message string
	Results map[string]string
}

MultipleStatusResult represents results for multiple status operations

type NoMatchingRecordError

type NoMatchingRecordError struct {
	Message string
}

NoMatchingRecordError represents when no matching record is found

func (*NoMatchingRecordError) Error

func (e *NoMatchingRecordError) Error() string

type PeakJobResult

type PeakJobResult struct {
	ID         int                    `json:"id"`
	Data       map[string]interface{} `json:"data"`
	ScheduleAt *time.Time             `json:"schedule_at"`
	StartedAt  *time.Time             `json:"started_at"`
}

PeakJobResult represents the result of peeking at a job

type PushJobResult

type PushJobResult struct {
	JobID      int                    `json:"job_id"`
	ScheduleAt *time.Time             `json:"schedule_at"`
	Data       map[string]interface{} `json:"data"`
}

PushJobResult represents the result of pushing a job

type RPCRecord

type RPCRecord struct {
	ID                  int                    `json:"id"`
	ServerPath          string                 `json:"server_path"`
	RequestID           string                 `json:"request_id"`
	RPCAction           string                 `json:"rpc_action"`
	RequestPayload      map[string]interface{} `json:"request_payload"`
	TransactionTag      string                 `json:"transaction_tag"`
	Priority            int                    `json:"priority"`
	RPCClientQueue      *string                `json:"rpc_client_queue"`
	State               string                 `json:"state"`
	RequestTimestamp    *time.Time             `json:"request_timestamp"`
	ProcessingTimestamp *time.Time             `json:"processing_timestamp"`
	CompletedTimestamp  *time.Time             `json:"completed_timestamp"`
}

RPCRecord represents a single RPC record

type ReplyData

type ReplyData struct {
	ID                int                    `json:"id"`
	RequestID         string                 `json:"request_id"`
	ClientPath        string                 `json:"client_path"`
	ServerPath        string                 `json:"server_path"`
	RPCAction         string                 `json:"rpc_action,omitempty"`
	TransactionTag    string                 `json:"transaction_tag,omitempty"`
	ResponsePayload   map[string]interface{} `json:"response_payload"`
	ResponseTimestamp time.Time              `json:"response_timestamp"`
	IsNewResult       bool                   `json:"is_new_result"`
}

ReplyData represents a reply data record

type SlotCounts

type SlotCounts struct {
	TotalRecords int `json:"total_records"`
	FreeSlots    int `json:"free_slots"`
	QueuedSlots  int `json:"queued_slots"`
}

SlotCounts represents free and queued slot counts

type StatusDataResult

type StatusDataResult struct {
	Success bool
	Message string
	Data    interface{}
}

StatusDataResult represents the result of status data operations

type StreamPushResult

type StreamPushResult struct {
	ID                 int                    `json:"id"`
	Path               string                 `json:"path"`
	RecordedAt         time.Time              `json:"recorded_at"`
	Data               map[string]interface{} `json:"data"`
	Valid              bool                   `json:"valid"`
	PreviousRecordedAt time.Time              `json:"previous_recorded_at"`
	WasPreviouslyValid bool                   `json:"was_previously_valid"`
	Operation          string                 `json:"operation"`
}

StreamPushResult represents the result of pushing stream data

type StreamRecord

type StreamRecord struct {
	ID         int                    `json:"id"`
	Path       string                 `json:"path"`
	RecordedAt time.Time              `json:"recorded_at"`
	Data       map[string]interface{} `json:"data"`
	Valid      bool                   `json:"valid"`
}

StreamRecord represents a single stream record

type StreamStatistics

type StreamStatistics struct {
	TotalRecords            int        `json:"total_records,omitempty"`
	ValidRecords            int        `json:"valid_records"`
	InvalidRecords          int        `json:"invalid_records,omitempty"`
	EarliestValidRecorded   *time.Time `json:"earliest_valid_recorded,omitempty"`
	LatestValidRecorded     *time.Time `json:"latest_valid_recorded,omitempty"`
	EarliestRecordedOverall *time.Time `json:"earliest_recorded_overall,omitempty"`
	LatestRecordedOverall   *time.Time `json:"latest_recorded_overall,omitempty"`
	AvgIntervalSecondsAll   *float64   `json:"avg_interval_seconds_all,omitempty"`
	AvgIntervalSecondsValid *float64   `json:"avg_interval_seconds_valid,omitempty"`
	EarliestRecorded        *time.Time `json:"earliest_recorded,omitempty"`
	LatestRecorded          *time.Time `json:"latest_recorded,omitempty"`
	AvgIntervalSeconds      *float64   `json:"avg_interval_seconds,omitempty"`
}

StreamStatistics represents statistics for stream data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL