Documentation
¶
Index ¶
- Constants
- Variables
- func IsAdminError(err error) bool
- type APIVersion
- type AllocatorStats
- type AuthAction
- type AuthPolicies
- type AutoFailoverPolicyData
- type AutoFailoverPolicyType
- type BacklogQuota
- type BacklogQuotaType
- type BookieAffinityGroupData
- type BrokerAssignment
- type BrokerData
- type BrokerNamespaceIsolationData
- type BrokerStats
- type BrokerStatsData
- type Brokers
- type BundlesData
- type Client
- type ClusterData
- type Clusters
- type Config
- type ConnectorDefinition
- type ConsumerConfig
- type ConsumerStats
- type CursorInfo
- type CursorStats
- type DispatchRate
- type Error
- type Example
- type ExceptionInformation
- type FailureDomainData
- type FailureDomainMap
- type FunctionConfig
- type FunctionData
- type FunctionInstanceStats
- type FunctionInstanceStatsData
- type FunctionInstanceStatsDataBase
- type FunctionInstanceStatus
- type FunctionInstanceStatusData
- type FunctionState
- type FunctionStats
- type FunctionStatus
- type Functions
- type FunctionsWorker
- type GetSchemaResponse
- type InternalConfigurationData
- type KeyValue
- type LedgerInfo
- type LocalBrokerData
- type LongDescription
- type LongRunningProcessStatus
- type LookupData
- type ManagedLedgerInfo
- type Message
- type MessageID
- type MessageRangeInfo
- type Metrics
- type NameSpaceName
- type NamespaceBundleStats
- type NamespaceIsolationData
- type NamespaceOwnershipStatus
- type Namespaces
- type NamespacesData
- type NsIsolationPoliciesData
- type NsIsolationPolicy
- type OffloadProcessStatus
- type Output
- type PartitionedTopicMetadata
- type PartitionedTopicStats
- type PersistencePolicies
- type PersistentTopicInternalStats
- type Policies
- type PoolArenaStats
- type PoolChunkListStats
- type PoolChunkStats
- type PoolSubpageStats
- type PositionInfo
- type PostSchemaPayload
- type PublisherStats
- type ReplicatorStats
- type ResourceQuota
- type ResourceQuotaData
- type ResourceQuotas
- type ResourceUsage
- type Resources
- type RetentionPolicies
- type RetentionPolicy
- type Schema
- type SchemaCompatibilityStrategy
- type SchemaData
- type SchemaInfo
- type SchemaInfoWithVersion
- type SingleMessageMetadata
- type SinkConfig
- type SinkData
- type SinkInstanceStatus
- type SinkInstanceStatusData
- type SinkStatus
- type Sinks
- type SourceConfig
- type SourceData
- type SourceInstanceStatus
- type SourceInstanceStatusData
- type SourceStatus
- type Sources
- type Status
- type SubscribeRate
- type SubscriptionAuthMode
- type SubscriptionStats
- type Subscriptions
- type TLSOptions
- type TenantData
- type Tenants
- type TopicDomain
- type TopicName
- func (t *TopicName) GetDomain() TopicDomain
- func (t *TopicName) GetEncodedTopic() string
- func (t *TopicName) GetLocalName() string
- func (t *TopicName) GetPartition(index int) (*TopicName, error)
- func (t *TopicName) GetRestPath() string
- func (t *TopicName) IsPersistent() bool
- func (t *TopicName) String() string
- type TopicStats
- type TopicStatsStream
- type Topics
- type UpdateOptions
- type WindowConfig
- type WorkerFunctionInstanceStats
- type WorkerInfo
Constants ¶
const ( DefaultWebServiceURL = "http://localhost:8080" DefaultHTTPTimeOutDuration = 5 * time.Minute )
const ( JavaRuntime = "JAVA" PythonRuntime = "PYTHON" GoRuntime = "GO" )
const ( FirstBoundary string = "0x00000000" LastBoundary string = "0xffffffff" )
const ( PublishTimeHeader = "X-Pulsar-Publish-Time" BatchHeader = "X-Pulsar-Num-Batch-Message" PropertyPrefix = "X-Pulsar-PROPERTY-" )
const ( PUBLICTENANT = "public" DEFAULTNAMESPACE = "default" PARTITIONEDTOPICSUFFIX = "-partition-" )
const DefaultAPIVersion = "v2"
const PATTEN = "^[-=:.\\w]*$"
allowed characters for property, namespace, cluster and topic names are alphanumeric (a-zA-Z0-9) and these special chars -=:. and % is allowed as part of valid URL encoding
const WindowConfigKey = "__WINDOWCONFIGS__"
Variables ¶
var EXAMPLES = "EXAMPLES:"
var Earliest = MessageID{-1, -1, -1, -1}
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var OUTPUT = "OUTPUT:"
var PERMISSION = "REQUIRED PERMISSION:"
var ReleaseVersion = "None"
var SCOPE = "SCOPE:"
var SPACES = " "
var USEDFOR = "USED FOR:"
Functions ¶
func IsAdminError ¶
Types ¶
type APIVersion ¶
type APIVersion int
const ( V1 APIVersion = iota V2 V3 )
func (APIVersion) String ¶
func (v APIVersion) String() string
type AllocatorStats ¶
type AllocatorStats struct {
NumDirectArenas int `json:"numDirectArenas"`
NumHeapArenas int `json:"numHeapArenas"`
NumThreadLocalCaches int `json:"numThreadLocalCaches"`
NormalCacheSize int `json:"normalCacheSize"`
SmallCacheSize int `json:"smallCacheSize"`
TinyCacheSize int `json:"tinyCacheSize"`
DirectArenas []PoolArenaStats `json:"directArenas"`
HeapArenas []PoolArenaStats `json:"heapArenas"`
}
type AuthAction ¶
type AuthAction string
func ParseAuthAction ¶
func ParseAuthAction(action string) (AuthAction, error)
func (AuthAction) String ¶
func (a AuthAction) String() string
type AuthPolicies ¶
type AuthPolicies struct {
NamespaceAuth map[string]AuthAction `json:"namespace_auth"`
DestinationAuth map[string]map[string]AuthAction `json:"destination_auth"`
SubscriptionAuthRoles map[string][]string `json:"subscription_auth_roles"`
}
func NewAuthPolicies ¶
func NewAuthPolicies() *AuthPolicies
type AutoFailoverPolicyData ¶
type AutoFailoverPolicyData struct {
PolicyType AutoFailoverPolicyType `json:"policy_type"`
Parameters map[string]string `json:"parameters"`
}
type AutoFailoverPolicyType ¶
type AutoFailoverPolicyType string
const (
MinAvailable AutoFailoverPolicyType = "min_available"
)
type BacklogQuota ¶
type BacklogQuota struct {
Limit int64 `json:"limit"`
Policy RetentionPolicy `json:"policy"`
}
func NewBacklogQuota ¶
func NewBacklogQuota(limit int64, policy RetentionPolicy) BacklogQuota
type BacklogQuotaType ¶
type BacklogQuotaType string
const DestinationStorage BacklogQuotaType = "destination_storage"
type BookieAffinityGroupData ¶
type BrokerAssignment ¶
type BrokerAssignment string
const ( Primary BrokerAssignment = "primary" Secondary BrokerAssignment = "secondary" )
type BrokerData ¶
type BrokerStats ¶
type BrokerStats interface {
// GetMetrics returns Monitoring metrics
GetMetrics() ([]Metrics, error)
// GetMBeans requests JSON string server mbean dump
GetMBeans() ([]Metrics, error)
// GetTopics returns JSON string topics stats
GetTopics() (string, error)
// GetLoadReport returns load report of broker
GetLoadReport() (*LocalBrokerData, error)
// GetAllocatorStats returns stats from broker
GetAllocatorStats(allocatorName string) (*AllocatorStats, error)
}
BrokerStats is admin interface for broker stats management
type BrokerStatsData ¶
type BrokerStatsData struct {
Indent bool `json:"indent"`
}
type Brokers ¶
type Brokers interface {
// GetActiveBrokers returns the list of active brokers in the cluster.
GetActiveBrokers(cluster string) ([]string, error)
// GetDynamicConfigurationNames returns list of updatable configuration name
GetDynamicConfigurationNames() ([]string, error)
// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error)
// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
// brokers and all brokers can update {@link ServiceConfiguration} value locally
UpdateDynamicConfiguration(configName, configValue string) error
// DeleteDynamicConfiguration deletes dynamic configuration value in to Zk. It will not impact current value
// in broker but next time when broker restarts, it applies value from configuration file only.
DeleteDynamicConfiguration(configName string) error
// GetRuntimeConfigurations returns values of runtime configuration
GetRuntimeConfigurations() (map[string]string, error)
// GetInternalConfigurationData returns the internal configuration data
GetInternalConfigurationData() (*InternalConfigurationData, error)
// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
GetAllDynamicConfigurations() (map[string]string, error)
// HealthCheck run a health check on the broker
HealthCheck() error
}
Brokers is admin interface for brokers management
type BundlesData ¶
type BundlesData struct {
Boundaries []string `json:"boundaries"`
NumBundles int `json:"numBundles"`
}
func NewBundlesData ¶
func NewBundlesData(boundaries []string) BundlesData
func NewBundlesDataWithNumBundles ¶
func NewBundlesDataWithNumBundles(numBundles int) *BundlesData
func NewDefaultBoundle ¶
func NewDefaultBoundle() *BundlesData
type Client ¶
type Client interface {
Clusters() Clusters
Functions() Functions
Tenants() Tenants
Topics() Topics
Subscriptions() Subscriptions
Sources() Sources
Sinks() Sinks
Namespaces() Namespaces
Schemas() Schema
NsIsolationPolicy() NsIsolationPolicy
Brokers() Brokers
BrokerStats() BrokerStats
ResourceQuotas() ResourceQuotas
FunctionsWorker() FunctionsWorker
}
Client provides a client to the Pulsar Restful API
type ClusterData ¶
type ClusterData struct {
Name string `json:"-"`
ServiceURL string `json:"serviceUrl"`
ServiceURLTls string `json:"serviceUrlTls"`
BrokerServiceURL string `json:"brokerServiceUrl"`
BrokerServiceURLTls string `json:"brokerServiceUrlTls"`
PeerClusterNames []string `json:"peerClusterNames"`
}
ClusterData information on a cluster
type Clusters ¶
type Clusters interface {
// List returns the list of clusters
List() ([]string, error)
// Get the configuration data for the specified cluster
Get(string) (ClusterData, error)
// Create a new cluster
Create(ClusterData) error
// Delete an existing cluster
Delete(string) error
// Update the configuration for a cluster
Update(ClusterData) error
// UpdatePeerClusters updates peer cluster names.
UpdatePeerClusters(string, []string) error
// GetPeerClusters returns peer-cluster names
GetPeerClusters(string) ([]string, error)
// CreateFailureDomain creates a domain into cluster
CreateFailureDomain(FailureDomainData) error
// GetFailureDomain returns the domain registered into a cluster
GetFailureDomain(clusterName, domainName string) (FailureDomainData, error)
// ListFailureDomains returns all registered domains in cluster
ListFailureDomains(string) (FailureDomainMap, error)
// DeleteFailureDomain deletes a domain in cluster
DeleteFailureDomain(FailureDomainData) error
// UpdateFailureDomain updates a domain into cluster
UpdateFailureDomain(FailureDomainData) error
}
Clusters is admin interface for clusters management
type Config ¶
type Config struct {
WebServiceURL string
HTTPTimeout time.Duration
HTTPClient *http.Client
APIVersion APIVersion
Auth *auth.TLSAuthProvider
AuthParams string
TLSOptions *TLSOptions
TokenAuth *auth.TokenAuthProvider
}
Config is used to configure the admin client
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a default configuration for the pulsar admin client
type ConnectorDefinition ¶
type ConnectorDefinition struct {
// The name of the connector type
Name string `json:"name"`
// Description to be used for user help
Description string `json:"description"`
// The class name for the connector source implementation
// <p>If not defined, it will be assumed this connector cannot act as a data source
SourceClass string `json:"sourceClass"`
// The class name for the connector sink implementation
// <p>If not defined, it will be assumed this connector cannot act as a data sink
SinkClass string `json:"sinkClass"`
}
Basic information about a Pulsar connector
type ConsumerConfig ¶
type ConsumerStats ¶
type ConsumerStats struct {
BlockedConsumerOnUnAckedMsgs bool `json:"blockedConsumerOnUnackedMsgs"`
AvailablePermits int `json:"availablePermits"`
UnAckedMessages int `json:"unackedMessages"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
ConsumerName string `json:"consumerName"`
Metadata map[string]string `json:"metadata"`
}
type CursorInfo ¶
type CursorInfo struct {
Version int `json:"version"`
CreationDate string `json:"creationDate"`
ModificationDate string `json:"modificationDate"`
CursorsLedgerID int64 `json:"cursorsLedgerId"`
MarkDelete PositionInfo `json:"markDelete"`
IndividualDeletedMessages []MessageRangeInfo `json:"individualDeletedMessages"`
Properties map[string]int64
}
type CursorStats ¶
type CursorStats struct {
MarkDeletePosition string `json:"markDeletePosition"`
ReadPosition string `json:"readPosition"`
WaitingReadOp bool `json:"waitingReadOp"`
PendingReadOps int `json:"pendingReadOps"`
MessagesConsumedCounter int64 `json:"messagesConsumedCounter"`
CursorLedger int64 `json:"cursorLedger"`
CursorLedgerLastEntry int64 `json:"cursorLedgerLastEntry"`
IndividuallyDeletedMessages string `json:"individuallyDeletedMessages"`
LastLedgerWitchTimestamp string `json:"lastLedgerWitchTimestamp"`
State string `json:"state"`
NumberOfEntriesSinceFirstNotAckedMessage int64 `json:"numberOfEntriesSinceFirstNotAckedMessage"`
TotalNonContiguousDeletedMessagesRange int `json:"totalNonContiguousDeletedMessagesRange"`
Properties map[string]int64 `json:"properties"`
}
type DispatchRate ¶
type DispatchRate struct {
DispatchThrottlingRateInMsg int `json:"dispatchThrottlingRateInMsg"`
DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"`
RatePeriodInSecond int `json:"ratePeriodInSecond"`
}
func NewDispatchRate ¶
func NewDispatchRate() *DispatchRate
type ExceptionInformation ¶
type FailureDomainData ¶
type FailureDomainData struct {
ClusterName string `json:"-"`
DomainName string `json:"-"`
BrokerList []string `json:"brokers"`
}
Failure Domain information
type FailureDomainMap ¶
type FailureDomainMap map[string]FailureDomainData
type FunctionConfig ¶
type FunctionConfig struct {
TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"`
TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"`
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"`
AutoAck bool `json:"autoAck" yaml:"autoAck"`
Parallelism int `json:"parallelism" yaml:"parallelism"`
MaxMessageRetries int `json:"maxMessageRetries" yaml:"maxMessageRetries"`
Output string `json:"output" yaml:"output"`
OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"`
LogTopic string `json:"logTopic" yaml:"logTopic"`
ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"`
// Represents either a builtin schema type (eg: 'avro', 'json', etc) or the class name for a Schema implementation
OutputSchemaType string `json:"outputSchemaType" yaml:"outputSchemaType"`
Runtime string `json:"runtime" yaml:"runtime"`
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
SubName string `json:"subName" yaml:"subName"`
FQFN string `json:"fqfn" yaml:"fqfn"`
Jar string `json:"jar" yaml:"jar"`
Py string `json:"py" yaml:"py"`
Go string `json:"go" yaml:"go"`
// Any flags that you want to pass to the runtime.
// note that in thread mode, these flags will have no impact
RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`
Tenant string `json:"tenant" yaml:"tenant"`
Namespace string `json:"namespace" yaml:"namespace"`
Name string `json:"name" yaml:"name"`
ClassName string `json:"className" yaml:"className"`
Resources *Resources `json:"resources" yaml:"resources"`
WindowConfig *WindowConfig `json:"windowConfig" yaml:"windowConfig"`
Inputs []string `json:"inputs" yaml:"inputs"`
UserConfig map[string]interface{} `json:"userConfig" yaml:"userConfig"`
CustomSerdeInputs map[string]string `json:"customSerdeInputs" yaml:"customSerdeInputs"`
CustomSchemaInputs map[string]string `json:"customSchemaInputs" yaml:"customSchemaInputs"`
// A generalized way of specifying inputs
InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"`
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider. The type of an value here can be found by the
// SecretProviderConfigurator.getSecretObjectType() method.
Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`
}
type FunctionData ¶
type FunctionData struct {
UpdateAuthData bool `json:"updateAuthData"`
RetainOrdering bool `json:"retainOrdering"`
Watch bool `json:"watch"`
AutoAck bool `json:"autoAck"`
Parallelism int `json:"parallelism"`
WindowLengthCount int `json:"windowLengthCount"`
SlidingIntervalCount int `json:"slidingIntervalCount"`
MaxMessageRetries int `json:"maxMessageRetries"`
TimeoutMs int64 `json:"timeoutMs"`
SlidingIntervalDurationMs int64 `json:"slidingIntervalDurationMs"`
WindowLengthDurationMs int64 `json:"windowLengthDurationMs"`
RAM int64 `json:"ram"`
Disk int64 `json:"disk"`
CPU float64 `json:"cpu"`
SubsName string `json:"subsName"`
DeadLetterTopic string `json:"deadLetterTopic"`
Key string `json:"key"`
State string `json:"state"`
TriggerValue string `json:"triggerValue"`
TriggerFile string `json:"triggerFile"`
Topic string `json:"topic"`
UserCodeFile string `json:"-"`
FQFN string `json:"fqfn"`
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
FuncName string `json:"functionName"`
InstanceID string `json:"instance_id"`
ClassName string `json:"className"`
Jar string `json:"jarFile"`
Py string `json:"pyFile"`
Go string `json:"goFile"`
Inputs string `json:"inputs"`
TopicsPattern string `json:"topicsPattern"`
Output string `json:"output"`
LogTopic string `json:"logTopic"`
SchemaType string `json:"schemaType"`
CustomSerDeInputs string `json:"customSerdeInputString"`
CustomSchemaInput string `json:"customSchemaInputString"`
OutputSerDeClassName string `json:"outputSerdeClassName"`
FunctionConfigFile string `json:"fnConfigFile"`
ProcessingGuarantees string `json:"processingGuarantees"`
UserConfig string `json:"userConfigString"`
FuncConf *FunctionConfig `json:"-"`
}
FunctionData information for a Pulsar Function
type FunctionInstanceStats ¶
type FunctionInstanceStats struct {
FunctionInstanceStatsDataBase
InstanceID int64 `json:"instanceId"`
Metrics FunctionInstanceStatsData `json:"metrics"`
}
type FunctionInstanceStatsData ¶
type FunctionInstanceStatsData struct {
OneMin FunctionInstanceStatsDataBase `json:"oneMin"`
// Timestamp of when the function was last invoked for instance
LastInvocation int64 `json:"lastInvocation"`
// Map of user defined metrics
UserMetrics map[string]float64 `json:"userMetrics"`
FunctionInstanceStatsDataBase
}
type FunctionInstanceStatsDataBase ¶
type FunctionInstanceStatsDataBase struct {
// Total number of records function received from source for instance
ReceivedTotal int64 `json:"receivedTotal"`
// Total number of records successfully processed by user function for instance
ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"`
// Total number of system exceptions thrown for instance
SystemExceptionsTotal int64 `json:"systemExceptionsTotal"`
// Total number of user exceptions thrown for instance
UserExceptionsTotal int64 `json:"userExceptionsTotal"`
// Average process latency for function for instance
AvgProcessLatency float64 `json:"avgProcessLatency"`
}
type FunctionInstanceStatus ¶
type FunctionInstanceStatus struct {
InstanceID int `json:"instanceId"`
Status FunctionInstanceStatusData `json:"status"`
}
type FunctionInstanceStatusData ¶
type FunctionInstanceStatusData struct {
Running bool `json:"running"`
Err string `json:"error"`
NumRestarts int64 `json:"numRestarts"`
NumReceived int64 `json:"numReceived"`
NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"`
NumUserExceptions int64 `json:"numUserExceptions"`
LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"`
NumSystemExceptions int64 `json:"numSystemExceptions"`
LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`
AverageLatency float64 `json:"averageLatency"`
LastInvocationTime int64 `json:"lastInvocationTime"`
WorkerID string `json:"workerId"`
}
type FunctionState ¶
type FunctionStats ¶
type FunctionStats struct {
// Overall total number of records function received from source
ReceivedTotal int64 `json:"receivedTotal"`
// Overall total number of records successfully processed by user function
ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"`
// Overall total number of system exceptions thrown
SystemExceptionsTotal int64 `json:"systemExceptionsTotal"`
// Overall total number of user exceptions thrown
UserExceptionsTotal int64 `json:"userExceptionsTotal"`
// Average process latency for function
AvgProcessLatency float64 `json:"avgProcessLatency"`
// Timestamp of when the function was last invoked by any instance
LastInvocation int64 `json:"lastInvocation"`
OneMin FunctionInstanceStatsDataBase `json:"oneMin"`
Instances []FunctionInstanceStats `json:"instances"`
FunctionInstanceStats
}
func (*FunctionStats) AddInstance ¶
func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats)
func (*FunctionStats) CalculateOverall ¶
func (fs *FunctionStats) CalculateOverall() *FunctionStats
type FunctionStatus ¶
type FunctionStatus struct {
NumInstances int `json:"numInstances"`
NumRunning int `json:"numRunning"`
Instances []FunctionInstanceStatus `json:"instances"`
}
type Functions ¶
type Functions interface {
// CreateFunc create a new function.
CreateFunc(data *FunctionConfig, fileName string) error
// CreateFuncWithURL create a new function by providing url from which fun-pkg can be downloaded.
// supported url: http/file
// eg:
// File: file:/dir/fileName.jar
// Http: http://www.repo.com/fileName.jar
//
// @param functionConfig
// the function configuration object
// @param pkgURL
// url from which pkg can be downloaded
CreateFuncWithURL(data *FunctionConfig, pkgURL string) error
// StopFunction stop all function instances
StopFunction(tenant, namespace, name string) error
// StopFunctionWithID stop function instance
StopFunctionWithID(tenant, namespace, name string, instanceID int) error
// DeleteFunction delete an existing function
DeleteFunction(tenant, namespace, name string) error
// StartFunction start all function instances
StartFunction(tenant, namespace, name string) error
// StartFunctionWithID start function instance
StartFunctionWithID(tenant, namespace, name string, instanceID int) error
// RestartFunction restart all function instances
RestartFunction(tenant, namespace, name string) error
// RestartFunctionWithID restart function instance
RestartFunctionWithID(tenant, namespace, name string, instanceID int) error
// GetFunctions returns the list of functions
GetFunctions(tenant, namespace string) ([]string, error)
// GetFunction returns the configuration for the specified function
GetFunction(tenant, namespace, name string) (FunctionConfig, error)
// GetFunctionStatus returns the current status of a function
GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error)
// GetFunctionStatusWithInstanceID returns the current status of a function instance
GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error)
// GetFunctionStats returns the current stats of a function
GetFunctionStats(tenant, namespace, name string) (FunctionStats, error)
// GetFunctionStatsWithInstanceID gets the current stats of a function instance
GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error)
// GetFunctionState fetch the current state associated with a Pulsar Function
//
// Response Example:
// { "value : 12, version : 2"}
GetFunctionState(tenant, namespace, name, key string) (FunctionState, error)
// PutFunctionState puts the given state associated with a Pulsar Function
PutFunctionState(tenant, namespace, name string, state FunctionState) error
// TriggerFunction triggers the function by writing to the input topic
TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error)
// UpdateFunction updates the configuration for a function.
UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error
// UpdateFunctionWithURL updates the configuration for a function.
//
// Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
// eg:
// File: file:/dir/fileName.jar
// Http: http://www.repo.com/fileName.jar
UpdateFunctionWithURL(functionConfig *FunctionConfig, pkgURL string, updateOptions *UpdateOptions) error
}
Functions is admin interface for functions management
type FunctionsWorker ¶
type FunctionsWorker interface {
// Get all functions stats on a worker
GetFunctionsStats() ([]*WorkerFunctionInstanceStats, error)
// Get worker metrics
GetMetrics() ([]*Metrics, error)
// Get List of all workers belonging to this cluster
GetCluster() ([]*WorkerInfo, error)
// Get the worker who is the leader of the clusterv
GetClusterLeader() (*WorkerInfo, error)
// Get the function assignment among the cluster
GetAssignments() (map[string][]string, error)
}
type GetSchemaResponse ¶
type KeyValue ¶
type KeyValue struct {
Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
Value *string `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
nolint
func (*KeyValue) ProtoMessage ¶
func (*KeyValue) ProtoMessage()
type LedgerInfo ¶
type LocalBrokerData ¶
type LocalBrokerData struct {
// URLs to satisfy contract of ServiceLookupData (used by NamespaceService).
WebServiceURL string `json:"webServiceUrl"`
WebServiceURLTLS string `json:"webServiceUrlTls"`
PulsarServiceURL string `json:"pulsarServiceUrl"`
PulsarServiceURLTLS string `json:"pulsarServiceUrlTls"`
PersistentTopicsEnabled bool `json:"persistentTopicsEnabled"`
NonPersistentTopicsEnabled bool `json:"nonPersistentTopicsEnabled"`
// Most recently available system resource usage.
CPU ResourceUsage `json:"cpu"`
Memory ResourceUsage `json:"memory"`
DirectMemory ResourceUsage `json:"directMemory"`
BandwidthIn ResourceUsage `json:"bandwidthIn"`
BandwidthOut ResourceUsage `json:"bandwidthOut"`
// Message data from the most recent namespace bundle stats.
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
// Timestamp of last update.
LastUpdate int64 `json:"lastUpdate"`
// The stats given in the most recent invocation of update.
LastStats map[string]*NamespaceBundleStats `json:"lastStats"`
NumTopics int `json:"numTopics"`
NumBundles int `json:"numBundles"`
NumConsumers int `json:"numConsumers"`
NumProducers int `json:"numProducers"`
// All bundles belonging to this broker.
Bundles []string `json:"bundles"`
// The bundles gained since the last invocation of update.
LastBundleGains []string `json:"lastBundleGains"`
// The bundles lost since the last invocation of update.
LastBundleLosses []string `json:"lastBundleLosses"`
// The version string that this broker is running, obtained from the Maven build artifact in the POM
BrokerVersionString string `json:"brokerVersionString"`
// This place-holder requires to identify correct LoadManagerReport type while deserializing
LoadReportType string `json:"loadReportType"`
// the external protocol data advertised by protocol handlers.
Protocols map[string]string `json:"protocols"`
}
func NewLocalBrokerData ¶
func NewLocalBrokerData() LocalBrokerData
type LongDescription ¶
type LongDescription struct {
CommandUsedFor string
CommandPermission string
CommandExamples []Example
CommandOutput []Output
CommandScope string
}
func (*LongDescription) ExampleToString ¶
func (desc *LongDescription) ExampleToString() string
func (*LongDescription) ToString ¶
func (desc *LongDescription) ToString() string
type LookupData ¶
type ManagedLedgerInfo ¶
type ManagedLedgerInfo struct {
Version int `json:"version"`
CreationDate string `json:"creationDate"`
ModificationData string `json:"modificationData"`
Ledgers []LedgerInfo `json:"ledgers"`
TerminatedPosition PositionInfo `json:"terminatedPosition"`
Cursors map[string]CursorInfo `json:"cursors"`
}
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
func (*Message) GetMessageID ¶
func (*Message) GetPayload ¶
func (*Message) GetProperties ¶
type MessageID ¶
type MessageID struct {
LedgerID int64 `json:"ledgerId"`
EntryID int64 `json:"entryId"`
PartitionedIndex int `json:"partitionedIndex"`
BatchIndex int `json:"-"`
}
func ParseMessageID ¶
type MessageRangeInfo ¶
type MessageRangeInfo struct {
From PositionInfo `json:"from"`
To PositionInfo `json:"to"`
Offloaded bool `json:"offloaded"`
}
type Metrics ¶
type Metrics struct {
Metrics map[string]interface{} `json:"metrics"`
Dimensions map[string]string `json:"dimensions"`
}
func NewMetrics ¶
type NameSpaceName ¶
type NameSpaceName struct {
// contains filtered or unexported fields
}
func GetNameSpaceName ¶
func GetNameSpaceName(tenant, namespace string) (*NameSpaceName, error)
func GetNamespaceName ¶
func GetNamespaceName(completeName string) (*NameSpaceName, error)
func (*NameSpaceName) String ¶
func (n *NameSpaceName) String() string
type NamespaceBundleStats ¶
type NamespaceBundleStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
ConsumerCount int `json:"consumerCount"`
ProducerCount int `json:"producerCount"`
TopicsNum int64 `json:"topics"`
CacheSize int64 `json:"cacheSize"`
// Consider the throughput equal if difference is less than 100 KB/s
ThroughputDifferenceThreshold float64 `json:"throughputDifferenceThreshold"`
// Consider the msgRate equal if the difference is less than 100
MsgRateDifferenceThreshold float64 `json:"msgRateDifferenceThreshold"`
// Consider the total topics/producers/consumers equal if the difference is less than 500
TopicConnectionDifferenceThreshold int64 `json:"topicConnectionDifferenceThreshold"`
// Consider the cache size equal if the difference is less than 100 kb
CacheSizeDifferenceThreshold int64 `json:"cacheSizeDifferenceThreshold"`
}
func NewNamespaceBundleStats ¶
func NewNamespaceBundleStats() *NamespaceBundleStats
type NamespaceIsolationData ¶
type NamespaceIsolationData struct {
Namespaces []string `json:"namespaces"`
Primary []string `json:"primary"`
Secondary []string `json:"secondary"`
AutoFailoverPolicy AutoFailoverPolicyData `json:"auto_failover_policy"`
}
type NamespaceOwnershipStatus ¶
type NamespaceOwnershipStatus struct {
BrokerAssignment BrokerAssignment `json:"broker_assignment"`
IsControlled bool `json:"is_controlled"`
IsActive bool `json:"is_active"`
}
type Namespaces ¶
type Namespaces interface {
// GetNamespaces returns the list of all the namespaces for a certain tenant
GetNamespaces(tenant string) ([]string, error)
// GetTopics returns the list of all the topics under a certain namespace
GetTopics(namespace string) ([]string, error)
// GetPolicies returns the dump all the policies specified for a namespace
GetPolicies(namespace string) (*Policies, error)
// CreateNamespace creates a new empty namespace with no policies attached
CreateNamespace(namespace string) error
// CreateNsWithNumBundles creates a new empty namespace with no policies attached
CreateNsWithNumBundles(namespace string, numBundles int) error
// CreateNsWithPolices creates a new namespace with the specified policies
CreateNsWithPolices(namespace string, polices Policies) error
// CreateNsWithBundlesData creates a new empty namespace with no policies attached
CreateNsWithBundlesData(namespace string, bundleData *BundlesData) error
// DeleteNamespace deletes an existing namespace
DeleteNamespace(namespace string) error
// DeleteNamespaceBundle deletes an existing bundle in a namespace
DeleteNamespaceBundle(namespace string, bundleRange string) error
// SetNamespaceMessageTTL sets the messages Time to Live for all the topics within a namespace
SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error
// GetNamespaceMessageTTL returns the message TTL for a namespace
GetNamespaceMessageTTL(namespace string) (int, error)
// GetRetention returns the retention configuration for a namespace
GetRetention(namespace string) (*RetentionPolicies, error)
// SetRetention sets the retention configuration for all the topics on a namespace
SetRetention(namespace string, policy RetentionPolicies) error
// GetBacklogQuotaMap returns backlog quota map on a namespace
GetBacklogQuotaMap(namespace string) (map[BacklogQuotaType]BacklogQuota, error)
// SetBacklogQuota sets a backlog quota for all the topics on a namespace
SetBacklogQuota(namespace string, backlogQuota BacklogQuota) error
// RemoveBacklogQuota removes a backlog quota policy from a namespace
RemoveBacklogQuota(namespace string) error
// SetSchemaValidationEnforced sets schema validation enforced for namespace
SetSchemaValidationEnforced(namespace NameSpaceName, schemaValidationEnforced bool) error
// GetSchemaValidationEnforced returns schema validation enforced for namespace
GetSchemaValidationEnforced(namespace NameSpaceName) (bool, error)
// SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the a new schema provided
// by a producer is compatible with the current schema before it is installed
SetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName, strategy SchemaCompatibilityStrategy) error
// GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the a new schema provided
// by a producer is compatible with the current schema before it is installed
GetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName) (SchemaCompatibilityStrategy, error)
// ClearOffloadDeleteLag clears the offload deletion lag for a namespace.
ClearOffloadDeleteLag(namespace NameSpaceName) error
// SetOffloadDeleteLag sets the offload deletion lag for a namespace
SetOffloadDeleteLag(namespace NameSpaceName, timeMs int64) error
// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds
GetOffloadDeleteLag(namespace NameSpaceName) (int64, error)
// SetOffloadThreshold sets the offloadThreshold for a namespace
SetOffloadThreshold(namespace NameSpaceName, threshold int64) error
// GetOffloadThreshold returns the offloadThreshold for a namespace
GetOffloadThreshold(namespace NameSpaceName) (int64, error)
// SetCompactionThreshold sets the compactionThreshold for a namespace
SetCompactionThreshold(namespace NameSpaceName, threshold int64) error
// GetCompactionThreshold returns the compactionThreshold for a namespace
GetCompactionThreshold(namespace NameSpaceName) (int64, error)
// SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace.
SetMaxConsumersPerSubscription(namespace NameSpaceName, max int) error
// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace.
GetMaxConsumersPerSubscription(namespace NameSpaceName) (int, error)
// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
SetMaxConsumersPerTopic(namespace NameSpaceName, max int) error
// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace.
GetMaxConsumersPerTopic(namespace NameSpaceName) (int, error)
// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
SetMaxProducersPerTopic(namespace NameSpaceName, max int) error
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
GetMaxProducersPerTopic(namespace NameSpaceName) (int, error)
// GetNamespaceReplicationClusters returns the replication clusters for a namespace
GetNamespaceReplicationClusters(namespace string) ([]string, error)
// SetNamespaceReplicationClusters returns the replication clusters for a namespace
SetNamespaceReplicationClusters(namespace string, clusterIds []string) error
// SetNamespaceAntiAffinityGroup sets anti-affinity group name for a namespace
SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error
// GetAntiAffinityNamespaces returns all namespaces that grouped with given anti-affinity group
GetAntiAffinityNamespaces(tenant, cluster, namespaceAntiAffinityGroup string) ([]string, error)
// GetNamespaceAntiAffinityGroup returns anti-affinity group name for a namespace
GetNamespaceAntiAffinityGroup(namespace string) (string, error)
// DeleteNamespaceAntiAffinityGroup deletes anti-affinity group name for a namespace
DeleteNamespaceAntiAffinityGroup(namespace string) error
// SetDeduplicationStatus sets the deduplication status for all topics within a namespace
// When deduplication is enabled, the broker will prevent to store the same Message multiple times
SetDeduplicationStatus(namespace string, enableDeduplication bool) error
// SetPersistence sets the persistence configuration for all the topics on a namespace
SetPersistence(namespace string, persistence PersistencePolicies) error
// GetPersistence returns the persistence configuration for a namespace
GetPersistence(namespace string) (*PersistencePolicies, error)
// SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are
// part of given affinity group
SetBookieAffinityGroup(namespace string, bookieAffinityGroup BookieAffinityGroupData) error
// DeleteBookieAffinityGroup deletes bookie affinity group configured for a namespace
DeleteBookieAffinityGroup(namespace string) error
// GetBookieAffinityGroup returns bookie affinity group configured for a namespace
GetBookieAffinityGroup(namespace string) (*BookieAffinityGroupData, error)
// Unload a namespace from the current serving broker
Unload(namespace string) error
// UnloadNamespaceBundle unloads namespace bundle
UnloadNamespaceBundle(namespace, bundle string) error
// SplitNamespaceBundle splits namespace bundle
SplitNamespaceBundle(namespace, bundle string, unloadSplitBundles bool) error
// GetNamespacePermissions returns permissions on a namespace
GetNamespacePermissions(namespace NameSpaceName) (map[string][]AuthAction, error)
// GrantNamespacePermission grants permission on a namespace.
GrantNamespacePermission(namespace NameSpaceName, role string, action []AuthAction) error
// RevokeNamespacePermission revokes permissions on a namespace.
RevokeNamespacePermission(namespace NameSpaceName, role string) error
// GrantSubPermission grants permission to role to access subscription's admin-api
GrantSubPermission(namespace NameSpaceName, sName string, roles []string) error
// RevokeSubPermission revoke permissions on a subscription's admin-api access
RevokeSubPermission(namespace NameSpaceName, sName, role string) error
// SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace
SetSubscriptionAuthMode(namespace NameSpaceName, mode SubscriptionAuthMode) error
// SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace
SetEncryptionRequiredStatus(namespace NameSpaceName, encrypt bool) error
// UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace
UnsubscribeNamespace(namespace NameSpaceName, sName string) error
// UnsubscribeNamespaceBundle unsubscribe the given subscription on all topics on a namespace bundle
UnsubscribeNamespaceBundle(namespace NameSpaceName, bundle, sName string) error
// ClearNamespaceBundleBacklogForSubscription clears backlog for a given subscription on all
// topics on a namespace bundle
ClearNamespaceBundleBacklogForSubscription(namespace NameSpaceName, bundle, sName string) error
// ClearNamespaceBundleBacklog clears backlog for all topics on a namespace bundle
ClearNamespaceBundleBacklog(namespace NameSpaceName, bundle string) error
// ClearNamespaceBacklogForSubscription clears backlog for a given subscription on all topics on a namespace
ClearNamespaceBacklogForSubscription(namespace NameSpaceName, sName string) error
// ClearNamespaceBacklog clears backlog for all topics on a namespace
ClearNamespaceBacklog(namespace NameSpaceName) error
// SetReplicatorDispatchRate sets replicator-Message-dispatch-rate (Replicators under this namespace
// can dispatch this many messages per second)
SetReplicatorDispatchRate(namespace NameSpaceName, rate DispatchRate) error
// Get replicator-Message-dispatch-rate (Replicators under this namespace
// can dispatch this many messages per second)
GetReplicatorDispatchRate(namespace NameSpaceName) (DispatchRate, error)
// SetSubscriptionDispatchRate sets subscription-Message-dispatch-rate (subscriptions under this namespace
// can dispatch this many messages per second)
SetSubscriptionDispatchRate(namespace NameSpaceName, rate DispatchRate) error
// GetSubscriptionDispatchRate returns subscription-Message-dispatch-rate (subscriptions under this namespace
// can dispatch this many messages per second)
GetSubscriptionDispatchRate(namespace NameSpaceName) (DispatchRate, error)
// SetSubscribeRate sets namespace-subscribe-rate (topics under this namespace will limit by subscribeRate)
SetSubscribeRate(namespace NameSpaceName, rate SubscribeRate) error
// GetSubscribeRate returns namespace-subscribe-rate (topics under this namespace allow subscribe
// times per consumer in a period)
GetSubscribeRate(namespace NameSpaceName) (SubscribeRate, error)
// SetDispatchRate sets Message-dispatch-rate (topics under this namespace can dispatch
// this many messages per second)
SetDispatchRate(namespace NameSpaceName, rate DispatchRate) error
// GetDispatchRate returns Message-dispatch-rate (topics under this namespace can dispatch
// this many messages per second)
GetDispatchRate(namespace NameSpaceName) (DispatchRate, error)
}
Namespaces is admin interface for namespaces management
type NamespacesData ¶
type NamespacesData struct {
Enable bool `json:"enable"`
Unload bool `json:"unload"`
NumBundles int `json:"numBundles"`
BookkeeperEnsemble int `json:"bookkeeperEnsemble"`
BookkeeperWriteQuorum int `json:"bookkeeperWriteQuorum"`
MessageTTL int `json:"messageTTL"`
BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"`
ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"`
ClusterIds string `json:"clusterIds"`
RetentionTimeStr string `json:"retentionTimeStr"`
LimitStr string `json:"limitStr"`
PolicyStr string `json:"policyStr"`
AntiAffinityGroup string `json:"antiAffinityGroup"`
Tenant string `json:"tenant"`
Cluster string `json:"cluster"`
Bundle string `json:"bundle"`
Clusters []string `json:"clusters"`
}
type NsIsolationPoliciesData ¶
type NsIsolationPolicy ¶
type NsIsolationPolicy interface {
// Create a namespace isolation policy for a cluster
CreateNamespaceIsolationPolicy(cluster, policyName string, namespaceIsolationData NamespaceIsolationData) error
// Delete a namespace isolation policy for a cluster
DeleteNamespaceIsolationPolicy(cluster, policyName string) error
// Get a single namespace isolation policy for a cluster
GetNamespaceIsolationPolicy(cluster, policyName string) (*NamespaceIsolationData, error)
// Get the namespace isolation policies of a cluster
GetNamespaceIsolationPolicies(cluster string) (map[string]NamespaceIsolationData, error)
// Returns list of active brokers with namespace-isolation policies attached to it.
GetBrokersWithNamespaceIsolationPolicy(cluster string) ([]BrokerNamespaceIsolationData, error)
// Returns active broker with namespace-isolation policies attached to it.
GetBrokerWithNamespaceIsolationPolicy(cluster, broker string) (*BrokerNamespaceIsolationData, error)
}
type OffloadProcessStatus ¶
type PartitionedTopicMetadata ¶
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"`
}
Topic data
type PartitionedTopicStats ¶
type PartitionedTopicStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
AverageMsgSize float64 `json:"averageMsgSize"`
StorageSize int64 `json:"storageSize"`
Publishers []PublisherStats `json:"publishers"`
Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
Replication map[string]ReplicatorStats `json:"replication"`
DeDuplicationStatus string `json:"deduplicationStatus"`
Metadata PartitionedTopicMetadata `json:"metadata"`
Partitions map[string]TopicStats `json:"partitions"`
}
type PersistencePolicies ¶
type PersistencePolicies struct {
BookkeeperEnsemble int `json:"bookkeeperEnsemble"`
BookkeeperWriteQuorum int `json:"bookkeeperWriteQuorum"`
BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"`
ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"`
}
func NewPersistencePolicies ¶
func NewPersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum, bookkeeperAckQuorum int, managedLedgerMaxMarkDeleteRate float64) PersistencePolicies
type PersistentTopicInternalStats ¶
type PersistentTopicInternalStats struct {
WaitingCursorsCount int `json:"waitingCursorsCount"`
PendingAddEntriesCount int `json:"pendingAddEntriesCount"`
EntriesAddedCounter int64 `json:"entriesAddedCounter"`
NumberOfEntries int64 `json:"numberOfEntries"`
TotalSize int64 `json:"totalSize"`
CurrentLedgerEntries int64 `json:"currentLedgerEntries"`
CurrentLedgerSize int64 `json:"currentLedgerSize"`
LastLedgerCreatedTimestamp string `json:"lastLedgerCreatedTimestamp"`
LastLedgerCreationFailureTimestamp string `json:"lastLedgerCreationFailureTimestamp"`
LastConfirmedEntry string `json:"lastConfirmedEntry"`
State string `json:"state"`
Ledgers []LedgerInfo `json:"ledgers"`
Cursors map[string]CursorStats `json:"cursors"`
}
type Policies ¶
type Policies struct {
Bundles *BundlesData `json:"bundles"`
Persistence *PersistencePolicies `json:"persistence"`
RetentionPolicies *RetentionPolicies `json:"retention_policies"`
SchemaValidationEnforced bool `json:"schema_validation_enforced"`
DeduplicationEnabled bool `json:"deduplicationEnabled"`
Deleted bool `json:"deleted"`
EncryptionRequired bool `json:"encryption_required"`
MessageTTLInSeconds int `json:"message_ttl_in_seconds"`
MaxProducersPerTopic int `json:"max_producers_per_topic"`
MaxConsumersPerTopic int `json:"max_consumers_per_topic"`
MaxConsumersPerSubscription int `json:"max_consumers_per_subscription"`
CompactionThreshold int64 `json:"compaction_threshold"`
OffloadThreshold int64 `json:"offload_threshold"`
OffloadDeletionLagMs int64 `json:"offload_deletion_lag_ms"`
AntiAffinityGroup string `json:"antiAffinityGroup"`
ReplicationClusters []string `json:"replication_clusters"`
LatencyStatsSampleRate map[string]int `json:"latency_stats_sample_rate"`
BacklogQuotaMap map[BacklogQuotaType]BacklogQuota `json:"backlog_quota_map"`
TopicDispatchRate map[string]DispatchRate `json:"topicDispatchRate"`
SubscriptionDispatchRate map[string]DispatchRate `json:"subscriptionDispatchRate"`
ReplicatorDispatchRate map[string]DispatchRate `json:"replicatorDispatchRate"`
ClusterSubscribeRate map[string]SubscribeRate `json:"clusterSubscribeRate"`
SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schema_auto_update_compatibility_strategy"`
AuthPolicies AuthPolicies `json:"auth_policies"`
SubscriptionAuthMode SubscriptionAuthMode `json:"subscription_auth_mode"`
}
func NewDefaultPolicies ¶
func NewDefaultPolicies() *Policies
type PoolArenaStats ¶
type PoolArenaStats struct {
NumTinySubpages int `json:"numTinySubpages"`
NumSmallSubpages int `json:"numSmallSubpages"`
NumChunkLists int `json:"numChunkLists"`
TinySubpages []PoolSubpageStats `json:"tinySubpages"`
SmallSubpages []PoolSubpageStats `json:"smallSubpages"`
ChunkLists []PoolChunkListStats `json:"chunkLists"`
NumAllocations int64 `json:"numAllocations"`
NumTinyAllocations int64 `json:"numTinyAllocations"`
NumSmallAllocations int64 `json:"numSmallAllocations"`
NumNormalAllocations int64 `json:"numNormalAllocations"`
NumHugeAllocations int64 `json:"numHugeAllocations"`
NumDeallocations int64 `json:"numDeallocations"`
NumTinyDeallocations int64 `json:"numTinyDeallocations"`
NumSmallDeallocations int64 `json:"numSmallDeallocations"`
NumNormalDeallocations int64 `json:"numNormalDeallocations"`
NumHugeDeallocations int64 `json:"numHugeDeallocations"`
NumActiveAllocations int64 `json:"numActiveAllocations"`
NumActiveTinyAllocations int64 `json:"numActiveTinyAllocations"`
NumActiveSmallAllocations int64 `json:"numActiveSmallAllocations"`
NumActiveNormalAllocations int64 `json:"numActiveNormalAllocations"`
NumActiveHugeAllocations int64 `json:"numActiveHugeAllocations"`
}
type PoolChunkListStats ¶
type PoolChunkListStats struct {
MinUsage int `json:"minUsage"`
MaxUsage int `json:"maxUsage"`
Chunks []PoolChunkStats `json:"chunks"`
}
type PoolChunkStats ¶
type PoolSubpageStats ¶
type PositionInfo ¶
type PostSchemaPayload ¶
type PostSchemaPayload struct {
SchemaType string `json:"type"`
Schema string `json:"schema"`
Properties map[string]string `json:"properties"`
}
Payload with information about a schema
type PublisherStats ¶
type ReplicatorStats ¶
type ReplicatorStats struct {
Connected bool `json:"connected"`
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateExpired float64 `json:"msgRateExpired"`
ReplicationBacklog int64 `json:"replicationBacklog"`
ReplicationDelayInSeconds int64 `json:"replicationDelayInSeconds"`
InboundConnection string `json:"inboundConnection"`
InboundConnectedSince string `json:"inboundConnectedSince"`
OutboundConnection string `json:"outboundConnection"`
OutboundConnectedSince string `json:"outboundConnectedSince"`
}
type ResourceQuota ¶
type ResourceQuota struct {
// messages published per second
MsgRateIn float64 `json:"msgRateIn"`
// messages consumed per second
MsgRateOut float64 `json:"msgRateOut"`
// incoming bytes per second
BandwidthIn float64 `json:"bandwidthIn"`
// outgoing bytes per second
BandwidthOut float64 `json:"bandwidthOut"`
// used memory in Mbytes
Memory float64 `json:"memory"`
// allow the quota be dynamically re-calculated according to real traffic
Dynamic bool `json:"dynamic"`
}
func NewResourceQuota ¶
func NewResourceQuota() *ResourceQuota
type ResourceQuotaData ¶
type ResourceQuotaData struct {
Names string `json:"names"`
Bundle string `json:"bundle"`
MsgRateIn int64 `json:"msgRateIn"`
MsgRateOut int64 `json:"msgRateOut"`
BandwidthIn int64 `json:"bandwidthIn"`
BandwidthOut int64 `json:"bandwidthOut"`
Memory int64 `json:"memory"`
Dynamic bool `json:"dynamic"`
}
type ResourceQuotas ¶
type ResourceQuotas interface {
// Get default resource quota for new resource bundles.
GetDefaultResourceQuota() (*ResourceQuota, error)
// Set default resource quota for new namespace bundles.
SetDefaultResourceQuota(quota ResourceQuota) error
// Get resource quota of a namespace bundle.
GetNamespaceBundleResourceQuota(namespace, bundle string) (*ResourceQuota, error)
// Set resource quota for a namespace bundle.
SetNamespaceBundleResourceQuota(namespace, bundle string, quota ResourceQuota) error
// Reset resource quota for a namespace bundle to default value.
ResetNamespaceBundleResourceQuota(namespace, bundle string) error
}
type ResourceUsage ¶
func (*ResourceUsage) CompareTo ¶
func (ru *ResourceUsage) CompareTo(o *ResourceUsage) int
func (*ResourceUsage) PercentUsage ¶
func (ru *ResourceUsage) PercentUsage() float32
func (*ResourceUsage) Reset ¶
func (ru *ResourceUsage) Reset()
type Resources ¶
func NewDefaultResources ¶
func NewDefaultResources() *Resources
type RetentionPolicies ¶
type RetentionPolicies struct {
RetentionTimeInMinutes int `json:"retentionTimeInMinutes"`
RetentionSizeInMB int64 `json:"retentionSizeInMB"`
}
func NewRetentionPolicies ¶
func NewRetentionPolicies(retentionTimeInMinutes int, retentionSizeInMB int) RetentionPolicies
type RetentionPolicy ¶
type RetentionPolicy string
const ( ProducerRequestHold RetentionPolicy = "producer_request_hold" ProducerException RetentionPolicy = "producer_exception" ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction" )
type Schema ¶
type Schema interface {
// GetSchemaInfo retrieves the latest schema of a topic
GetSchemaInfo(topic string) (*SchemaInfo, error)
// GetSchemaInfoWithVersion retrieves the latest schema with version of a topic
GetSchemaInfoWithVersion(topic string) (*SchemaInfoWithVersion, error)
// GetSchemaInfoByVersion retrieves the schema of a topic at a given <tt>version</tt>
GetSchemaInfoByVersion(topic string, version int64) (*SchemaInfo, error)
// DeleteSchema deletes the schema associated with a given <tt>topic</tt>
DeleteSchema(topic string) error
// CreateSchemaByPayload creates a schema for a given <tt>topic</tt>
CreateSchemaByPayload(topic string, schemaPayload PostSchemaPayload) error
}
Schema is admin interface for schema management
type SchemaCompatibilityStrategy ¶
type SchemaCompatibilityStrategy string
const ( AutoUpdateDisabled SchemaCompatibilityStrategy = "AutoUpdateDisabled" Backward SchemaCompatibilityStrategy = "Backward" Forward SchemaCompatibilityStrategy = "Forward" Full SchemaCompatibilityStrategy = "Full" AlwaysCompatible SchemaCompatibilityStrategy = "AlwaysCompatible" BackwardTransitive SchemaCompatibilityStrategy = "BackwardTransitive" ForwardTransitive SchemaCompatibilityStrategy = "ForwardTransitive" FullTransitive SchemaCompatibilityStrategy = "FullTransitive" )
func ParseSchemaAutoUpdateCompatibilityStrategy ¶
func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaCompatibilityStrategy, error)
func (SchemaCompatibilityStrategy) String ¶
func (s SchemaCompatibilityStrategy) String() string
type SchemaData ¶
type SchemaInfo ¶
type SchemaInfoWithVersion ¶
type SchemaInfoWithVersion struct {
Version int64 `json:"version"`
SchemaInfo *SchemaInfo `json:"schemaInfo"`
}
type SingleMessageMetadata ¶
type SingleMessageMetadata struct {
Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
PayloadSize *int32 `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"`
CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
// the timestamp that this event occurs. it is typically set by applications.
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
PartitionKeyB64Encoded *bool `` /* 131-byte string literal not displayed */
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
OrderingKey []byte `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
nolint
func (*SingleMessageMetadata) GetPayloadSize ¶
func (m *SingleMessageMetadata) GetPayloadSize() int32
func (*SingleMessageMetadata) ProtoMessage ¶
func (*SingleMessageMetadata) ProtoMessage()
func (*SingleMessageMetadata) Reset ¶
func (m *SingleMessageMetadata) Reset()
func (*SingleMessageMetadata) String ¶
func (m *SingleMessageMetadata) String() string
type SinkConfig ¶
type SinkConfig struct {
TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"`
Resources *Resources `json:"resources" yaml:"resources"`
TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"`
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"`
AutoAck bool `json:"autoAck" yaml:"autoAck"`
Parallelism int `json:"parallelism" yaml:"parallelism"`
Tenant string `json:"tenant" yaml:"tenant"`
Namespace string `json:"namespace" yaml:"namespace"`
Name string `json:"name" yaml:"name"`
ClassName string `json:"className" yaml:"className"`
Archive string `json:"archive" yaml:"archive"`
ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"`
SourceSubscriptionName string `json:"sourceSubscriptionName" yaml:"sourceSubscriptionName"`
RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`
Inputs []string `json:"inputs" yaml:"inputs"`
TopicToSerdeClassName map[string]string `json:"topicToSerdeClassName" yaml:"topicToSerdeClassName"`
TopicToSchemaType map[string]string `json:"topicToSchemaType" yaml:"topicToSchemaType"`
InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"`
Configs map[string]interface{} `json:"configs" yaml:"configs"`
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider. The type of an value here can be found by the
// SecretProviderConfigurator.getSecretObjectType() method.
Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`
}
type SinkData ¶
type SinkData struct {
UpdateAuthData bool `json:"updateAuthData"`
RetainOrdering bool `json:"retainOrdering"`
AutoAck bool `json:"autoAck"`
Parallelism int `json:"parallelism"`
RAM int64 `json:"ram"`
Disk int64 `json:"disk"`
TimeoutMs int64 `json:"timeoutMs"`
CPU float64 `json:"cpu"`
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
Name string `json:"name"`
SinkType string `json:"sinkType"`
Inputs string `json:"inputs"`
TopicsPattern string `json:"topicsPattern"`
SubsName string `json:"subsName"`
CustomSerdeInputString string `json:"customSerdeInputString"`
CustomSchemaInputString string `json:"customSchemaInputString"`
ProcessingGuarantees string `json:"processingGuarantees"`
Archive string `json:"archive"`
ClassName string `json:"className"`
SinkConfigFile string `json:"sinkConfigFile"`
SinkConfigString string `json:"sinkConfigString"`
InstanceID string `json:"instanceId"`
SinkConf *SinkConfig `json:"-"`
}
type SinkInstanceStatus ¶
type SinkInstanceStatus struct {
InstanceID int `json:"instanceId"`
Status SourceInstanceStatusData `json:"status"`
}
type SinkInstanceStatusData ¶
type SinkInstanceStatusData struct {
// Is this instance running?
Running bool `json:"running"`
// Do we have any error while running this instance
Err string `json:"error"`
// Number of times this instance has restarted
NumRestarts int64 `json:"numRestarts"`
// Number of messages read from Pulsar
NumReadFromPulsar int64 `json:"numReadFromPulsar"`
// Number of times there was a system exception handling messages
NumSystemExceptions int64 `json:"numSystemExceptions"`
// A list of the most recent system exceptions
LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`
// Number of times there was a sink exception
NumSinkExceptions int64 `json:"numSinkExceptions"`
// A list of the most recent sink exceptions
LatestSinkExceptions []ExceptionInformation `json:"latestSinkExceptions"`
// Number of messages written to sink
NumWrittenToSink int64 `json:"numWrittenToSink"`
// When was the last time we received a Message from Pulsar
LastReceivedTime int64 `json:"lastReceivedTime"`
WorkerID string `json:"workerId"`
}
type SinkStatus ¶
type SinkStatus struct {
// The total number of sink instances that ought to be running
NumInstances int `json:"numInstances"`
// The number of source instances that are actually running
NumRunning int `json:"numRunning"`
Instances []*SinkInstanceStatus `json:"instances"`
}
type Sinks ¶
type Sinks interface {
// ListSinks returns the list of all the Pulsar Sinks.
ListSinks(tenant, namespace string) ([]string, error)
// GetSink returns the configuration for the specified sink
GetSink(tenant, namespace, Sink string) (SinkConfig, error)
// CreateSink creates a new sink
CreateSink(config *SinkConfig, fileName string) error
// CreateSinkWithURL creates a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
CreateSinkWithURL(config *SinkConfig, pkgURL string) error
// UpdateSink updates the configuration for a sink.
UpdateSink(config *SinkConfig, fileName string, options *UpdateOptions) error
// UpdateSinkWithURL updates a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
UpdateSinkWithURL(config *SinkConfig, pkgURL string, options *UpdateOptions) error
// DeleteSink deletes an existing sink
DeleteSink(tenant, namespace, Sink string) error
// GetSinkStatus returns the current status of a sink.
GetSinkStatus(tenant, namespace, Sink string) (SinkStatus, error)
// GetSinkStatusWithID returns the current status of a sink instance.
GetSinkStatusWithID(tenant, namespace, Sink string, id int) (SinkInstanceStatusData, error)
// RestartSink restarts all sink instances
RestartSink(tenant, namespace, Sink string) error
// RestartSinkWithID restarts sink instance
RestartSinkWithID(tenant, namespace, Sink string, id int) error
// StopSink stops all sink instances
StopSink(tenant, namespace, Sink string) error
// StopSinkWithID stops sink instance
StopSinkWithID(tenant, namespace, Sink string, id int) error
// StartSink starts all sink instances
StartSink(tenant, namespace, Sink string) error
// StartSinkWithID starts sink instance
StartSinkWithID(tenant, namespace, Sink string, id int) error
// GetBuiltInSinks fetches a list of supported Pulsar IO sinks currently running in cluster mode
GetBuiltInSinks() ([]*ConnectorDefinition, error)
// ReloadBuiltInSinks reload the available built-in connectors, include Source and Sink
ReloadBuiltInSinks() error
}
Sinks is admin interface for sinks management
type SourceConfig ¶
type SourceConfig struct {
Tenant string `json:"tenant" yaml:"tenant"`
Namespace string `json:"namespace" yaml:"namespace"`
Name string `json:"name" yaml:"name"`
ClassName string `json:"className" yaml:"className"`
TopicName string `json:"topicName" yaml:"topicName"`
SerdeClassName string `json:"serdeClassName" yaml:"serdeClassName"`
SchemaType string `json:"schemaType" yaml:"schemaType"`
Configs map[string]interface{} `json:"configs" yaml:"configs"`
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider. The type of an value here can be found by the
// SecretProviderConfigurator.getSecretObjectType() method.
Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`
Parallelism int `json:"parallelism" yaml:"parallelism"`
ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"`
Resources *Resources `json:"resources" yaml:"resources"`
Archive string `json:"archive" yaml:"archive"`
// Any flags that you want to pass to the runtime.
RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`
}
type SourceData ¶
type SourceData struct {
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
Name string `json:"name"`
SourceType string `json:"sourceType"`
ProcessingGuarantees string `json:"processingGuarantees"`
DestinationTopicName string `json:"destinationTopicName"`
DeserializationClassName string `json:"deserializationClassName"`
SchemaType string `json:"schemaType"`
Parallelism int `json:"parallelism"`
Archive string `json:"archive"`
ClassName string `json:"className"`
SourceConfigFile string `json:"sourceConfigFile"`
CPU float64 `json:"cpu"`
RAM int64 `json:"ram"`
Disk int64 `json:"disk"`
SourceConfigString string `json:"sourceConfigString"`
SourceConf *SourceConfig `json:"-"`
InstanceID string `json:"instanceId"`
UpdateAuthData bool `json:"updateAuthData"`
}
type SourceInstanceStatus ¶
type SourceInstanceStatus struct {
InstanceID int `json:"instanceId"`
Status SourceInstanceStatusData `json:"status"`
}
type SourceInstanceStatusData ¶
type SourceInstanceStatusData struct {
Running bool `json:"running"`
Err string `json:"error"`
NumRestarts int64 `json:"numRestarts"`
NumReceivedFromSource int64 `json:"numReceivedFromSource"`
NumSystemExceptions int64 `json:"numSystemExceptions"`
LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`
NumSourceExceptions int64 `json:"numSourceExceptions"`
LatestSourceExceptions []ExceptionInformation `json:"latestSourceExceptions"`
NumWritten int64 `json:"numWritten"`
LastReceivedTime int64 `json:"lastReceivedTime"`
WorkerID string `json:"workerId"`
}
type SourceStatus ¶
type SourceStatus struct {
NumInstances int `json:"numInstances"`
NumRunning int `json:"numRunning"`
Instances []*SourceInstanceStatus `json:"instances"`
}
type Sources ¶
type Sources interface {
// ListSources returns the list of all the Pulsar Sources.
ListSources(tenant, namespace string) ([]string, error)
// GetSource return the configuration for the specified source
GetSource(tenant, namespace, source string) (SourceConfig, error)
// CreateSource creates a new source
CreateSource(config *SourceConfig, fileName string) error
// CreateSourceWithURL creates a new source by providing url from which fun-pkg can be downloaded.
// supported url: http/file
CreateSourceWithURL(config *SourceConfig, pkgURL string) error
// UpdateSource updates the configuration for a source.
UpdateSource(config *SourceConfig, fileName string, options *UpdateOptions) error
// UpdateSourceWithURL updates a source by providing url from which fun-pkg can be downloaded. supported url: http/file
UpdateSourceWithURL(config *SourceConfig, pkgURL string, options *UpdateOptions) error
// DeleteSource deletes an existing source
DeleteSource(tenant, namespace, source string) error
// GetSourceStatus returns the current status of a source.
GetSourceStatus(tenant, namespace, source string) (SourceStatus, error)
// GetSourceStatusWithID returns the current status of a source instance.
GetSourceStatusWithID(tenant, namespace, source string, id int) (SourceInstanceStatusData, error)
// RestartSource restarts all source instances
RestartSource(tenant, namespace, source string) error
// RestartSourceWithID restarts source instance
RestartSourceWithID(tenant, namespace, source string, id int) error
// StopSource stops all source instances
StopSource(tenant, namespace, source string) error
// StopSourceWithID stops source instance
StopSourceWithID(tenant, namespace, source string, id int) error
// StartSource starts all source instances
StartSource(tenant, namespace, source string) error
// StartSourceWithID starts source instance
StartSourceWithID(tenant, namespace, source string, id int) error
// GetBuiltInSources fetches a list of supported Pulsar IO sources currently running in cluster mode
GetBuiltInSources() ([]*ConnectorDefinition, error)
// ReloadBuiltInSources reloads the available built-in connectors, include Source and Sink
ReloadBuiltInSources() error
}
Sources is admin interface for sources management
type SubscribeRate ¶
type SubscribeRate struct {
SubscribeThrottlingRatePerConsumer int `json:"subscribeThrottlingRatePerConsumer"`
RatePeriodInSecond int `json:"ratePeriodInSecond"`
}
func NewSubscribeRate ¶
func NewSubscribeRate() *SubscribeRate
type SubscriptionAuthMode ¶
type SubscriptionAuthMode string
const ( None SubscriptionAuthMode = "None" Prefix SubscriptionAuthMode = "Prefix" )
func ParseSubscriptionAuthMode ¶
func ParseSubscriptionAuthMode(s string) (SubscriptionAuthMode, error)
func (SubscriptionAuthMode) String ¶
func (s SubscriptionAuthMode) String() string
type SubscriptionStats ¶
type SubscriptionStats struct {
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
IsReplicated bool `json:"isReplicated"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
MsgDelayed int64 `json:"msgDelayed"`
UnAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
Consumers []ConsumerStats `json:"consumers"`
}
type Subscriptions ¶
type Subscriptions interface {
// Create a new subscription on a topic
Create(TopicName, string, MessageID) error
// Delete a subscription.
// Delete a persistent subscription from a topic. There should not be any active consumers on the subscription
Delete(TopicName, string) error
// List returns the list of subscriptions
List(TopicName) ([]string, error)
// ResetCursorToMessageID resets cursor position on a topic subscription
// @param
// messageID reset subscription to messageId (or previous nearest messageId if given messageId is not valid)
ResetCursorToMessageID(TopicName, string, MessageID) error
// ResetCursorToTimestamp resets cursor position on a topic subscription
// @param
// time reset subscription to position closest to time in ms since epoch
ResetCursorToTimestamp(TopicName, string, int64) error
// ClearBacklog skips all messages on a topic subscription
ClearBacklog(TopicName, string) error
// SkipMessages skips messages on a topic subscription
SkipMessages(TopicName, string, int64) error
// ExpireMessages expires all messages older than given N (expireTimeInSeconds) seconds for a given subscription
ExpireMessages(TopicName, string, int64) error
// ExpireAllMessages expires all messages older than given N (expireTimeInSeconds) seconds for all
// subscriptions of the persistent-topic
ExpireAllMessages(TopicName, int64) error
// PeekMessages peeks messages from a topic subscription
PeekMessages(TopicName, string, int) ([]*Message, error)
}
Subscriptions is admin interface for subscriptions management
type TLSOptions ¶
type TenantData ¶
type TenantData struct {
Name string `json:"-"`
AdminRoles []string `json:"adminRoles"`
AllowedClusters []string `json:"allowedClusters"`
}
Tenant args
type Tenants ¶
type Tenants interface {
// Create a new tenant
Create(TenantData) error
// Delete an existing tenant
Delete(string) error
// Update the admins for a tenant
Update(TenantData) error
//List returns the list of tenants
List() ([]string, error)
// Get returns the config of the tenant.
Get(string) (TenantData, error)
}
Tenants is admin interface for tenants management
type TopicDomain ¶
type TopicDomain string
func ParseTopicDomain ¶
func ParseTopicDomain(domain string) (TopicDomain, error)
func (TopicDomain) String ¶
func (t TopicDomain) String() string
type TopicName ¶
type TopicName struct {
// contains filtered or unexported fields
}
func GetTopicName ¶
The topic name can be in two different forms, one is fully qualified topic name, the other one is short topic name
func (*TopicName) GetDomain ¶
func (t *TopicName) GetDomain() TopicDomain
func (*TopicName) GetEncodedTopic ¶
func (*TopicName) GetLocalName ¶
func (*TopicName) GetRestPath ¶
func (*TopicName) IsPersistent ¶
type TopicStats ¶
type TopicStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
AverageMsgSize float64 `json:"averageMsgSize"`
StorageSize int64 `json:"storageSize"`
Publishers []PublisherStats `json:"publishers"`
Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
Replication map[string]ReplicatorStats `json:"replication"`
DeDuplicationStatus string `json:"deduplicationStatus"`
}
type TopicStatsStream ¶
type TopicStatsStream struct {
TopicsMap map[string]map[string]map[string]TopicStats `json:"topicStatsBuf"`
}
type Topics ¶
type Topics interface {
// Create a topic
Create(TopicName, int) error
// Delete a topic
Delete(TopicName, bool, bool) error
// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
Update(TopicName, int) error
// GetMetadata returns metadata of a partitioned topic
GetMetadata(TopicName) (PartitionedTopicMetadata, error)
// List returns the list of topics under a namespace
List(NameSpaceName) ([]string, []string, error)
// GetInternalInfo returns the internal metadata info for the topic
GetInternalInfo(TopicName) (ManagedLedgerInfo, error)
// GetPermissions returns permissions on a topic
// Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the
// namespace level combined (union) with any eventual specific permission set on the topic.
GetPermissions(TopicName) (map[string][]AuthAction, error)
// GrantPermission grants a new permission to a client role on a single topic
GrantPermission(TopicName, string, []AuthAction) error
// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
RevokePermission(TopicName, string) error
// Lookup a topic returns the broker URL that serves the topic
Lookup(TopicName) (LookupData, error)
// GetBundleRange returns a bundle range of a topic
GetBundleRange(TopicName) (string, error)
// GetLastMessageID returns the last commit message Id of a topic
GetLastMessageID(TopicName) (MessageID, error)
// GetStats returns the stats for the topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetStats(TopicName) (TopicStats, error)
// GetInternalStats returns the internal stats for the topic.
GetInternalStats(TopicName) (PersistentTopicInternalStats, error)
// GetPartitionedStats returns the stats for the partitioned topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetPartitionedStats(TopicName, bool) (PartitionedTopicStats, error)
// Terminate the topic and prevent any more messages being published on it
Terminate(TopicName) (MessageID, error)
// Offload triggers offloading messages in topic to longterm storage
Offload(TopicName, MessageID) error
// OffloadStatus checks the status of an ongoing offloading operation for a topic
OffloadStatus(TopicName) (OffloadProcessStatus, error)
// Unload a topic
Unload(TopicName) error
// Compact triggers compaction to run for a topic. A single topic can only have one instance of compaction
// running at any time. Any attempt to trigger another will be met with a ConflictException.
Compact(TopicName) error
// CompactStatus checks the status of an ongoing compaction for a topic
CompactStatus(TopicName) (LongRunningProcessStatus, error)
}
Topics is admin interface for topics management
type UpdateOptions ¶
type UpdateOptions struct {
UpdateAuthData bool
}
Options while updating the sink
func NewUpdateOptions ¶
func NewUpdateOptions() *UpdateOptions
type WindowConfig ¶
type WindowConfig struct {
WindowLengthCount int
WindowLengthDurationMs int64
SlidingIntervalCount int
SlidingIntervalDurationMs int64
LateDataTopic string
MaxLagMs int64
WatermarkEmitIntervalMs int64
TimestampExtractorClassName string
ActualWindowFunctionClassName string
}
func NewDefaultWindowConfing ¶
func NewDefaultWindowConfing() *WindowConfig
type WorkerFunctionInstanceStats ¶
type WorkerFunctionInstanceStats struct {
Name string `json:"name"`
Metrics FunctionInstanceStatsData `json:"metrics"`
}
type WorkerInfo ¶
Source Files
¶
- admin.go
- allocator_stats.go
- api_version.go
- auth_action.go
- auth_polices.go
- backlog_quota.go
- broker_ns_isolation_data.go
- broker_stats.go
- brokers.go
- bundles_data.go
- cluster.go
- connector_definition.go
- consumer_config.go
- data.go
- descriptions.go
- dispatch_rate.go
- errors.go
- function_confg.go
- function_state.go
- function_status.go
- functions.go
- functions_stats.go
- functions_worker.go
- internal_configuration_data.go
- load_manage_report.go
- long_running_process_status.go
- message.go
- message_id.go
- metrics.go
- namespace.go
- namespace_name.go
- ns_isolation_data.go
- ns_isolation_policy.go
- ns_ownership_status.go
- persistence_policies.go
- policies.go
- resource_quota.go
- resource_quotas.go
- resources.go
- retention_policies.go
- schema.go
- schema_strategy.go
- schema_util.go
- sinkConfig.go
- sink_status.go
- sinks.go
- source_config.go
- source_status.go
- sources.go
- subscription.go
- subscription_auth_mode.go
- tenant.go
- topic.go
- topic_domain.go
- topic_name.go
- topics_stats_stream.go
- update_options.go
- utils.go
- window_confing.go
- worker_info.go