pftools

package
v0.1.15-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package pftools provides Pulsar Functions tooling for MCP.

Index

Constants

View Source
const (
	// CustomRuntimeOptionsEnvMcpToolNameKey is the env var name for tool names.
	CustomRuntimeOptionsEnvMcpToolNameKey = "MCP_TOOL_NAME"
	// CustomRuntimeOptionsEnvMcpToolDescriptionKey is the env var name for tool descriptions.
	CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION"
)

Variables

View Source
var (
	// ErrFunctionNotFound indicates the function was not found.
	ErrFunctionNotFound = errors.New("function not found")
	// ErrNotOurMessage indicates a message that should be ignored.
	ErrNotOurMessage = errors.New("not our message")
)
View Source
var DefaultStringSchema = &mcp.ToolInputSchema{
	Type: "object",
	Properties: map[string]interface{}{
		"payload": map[string]interface{}{
			"type":        "string",
			"description": "The payload of the message, in plain text format",
		},
	},
}

DefaultStringSchema defines the default MCP input schema for string payloads.

View Source
var DefaultStringSchemaInfo = &SchemaInfo{
	Type: "STRING",
	Definition: map[string]interface{}{
		"type": "string",
	},
	PulsarSchemaInfo: &utils.SchemaInfo{
		Type: "STRING",
	},
}

DefaultStringSchemaInfo defines the default schema info for string payloads.

Functions

func ConvertSchemaToToolInput

func ConvertSchemaToToolInput(schemaInfo *SchemaInfo) (*mcp.ToolInputSchema, error)

ConvertSchemaToToolInput converts a schema to MCP tool input schema

func GetPulsarTypeSchema

func GetPulsarTypeSchema(schemaInfo *SchemaInfo) (pulsar.Schema, error)

GetPulsarTypeSchema converts SchemaInfo into a Pulsar schema.

func IsAuthError added in v0.1.14

func IsAuthError(err error) bool

IsAuthError reports whether the error is an authorization error.

func IsClusterUnhealthy added in v0.1.13

func IsClusterUnhealthy(err error) bool

IsClusterUnhealthy checks if an error indicates cluster health issues

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker guards function invocations to prevent repeated failures.

func NewCircuitBreaker

func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() bool

AllowRequest determines if a request should be allowed

func (*CircuitBreaker) ForceClose

func (cb *CircuitBreaker) ForceClose()

ForceClose forces the circuit breaker to close

func (*CircuitBreaker) ForceOpen

func (cb *CircuitBreaker) ForceOpen()

ForceOpen forces the circuit breaker to open

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState returns the current state of the circuit breaker

func (*CircuitBreaker) GetStateString

func (cb *CircuitBreaker) GetStateString() string

GetStateString returns a string representation of the circuit breaker state

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state

func (*CircuitBreaker) String

func (cb *CircuitBreaker) String() string

String returns a string representation of the circuit breaker

type CircuitState

type CircuitState int

CircuitState represents the circuit breaker state.

const (
	StateOpen CircuitState = iota
	StateHalfOpen
	StateClosed
)

Circuit breaker states.

type ClusterErrorHandler added in v0.1.13

type ClusterErrorHandler func(*PulsarFunctionManager, error)

ClusterErrorHandler handles cluster errors for Pulsar function managers.

type FunctionInvoker

type FunctionInvoker struct {
	// contains filtered or unexported fields
}

FunctionInvoker handles function invocation and result tracking

func NewFunctionInvoker

func NewFunctionInvoker(manager *PulsarFunctionManager) *FunctionInvoker

NewFunctionInvoker creates a new FunctionInvoker

func (*FunctionInvoker) InvokeFunctionAndWait

func (fi *FunctionInvoker) InvokeFunctionAndWait(ctx context.Context, fnTool *FunctionTool, params map[string]interface{}) (*mcp.CallToolResult, error)

InvokeFunctionAndWait sends a message to the function and waits for the result

type FunctionResult

type FunctionResult struct {
	Data  string
	Error error
}

FunctionResult represents the result of a function invocation

type FunctionTool

type FunctionTool struct {
	Name               string
	Function           *utils.FunctionConfig
	InputSchema        *SchemaInfo
	OutputSchema       *SchemaInfo
	InputTopic         string
	OutputTopic        string
	Tool               mcp.Tool
	SchemaFetchSuccess bool
}

FunctionTool represents a Pulsar function exposed as an MCP tool.

type ManagerOptions

type ManagerOptions struct {
	PollInterval        time.Duration
	DefaultTimeout      time.Duration
	FailureThreshold    int
	ResetTimeout        time.Duration
	TenantNamespaces    []string
	StrictExport        bool
	ClusterErrorHandler ClusterErrorHandler
}

ManagerOptions configures PulsarFunctionManager behavior.

func DefaultManagerOptions

func DefaultManagerOptions() *ManagerOptions

DefaultManagerOptions returns default manager options.

type PulsarFunctionManager

type PulsarFunctionManager struct {
	// contains filtered or unexported fields
}

PulsarFunctionManager manages the lifecycle of Pulsar Functions as MCP tools

func NewPulsarFunctionManager

func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionID string) (*PulsarFunctionManager, error)

NewPulsarFunctionManager creates a new PulsarFunctionManager

func (*PulsarFunctionManager) GetProducer

func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer, error)

GetProducer retrieves a producer from the cache or creates a new one if not found.

func (*PulsarFunctionManager) Start

func (m *PulsarFunctionManager) Start()

Start starts polling for functions

func (*PulsarFunctionManager) Stop

func (m *PulsarFunctionManager) Stop()

Stop stops polling for functions

type SchemaInfo

type SchemaInfo struct {
	Type             string
	Definition       map[string]interface{}
	PulsarSchemaInfo *utils.SchemaInfo
}

SchemaInfo represents schema metadata for Pulsar functions.

func GetSchemaFromTopic

func GetSchemaFromTopic(admin cmdutils.Client, topic string) (*SchemaInfo, error)

GetSchemaFromTopic retrieves schema information from a topic

type Server

type Server struct {
	MCPServer     *server.MCPServer
	KafkaSession  *kafka.Session
	PulsarSession *pulsar.Session
	Logger        interface{}
}

Server is imported directly to avoid circular dependency

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL