api

package
v1.4.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: GPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	FetchSpec = func(_ string) (string, error) {
		return "", g.Error("please use the official sling-cli release for fetching API specs")
	}
)

Functions

This section is empty.

Types

type APIConnection

type APIConnection struct {
	Spec    Spec
	State   *APIState
	Context *g.Context
	// contains filtered or unexported fields
}

func NewAPIConnection

func NewAPIConnection(ctx context.Context, spec Spec, data map[string]any) (ac *APIConnection, err error)

NewAPIConnection creates an

func (*APIConnection) Authenticate

func (ac *APIConnection) Authenticate() (err error)

Authenticate performs the auth workflow if needed. Like a Connect step. Header based auths (such as Basic, or Bearer) don't need this step save payload in APIState.Auth

func (*APIConnection) Close

func (ac *APIConnection) Close() error

Close performs cleanup of all resources

func (*APIConnection) CloseAllQueues

func (ac *APIConnection) CloseAllQueues()

CloseAllQueues closes all queues associated with this connection

func (*APIConnection) EnsureAuthenticated added in v1.4.14

func (ac *APIConnection) EnsureAuthenticated() error

EnsureAuthenticated checks if authentication is valid and re-authenticates if needed This method ensures thread-safe authentication checks and re-authentication

func (*APIConnection) GetQueue

func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)

GetQueue retrieves a queue by name

func (*APIConnection) GetReplicationStore added in v1.4.20

func (ac *APIConnection) GetReplicationStore() (store map[string]any)

func (*APIConnection) GetSyncedState

func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]any, err error)

GetSyncedState cycles through each endpoint, and collects the values for each of the Endpoint.Sync values. Output is a map[Sync.value] = Endpoint.syncMap[Sync.value]

func (*APIConnection) IsAuthExpired added in v1.4.14

func (ac *APIConnection) IsAuthExpired() bool

IsAuthExpired checks if the authentication has expired

func (*APIConnection) ListEndpoints

func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)

func (*APIConnection) MakeDynamicEndpointIterator

func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)

func (*APIConnection) PutSyncedState

func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]any) (err error)

PutSyncedState restores the state from previous run in each endpoint using the Endpoint.Sync values. Inputs is map[Sync.value] = Endpoint.syncMap[Sync.value]

func (*APIConnection) ReadDataflow

func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)

func (*APIConnection) RegisterQueue

func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)

RegisterQueue creates a new queue with the given name If a queue with the same name already exists, it is returned

func (*APIConnection) RemoveQueue

func (ac *APIConnection) RemoveQueue(name string) error

RemoveQueue closes and removes a queue

func (*APIConnection) RenderDynamicEndpoints added in v1.4.14

func (ac *APIConnection) RenderDynamicEndpoints() (err error)

RenderDynamicEndpoints will render the dynamic objects basically mutating the spec endpoints. Needs to authenticate first

func (*APIConnection) SetReplicationStore added in v1.4.20

func (ac *APIConnection) SetReplicationStore(store map[string]any)

type APIState

type APIState struct {
	Env     map[string]string     `json:"env,omitempty"`
	State   map[string]any        `json:"state,omitempty"`
	Secrets map[string]any        `json:"secrets,omitempty"`
	Queues  map[string]*iop.Queue `json:"queues,omitempty"` // appends to file
	Auth    APIStateAuth          `json:"auth,omitempty"`
}

type APIStateAuth

type APIStateAuth struct {
	Authenticated bool              `json:"authenticated,omitempty"`
	Token         string            `json:"token,omitempty"`      // refresh token?
	Headers       map[string]string `json:"-"`                    // to inject
	ExpiresAt     int64             `json:"expires_at,omitempty"` // Unix timestamp when auth expires

	Sign  func(context.Context, *http.Request, []byte) error `json:"-"`          // for AWS Sigv4
	Mutex *sync.Mutex                                        `json:"-" yaml:"-"` // Mutex for auth operations
}

type APIStreamConfig

type APIStreamConfig struct {
	Flatten     int // levels of flattening. 0 is infinite
	JmesPath    string
	Select      []string // select specific columns
	Limit       int
	Metadata    iop.Metadata
	Mode        string
	DsConfigMap map[string]any // stream processor options
}

type AggregateState

type AggregateState struct {
	// contains filtered or unexported fields
}

AggregateState stores aggregated values during response processing

type AggregationType

type AggregationType string
const (
	AggregationTypeNone    AggregationType = ""        // No aggregation, apply transformation at record level
	AggregationTypeMaximum AggregationType = "maximum" // Keep the maximum value across records
	AggregationTypeMinimum AggregationType = "minimum" // Keep the minimum value across records
	AggregationTypeFlatten AggregationType = "flatten" // Collect all values into an array
	AggregationTypeFirst   AggregationType = "first"   // Keep only the first encountered value
	AggregationTypeLast    AggregationType = "last"    // Keep only the last encountered value
)

type AuthType

type AuthType string
const (
	AuthTypeNone     AuthType = ""
	AuthTypeSequence AuthType = "sequence"
	AuthTypeBasic    AuthType = "basic"
	AuthTypeOAuth2   AuthType = "oauth2"
	AuthTypeAWSSigV4 AuthType = "aws-sigv4"
)

type Authentication

type Authentication struct {
	Type AuthType `yaml:"type" json:"type"`

	// when set, re-auth after number of seconds
	Expires int `yaml:"expires" json:"expires,omitempty"`

	// custom authentication workflow
	Sequence Sequence `yaml:"sequence" json:"sequence,omitempty"`

	// Basic Auth
	Username string `yaml:"username,omitempty" json:"username,omitempty"`
	Password string `yaml:"password,omitempty" json:"password,omitempty"`

	// OAuth
	Flow              OAuthFlow `yaml:"flow,omitempty" json:"flow,omitempty"`
	AuthenticationURL string    `yaml:"authentication_url,omitempty" json:"authentication_url,omitempty"`
	ClientID          string    `yaml:"client_id,omitempty" json:"client_id,omitempty"`
	ClientSecret      string    `yaml:"client_secret,omitempty" json:"client_secret,omitempty"`
	Token             string    `yaml:"token,omitempty" json:"token,omitempty"`
	Scopes            []string  `yaml:"scopes,omitempty" json:"scopes,omitempty"`
	RedirectURI       string    `yaml:"redirect_uri,omitempty" json:"redirect_uri,omitempty"`
	RefreshToken      string    `yaml:"refresh_token,omitempty" json:"refresh_token,omitempty"`
	RefreshOnExpire   bool      `yaml:"refresh_on_expire,omitempty" json:"refresh_on_expire,omitempty"`

	// AWS
	AwsService         string `yaml:"aws_service,omitempty" json:"aws_service,omitempty"`
	AwsAccessKeyID     string `yaml:"aws_access_key_id,omitempty" json:"aws_access_key_id,omitempty"`
	AwsSecretAccessKey string `yaml:"aws_secret_access_key,omitempty" json:"aws_secret_access_key,omitempty"`
	AwsSessionToken    string `yaml:"aws_session_token,omitempty" json:"aws_session_token,omitempty"`
	AwsRegion          string `yaml:"aws_region,omitempty" json:"aws_region,omitempty"`
	AwsProfile         string `yaml:"aws_profile,omitempty" json:"aws_profile,omitempty"`
}

Authentication defines how to authenticate with the API

type BackoffType

type BackoffType string
const (
	BackoffTypeNone        BackoffType = ""            // No delay between retries
	BackoffTypeConstant    BackoffType = "constant"    // Fixed delay between retries
	BackoffTypeLinear      BackoffType = "linear"      // Delay increases linearly with each attempt
	BackoffTypeExponential BackoffType = "exponential" // Delay increases exponentially (common pattern)
	BackoffTypeJitter      BackoffType = "jitter"      // Exponential backoff with randomization to avoid thundering herd
)

type Call

type Call struct {
	If         string     `yaml:"if" json:"if"`
	Request    Request    `yaml:"request" json:"request"`
	Pagination Pagination `yaml:"pagination" json:"pagination"`
	Response   Response   `yaml:"response" json:"response"`
}

type DynamicEndpoint added in v1.4.14

type DynamicEndpoint struct {
	Setup    Sequence `yaml:"setup" json:"setup"`
	Iterate  string   `yaml:"iterate" json:"iterate"`
	Into     string   `yaml:"into" json:"into"`
	Endpoint Endpoint `yaml:"endpoint" json:"endpoint"`
}

type DynamicEndpoints added in v1.4.14

type DynamicEndpoints []DynamicEndpoint

type Endpoint

type Endpoint struct {
	Name        string     `yaml:"name" json:"name"`
	Description string     `yaml:"description" json:"description,omitempty"`
	Docs        string     `yaml:"docs" json:"docs,omitempty"`
	Disabled    bool       `yaml:"disabled" json:"disabled"`
	State       StateMap   `yaml:"state" json:"state"`
	Sync        []string   `yaml:"sync" json:"sync,omitempty"`
	Request     Request    `yaml:"request" json:"request"`
	Pagination  Pagination `yaml:"pagination" json:"pagination"`
	Response    Response   `yaml:"response" json:"response"`
	Iterate     Iterate    `yaml:"iterate" json:"iterate,omitempty"` // state expression to use to loop
	Setup       Sequence   `yaml:"setup" json:"setup,omitempty"`
	Teardown    Sequence   `yaml:"teardown" json:"teardown,omitempty"`
	DependsOn   []string   `yaml:"-" json:"-"` // upstream endpoints
	// contains filtered or unexported fields
}

Endpoint is the top-level configuration structure

func (*Endpoint) SetStateVal

func (ep *Endpoint) SetStateVal(key string, val any)

type EndpointMap

type EndpointMap map[string]Endpoint

Endpoints is a collection of API endpoints

type Endpoints

type Endpoints []Endpoint

func (Endpoints) DAG added in v1.4.23

func (eps Endpoints) DAG() [][]string

DAG returns groups of interdependent endpoints

func (Endpoints) HasUpstreams added in v1.4.23

func (eps Endpoints) HasUpstreams(endpointName string) (upstreams []string)

HasUpstreams returns all upstream endpoint names that the specified endpoint depends on. Returns empty slice if the endpoint doesn't exist or has no dependencies.

func (Endpoints) Names added in v1.4.23

func (eps Endpoints) Names() (names []string)

Names returns names in alphabetical order

func (Endpoints) Sort

func (eps Endpoints) Sort()

type HTTPMethod

type HTTPMethod string
const (
	MethodGet     HTTPMethod = "GET"
	MethodHead    HTTPMethod = "HEAD"
	MethodPost    HTTPMethod = "POST"
	MethodPut     HTTPMethod = "PUT"
	MethodPatch   HTTPMethod = "PATCH"
	MethodDelete  HTTPMethod = "DELETE"
	MethodConnect HTTPMethod = "CONNECT"
	MethodOptions HTTPMethod = "OPTIONS"
	MethodTrace   HTTPMethod = "TRACE"
)

type Iterate

type Iterate struct {
	Over        any    `yaml:"over" json:"iterate,omitempty"` // expression
	Into        string `yaml:"into" json:"into,omitempty"`    // state variable
	If          string `yaml:"id" json:"id,omitempty"`        // if we should iterate
	Concurrency int    `yaml:"concurrency" json:"concurrency,omitempty"`
	// contains filtered or unexported fields
}

Iterate is for configuring looping values for requests

type Iteration

type Iteration struct {
	// contains filtered or unexported fields
}

func (*Iteration) Debug added in v1.4.20

func (iter *Iteration) Debug(text string, args ...any)

func (*Iteration) DetermineStateRenderOrder added in v1.4.14

func (iter *Iteration) DetermineStateRenderOrder() (order []string, err error)

type OAuthFlow added in v1.4.14

type OAuthFlow string
const (
	OAuthFlowClientCredentials OAuthFlow = "client_credentials"
	OAuthFlowAuthorizationCode OAuthFlow = "authorization_code"
	OAuthFlowPassword          OAuthFlow = "password"
	OAuthFlowRefreshToken      OAuthFlow = "refresh_token"
)

type Pagination

type Pagination struct {
	NextState     map[string]any `yaml:"next_state" json:"next_state,omitempty"`
	StopCondition string         `yaml:"stop_condition" json:"stop_condition,omitempty"`
}

Pagination configures how to navigate through multiple pages of API results

type Processor

type Processor struct {
	Aggregation AggregationType `yaml:"aggregation" json:"aggregation"`
	Expression  string          `yaml:"expression" json:"expression"`
	Output      string          `yaml:"output" json:"output"`
}

Processor represents a way to process data without aggregation, represents a transformation applied at record level with aggregation to reduce/aggregate record data, and save into the state

type Records

type Records struct {
	JmesPath   string   `yaml:"jmespath" json:"jmespath,omitempty"` // for json or xml
	PrimaryKey []string `yaml:"primary_key" json:"primary_key,omitempty"`
	UpdateKey  string   `yaml:"update_key" json:"update_key,omitempty"`
	Limit      int      `yaml:"limit" json:"limit,omitempty"` // to limit the records, useful for testing

	DuplicateTolerance string `yaml:"duplicate_tolerance" json:"duplicate_tolerance,omitempty"`
}

Records configures how to extract and process data records from a response

type Request

type Request struct {
	URL         string         `yaml:"url" json:"url,omitempty"`
	Timeout     int            `yaml:"timeout" json:"timeout,omitempty"`
	Method      HTTPMethod     `yaml:"method" json:"method,omitempty"`
	Headers     map[string]any `yaml:"headers" json:"headers,omitempty"`
	Parameters  map[string]any `yaml:"parameters" json:"parameters,omitempty"`
	Payload     any            `yaml:"payload" json:"payload,omitempty"`
	Rate        float64        `yaml:"rate" json:"rate,omitempty"`               // maximum request per second
	Concurrency int            `yaml:"concurrency" json:"concurrency,omitempty"` // maximum concurrent requests
}

Request defines how to construct an HTTP request to the API

type RequestState

type RequestState struct {
	Method   string         `yaml:"method" json:"method"`
	URL      string         `yaml:"url" json:"url"`
	Headers  map[string]any `yaml:"headers" json:"headers"`
	Payload  any            `yaml:"payload" json:"payload"`
	Attempts int            `yaml:"attempts" json:"attempts"`
}

RequestState captures the state of the HTTP request for reference and debugging

type Response

type Response struct {
	Format     dbio.FileType `yaml:"format" json:"format,omitempty"` // force response format
	Records    Records       `yaml:"records" json:"records"`
	Processors []Processor   `yaml:"processors" json:"processors,omitempty"`
	Rules      []Rule        `yaml:"rules" json:"rules,omitempty"`
}

Response defines how to process the API response and extract records

type ResponseState

type ResponseState struct {
	Status  int            `yaml:"status" json:"status"`
	Headers map[string]any `yaml:"headers" json:"headers"`
	Text    string         `yaml:"text" json:"text"`
	JSON    any            `yaml:"json" json:"json"`
	Records []any          `yaml:"records" json:"records"`
}

ResponseState captures the state of the HTTP response for reference and debugging

type Rule

type Rule struct {
	Action      RuleType    `yaml:"action" json:"action"`
	Condition   string      `yaml:"condition" json:"condition"` // an expression
	MaxAttempts int         `yaml:"max_attempts" json:"max_attempts"`
	Backoff     BackoffType `yaml:"backoff" json:"backoff"`
	BackoffBase int         `yaml:"backoff_base" json:"backoff_base"` // base duration, number of seconds. default is 1
	Message     string      `yaml:"message" json:"message"`
}

Rule represents a response rule

type RuleType

type RuleType string
const (
	RuleTypeRetry    RuleType = "retry"    // Retry the request up to MaxAttempts times
	RuleTypeContinue RuleType = "continue" // Continue processing responses and rules
	RuleTypeStop     RuleType = "stop"     // Stop processing requests for this endpoint
	RuleTypeFail     RuleType = "fail"     // Stop processing and return an error
)

type Sequence added in v1.4.14

type Sequence []Call

Sequence is many calls (perfect for async jobs, custom auth)

type SingleRequest

type SingleRequest struct {
	Request  *RequestState  `yaml:"request" json:"request"`
	Response *ResponseState `yaml:"response" json:"response"`
	// contains filtered or unexported fields
}

SingleRequest represents a single HTTP request/response cycle

func NewSingleRequest

func NewSingleRequest(iter *Iteration) *SingleRequest

func (*SingleRequest) Debug

func (lrs *SingleRequest) Debug(text string, args ...any)

func (*SingleRequest) Map

func (lrs *SingleRequest) Map() map[string]any

func (*SingleRequest) Records

func (lrs *SingleRequest) Records() []any

func (*SingleRequest) Trace added in v1.4.6

func (lrs *SingleRequest) Trace(text string, args ...any)

type Spec

type Spec struct {
	Name             string           `yaml:"name" json:"name"`
	Description      string           `yaml:"description" json:"description"`
	Queues           []string         `yaml:"queues" json:"queues"`
	Defaults         Endpoint         `yaml:"defaults" json:"defaults"`
	Authentication   Authentication   `yaml:"authentication" json:"authentication"`
	EndpointMap      EndpointMap      `yaml:"endpoints" json:"endpoints"`
	DynamicEndpoints DynamicEndpoints `yaml:"dynamic_endpoints" json:"dynamic_endpoints"`
	// contains filtered or unexported fields
}

Spec defines the complete API specification with endpoints and authentication

func LoadSpec

func LoadSpec(specBody string) (spec Spec, err error)

func (*Spec) IsDynamic

func (s *Spec) IsDynamic() bool

type StateMap

type StateMap map[string]any

StateMap stores the current state of an endpoint's execution

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL