pftools

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CustomRuntimeOptionsEnvMcpToolNameKey        = "MCP_TOOL_NAME"
	CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION"
)

Variables

View Source
var (
	ErrFunctionNotFound = errors.New("function not found")
	ErrNotOurMessage    = errors.New("not our message")
)
View Source
var DefaultStringSchema = &mcp.ToolInputSchema{
	Type: "object",
	Properties: map[string]interface{}{
		"payload": map[string]interface{}{
			"type":        "string",
			"description": "The payload of the message, in plain text format",
		},
	},
}
View Source
var DefaultStringSchemaInfo = &SchemaInfo{
	Type: "STRING",
	Definition: map[string]interface{}{
		"type": "string",
	},
	PulsarSchemaInfo: &cliutils.SchemaInfo{
		Type: "STRING",
	},
}

Functions

func ConvertSchemaToToolInput

func ConvertSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSchema, error)

ConvertSchemaToToolInput converts a schema to MCP tool input schema

func GetPulsarTypeSchema

func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error)

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewCircuitBreaker

func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() bool

AllowRequest determines if a request should be allowed

func (*CircuitBreaker) ForceClose

func (cb *CircuitBreaker) ForceClose()

ForceClose forces the circuit breaker to close

func (*CircuitBreaker) ForceOpen

func (cb *CircuitBreaker) ForceOpen()

ForceOpen forces the circuit breaker to open

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState returns the current state of the circuit breaker

func (*CircuitBreaker) GetStateString

func (cb *CircuitBreaker) GetStateString() string

GetStateString returns a string representation of the circuit breaker state

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state

func (*CircuitBreaker) String

func (cb *CircuitBreaker) String() string

String returns a string representation of the circuit breaker

type CircuitState

type CircuitState int
const (
	StateOpen CircuitState = iota
	StateHalfOpen
	StateClosed
)

type FunctionInvoker

type FunctionInvoker struct {
	// contains filtered or unexported fields
}

FunctionInvoker handles function invocation and result tracking

func NewFunctionInvoker

func NewFunctionInvoker(manager *PulsarFunctionManager) *FunctionInvoker

NewFunctionInvoker creates a new FunctionInvoker

func (*FunctionInvoker) InvokeFunctionAndWait

func (fi *FunctionInvoker) InvokeFunctionAndWait(ctx context.Context, fnTool *FunctionTool, params map[string]interface{}) (*mcp.CallToolResult, error)

InvokeFunctionAndWait sends a message to the function and waits for the result

type FunctionResult

type FunctionResult struct {
	Data  string
	Error error
}

FunctionResult represents the result of a function invocation

type FunctionTool

type FunctionTool struct {
	Name               string
	Function           *utils.FunctionConfig
	InputSchema        *SchemaInfo
	OutputSchema       *SchemaInfo
	InputTopic         string
	OutputTopic        string
	Tool               mcp.Tool
	SchemaFetchSuccess bool
}

type ManagerOptions

type ManagerOptions struct {
	PollInterval     time.Duration
	DefaultTimeout   time.Duration
	FailureThreshold int
	ResetTimeout     time.Duration
	TenantNamespaces []string
	StrictExport     bool
}

func DefaultManagerOptions

func DefaultManagerOptions() *ManagerOptions

type PulsarFunctionManager

type PulsarFunctionManager struct {
	// contains filtered or unexported fields
}

PulsarFunctionManager manages the lifecycle of Pulsar Functions as MCP tools

func NewPulsarFunctionManager

func NewPulsarFunctionManager(mcpServer *server.MCPServer, readOnly bool, options *ManagerOptions) (*PulsarFunctionManager, error)

NewPulsarFunctionManager creates a new PulsarFunctionManager

func (*PulsarFunctionManager) GetProducer

func (m *PulsarFunctionManager) GetProducer(topic string) (pulsar.Producer, error)

GetProducer retrieves a producer from the cache or creates a new one if not found.

func (*PulsarFunctionManager) Start

func (m *PulsarFunctionManager) Start()

Start starts polling for functions

func (*PulsarFunctionManager) Stop

func (m *PulsarFunctionManager) Stop()

Stop stops polling for functions

type SchemaInfo

type SchemaInfo struct {
	Type             string
	Definition       map[string]interface{}
	PulsarSchemaInfo *utils.SchemaInfo
}

func GetSchemaFromTopic

func GetSchemaFromTopic(admin cmdutils.Client, topic string) (*SchemaInfo, error)

GetSchemaFromTopic retrieves schema information from a topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL