Documentation
¶
Index ¶
- func CollectResults(resultChan <-chan *interface{}) []interface{}
- func CreateStateJSON(config Config)
- func GatherRecords(f func(resultChan chan<- *interface{}, config Config, state *State, ...), ...) ([]interface{}, error)
- func GenerateRecordMessage(record interface{}, state *State, config Config) error
- func GenerateSchema(records []interface{}) (map[string]interface{}, error)
- func GenerateSchemaMessage(schema map[string]interface{}, config Config) error
- func GenerateStateMessage(state *State, config Config) error
- func ParseRecord(record []byte, resultChan chan<- *interface{}, config Config, state *State, ...)
- func UpdateState(records []interface{}, state *State, config Config)
- type Config
- type Message
- type State
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollectResults ¶ added in v0.0.9
func CollectResults(resultChan <-chan *interface{}) []interface{}
func CreateStateJSON ¶
func CreateStateJSON(config Config)
/////////////////////////////////////////////////////////// CREATE ///////////////////////////////////////////////////////////
func GatherRecords ¶ added in v0.0.9
func GatherRecords(f func(resultChan chan<- *interface{}, config Config, state *State, wg *sync.WaitGroup), config Config, state *State) ([]interface{}, error)
///////////////////////////////////////////////////////// GATHER /////////////////////////////////////////////////////////
func GenerateRecordMessage ¶
func GenerateSchema ¶
/////////////////////////////////////////////////////////// GENERATE SCHEMA ///////////////////////////////////////////////////////////
func GenerateSchemaMessage ¶
func GenerateStateMessage ¶
func ParseRecord ¶ added in v0.0.9
func ParseRecord(record []byte, resultChan chan<- *interface{}, config Config, state *State, wg *sync.WaitGroup)
///////////////////////////////////////////////////////// PARSE RECORD /////////////////////////////////////////////////////////
func UpdateState ¶
/////////////////////////////////////////////////////////// UPDATE ///////////////////////////////////////////////////////////
Types ¶
type Config ¶
type Config struct {
StreamName *string `json:"stream_name,omitempty"`
SourceType *string `json:"source_type,omitempty"`
URL *string `json:"url,omitempty"`
Records *struct {
UniqueKeyPath *[]string `json:"unique_key_path,omitempty"`
BookmarkPath *[]string `json:"bookmark_path,omitempty"`
DropFieldPaths *[][]string `json:"drop_field_paths,omitempty"`
SensitiveFieldPaths *[][]string `json:"sensitive_field_paths,omitempty"`
IntelligentFields *[]struct {
Prefix *string `json:"prefix,omitempty"`
FieldPath *[]string `json:"field_path,omitempty"`
Suffix *string `json:"suffix,omitempty"`
MaxTokens *int `json:"max_tokens,omitempty"`
Temperature *float32 `json:"temperature,omitempty"`
IntelligentFieldName *string `json:"intelligent_field_name,omitempty"`
} `json:"intelligent_fields,omitempty"`
} `json:"records,omitempty"`
Database *struct {
Table *string `json:"table,omitempty"`
} `json:"db,omitempty"`
Rest *struct {
Sleep *int `json:"sleep,omitempty"`
Auth *struct {
Required *bool `json:"required,omitempty"`
Strategy *string `json:"strategy,omitempty"`
Basic *struct {
Username *string `json:"username,omitempty"`
Password *string `json:"password,omitempty"`
} `json:"basic,omitempty"`
Token *struct {
Header *string `json:"header,omitempty"`
HeaderValue *string `json:"header_value,omitempty"`
} `json:"token,omitempty"`
Oauth *struct {
ClientID *string `json:"client_id,omitempty"`
ClientSecret *string `json:"client_secret,omitempty"`
RefreshToken *string `json:"refresh_token,omitempty"`
TokenURL *string `json:"token_url,omitempty"`
} `json:"oauth,omitempty"`
} `json:"auth,omitempty"`
Response *struct {
RecordsPath *[]string `json:"records_path,omitempty"`
Pagination *bool `json:"pagination,omitempty"`
PaginationStrategy *string `json:"pagination_strategy,omitempty"`
PaginationNextPath *[]string `json:"pagination_next_path,omitempty"`
PaginationQuery *struct {
QueryParameter *string `json:"query_parameter,omitempty"`
QueryValue *int `json:"query_value,omitempty"`
QueryIncrement *int `json:"query_increment,omitempty"`
} `json:"pagination_query,omitempty"`
} `json:"response,omitempty"`
} `json:"rest,omitempty"`
}
///////////////////////////////////////////////////////// CONFIG.JSON /////////////////////////////////////////////////////////
type Message ¶
type Message struct {
Type string `json:"type"`
Record map[string]interface{} `json:"record,omitempty"`
Stream string `json:"stream,omitempty"`
Schema interface{} `json:"schema,omitempty"`
Value interface{} `json:"value,omitempty"`
KeyProperties []string `json:"key_properties,omitempty"`
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
}
///////////////////////////////////////////////////////// MESSAGES /////////////////////////////////////////////////////////
type State ¶
type State struct {
Type string `json:"Type"`
Value struct {
Bookmarks map[string]struct {
BookmarkUpdatedAt string `json:"last_extraction_at"`
DetectionBookmark []string `json:"detection_bookmark"`
Bookmark string `json:"bookmark"`
} `json:"bookmarks"`
} `json:"Value"`
}
///////////////////////////////////////////////////////// STATE_<STREAM>.JSON /////////////////////////////////////////////////////////
func ParseStateJSON ¶
/////////////////////////////////////////////////////////// PARSE state_<STREAM>.json ///////////////////////////////////////////////////////////