Documentation
¶
Index ¶
- func CreateCatalogJSON()
- func CreateStateJSON()
- func GenerateRecordMessage(record interface{}) error
- func GenerateSchema(record interface{}) (map[string]interface{}, error)
- func GenerateSchemaMessage(schema map[string]interface{}) error
- func GenerateStateMessage(state *State) error
- func ProcessRecord(record *interface{}) (*interface{}, error)
- func UpdateCatalogJSON()
- func UpdateSchema(existingSchema, newSchema map[string]interface{}) (map[string]interface{}, error)
- func UpdateStateBookmark(record interface{})
- func ValidateRecordSchema(record map[string]interface{}, schema map[string]interface{}) (bool, error)
- type Bookmark
- type Catalog
- type Config
- type Message
- type State
- type StreamCatalog
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateStateJSON ¶
func CreateStateJSON()
CreateStateJSON creates a state JSON file for the stream
func GenerateRecordMessage ¶
func GenerateRecordMessage(record interface{}) error
func GenerateSchema ¶
GenerateSchema generates a JSON schema from a record
func GenerateSchemaMessage ¶
func GenerateStateMessage ¶
func ProcessRecord ¶ added in v0.3.0
func ProcessRecord(record *interface{}) (*interface{}, error)
Transform record including dropping fields, hashing sensitive fields, and validating against bookmark
func UpdateCatalogJSON ¶ added in v0.3.0
func UpdateCatalogJSON()
func UpdateSchema ¶ added in v0.3.0
UpdateSchema merges the new schema into the existing schema
func UpdateStateBookmark ¶ added in v0.3.0
func UpdateStateBookmark(record interface{})
Update <STREAM>_state.json
Types ¶
type Catalog ¶ added in v0.3.0
type Catalog struct {
Streams []StreamCatalog `json:"streams"`
}
var ParsedCatalog *Catalog
func ParseCatalogJSON ¶ added in v0.3.0
Parse <STREAM>_catalog.json
type Config ¶
type Config struct {
StreamName *string `json:"stream_name,omitempty"`
SourceType *string `json:"source_type,omitempty"`
URL *string `json:"url,omitempty"`
MaxConcurrency *int `json:"max_concurrency,omitempty"`
Records *struct {
UniqueKeyPath *[]string `json:"unique_key_path,omitempty"`
DropFieldPaths *[][]string `json:"drop_field_paths,omitempty"`
SensitiveFieldPaths *[][]string `json:"sensitive_field_paths,omitempty"`
} `json:"records,omitempty"`
Database *struct {
Table *string `json:"table,omitempty"`
} `json:"db,omitempty"`
Rest *struct {
Auth *struct {
Required *bool `json:"required,omitempty"`
Strategy *string `json:"strategy,omitempty"`
Basic *struct {
Username *string `json:"username,omitempty"`
Password *string `json:"password,omitempty"`
} `json:"basic,omitempty"`
Token *struct {
Header *string `json:"header,omitempty"`
HeaderValue *string `json:"header_value,omitempty"`
} `json:"token,omitempty"`
Oauth *struct {
ClientID *string `json:"client_id,omitempty"`
ClientSecret *string `json:"client_secret,omitempty"`
RefreshToken *string `json:"refresh_token,omitempty"`
TokenURL *string `json:"token_url,omitempty"`
} `json:"oauth,omitempty"`
} `json:"auth,omitempty"`
Response *struct {
RecordsPath *[]string `json:"records_path,omitempty"`
Pagination *bool `json:"pagination,omitempty"`
PaginationStrategy *string `json:"pagination_strategy,omitempty"`
PaginationNextPath *[]string `json:"pagination_next_path,omitempty"`
PaginationQuery *struct {
QueryParameter *string `json:"query_parameter,omitempty"`
QueryValue *int `json:"query_value,omitempty"`
QueryIncrement *int `json:"query_increment,omitempty"`
} `json:"pagination_query,omitempty"`
} `json:"response,omitempty"`
} `json:"rest,omitempty"`
}
Parse config.json file to Config struct
var ParsedConfig Config
type Message ¶
type Message struct {
Type string `json:"type"`
Record map[string]interface{} `json:"record,omitempty"`
Stream string `json:"stream,omitempty"`
Schema interface{} `json:"schema,omitempty"`
Value interface{} `json:"value,omitempty"`
KeyProperties []string `json:"key_properties,omitempty"`
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
}
Click to show internal directories.
Click to hide internal directories.