lib

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ExtractedChan = make(chan map[string]interface{})
View Source
var ProcessingWG sync.WaitGroup
View Source
var ResultChan = make(chan map[string]interface{})

Functions

func CreateCatalogJSON added in v0.3.0

func CreateCatalogJSON() error

Create <STREAM>_catalog.json

func CreateStateJSON

func CreateStateJSON() error

CreateStateJSON creates a state JSON file for the stream

func ExtractRecords added in v0.5.0

func ExtractRecords(streamFunc func(Config) error)

Begin streaming records from source (and sending to ExtractedChan) and start goroutines to process records

func GenerateSchema

func GenerateSchema(record interface{}) (map[string]interface{}, error)

GenerateSchema generates a JSON schema from a record

func ProcessRecord added in v0.3.0

func ProcessRecord(record map[string]interface{}) (map[string]interface{}, error)

Transform record including dropping fields, hashing sensitive fields, and validating against bookmark

func ProduceRecordMessage added in v0.5.0

func ProduceRecordMessage(record interface{}) error

ProduceRecordMessage generates a message with the schema of the record

func ProduceSchemaMessage added in v0.5.0

func ProduceSchemaMessage(schema map[string]interface{}) error

ProduceSchemaMessage generates a schema message from the derived catalog

func ProduceStateMessage added in v0.5.0

func ProduceStateMessage(state *State) error

ProduceStateMessage generates a message with the current state

func UpdateCatalogJSON added in v0.3.0

func UpdateCatalogJSON()

func UpdateSchema added in v0.3.0

func UpdateSchema(existingSchema, newSchema map[string]interface{}) (map[string]interface{}, error)

UpdateSchema merges the new schema into the existing schema

func UpdateStateBookmark added in v0.3.0

func UpdateStateBookmark(record map[string]interface{})

Update <STREAM>_state.json

func UpdateStateQuarantine added in v0.5.0

func UpdateStateQuarantine(record map[string]interface{})

Update <STREAM>_state.json

func ValidateRecordSchema added in v0.3.0

func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) (bool, error)

Validate record against Catalog

Types

type Bookmark added in v0.3.0

type Bookmark struct {
	UpdatedAt  string            `json:"updated_at"`
	Latest     map[string]string `json:"latest"`
	Quarantine map[string]string `json:"quarantine"`
}

type Config

type Config struct {
	StreamName     *string `json:"stream_name,omitempty"`
	SourceType     *string `json:"source_type,omitempty"`
	URL            *string `json:"url,omitempty"`
	MaxConcurrency *int    `json:"max_concurrency,omitempty"`
	Records        *struct {
		UniqueKeyPath       *[]string   `json:"unique_key_path,omitempty"`
		DropFieldPaths      *[][]string `json:"drop_field_paths,omitempty"`
		SensitiveFieldPaths *[][]string `json:"sensitive_field_paths,omitempty"`
	} `json:"records,omitempty"`
	Rest *struct {
		Auth *struct {
			Required *bool   `json:"required,omitempty"`
			Strategy *string `json:"strategy,omitempty"`
			Basic    *struct {
				Username *string `json:"username,omitempty"`
				Password *string `json:"password,omitempty"`
			} `json:"basic,omitempty"`
			Token *struct {
				Header      *string `json:"header,omitempty"`
				HeaderValue *string `json:"header_value,omitempty"`
			} `json:"token,omitempty"`
			Oauth *struct {
				ClientID     *string `json:"client_id,omitempty"`
				ClientSecret *string `json:"client_secret,omitempty"`
				RefreshToken *string `json:"refresh_token,omitempty"`
				TokenURL     *string `json:"token_url,omitempty"`
			} `json:"oauth,omitempty"`
		} `json:"auth,omitempty"`
		Response *struct {
			RecordsPath        *[]string `json:"records_path,omitempty"`
			Pagination         *bool     `json:"pagination,omitempty"`
			PaginationStrategy *string   `json:"pagination_strategy,omitempty"`
			PaginationNextPath *[]string `json:"pagination_next_path,omitempty"`
			PaginationQuery    *struct {
				QueryParameter *string `json:"query_parameter,omitempty"`
				QueryValue     *int    `json:"query_value,omitempty"`
				QueryIncrement *int    `json:"query_increment,omitempty"`
			} `json:"pagination_query,omitempty"`
		} `json:"response,omitempty"`
	} `json:"rest,omitempty"`
}

Parse config.json file to Config struct

var ParsedConfig Config

type Message

type Message struct {
	Type               string                 `json:"type"`
	Record             map[string]interface{} `json:"record,omitempty"`
	Stream             string                 `json:"stream,omitempty"`
	Schema             interface{}            `json:"schema,omitempty"`
	Value              interface{}            `json:"value,omitempty"`
	KeyProperties      []string               `json:"key_properties,omitempty"`
	BookmarkProperties []string               `json:"bookmark_properties,omitempty"`
	Required           []string               `json:"required,omitempty"`
}

type State

type State struct {
	Type     string   `json:"type"`
	Stream   string   `json:"stream"`
	Bookmark Bookmark `json:"bookmark"`
}
var ParsedState *State

func ReadStateJSON added in v0.4.0

func ReadStateJSON() (*State, error)

Reads <STREAM_NAME>_state.json

type StreamCatalog added in v0.3.0

type StreamCatalog struct {
	KeyProperties []string               `json:"key_properties"`
	Schema        map[string]interface{} `json:"schema"`
	Stream        string                 `json:"stream"`
}
var DerivedCatalog *StreamCatalog

func ReadCatalogJSON added in v0.4.0

func ReadCatalogJSON() (*StreamCatalog, error)

Parse <STREAM>_catalog.json

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL