Documentation
¶
Overview ¶
Package rsql provides SQL parsing and analysis capabilities for StreamSQL.
This package implements a comprehensive SQL parser specifically designed for stream processing, supporting standard SQL syntax with extensions for window functions and streaming operations. It transforms SQL queries into executable stream processing configurations.
Core Features ¶
• Complete SQL Parser - Full support for SELECT, FROM, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT • Window Function Support - Native parsing of tumbling, sliding, counting, and session windows • Expression Analysis - Deep parsing of complex expressions, functions, and field references • Error Recovery - Advanced error detection and recovery with detailed error reporting • Function Validation - Integration with function registry for syntax and semantic validation • AST Generation - Abstract Syntax Tree generation for query optimization • Stream-Specific Extensions - Custom syntax for streaming operations and window management
Supported SQL Syntax ¶
Standard SQL clauses with streaming extensions:
// Basic SELECT statement
SELECT field1, field2, AGG_FUNC(field3)
FROM stream
WHERE condition
GROUP BY field1, WindowFunction('params')
HAVING aggregate_condition
ORDER BY field1 ASC, field2 DESC
LIMIT 100
// Window functions
TumblingWindow('5s') - Non-overlapping time windows
SlidingWindow('30s', '10s') - Overlapping time windows
CountingWindow(100) - Count-based windows
SessionWindow('5m') - Session-based windows
Lexical Analysis ¶
Advanced tokenization with comprehensive token types:
// Token types TOKEN_SELECT, TOKEN_FROM, TOKEN_WHERE - SQL keywords TOKEN_IDENTIFIER, TOKEN_STRING - Identifiers and literals TOKEN_NUMBER, TOKEN_FLOAT - Numeric literals TOKEN_OPERATOR, TOKEN_COMPARISON - Operators TOKEN_FUNCTION, TOKEN_WINDOW - Function calls TOKEN_LPAREN, TOKEN_RPAREN - Parentheses TOKEN_COMMA, TOKEN_SEMICOLON - Delimiters
Parser Architecture ¶
Recursive descent parser with error recovery:
type Parser struct {
lexer *Lexer
errorRecovery *ErrorRecovery
currentToken Token
input string
}
// Main parsing entry point
func (p *Parser) Parse() (*SelectStatement, error)
// Clause-specific parsers
func (p *Parser) parseSelect(stmt *SelectStatement) error
func (p *Parser) parseFrom(stmt *SelectStatement) error
func (p *Parser) parseWhere(stmt *SelectStatement) error
func (p *Parser) parseGroupBy(stmt *SelectStatement) error
Error Handling ¶
Comprehensive error detection and recovery:
// Error types
type ParseError struct {
Message string
Position int
Line int
Column int
Context string
ErrorType ErrorType
}
// Error recovery strategies
type ErrorRecovery struct {
errors []*ParseError
parser *Parser
strategies []RecoveryStrategy
}
Function Validation ¶
Integration with function registry for validation:
// Function validator
type FunctionValidator struct {
functionRegistry map[string]FunctionInfo
}
// Validation methods
func (fv *FunctionValidator) ValidateFunction(name string, args []Expression) error
func (fv *FunctionValidator) ValidateAggregateFunction(name string, context AggregateContext) error
func (fv *FunctionValidator) ValidateWindowFunction(name string, params []Parameter) error
AST Structure ¶
StreamSQL AST representation:
type SelectStatement struct {
Fields []Field
Distinct bool
SelectAll bool
Source string
Condition string
Window WindowDefinition
GroupBy []string
Limit int
Having string
}
type Field struct {
Expression string
Alias string
AggType string
}
type WindowDefinition struct {
Type string
Params []interface{}
TsProp string
TimeUnit time.Duration
}
Usage Examples ¶
Basic SQL parsing:
parser := NewParser("SELECT AVG(temperature) FROM stream WHERE device_id = 'sensor1'")
stmt, err := parser.Parse()
if err != nil {
log.Fatal(err)
}
// Convert to stream configuration
config, condition, err := stmt.ToStreamConfig()
Window function parsing:
sql := `SELECT device_id, AVG(temperature)
FROM stream
GROUP BY device_id, TumblingWindow('5s')`
config, condition, err := Parse(sql)
Complex query with multiple clauses:
sql := `SELECT device_id,
AVG(temperature) as avg_temp,
MAX(humidity) as max_humidity
FROM stream
WHERE device_id LIKE 'sensor%'
GROUP BY device_id, SlidingWindow('1m', '30s')
HAVING avg_temp > 25
ORDER BY avg_temp DESC
LIMIT 10`
config, condition, err := Parse(sql)
Configuration Generation ¶
Transformation from AST to stream processing configuration:
type Config struct {
WindowConfig WindowConfig
GroupFields []string
SelectFields map[string]AggregateType
FieldAlias map[string]string
SimpleFields []string
FieldExpressions map[string]FieldExpression
FieldOrder []string
Where string
Having string
NeedWindow bool
Distinct bool
Limit int
}
Integration ¶
Seamless integration with other StreamSQL components:
• Functions package - Function validation and registry integration • Expr package - Expression parsing and evaluation • Types package - Configuration and data type definitions • Stream package - Configuration application and execution • Window package - Window function parsing and configuration
Index ¶
- Constants
- func FormatErrorContext(input string, position int, contextLength int) string
- func Parse(sql string) (*types.Config, string, error)
- func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.AggregateType, name string, expression string, ...)
- type ErrorRecovery
- type ErrorType
- type Field
- type FunctionCall
- type FunctionValidator
- type Lexer
- type ParseError
- func CreateLexicalError(message string, position int, char byte) *ParseError
- func CreateLexicalErrorWithPosition(message string, position int, line int, column int, char byte) *ParseError
- func CreateMissingTokenError(expected string, position int) *ParseError
- func CreateSemanticError(message string, position int) *ParseError
- func CreateSyntaxError(message string, position int, token string, expected []string) *ParseError
- func CreateUnexpectedTokenError(found string, expected []string, position int) *ParseError
- func CreateUnknownFunctionError(functionName string, position int) *ParseError
- type Parser
- type SelectStatement
- type Token
- type TokenType
- type WindowDefinition
Constants ¶
const ( // MaxRecursionDepth 定义 expectTokenWithDepth 方法的最大递归深度 // 用于防止无限递归 MaxRecursionDepth = 30 // MaxSelectFields 定义 SELECT 子句中允许的最大字段数量 MaxSelectFields = 300 )
解析器配置常量
Variables ¶
This section is empty.
Functions ¶
func FormatErrorContext ¶
FormatErrorContext formats error context
func ParseAggregateTypeWithExpression ¶
func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.AggregateType, name string, expression string, allFields []string, err error)
Parse aggregation function and return expression information
Types ¶
type ErrorRecovery ¶
type ErrorRecovery struct {
// contains filtered or unexported fields
}
ErrorRecovery error recovery strategy
func NewErrorRecovery ¶
func NewErrorRecovery(parser *Parser) *ErrorRecovery
NewErrorRecovery creates error recovery instance
func (*ErrorRecovery) AddError ¶
func (er *ErrorRecovery) AddError(err *ParseError)
AddError adds an error
func (*ErrorRecovery) GetErrors ¶
func (er *ErrorRecovery) GetErrors() []*ParseError
GetErrors gets all errors
func (*ErrorRecovery) HasErrors ¶
func (er *ErrorRecovery) HasErrors() bool
HasErrors checks if there are errors
func (*ErrorRecovery) RecoverFromError ¶
func (er *ErrorRecovery) RecoverFromError(errorType ErrorType) bool
RecoverFromError recovers from error
type FunctionCall ¶
FunctionCall contains function call information
type FunctionValidator ¶
type FunctionValidator struct {
// contains filtered or unexported fields
}
FunctionValidator validates SQL functions in expressions
func NewFunctionValidator ¶
func NewFunctionValidator(errorRecovery *ErrorRecovery) *FunctionValidator
NewFunctionValidator creates a new function validator
func (*FunctionValidator) ValidateExpression ¶
func (fv *FunctionValidator) ValidateExpression(expression string, position int)
ValidateExpression validates functions within expressions
type Lexer ¶
type Lexer struct {
// contains filtered or unexported fields
}
func (*Lexer) SetErrorRecovery ¶
func (l *Lexer) SetErrorRecovery(er *ErrorRecovery)
SetErrorRecovery 设置错误恢复实例
type ParseError ¶
type ParseError struct {
Type ErrorType
Message string
Position int
Line int
Column int
Token string
Expected []string
Suggestions []string
Context string
Recoverable bool
}
ParseError enhanced parsing error structure
func CreateLexicalError ¶
func CreateLexicalError(message string, position int, char byte) *ParseError
CreateLexicalError creates lexical error
func CreateLexicalErrorWithPosition ¶
func CreateLexicalErrorWithPosition(message string, position int, line int, column int, char byte) *ParseError
CreateLexicalErrorWithPosition creates lexical error with accurate position
func CreateMissingTokenError ¶
func CreateMissingTokenError(expected string, position int) *ParseError
CreateMissingTokenError creates missing token error
func CreateSemanticError ¶ added in v0.10.2
func CreateSemanticError(message string, position int) *ParseError
CreateSemanticError creates semantic error
func CreateSyntaxError ¶
func CreateSyntaxError(message string, position int, token string, expected []string) *ParseError
CreateSyntaxError creates syntax error
func CreateUnexpectedTokenError ¶
func CreateUnexpectedTokenError(found string, expected []string, position int) *ParseError
CreateUnexpectedTokenError creates unexpected token error
func CreateUnknownFunctionError ¶
func CreateUnknownFunctionError(functionName string, position int) *ParseError
CreateUnknownFunctionError creates unknown function error
func (*ParseError) IsRecoverable ¶
func (e *ParseError) IsRecoverable() bool
IsRecoverable checks if error is recoverable
type Parser ¶
type Parser struct {
// contains filtered or unexported fields
}
func (*Parser) Parse ¶
func (p *Parser) Parse() (*SelectStatement, error)
type SelectStatement ¶
type SelectStatement struct {
Fields []Field
Distinct bool
SelectAll bool // Flag to indicate if this is a SELECT * query
Source string
Condition string
Window WindowDefinition
GroupBy []string
Limit int
Having string
}
func (*SelectStatement) ToStreamConfig ¶
func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error)
ToStreamConfig converts AST to Stream configuration
type TokenType ¶
type TokenType int
const ( TokenEOF TokenType = iota TokenIdent TokenNumber TokenString TokenQuotedIdent // 反引号标识符 TokenComma TokenLParen TokenRParen TokenPlus TokenMinus TokenAsterisk TokenSlash TokenEQ TokenNE TokenGT TokenLT TokenGE TokenLE TokenAND TokenOR TokenSELECT TokenFROM TokenWHERE TokenGROUP TokenBY TokenAS TokenTumbling TokenSliding TokenCounting TokenSession TokenWITH TokenTimestamp TokenTimeUnit TokenOrder TokenDISTINCT TokenLIMIT TokenHAVING TokenLIKE TokenIS TokenNULL TokenNOT // CASE表达式相关token TokenCASE TokenWHEN TokenTHEN TokenELSE TokenEND // 数组索引相关token TokenLBracket TokenRBracket )