statefun

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: Apache-2.0 Imports: 17 Imported by: 4

Documentation

Index

Constants

View Source
const (
	SignalPrefix                          = "signal"
	RequestPrefix                         = "request"
	FromGlobalSignalTmpl                  = SignalPrefix + ".%s.%s"
	DomainSubjectsIngressPrefix           = "$SI"
	DomainSubjectsEgressPrefix            = "$SE"
	DomainIngressSubjectsTmpl             = DomainSubjectsIngressPrefix + ".%s.%s"
	DomainEgressSubjectsTmpl              = DomainSubjectsEgressPrefix + ".%s.%s"
	ObjectIDDomainSeparator               = "/"
	ObjectIDWeakClusteringDomainSeparator = "#"
)
View Source
const (
	MsgAckWaitTimeoutMs = 10000

	IdChannelSize            = 10
	BalanceNeeded            = true
	MutexLifetimeSec         = 120
	MultipleInstancesAllowed = false
	MaxIdHandlers            = 20
)
View Source
const (
	RuntimeName                 = "runtime"
	NatsURL                     = "nats://nats:foliage@nats:4222"
	KVMutexLifetimeSec          = 10
	KVMutexIsOldPollingInterval = 10
	FunctionTypeIDLifetimeMs    = 5000
	RequestTimeoutSec           = 60
	GCIntervalSec               = 5
	DefaultHubDomainName        = "hub"
	HandlesDomainRouters        = true
	EnableTLS                   = false
	EnableNatsClusterMode       = false
	NatsReplicasCount           = 1
	FtStreamMaxMsgs             = 10000
	FtStreamMaxBytes            = 1024 * 1024 * 256
	FtStreamMaxAge              = 24 * time.Hour
	SysStreamMaxMsgs            = 80000
	SysStreamMaxBytes           = 1024 * 1024 * 512
	SysStreamMaxAge             = 12 * time.Hour
	KVStreamMaxMsgs             = -1 //unlimited
	KVStreamMaxBytes            = -1 //unlimited
	KVStreamMaxAge              = 0  //unlimited

)
View Source
const (
	ShadowObjectCallParamOptionPath string = "shadow_object.can_receive"
)

Variables

View Source
var (
	ErrMutexLocked = errors.New("mutex is locked")
)

Functions

func AddRequestSourceNatsCore added in v0.1.1

func AddRequestSourceNatsCore(ft *FunctionType) error

func AddSignalSourceJetstreamQueuePushConsumer added in v0.1.1

func AddSignalSourceJetstreamQueuePushConsumer(ft *FunctionType) error

func ContextMutexLock

func ContextMutexLock(ctx context.Context, ft *FunctionType, id string, errorOnLocked bool) (uint64, error)

func ContextMutexUnlock

func ContextMutexUnlock(ctx context.Context, ft *FunctionType, id string, lockRevisionID uint64) error

func FunctionTypeMutexLock

func FunctionTypeMutexLock(ctx context.Context, ft *FunctionType, errorOnLocked bool) (uint64, error)

func FunctionTypeMutexUnlock

func FunctionTypeMutexUnlock(ctx context.Context, ft *FunctionType, lockRevisionID uint64) error

func KeyMutexLock

func KeyMutexLock(ctx context.Context, runtime *Runtime, key string, errorOnLocked bool) (uint64, error)

KeyMutexLock errorOnLocked - if mutex is already locked, exit with error (do not wait for unlocking)

func KeyMutexLockUpdate added in v0.1.3

func KeyMutexLockUpdate(ctx context.Context, runtime *Runtime, key string, lockRevisionID uint64) (uint64, error)

func KeyMutexUnlock

func KeyMutexUnlock(ctx context.Context, runtime *Runtime, key string, lockRevisionID uint64) error

Types

type AckCallbackAction added in v0.1.6

type AckCallbackAction = func(ack bool)

type Domain added in v0.1.6

type Domain struct {
	// contains filtered or unexported fields
}

func NewDomain added in v0.1.6

func NewDomain(nc *nats.Conn, js nats.JetStreamContext, desiredHubDomainName string, ftSC, sysSC, kvSC streamConfig) (dm *Domain, e error)

func (*Domain) Cache added in v0.1.6

func (dm *Domain) Cache() *cache.Store

func (*Domain) CreateCustomShadowId added in v0.1.6

func (dm *Domain) CreateCustomShadowId(storeDomain, targetDomain, uuid string) string

func (*Domain) CreateObjectIDWithDomain added in v0.1.6

func (dm *Domain) CreateObjectIDWithDomain(domain string, objectID string, domainReplace bool) (dmObjectID string)

func (*Domain) CreateObjectIDWithHubDomain added in v0.1.6

func (dm *Domain) CreateObjectIDWithHubDomain(objectID string, domainReplace bool) string

func (*Domain) CreateObjectIDWithThisDomain added in v0.1.6

func (dm *Domain) CreateObjectIDWithThisDomain(objectID string, domainReplace bool) string

func (*Domain) GetDomainFromObjectID added in v0.1.6

func (dm *Domain) GetDomainFromObjectID(objectID string) string

func (*Domain) GetObjectIDWithoutDomain added in v0.1.6

func (dm *Domain) GetObjectIDWithoutDomain(objectID string) string

func (*Domain) GetShadowObjectDomainAndID added in v0.1.6

func (dm *Domain) GetShadowObjectDomainAndID(shadowObjectId string) (domainName, objectIdWithoutDomain string, err error)

* domainName1/domainName2#ObjectId -> domainName2, ObjectId

func (*Domain) GetShadowObjectShadowId added in v0.1.6

func (dm *Domain) GetShadowObjectShadowId(objectIdWithAnyDomainName string) string

* otherDomainName/ObjectId -> thisDomainName/otherDomainName#ObjectId * thisDomainName/ObjectId -> thisDomainName/ObjectId

func (*Domain) GetValidObjectId added in v0.1.6

func (dm *Domain) GetValidObjectId(objectId string) string

* thisDomainName/otherDomainName#ObjectId -> thisDomainName/otherDomainName#ObjectId

* thisDomainName/thisDomainName#ObjectId -> thisDomainName/ObjectId * otherDomainName/thisDomainName#ObjectId -> thisDomainName/ObjectId * otherDomainName/otherDomainName#ObjectId -> thisDomainName/otherDomainName#ObjectId

* thisDomainName/ObjectId -> thisDomainName/ObjectId * otherDomainName/ObjectId -> otherDomainName/ObjectId

func (*Domain) GetWeakClusterDomains added in v0.1.6

func (dm *Domain) GetWeakClusterDomains() []string

Get all domains in weak cluster including this one

func (*Domain) HubDomainName added in v0.1.6

func (dm *Domain) HubDomainName() string

func (*Domain) IsShadowObject added in v0.1.6

func (dm *Domain) IsShadowObject(idWithDomain string) bool

* domainName1/domainName2#ObjectId -> true * domainName1/ObjectId -> false

func (*Domain) Name added in v0.1.6

func (dm *Domain) Name() string

func (*Domain) SetWeakClusterDomains added in v0.1.6

func (dm *Domain) SetWeakClusterDomains(weakClusterDomains []string)

Set all domains in weak cluster (this domain name will also be included automatically if not defined)

type FunctionLogicHandler added in v0.1.1

type FunctionLogicHandler func(sfPlugins.StatefunExecutor, *sfPlugins.StatefunContextProcessor)

type FunctionType

type FunctionType struct {
	// contains filtered or unexported fields
}

func NewFunctionType

func NewFunctionType(runtime *Runtime, name string, logicHandler FunctionLogicHandler, config FunctionTypeConfig) *FunctionType

func (*FunctionType) SetExecutor

func (ft *FunctionType) SetExecutor(alias string, content string, constructor func(alias string, source string) sfPlugins.StatefunExecutor) error

func (*FunctionType) TokenCapacity added in v0.1.6

func (ft *FunctionType) TokenCapacity() int

func (*FunctionType) TokenRelease added in v0.1.6

func (ft *FunctionType) TokenRelease()

func (*FunctionType) TokenTryAcquire added in v0.1.6

func (ft *FunctionType) TokenTryAcquire() bool

type FunctionTypeConfig

type FunctionTypeConfig struct {
	// contains filtered or unexported fields
}

func NewFunctionTypeConfig

func NewFunctionTypeConfig() *FunctionTypeConfig

func (*FunctionTypeConfig) IsRequestProviderAllowed added in v0.1.6

func (ftc *FunctionTypeConfig) IsRequestProviderAllowed(requestProvider sfPlugins.RequestProvider) bool

func (*FunctionTypeConfig) IsSignalProviderAllowed added in v0.1.6

func (ftc *FunctionTypeConfig) IsSignalProviderAllowed(signalProvider sfPlugins.SignalProvider) bool

func (*FunctionTypeConfig) SetAllowedRequestProviders added in v0.1.6

func (ftc *FunctionTypeConfig) SetAllowedRequestProviders(allowedRequestProviders ...sfPlugins.RequestProvider) *FunctionTypeConfig

func (*FunctionTypeConfig) SetAllowedSignalProviders added in v0.1.6

func (ftc *FunctionTypeConfig) SetAllowedSignalProviders(allowedSignalProviders ...sfPlugins.SignalProvider) *FunctionTypeConfig

func (*FunctionTypeConfig) SetBalanceNeeded

func (ftc *FunctionTypeConfig) SetBalanceNeeded(balanceNeeded bool) *FunctionTypeConfig

func (*FunctionTypeConfig) SetIdChannelSize added in v0.1.6

func (ftc *FunctionTypeConfig) SetIdChannelSize(idChannelSize int) *FunctionTypeConfig

func (*FunctionTypeConfig) SetMaxIdHandlers added in v0.1.3

func (ftc *FunctionTypeConfig) SetMaxIdHandlers(maxIdHandlers int) *FunctionTypeConfig

Deprecated

func (*FunctionTypeConfig) SetMsgAckChannelSize

func (ftc *FunctionTypeConfig) SetMsgAckChannelSize(msgAckChannelSize int) *FunctionTypeConfig

Deprecated

func (*FunctionTypeConfig) SetMsgAckWaitMs

func (ftc *FunctionTypeConfig) SetMsgAckWaitMs(msgAckWaitMs int) *FunctionTypeConfig

func (*FunctionTypeConfig) SetMsgChannelSize added in v0.1.3

func (ftc *FunctionTypeConfig) SetMsgChannelSize(msgChannelSize int) *FunctionTypeConfig

Deprecated

func (*FunctionTypeConfig) SetMsgMaxDeliver added in v0.1.6

func (ftc *FunctionTypeConfig) SetMsgMaxDeliver(msgMaxDeliver int) *FunctionTypeConfig

func (*FunctionTypeConfig) SetMultipleInstancesAllowance added in v0.1.3

func (ftc *FunctionTypeConfig) SetMultipleInstancesAllowance(allowed bool) *FunctionTypeConfig

func (*FunctionTypeConfig) SetMutexLifeTimeSec

func (ftc *FunctionTypeConfig) SetMutexLifeTimeSec(mutexLifeTimeSec int) *FunctionTypeConfig

func (*FunctionTypeConfig) SetOptions

func (ftc *FunctionTypeConfig) SetOptions(options *easyjson.JSON) *FunctionTypeConfig

func (*FunctionTypeConfig) SetWorkerPoolConfig added in v0.1.6

func (ftc *FunctionTypeConfig) SetWorkerPoolConfig(functionWorkerPoolConfig SFWorkerPoolConfig) *FunctionTypeConfig

func (*FunctionTypeConfig) SetWorkerPoolLoadType added in v0.1.6

func (ftc *FunctionTypeConfig) SetWorkerPoolLoadType(wpLoadType WPLoadType) *FunctionTypeConfig

type FunctionTypeMsg added in v0.1.1

type FunctionTypeMsg struct {
	Caller          *sfPlugins.StatefunAddress
	Payload         *easyjson.JSON
	Options         *easyjson.JSON
	RefusalCallback RefuseCallbackAction
	RequestCallback RequestCallbackAction
	AckCallback     AckCallbackAction
}

type HandlerMsgRefusalType added in v0.1.1

type HandlerMsgRefusalType int

type MeasureMsgDeliverType added in v0.1.6

type MeasureMsgDeliverType string
const (
	NatsPub           MeasureMsgDeliverType = "nats_pub"
	NatsPubRedelivery MeasureMsgDeliverType = "nats_pub_redeliver"
	NatsReq           MeasureMsgDeliverType = "nats_req"
	GolangReq         MeasureMsgDeliverType = "golang_req"
)

type OnAfterStartFunction added in v0.1.6

type OnAfterStartFunction func(ctx context.Context, runtime *Runtime) error

type RefuseCallbackAction added in v0.1.6

type RefuseCallbackAction = func(skipForever bool)

type RequestCallbackAction added in v0.1.1

type RequestCallbackAction = func(data *easyjson.JSON)

type Runtime

type Runtime struct {
	Domain *Domain
	// contains filtered or unexported fields
}

Runtime represents the runtime environment for stateful functions.

func NewRuntime

func NewRuntime(config RuntimeConfig) (*Runtime, error)

NewRuntime initializes a new Runtime instance with the given configuration.

func (*Runtime) RegisterOnAfterStartFunction added in v0.1.6

func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async bool)

RegisterOnAfterStartFunction registers a function to be called after the runtime starts. The function can be set to run asynchronously.

func (*Runtime) Request added in v0.1.1

func (r *Runtime) Request(requestProvider sfPlugins.RequestProvider, typename string, id string, payload *easyjson.JSON, options *easyjson.JSON, timeout ...time.Duration) (*easyjson.JSON, error)

func (*Runtime) Shutdown added in v0.1.6

func (r *Runtime) Shutdown()

Shutdown gracefully stops the runtime.

func (*Runtime) Signal added in v0.1.1

func (r *Runtime) Signal(signalProvider sfPlugins.SignalProvider, typename string, id string, payload *easyjson.JSON, options *easyjson.JSON) error

func (*Runtime) Start

func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error

Start initializes streams and starts function subscriptions. It also handles graceful shutdown via context.Context.

type RuntimeConfig

type RuntimeConfig struct {
	StreamParams
	// contains filtered or unexported fields
}

func NewRuntimeConfig

func NewRuntimeConfig() *RuntimeConfig

func NewRuntimeConfigSimple

func NewRuntimeConfigSimple(natsURL string, runtimeName string) *RuntimeConfig

func (*RuntimeConfig) ConfigureNatsCluster added in v0.1.6

func (ro *RuntimeConfig) ConfigureNatsCluster(replicasCount int) *RuntimeConfig

func (*RuntimeConfig) EnableNatsCluster added in v0.1.6

func (ro *RuntimeConfig) EnableNatsCluster(enableCluster bool) *RuntimeConfig

func (*RuntimeConfig) SetActivePassiveMode added in v0.1.6

func (ro *RuntimeConfig) SetActivePassiveMode(activePassiveMode bool) *RuntimeConfig

func (*RuntimeConfig) SetDomainRoutersHandling added in v0.1.6

func (ro *RuntimeConfig) SetDomainRoutersHandling(handlesDomainRouters bool) *RuntimeConfig

func (*RuntimeConfig) SetFunctionTypeIDLifetimeMs

func (ro *RuntimeConfig) SetFunctionTypeIDLifetimeMs(functionTypeIDLifetimeMs int) *RuntimeConfig

func (*RuntimeConfig) SetGCIntervalSec added in v0.1.6

func (ro *RuntimeConfig) SetGCIntervalSec(gcIntervalSec int) *RuntimeConfig

func (*RuntimeConfig) SetHubDomainName added in v0.1.6

func (ro *RuntimeConfig) SetHubDomainName(hubDomainName string) *RuntimeConfig

func (*RuntimeConfig) SetKVMutexIsOldPollingIntervalSec

func (ro *RuntimeConfig) SetKVMutexIsOldPollingIntervalSec(kvMutexIsOldPollingIntervalSec int) *RuntimeConfig

func (*RuntimeConfig) SetKVMutexLifeTimeSec

func (ro *RuntimeConfig) SetKVMutexLifeTimeSec(kvMutexLifeTimeSec int) *RuntimeConfig

func (*RuntimeConfig) SetNatsReplicas added in v0.1.6

func (ro *RuntimeConfig) SetNatsReplicas(replicasCount int) *RuntimeConfig

func (*RuntimeConfig) SetNatsURL

func (ro *RuntimeConfig) SetNatsURL(natsURL string) *RuntimeConfig

func (*RuntimeConfig) SetRequestTimeoutSec added in v0.1.3

func (ro *RuntimeConfig) SetRequestTimeoutSec(requestTimeoutSec int) *RuntimeConfig

func (*RuntimeConfig) SetStreamMaxAge added in v0.1.7

func (ro *RuntimeConfig) SetStreamMaxAge(streamType StreamType, maxAge time.Duration) *RuntimeConfig

func (*RuntimeConfig) SetStreamMaxBytes added in v0.1.7

func (ro *RuntimeConfig) SetStreamMaxBytes(streamType StreamType, maxBytes int64) *RuntimeConfig

func (*RuntimeConfig) SetStreamMaxMessages added in v0.1.7

func (ro *RuntimeConfig) SetStreamMaxMessages(streamType StreamType, maxMessages int64) *RuntimeConfig

func (*RuntimeConfig) SetTLS added in v0.1.6

func (ro *RuntimeConfig) SetTLS(enableTLS bool) *RuntimeConfig

func (*RuntimeConfig) UseJSDomainAsHubDomainName added in v0.1.6

func (ro *RuntimeConfig) UseJSDomainAsHubDomainName() *RuntimeConfig

type SFWorkerMessage added in v0.1.6

type SFWorkerMessage struct {
	ID   string
	Data FunctionTypeMsg
}

type SFWorkerPool added in v0.1.6

type SFWorkerPool struct {
	// contains filtered or unexported fields
}

SFWorkerPool - controls the statefun pool

func NewSFWorkerPool added in v0.1.6

func NewSFWorkerPool(ft *FunctionType, conf SFWorkerPoolConfig) *SFWorkerPool

func (*SFWorkerPool) GetWorkerPercentage added in v0.1.6

func (wp *SFWorkerPool) GetWorkerPercentage() (loadedWorkers float64, idleWorkers float64)

func (*SFWorkerPool) GetWorkerPoolLoadPercentage added in v0.1.6

func (wp *SFWorkerPool) GetWorkerPoolLoadPercentage() float64

func (*SFWorkerPool) Notify added in v0.1.6

func (wp *SFWorkerPool) Notify()

func (*SFWorkerPool) Stop added in v0.1.6

func (wp *SFWorkerPool) Stop()

type SFWorkerPoolConfig added in v0.1.6

type SFWorkerPoolConfig struct {
	MinWorkers   int
	MaxWorkers   int
	IdleTimeout  time.Duration
	TaskQueueLen int
}

func NewSFWorkerPoolConfig added in v0.1.6

func NewSFWorkerPoolConfig(loadType WPLoadType) (config SFWorkerPoolConfig)

type SFWorkerTask added in v0.1.6

type SFWorkerTask struct {
	Msg SFWorkerMessage
}

type StreamParams added in v0.1.7

type StreamParams struct {
	// contains filtered or unexported fields
}

type StreamType added in v0.1.7

type StreamType int
const (
	StreamTypeFunction StreamType = iota
	StreamTypeSystem
	StreamTypeKV
)

type WPLoadType added in v0.1.6

type WPLoadType int
const (
	WPLoadDefault WPLoadType = iota
	WPLoadVeryLight
	WPLoadLight
	WPLoadNormal
	WPLoadHigh
	WPLoadVeryHigh
)

Directories

Path Synopsis
Foliage statefun cache package.
Foliage statefun cache package.
Foliage statefun plugins package.
Foliage statefun plugins package.
js
Foliage primary statefun system package.
Foliage primary statefun system package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL