product

package
v2.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 9 Imported by: 5

Documentation

Index

Examples

Constants

View Source
const (
	SnapshotAPI          = "$GVT.%s.API.SNAPSHOT"
	SnapshotEventSubject = "$GVT.%s.SS.%s.>"
)
View Source
const (
	ProductAPI = "$GVT.%s.API.PRODUCT"
)
View Source
const (
	ProductEventStream = "GVT_%s_DP_%s"
)

Variables

View Source
var (
	ErrProductNotFound      = errors.New("product not found")
	ErrProductExistsAlready = errors.New("product exists already")
	ErrInvalidProductName   = errors.New("invalid product name")
)
View Source
var (
	ErrSnapshotNotReady = errors.New("snapshot view is not ready")
)

Functions

func NotFoundSnapshotViewErr

func NotFoundSnapshotViewErr() *core.Error

Types

type CreateProductReply

type CreateProductReply struct {
	core.ErrorReply
	Setting *ProductSetting `json:"setting"`
}

type CreateProductRequest

type CreateProductRequest struct {
	Setting *ProductSetting `json:"setting"`
}

type CreateSnapshotViewReply

type CreateSnapshotViewReply struct {
	core.ErrorReply

	ID         string    `json:"id"`
	Subscriber string    `json:"subscriber"`
	Product    string    `json:"product"`
	Stream     string    `json:"stream"`
	CreatedAt  time.Time `json:"createAt"`
}

type CreateSnapshotViewRequest

type CreateSnapshotViewRequest struct {
	Subscriber string  `json:"subscriber"`
	Product    string  `json:"product"`
	Partitions []int32 `json:"partitions"`
}

type DeleteProductReply

type DeleteProductReply struct {
	core.ErrorReply
}

type DeleteProductRequest

type DeleteProductRequest struct {
	Name string `json:"name"`
}

type DeleteSnapshotViewReply

type DeleteSnapshotViewReply struct {
	core.ErrorReply

	ID string `json:"id"`
}

type DeleteSnapshotViewRequest

type DeleteSnapshotViewRequest struct {
	ID string `json:"id"`
}

type DeleteSubscriptionReply

type DeleteSubscriptionReply struct {
	core.ErrorReply
}

type DeleteSubscriptionRequest

type DeleteSubscriptionRequest struct {
	Product      string `json:"product"`
	Subscription string `json:"subscription"`
}

type GetSubscriptionReply

type GetSubscriptionReply struct {
	core.ErrorReply
	Setting *subscription.SubscriptionSetting `json:"setting"`
}

type GetSubscriptionRequest

type GetSubscriptionRequest struct {
	Product      string `json:"product"`
	Subscription string `json:"subscription"`
}

type HandlerConfig

type HandlerConfig struct {
	Type   string `json:"type"`
	Script string `json:"script"`
}

type InfoProductReply

type InfoProductReply struct {
	core.ErrorReply
	Setting *ProductSetting `json:"setting"`
	State   *ProductState   `json:"state"`
}

type InfoProductRequest

type InfoProductRequest struct {
	Name string `json:"name"`
}

type ListProductsReply

type ListProductsReply struct {
	core.ErrorReply

	Products []*ProductInfo `json:"products"`
}

type ListProductsRequest

type ListProductsRequest struct {
}

type Options

type Options struct {
	Domain  string // Domain specifies the target domain or environment the options apply to.
	Verbose bool   // Verbose indicates whether verbose or detailed logging should be enabled.
}

func NewOptions

func NewOptions() *Options

type PrepareSubscriptionReply

type PrepareSubscriptionReply struct {
	core.ErrorReply
}

type PrepareSubscriptionRequest

type PrepareSubscriptionRequest struct {
	Product   string                          `json:"product"`
	Consumers []*subscription.ConsumerSetting `json:"consumers"`
}

type ProductClient

type ProductClient struct {
	// contains filtered or unexported fields
}

func NewProductClient

func NewProductClient(client *core.Client, options *Options) *ProductClient

func (*ProductClient) CreateProduct

func (pc *ProductClient) CreateProduct(productSetting *ProductSetting) (*ProductSetting, error)

CreateProduct creates a new product with the specified settings. It returns a pointer to the newly created ProductSetting or an error if the creation fails.

Example

ExampleProductClient_CreateProduct demonstrates the creation of a new data product using the Gravity SDK. This example first establishes a connection to the Gravity service using the client and options. It then initializes a product client with a specified domain. After setting up the client, the example creates a new data product with specified settings, such as name and description. Finally, it prints the name of the newly created product. Note: Replace '0.0.0.0:32803' with your actual Gravity service address.

client := core.NewClient()

// Connect to Gravity
options := core.NewOptions()
err := client.Connect("0.0.0.0:32803", options)
if err != nil {
	panic(err)
}

// Initializing data product client
pcOpts := NewOptions()
pcOpts.Domain = "default"
productClient := NewProductClient(client, pcOpts)

// Create a new data product
ps, err := productClient.CreateProduct(&ProductSetting{
	Name:        "sdk_example",
	Description: "SDK Example",
	Enabled:     false,
	Stream:      "",
})
if err != nil {
	panic(err)
}

fmt.Println(ps.Name)
Output:

sdk_example

func (*ProductClient) CreateSnapshot

func (pc *ProductClient) CreateSnapshot(productName string, opts ...SnapshotOpt) (*Snapshot, error)

CreateSnapshot creates a snapshot of a product identified by productName. Additional options can be passed using SnapshotOpt variadic parameters. It returns a pointer to the created Snapshot or an error if the creation fails.

func (*ProductClient) DeleteProduct

func (pc *ProductClient) DeleteProduct(name string) error

DeleteProduct removes the product identified by the given name. It returns an error if the deletion fails.

Example

ExampleProductClient_DeleteProduct demonstrates how to delete a data product using the Gravity SDK. This example first establishes a connection to the Gravity service using the client and options. It then initializes a product client for the specified domain. The example proceeds to delete a data product named "sdk_example". Finally, it prints a confirmation message upon successful deletion. Note: Replace '0.0.0.0:32803' with your actual Gravity service address. Ensure that the product "sdk_example" exists before running this example.

client := core.NewClient()

// Connect to Gravity
options := core.NewOptions()
err := client.Connect("0.0.0.0:32803", options)
if err != nil {
	panic(err)
}

// Initializing data product client
pcOpts := NewOptions()
pcOpts.Domain = "default"
productClient := NewProductClient(client, pcOpts)

err = productClient.DeleteProduct("sdk_example")
if err != nil {
	panic(err)
}

fmt.Println("Deleted")
Output:

Deleted

func (*ProductClient) GetProduct

func (pc *ProductClient) GetProduct(name string) (*ProductInfo, error)

GetProduct retrieves the information of a product identified by the given name. It returns a pointer to ProductInfo or an error if the retrieval fails.

Example

ExampleProductClient_GetProduct demonstrates how to retrieve information about a specific data product using the Gravity SDK. This example begins by establishing a connection to the Gravity service using a client and options. A product client is then initialized with a domain specification. The example proceeds to retrieve information for a data product named "sdk_example". Upon successful retrieval, it prints the name of the product. Note: Replace '0.0.0.0:32803' with your actual Gravity service address. Ensure that the product "sdk_example" exists before running this example.

client := core.NewClient()

// Connect to Gravity
options := core.NewOptions()
err := client.Connect("0.0.0.0:32803", options)
if err != nil {
	panic(err)
}

// Initializing data product client
pcOpts := NewOptions()
pcOpts.Domain = "default"
productClient := NewProductClient(client, pcOpts)

// Get product information
info, err := productClient.GetProduct("sdk_example")
if err != nil {
	panic(err)
}

fmt.Println(info.Setting.Name)
Output:

sdk_example

func (*ProductClient) ListProducts

func (pc *ProductClient) ListProducts() ([]*ProductInfo, error)

ListProducts returns a list of all available products. It returns a slice of ProductInfo pointers or an error if the listing fails.

Example

ExampleProductClient_ListProducts demonstrates how to list all products using the Gravity SDK. This example initializes a Gravity SDK client, creates a product client, and then lists all products. It finally prints the name and description of each product.

client := core.NewClient()

// Connect to Gravity
options := core.NewOptions()
err := client.Connect("0.0.0.0:32803", options)
if err != nil {
	panic(err)
}

// Initializing data product client
pcOpts := NewOptions()
pcOpts.Domain = "default"
productClient := NewProductClient(client, pcOpts)

// Get product list
products, err := productClient.ListProducts()
if err != nil {
	panic(err)
}

for _, p := range products {

	if p.Setting.Name != "sdk_example" {
		continue
	}

	// Data product information
	fmt.Println("Name:", p.Setting.Name)
	fmt.Println("Is enabled:", p.Setting.Enabled)
}
Output:

Name: sdk_example
Is enabled: false

func (*ProductClient) PurgeProduct

func (pc *ProductClient) PurgeProduct(name string) error

PurgeProduct purge a data product without deleting it. It returns an error if the purge operation fails.

func (*ProductClient) UpdateProduct

func (pc *ProductClient) UpdateProduct(name string, productSetting *ProductSetting) (*ProductSetting, error)

UpdateProduct updates the settings of an existing product identified by name. It returns the updated ProductSetting or an error if the update fails.

type ProductInfo

type ProductInfo struct {
	Setting *ProductSetting `json:"setting"` // The configuration settings of the product.
	State   *ProductState   `json:"state"`   // The current operational state of the product.
}

ProductInfo encapsulates both the settings and the current state of a data product.

type ProductSetting

type ProductSetting struct {
	Name            string                 `json:"name"`            // A unique identifier for the product.
	Description     string                 `json:"desc"`            // A brief description of the product.
	Enabled         bool                   `json:"enabled"`         // Flag indicating whether the product is active or not.
	Rules           map[string]*Rule       `json:"rules"`           // A map of event handling rules associated with the product.
	Schema          map[string]interface{} `json:"schema"`          // The data schema defining the structure of the product's data.
	EnabledSnapshot bool                   `json:"enabledSnapshot"` // Flag indicating whether snapshots are enabled for the product.
	Snapshot        *SnapshotSetting       `json:"snapshot"`        // Configuration settings for product snapshots.
	Stream          string                 `json:"stream"`          // The name of the data stream associated with the product.
	CreatedAt       time.Time              `json:"createdAt"`       // Timestamp indicating when the product was created.
	UpdatedAt       time.Time              `json:"updatedAt"`       // Timestamp indicating the last update to the product.
}

ProductSetting defines the structure of a Data Product setting. It includes information such as name, description, Schema, and event handling rules.

type ProductState

type ProductState struct {
	EventCount uint64    `json:"eventCount"` // The total number of events processed by the product.
	Bytes      uint64    `json:"bytes"`      // The total number of bytes processed by the product.
	FirstTime  time.Time `json:"firstTime"`  // Timestamp of the first event processed by the product.
	LastTime   time.Time `json:"lastTime"`   // Timestamp of the most recent event processed by the product.
}

ProductState represents the current state of a data product.

type PullSnapshotViewReply

type PullSnapshotViewReply struct {
	core.ErrorReply

	ID      string `json:"id"`
	LastKey string `json:"lastKey"`
	Count   int    `json:"count"`
}

type PullSnapshotViewRequest

type PullSnapshotViewRequest struct {
	ID           string `json:"id"`
	Partition    int32  `json:"partition"`
	LastKey      string `json:"lastKey"`
	AfterLastKey bool   `json:"afterLastKey"`
}

type PurgeProductReply

type PurgeProductReply struct {
	core.ErrorReply
}

type PurgeProductRequest

type PurgeProductRequest struct {
	Name string `json:"name"`
}

type Rule

type Rule struct {
	ID            string                 `json:"id"`                // A unique identifier for the rule.
	Name          string                 `json:"name"`              // A human-readable name for the rule.
	Description   string                 `json:"desc"`              // A brief description of what the rule does.
	Event         string                 `json:"event"`             // The type of event that triggers this rule.
	Product       string                 `json:"product"`           // The name of the product associated with this rule.
	Method        string                 `json:"method"`            // The method or action to be performed when the rule is triggered.
	PrimaryKey    []string               `json:"primaryKey"`        // A list of fields used as the primary key in the rule processing.
	SchemaConfig  map[string]interface{} `json:"schema,omitempty"`  // Optional configuration for the data schema related to the rule.
	HandlerConfig *HandlerConfig         `json:"handler,omitempty"` // Optional configuration for the handler responsible for executing the rule.
	Enabled       bool                   `json:"enabled"`           // Flag indicating whether the rule is currently active or not.
	CreatedAt     time.Time              `json:"createdAt"`         // Timestamp indicating when the rule was created.
	UpdatedAt     time.Time              `json:"updatedAt"`         // Timestamp indicating the last update to the rule.
}

Rule defines the structure for a rule associated with a data product. It specifies how events are processed and managed within the system.

func NewRule

func NewRule() *Rule

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot

func NewSnapshot(pc *ProductClient, productName string, opts ...SnapshotOpt) (*Snapshot, error)

func (*Snapshot) Close

func (s *Snapshot) Close() error

func (*Snapshot) Fetch

func (s *Snapshot) Fetch() (chan *nats.Msg, error)

type SnapshotOpt

type SnapshotOpt func(*Snapshot)

func WithSnapshotPartitions

func WithSnapshotPartitions(partitions []int32) SnapshotOpt

func WithSnapshotPendingLimits

func WithSnapshotPendingLimits(limit int) SnapshotOpt

type SnapshotSetting

type SnapshotSetting struct {
}

type UpdateProductReply

type UpdateProductReply struct {
	core.ErrorReply
	Setting *ProductSetting `json:"setting"`
}

type UpdateProductRequest

type UpdateProductRequest struct {
	Name    string          `json:"name"`
	Setting *ProductSetting `json:"setting"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL