Documentation
¶
Index ¶
- Constants
- Variables
- func AddRequestSourceNatsCore(ft *FunctionType) error
- func AddSignalSourceJetstreamQueuePushConsumer(ft *FunctionType) error
- func ContextMutexLock(ctx context.Context, ft *FunctionType, id string, errorOnLocked bool) (uint64, error)
- func ContextMutexUnlock(ctx context.Context, ft *FunctionType, id string, lockRevisionID uint64) error
- func FunctionTypeMutexLock(ctx context.Context, ft *FunctionType, errorOnLocked bool) (uint64, error)
- func FunctionTypeMutexUnlock(ctx context.Context, ft *FunctionType, lockRevisionID uint64) error
- func KeyMutexLock(ctx context.Context, runtime *Runtime, key string, errorOnLocked bool) (uint64, error)
- func KeyMutexLockUpdate(ctx context.Context, runtime *Runtime, key string, lockRevisionID uint64) (uint64, error)
- func KeyMutexUnlock(ctx context.Context, runtime *Runtime, key string, lockRevisionID uint64) error
- type AckCallbackAction
- type Domain
- func (dm *Domain) Cache() *cache.Store
- func (dm *Domain) CreateCustomShadowId(storeDomain, targetDomain, uuid string) string
- func (dm *Domain) CreateObjectIDWithDomain(domain string, objectID string, domainReplace bool) (dmObjectID string)
- func (dm *Domain) CreateObjectIDWithHubDomain(objectID string, domainReplace bool) string
- func (dm *Domain) CreateObjectIDWithThisDomain(objectID string, domainReplace bool) string
- func (dm *Domain) GetDomainFromObjectID(objectID string) string
- func (dm *Domain) GetObjectIDWithoutDomain(objectID string) string
- func (dm *Domain) GetShadowObjectDomainAndID(shadowObjectId string) (domainName, objectIdWithoutDomain string, err error)
- func (dm *Domain) GetShadowObjectShadowId(objectIdWithAnyDomainName string) string
- func (dm *Domain) GetValidObjectId(objectId string) string
- func (dm *Domain) GetWeakClusterDomains() []string
- func (dm *Domain) HubDomainName() string
- func (dm *Domain) IsShadowObject(idWithDomain string) bool
- func (dm *Domain) Name() string
- func (dm *Domain) SetWeakClusterDomains(weakClusterDomains []string)
- type FunctionLogicHandler
- type FunctionType
- type FunctionTypeConfig
- func (ftc *FunctionTypeConfig) IsRequestProviderAllowed(requestProvider sfPlugins.RequestProvider) bool
- func (ftc *FunctionTypeConfig) IsSignalProviderAllowed(signalProvider sfPlugins.SignalProvider) bool
- func (ftc *FunctionTypeConfig) SetAllowedRequestProviders(allowedRequestProviders ...sfPlugins.RequestProvider) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetAllowedSignalProviders(allowedSignalProviders ...sfPlugins.SignalProvider) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetBalanceNeeded(balanceNeeded bool) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetIdChannelSize(idChannelSize int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMaxIdHandlers(maxIdHandlers int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMsgAckChannelSize(msgAckChannelSize int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMsgAckWaitMs(msgAckWaitMs int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMsgChannelSize(msgChannelSize int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMsgMaxDeliver(msgMaxDeliver int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMultipleInstancesAllowance(allowed bool) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetMutexLifeTimeSec(mutexLifeTimeSec int) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetOptions(options *easyjson.JSON) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetWorkerPoolConfig(functionWorkerPoolConfig SFWorkerPoolConfig) *FunctionTypeConfig
- func (ftc *FunctionTypeConfig) SetWorkerPoolLoadType(wpLoadType WPLoadType) *FunctionTypeConfig
- type FunctionTypeMsg
- type HandlerMsgRefusalType
- type MeasureMsgDeliverType
- type OnAfterStartFunction
- type RefuseCallbackAction
- type RequestCallbackAction
- type Runtime
- func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async bool)
- func (r *Runtime) Request(requestProvider sfPlugins.RequestProvider, typename string, id string, ...) (*easyjson.JSON, error)
- func (r *Runtime) Shutdown()
- func (r *Runtime) Signal(signalProvider sfPlugins.SignalProvider, typename string, id string, ...) error
- func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error
- type RuntimeConfig
- func (ro *RuntimeConfig) ConfigureNatsCluster(replicasCount int) *RuntimeConfig
- func (ro *RuntimeConfig) EnableNatsCluster(enableCluster bool) *RuntimeConfig
- func (ro *RuntimeConfig) SetActivePassiveMode(activePassiveMode bool) *RuntimeConfig
- func (ro *RuntimeConfig) SetDomainRoutersHandling(handlesDomainRouters bool) *RuntimeConfig
- func (ro *RuntimeConfig) SetFunctionTypeIDLifetimeMs(functionTypeIDLifetimeMs int) *RuntimeConfig
- func (ro *RuntimeConfig) SetGCIntervalSec(gcIntervalSec int) *RuntimeConfig
- func (ro *RuntimeConfig) SetHubDomainName(hubDomainName string) *RuntimeConfig
- func (ro *RuntimeConfig) SetKVMutexIsOldPollingIntervalSec(kvMutexIsOldPollingIntervalSec int) *RuntimeConfig
- func (ro *RuntimeConfig) SetKVMutexLifeTimeSec(kvMutexLifeTimeSec int) *RuntimeConfig
- func (ro *RuntimeConfig) SetNatsReplicas(replicasCount int) *RuntimeConfig
- func (ro *RuntimeConfig) SetNatsURL(natsURL string) *RuntimeConfig
- func (ro *RuntimeConfig) SetRequestTimeoutSec(requestTimeoutSec int) *RuntimeConfig
- func (ro *RuntimeConfig) SetStreamMaxAge(streamType StreamType, maxAge time.Duration) *RuntimeConfig
- func (ro *RuntimeConfig) SetStreamMaxBytes(streamType StreamType, maxBytes int64) *RuntimeConfig
- func (ro *RuntimeConfig) SetStreamMaxMessages(streamType StreamType, maxMessages int64) *RuntimeConfig
- func (ro *RuntimeConfig) SetTLS(enableTLS bool) *RuntimeConfig
- func (ro *RuntimeConfig) UseJSDomainAsHubDomainName() *RuntimeConfig
- type SFWorkerMessage
- type SFWorkerPool
- type SFWorkerPoolConfig
- type SFWorkerTask
- type StreamParams
- type StreamType
- type WPLoadType
Constants ¶
const ( SignalPrefix = "signal" RequestPrefix = "request" FromGlobalSignalTmpl = SignalPrefix + ".%s.%s" DomainSubjectsIngressPrefix = "$SI" DomainSubjectsEgressPrefix = "$SE" DomainIngressSubjectsTmpl = DomainSubjectsIngressPrefix + ".%s.%s" DomainEgressSubjectsTmpl = DomainSubjectsEgressPrefix + ".%s.%s" ObjectIDDomainSeparator = "/" ObjectIDWeakClusteringDomainSeparator = "#" )
const ( MsgAckWaitTimeoutMs = 10000 IdChannelSize = 10 BalanceNeeded = true MutexLifetimeSec = 120 MultipleInstancesAllowed = false MaxIdHandlers = 20 )
const ( RuntimeName = "runtime" NatsURL = "nats://nats:foliage@nats:4222" KVMutexLifetimeSec = 10 KVMutexIsOldPollingInterval = 10 FunctionTypeIDLifetimeMs = 5000 RequestTimeoutSec = 60 GCIntervalSec = 5 DefaultHubDomainName = "hub" HandlesDomainRouters = true EnableTLS = false EnableNatsClusterMode = false NatsReplicasCount = 1 FtStreamMaxMsgs = 10000 FtStreamMaxBytes = 1024 * 1024 * 256 FtStreamMaxAge = 24 * time.Hour SysStreamMaxMsgs = 80000 SysStreamMaxBytes = 1024 * 1024 * 512 SysStreamMaxAge = 12 * time.Hour KVStreamMaxMsgs = -1 //unlimited KVStreamMaxBytes = -1 //unlimited KVStreamMaxAge = 0 //unlimited )
const (
ShadowObjectCallParamOptionPath string = "shadow_object.can_receive"
)
Variables ¶
var (
ErrMutexLocked = errors.New("mutex is locked")
)
Functions ¶
func AddRequestSourceNatsCore ¶ added in v0.1.1
func AddRequestSourceNatsCore(ft *FunctionType) error
func AddSignalSourceJetstreamQueuePushConsumer ¶ added in v0.1.1
func AddSignalSourceJetstreamQueuePushConsumer(ft *FunctionType) error
func ContextMutexLock ¶
func ContextMutexUnlock ¶
func FunctionTypeMutexLock ¶
func FunctionTypeMutexUnlock ¶
func FunctionTypeMutexUnlock(ctx context.Context, ft *FunctionType, lockRevisionID uint64) error
func KeyMutexLock ¶
func KeyMutexLock(ctx context.Context, runtime *Runtime, key string, errorOnLocked bool) (uint64, error)
KeyMutexLock errorOnLocked - if mutex is already locked, exit with error (do not wait for unlocking)
func KeyMutexLockUpdate ¶ added in v0.1.3
Types ¶
type AckCallbackAction ¶ added in v0.1.6
type AckCallbackAction = func(ack bool)
type Domain ¶ added in v0.1.6
type Domain struct {
// contains filtered or unexported fields
}
func (*Domain) CreateCustomShadowId ¶ added in v0.1.6
func (*Domain) CreateObjectIDWithDomain ¶ added in v0.1.6
func (*Domain) CreateObjectIDWithHubDomain ¶ added in v0.1.6
func (*Domain) CreateObjectIDWithThisDomain ¶ added in v0.1.6
func (*Domain) GetDomainFromObjectID ¶ added in v0.1.6
func (*Domain) GetObjectIDWithoutDomain ¶ added in v0.1.6
func (*Domain) GetShadowObjectDomainAndID ¶ added in v0.1.6
func (dm *Domain) GetShadowObjectDomainAndID(shadowObjectId string) (domainName, objectIdWithoutDomain string, err error)
* domainName1/domainName2#ObjectId -> domainName2, ObjectId
func (*Domain) GetShadowObjectShadowId ¶ added in v0.1.6
* otherDomainName/ObjectId -> thisDomainName/otherDomainName#ObjectId * thisDomainName/ObjectId -> thisDomainName/ObjectId
func (*Domain) GetValidObjectId ¶ added in v0.1.6
* thisDomainName/otherDomainName#ObjectId -> thisDomainName/otherDomainName#ObjectId
* thisDomainName/thisDomainName#ObjectId -> thisDomainName/ObjectId * otherDomainName/thisDomainName#ObjectId -> thisDomainName/ObjectId * otherDomainName/otherDomainName#ObjectId -> thisDomainName/otherDomainName#ObjectId
* thisDomainName/ObjectId -> thisDomainName/ObjectId * otherDomainName/ObjectId -> otherDomainName/ObjectId
func (*Domain) GetWeakClusterDomains ¶ added in v0.1.6
Get all domains in weak cluster including this one
func (*Domain) HubDomainName ¶ added in v0.1.6
func (*Domain) IsShadowObject ¶ added in v0.1.6
* domainName1/domainName2#ObjectId -> true * domainName1/ObjectId -> false
func (*Domain) SetWeakClusterDomains ¶ added in v0.1.6
Set all domains in weak cluster (this domain name will also be included automatically if not defined)
type FunctionLogicHandler ¶ added in v0.1.1
type FunctionLogicHandler func(sfPlugins.StatefunExecutor, *sfPlugins.StatefunContextProcessor)
type FunctionType ¶
type FunctionType struct {
// contains filtered or unexported fields
}
func NewFunctionType ¶
func NewFunctionType(runtime *Runtime, name string, logicHandler FunctionLogicHandler, config FunctionTypeConfig) *FunctionType
func (*FunctionType) SetExecutor ¶
func (ft *FunctionType) SetExecutor(alias string, content string, constructor func(alias string, source string) sfPlugins.StatefunExecutor) error
func (*FunctionType) TokenCapacity ¶ added in v0.1.6
func (ft *FunctionType) TokenCapacity() int
func (*FunctionType) TokenRelease ¶ added in v0.1.6
func (ft *FunctionType) TokenRelease()
func (*FunctionType) TokenTryAcquire ¶ added in v0.1.6
func (ft *FunctionType) TokenTryAcquire() bool
type FunctionTypeConfig ¶
type FunctionTypeConfig struct {
// contains filtered or unexported fields
}
func NewFunctionTypeConfig ¶
func NewFunctionTypeConfig() *FunctionTypeConfig
func (*FunctionTypeConfig) IsRequestProviderAllowed ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) IsRequestProviderAllowed(requestProvider sfPlugins.RequestProvider) bool
func (*FunctionTypeConfig) IsSignalProviderAllowed ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) IsSignalProviderAllowed(signalProvider sfPlugins.SignalProvider) bool
func (*FunctionTypeConfig) SetAllowedRequestProviders ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetAllowedRequestProviders(allowedRequestProviders ...sfPlugins.RequestProvider) *FunctionTypeConfig
func (*FunctionTypeConfig) SetAllowedSignalProviders ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetAllowedSignalProviders(allowedSignalProviders ...sfPlugins.SignalProvider) *FunctionTypeConfig
func (*FunctionTypeConfig) SetBalanceNeeded ¶
func (ftc *FunctionTypeConfig) SetBalanceNeeded(balanceNeeded bool) *FunctionTypeConfig
func (*FunctionTypeConfig) SetIdChannelSize ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetIdChannelSize(idChannelSize int) *FunctionTypeConfig
func (*FunctionTypeConfig) SetMaxIdHandlers ¶ added in v0.1.3
func (ftc *FunctionTypeConfig) SetMaxIdHandlers(maxIdHandlers int) *FunctionTypeConfig
Deprecated
func (*FunctionTypeConfig) SetMsgAckChannelSize ¶
func (ftc *FunctionTypeConfig) SetMsgAckChannelSize(msgAckChannelSize int) *FunctionTypeConfig
Deprecated
func (*FunctionTypeConfig) SetMsgAckWaitMs ¶
func (ftc *FunctionTypeConfig) SetMsgAckWaitMs(msgAckWaitMs int) *FunctionTypeConfig
func (*FunctionTypeConfig) SetMsgChannelSize ¶ added in v0.1.3
func (ftc *FunctionTypeConfig) SetMsgChannelSize(msgChannelSize int) *FunctionTypeConfig
Deprecated
func (*FunctionTypeConfig) SetMsgMaxDeliver ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetMsgMaxDeliver(msgMaxDeliver int) *FunctionTypeConfig
func (*FunctionTypeConfig) SetMultipleInstancesAllowance ¶ added in v0.1.3
func (ftc *FunctionTypeConfig) SetMultipleInstancesAllowance(allowed bool) *FunctionTypeConfig
func (*FunctionTypeConfig) SetMutexLifeTimeSec ¶
func (ftc *FunctionTypeConfig) SetMutexLifeTimeSec(mutexLifeTimeSec int) *FunctionTypeConfig
func (*FunctionTypeConfig) SetOptions ¶
func (ftc *FunctionTypeConfig) SetOptions(options *easyjson.JSON) *FunctionTypeConfig
func (*FunctionTypeConfig) SetWorkerPoolConfig ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetWorkerPoolConfig(functionWorkerPoolConfig SFWorkerPoolConfig) *FunctionTypeConfig
func (*FunctionTypeConfig) SetWorkerPoolLoadType ¶ added in v0.1.6
func (ftc *FunctionTypeConfig) SetWorkerPoolLoadType(wpLoadType WPLoadType) *FunctionTypeConfig
type FunctionTypeMsg ¶ added in v0.1.1
type FunctionTypeMsg struct {
Caller *sfPlugins.StatefunAddress
Payload *easyjson.JSON
Options *easyjson.JSON
RefusalCallback RefuseCallbackAction
RequestCallback RequestCallbackAction
AckCallback AckCallbackAction
}
type HandlerMsgRefusalType ¶ added in v0.1.1
type HandlerMsgRefusalType int
type MeasureMsgDeliverType ¶ added in v0.1.6
type MeasureMsgDeliverType string
const ( NatsPub MeasureMsgDeliverType = "nats_pub" NatsPubRedelivery MeasureMsgDeliverType = "nats_pub_redeliver" NatsReq MeasureMsgDeliverType = "nats_req" GolangReq MeasureMsgDeliverType = "golang_req" )
type OnAfterStartFunction ¶ added in v0.1.6
type RefuseCallbackAction ¶ added in v0.1.6
type RefuseCallbackAction = func(skipForever bool)
type RequestCallbackAction ¶ added in v0.1.1
type Runtime ¶
type Runtime struct {
Domain *Domain
// contains filtered or unexported fields
}
Runtime represents the runtime environment for stateful functions.
func NewRuntime ¶
func NewRuntime(config RuntimeConfig) (*Runtime, error)
NewRuntime initializes a new Runtime instance with the given configuration.
func (*Runtime) RegisterOnAfterStartFunction ¶ added in v0.1.6
func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async bool)
RegisterOnAfterStartFunction registers a function to be called after the runtime starts. The function can be set to run asynchronously.
func (*Runtime) Shutdown ¶ added in v0.1.6
func (r *Runtime) Shutdown()
Shutdown gracefully stops the runtime.
type RuntimeConfig ¶
type RuntimeConfig struct {
StreamParams
// contains filtered or unexported fields
}
func NewRuntimeConfig ¶
func NewRuntimeConfig() *RuntimeConfig
func NewRuntimeConfigSimple ¶
func NewRuntimeConfigSimple(natsURL string, runtimeName string) *RuntimeConfig
func (*RuntimeConfig) ConfigureNatsCluster ¶ added in v0.1.6
func (ro *RuntimeConfig) ConfigureNatsCluster(replicasCount int) *RuntimeConfig
func (*RuntimeConfig) EnableNatsCluster ¶ added in v0.1.6
func (ro *RuntimeConfig) EnableNatsCluster(enableCluster bool) *RuntimeConfig
func (*RuntimeConfig) SetActivePassiveMode ¶ added in v0.1.6
func (ro *RuntimeConfig) SetActivePassiveMode(activePassiveMode bool) *RuntimeConfig
func (*RuntimeConfig) SetDomainRoutersHandling ¶ added in v0.1.6
func (ro *RuntimeConfig) SetDomainRoutersHandling(handlesDomainRouters bool) *RuntimeConfig
func (*RuntimeConfig) SetFunctionTypeIDLifetimeMs ¶
func (ro *RuntimeConfig) SetFunctionTypeIDLifetimeMs(functionTypeIDLifetimeMs int) *RuntimeConfig
func (*RuntimeConfig) SetGCIntervalSec ¶ added in v0.1.6
func (ro *RuntimeConfig) SetGCIntervalSec(gcIntervalSec int) *RuntimeConfig
func (*RuntimeConfig) SetHubDomainName ¶ added in v0.1.6
func (ro *RuntimeConfig) SetHubDomainName(hubDomainName string) *RuntimeConfig
func (*RuntimeConfig) SetKVMutexIsOldPollingIntervalSec ¶
func (ro *RuntimeConfig) SetKVMutexIsOldPollingIntervalSec(kvMutexIsOldPollingIntervalSec int) *RuntimeConfig
func (*RuntimeConfig) SetKVMutexLifeTimeSec ¶
func (ro *RuntimeConfig) SetKVMutexLifeTimeSec(kvMutexLifeTimeSec int) *RuntimeConfig
func (*RuntimeConfig) SetNatsReplicas ¶ added in v0.1.6
func (ro *RuntimeConfig) SetNatsReplicas(replicasCount int) *RuntimeConfig
func (*RuntimeConfig) SetNatsURL ¶
func (ro *RuntimeConfig) SetNatsURL(natsURL string) *RuntimeConfig
func (*RuntimeConfig) SetRequestTimeoutSec ¶ added in v0.1.3
func (ro *RuntimeConfig) SetRequestTimeoutSec(requestTimeoutSec int) *RuntimeConfig
func (*RuntimeConfig) SetStreamMaxAge ¶ added in v0.1.7
func (ro *RuntimeConfig) SetStreamMaxAge(streamType StreamType, maxAge time.Duration) *RuntimeConfig
func (*RuntimeConfig) SetStreamMaxBytes ¶ added in v0.1.7
func (ro *RuntimeConfig) SetStreamMaxBytes(streamType StreamType, maxBytes int64) *RuntimeConfig
func (*RuntimeConfig) SetStreamMaxMessages ¶ added in v0.1.7
func (ro *RuntimeConfig) SetStreamMaxMessages(streamType StreamType, maxMessages int64) *RuntimeConfig
func (*RuntimeConfig) SetTLS ¶ added in v0.1.6
func (ro *RuntimeConfig) SetTLS(enableTLS bool) *RuntimeConfig
func (*RuntimeConfig) UseJSDomainAsHubDomainName ¶ added in v0.1.6
func (ro *RuntimeConfig) UseJSDomainAsHubDomainName() *RuntimeConfig
type SFWorkerMessage ¶ added in v0.1.6
type SFWorkerMessage struct {
ID string
Data FunctionTypeMsg
}
type SFWorkerPool ¶ added in v0.1.6
type SFWorkerPool struct {
// contains filtered or unexported fields
}
SFWorkerPool - controls the statefun pool
func NewSFWorkerPool ¶ added in v0.1.6
func NewSFWorkerPool(ft *FunctionType, conf SFWorkerPoolConfig) *SFWorkerPool
func (*SFWorkerPool) GetWorkerPercentage ¶ added in v0.1.6
func (wp *SFWorkerPool) GetWorkerPercentage() (loadedWorkers float64, idleWorkers float64)
func (*SFWorkerPool) GetWorkerPoolLoadPercentage ¶ added in v0.1.6
func (wp *SFWorkerPool) GetWorkerPoolLoadPercentage() float64
func (*SFWorkerPool) Notify ¶ added in v0.1.6
func (wp *SFWorkerPool) Notify()
func (*SFWorkerPool) Stop ¶ added in v0.1.6
func (wp *SFWorkerPool) Stop()
type SFWorkerPoolConfig ¶ added in v0.1.6
type SFWorkerPoolConfig struct {
MinWorkers int
MaxWorkers int
IdleTimeout time.Duration
TaskQueueLen int
}
func NewSFWorkerPoolConfig ¶ added in v0.1.6
func NewSFWorkerPoolConfig(loadType WPLoadType) (config SFWorkerPoolConfig)
type SFWorkerTask ¶ added in v0.1.6
type SFWorkerTask struct {
Msg SFWorkerMessage
}
type StreamParams ¶ added in v0.1.7
type StreamParams struct {
// contains filtered or unexported fields
}
type StreamType ¶ added in v0.1.7
type StreamType int
const ( StreamTypeFunction StreamType = iota StreamTypeSystem StreamTypeKV )
type WPLoadType ¶ added in v0.1.6
type WPLoadType int
const ( WPLoadDefault WPLoadType = iota WPLoadVeryLight WPLoadLight WPLoadNormal WPLoadHigh WPLoadVeryHigh )