lib

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateCatalogJSON added in v0.3.0

func CreateCatalogJSON()

Create <STREAM>_catalog.json

func CreateStateJSON

func CreateStateJSON()

CreateStateJSON creates a state JSON file for the stream

func GenerateRecordMessage

func GenerateRecordMessage(record interface{}) error

func GenerateSchema

func GenerateSchema(record interface{}) (map[string]interface{}, error)

GenerateSchema generates a JSON schema from a record

func GenerateSchemaMessage

func GenerateSchemaMessage(schema map[string]interface{}) error

func GenerateStateMessage

func GenerateStateMessage(state *State) error

func ProcessRecord added in v0.3.0

func ProcessRecord(record *interface{}) (*interface{}, error)

Transform record including dropping fields, hashing sensitive fields, and validating against bookmark

func UpdateCatalogJSON added in v0.3.0

func UpdateCatalogJSON()

func UpdateSchema added in v0.3.0

func UpdateSchema(existingSchema, newSchema map[string]interface{}) (map[string]interface{}, error)

UpdateSchema merges the new schema into the existing schema

func UpdateStateBookmark added in v0.3.0

func UpdateStateBookmark(record interface{})

Update <STREAM>_state.json

func ValidateRecordSchema added in v0.3.0

func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) (bool, error)

Validate record against Catalog

Types

type Bookmark added in v0.3.0

type Bookmark struct {
	BookmarkUpdatedAt string              `json:"bookmark_updated_at"`
	Bookmark          map[string]struct{} `json:"bookmark"`
}

type Catalog added in v0.3.0

type Catalog struct {
	Streams []StreamCatalog `json:"streams"`
}
var ParsedCatalog *Catalog

func ParseCatalogJSON added in v0.3.0

func ParseCatalogJSON() (*Catalog, error)

Parse <STREAM>_catalog.json

type Config

type Config struct {
	StreamName     *string `json:"stream_name,omitempty"`
	SourceType     *string `json:"source_type,omitempty"`
	URL            *string `json:"url,omitempty"`
	MaxConcurrency *int    `json:"max_concurrency,omitempty"`
	Records        *struct {
		UniqueKeyPath       *[]string   `json:"unique_key_path,omitempty"`
		DropFieldPaths      *[][]string `json:"drop_field_paths,omitempty"`
		SensitiveFieldPaths *[][]string `json:"sensitive_field_paths,omitempty"`
	} `json:"records,omitempty"`
	Database *struct {
		Table *string `json:"table,omitempty"`
	} `json:"db,omitempty"`
	Rest *struct {
		Auth *struct {
			Required *bool   `json:"required,omitempty"`
			Strategy *string `json:"strategy,omitempty"`
			Basic    *struct {
				Username *string `json:"username,omitempty"`
				Password *string `json:"password,omitempty"`
			} `json:"basic,omitempty"`
			Token *struct {
				Header      *string `json:"header,omitempty"`
				HeaderValue *string `json:"header_value,omitempty"`
			} `json:"token,omitempty"`
			Oauth *struct {
				ClientID     *string `json:"client_id,omitempty"`
				ClientSecret *string `json:"client_secret,omitempty"`
				RefreshToken *string `json:"refresh_token,omitempty"`
				TokenURL     *string `json:"token_url,omitempty"`
			} `json:"oauth,omitempty"`
		} `json:"auth,omitempty"`
		Response *struct {
			RecordsPath        *[]string `json:"records_path,omitempty"`
			Pagination         *bool     `json:"pagination,omitempty"`
			PaginationStrategy *string   `json:"pagination_strategy,omitempty"`
			PaginationNextPath *[]string `json:"pagination_next_path,omitempty"`
			PaginationQuery    *struct {
				QueryParameter *string `json:"query_parameter,omitempty"`
				QueryValue     *int    `json:"query_value,omitempty"`
				QueryIncrement *int    `json:"query_increment,omitempty"`
			} `json:"pagination_query,omitempty"`
		} `json:"response,omitempty"`
	} `json:"rest,omitempty"`
}

Parse config.json file to Config struct

var ParsedConfig Config

type Message

type Message struct {
	Type               string                 `json:"type"`
	Record             map[string]interface{} `json:"record,omitempty"`
	Stream             string                 `json:"stream,omitempty"`
	Schema             interface{}            `json:"schema,omitempty"`
	Value              interface{}            `json:"value,omitempty"`
	KeyProperties      []string               `json:"key_properties,omitempty"`
	BookmarkProperties []string               `json:"bookmark_properties,omitempty"`
}

type State

type State struct {
	Type  string `json:"type"`
	Value struct {
		Bookmarks map[string]Bookmark `json:"bookmarks"`
	} `json:"value"`
}

TODO MOVE BOOKMARKS TO MAP OF SEEN KEYS

var ParsedState *State

func ParseStateJSON

func ParseStateJSON() (*State, error)

Parse <STREAM>_state.json

type StreamCatalog added in v0.3.0

type StreamCatalog struct {
	Stream        string                 `json:"stream"`
	TapStreamID   string                 `json:"tap_stream_id"`
	KeyProperties []string               `json:"key_properties"`
	Schema        map[string]interface{} `json:"schema"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL