Documentation
¶
Index ¶
- Variables
- func CreateCatalogJSON() error
- func CreateStateJSON() error
- func ExtractRecords(streamFunc func(Config) error)
- func GenerateSchema(record interface{}) (map[string]interface{}, error)
- func ProcessRecord(record map[string]interface{}) (map[string]interface{}, error)
- func ProduceRecordMessage(record interface{}) error
- func ProduceSchemaMessage(schema map[string]interface{}) error
- func ProduceStateMessage(state *State) error
- func UpdateCatalogJSON()
- func UpdateSchema(existingSchema, newSchema map[string]interface{}) (map[string]interface{}, error)
- func UpdateStateBookmark(record map[string]interface{})
- func UpdateStateQuarantine(record map[string]interface{})
- func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) (bool, error)
- type Bookmark
- type Config
- type Message
- type State
- type StreamCatalog
Constants ¶
This section is empty.
Variables ¶
var ExtractedChan = make(chan map[string]interface{})
var ProcessingWG sync.WaitGroup
var ResultChan = make(chan map[string]interface{})
Functions ¶
func CreateCatalogJSON ¶ added in v0.3.0
func CreateCatalogJSON() error
Create <STREAM>_catalog.json
func CreateStateJSON ¶
func CreateStateJSON() error
CreateStateJSON creates a state JSON file for the stream
func ExtractRecords ¶ added in v0.5.0
Begin streaming records from source (and sending to ExtractedChan) and start goroutines to process records
func GenerateSchema ¶
GenerateSchema generates a JSON schema from a record
func ProcessRecord ¶ added in v0.3.0
Transform record including dropping fields, hashing sensitive fields, and validating against bookmark
func ProduceRecordMessage ¶ added in v0.5.0
func ProduceRecordMessage(record interface{}) error
ProduceRecordMessage generates a message with the schema of the record
func ProduceSchemaMessage ¶ added in v0.5.0
ProduceSchemaMessage generates a schema message from the derived catalog
func ProduceStateMessage ¶ added in v0.5.0
ProduceStateMessage generates a message with the current state
func UpdateCatalogJSON ¶ added in v0.3.0
func UpdateCatalogJSON()
func UpdateSchema ¶ added in v0.3.0
UpdateSchema merges the new schema into the existing schema
func UpdateStateBookmark ¶ added in v0.3.0
func UpdateStateBookmark(record map[string]interface{})
Update <STREAM>_state.json
func UpdateStateQuarantine ¶ added in v0.5.0
func UpdateStateQuarantine(record map[string]interface{})
Update <STREAM>_state.json
Types ¶
type Config ¶
type Config struct {
StreamName *string `json:"stream_name,omitempty"`
SourceType *string `json:"source_type,omitempty"`
URL *string `json:"url,omitempty"`
MaxConcurrency *int `json:"max_concurrency,omitempty"`
Records *struct {
UniqueKeyPath *[]string `json:"unique_key_path,omitempty"`
DropFieldPaths *[][]string `json:"drop_field_paths,omitempty"`
SensitiveFieldPaths *[][]string `json:"sensitive_field_paths,omitempty"`
} `json:"records,omitempty"`
Rest *struct {
Auth *struct {
Required *bool `json:"required,omitempty"`
Strategy *string `json:"strategy,omitempty"`
Basic *struct {
Username *string `json:"username,omitempty"`
Password *string `json:"password,omitempty"`
} `json:"basic,omitempty"`
Token *struct {
Header *string `json:"header,omitempty"`
HeaderValue *string `json:"header_value,omitempty"`
} `json:"token,omitempty"`
Oauth *struct {
ClientID *string `json:"client_id,omitempty"`
ClientSecret *string `json:"client_secret,omitempty"`
RefreshToken *string `json:"refresh_token,omitempty"`
TokenURL *string `json:"token_url,omitempty"`
} `json:"oauth,omitempty"`
} `json:"auth,omitempty"`
Response *struct {
RecordsPath *[]string `json:"records_path,omitempty"`
Pagination *bool `json:"pagination,omitempty"`
PaginationStrategy *string `json:"pagination_strategy,omitempty"`
PaginationNextPath *[]string `json:"pagination_next_path,omitempty"`
PaginationQuery *struct {
QueryParameter *string `json:"query_parameter,omitempty"`
QueryValue *int `json:"query_value,omitempty"`
QueryIncrement *int `json:"query_increment,omitempty"`
} `json:"pagination_query,omitempty"`
} `json:"response,omitempty"`
} `json:"rest,omitempty"`
}
Parse config.json file to Config struct
var ParsedConfig Config
type Message ¶
type Message struct {
Type string `json:"type"`
Record map[string]interface{} `json:"record,omitempty"`
Stream string `json:"stream,omitempty"`
Schema interface{} `json:"schema,omitempty"`
Value interface{} `json:"value,omitempty"`
KeyProperties []string `json:"key_properties,omitempty"`
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
Required []string `json:"required,omitempty"`
}
type State ¶
type State struct {
Type string `json:"type"`
Stream string `json:"stream"`
Bookmark Bookmark `json:"bookmark"`
}
var ParsedState *State
func ReadStateJSON ¶ added in v0.4.0
Reads <STREAM_NAME>_state.json
type StreamCatalog ¶ added in v0.3.0
type StreamCatalog struct {
KeyProperties []string `json:"key_properties"`
Schema map[string]interface{} `json:"schema"`
Stream string `json:"stream"`
}
var DerivedCatalog *StreamCatalog
func ReadCatalogJSON ¶ added in v0.4.0
func ReadCatalogJSON() (*StreamCatalog, error)
Parse <STREAM>_catalog.json