graph

package
v1.2.243 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package graph exposes functionality to express Chalk definitions and modify the protograph.

Index

Constants

This section is empty.

Variables

View Source
var All = Years(100)

All can be passed in as a duration to Windowed, equivalent to "all" in chalkpy

View Source
var Boolean = primitive("bool")

Boolean is equivalent to chalkpy bool

View Source
var CENTURY = durationpb.Duration{Seconds: 3153600000}
View Source
var Datetime = primitive("datetime")

Datetime is equivalent to chalkpy datetime.datetime

View Source
var FeatureTime = &FeatureTimeBuilder{}

FeatureTime is equivalent to chalkpy FeatureTime

View Source
var Float = primitive("float")

Float is equivalent to chalkpy float

View Source
var Int = primitive("int")

Int is equivalent to chalkpy int

View Source
var String = primitive("str")

String is equivalent to chalkpy str

View Source
var TRUE = true

Functions

func CheckHasFeature added in v1.2.112

func CheckHasFeature(graph *graphv1.Graph, fsName, fName string) error

func Days

func Days(n int64) time.Duration

func EmptyList added in v1.2.139

func EmptyList(ofType *ScalarFeatureBuilder) expr.Expr

EmptyList creates an empty list expression for the given element type

func FeatureName added in v1.2.109

func FeatureName(f *graphv1.FeatureType) string

FeatureName gets the field name of a feature, useful for debugging

func GetDefault

func GetDefault[M ~map[K]V, K comparable, V any](m M, k K, def V) V

func Hours

func Hours(n int64) time.Duration

func Minutes

func Minutes(n int64) time.Duration

func Seconds

func Seconds(n int64) time.Duration

Duration helper functions

func StringOptions added in v1.2.139

func StringOptions(in map[string]string) map[string]*structpb.Value

func TypeName added in v1.2.110

func TypeName(scalar *graphv1.ScalarFeatureType) string

func Weeks

func Weeks(n int64) time.Duration

func Years added in v1.2.109

func Years(n int64) time.Duration

Types

type Definitions

type Definitions struct {
	FeatureSets     []*FeatureSet
	StreamResolvers []*StreamResolver
}

Definitions stores either Feature Set or Stream Resolver definitions

func (Definitions) ToGraph

func (d Definitions) ToGraph() (*graphv1.Graph, error)

ToGraph converts a Definitions struct to a protograph

func (Definitions) UpdateGraph added in v1.2.110

func (d Definitions) UpdateGraph(g *graphv1.Graph) error

UpdateGraph will modify protograph g in-place to include Feature Sets and Stream Resolvers from d, replacing any with the same name.

func (Definitions) WithFeatureSets

func (d Definitions) WithFeatureSets(fs ...FeatureSet) Definitions

WithFeatureSets is a variadic function that adds Feature Set definitions note it is a chaining method and not in-place

func (Definitions) WithStreamResolvers added in v1.2.112

func (d Definitions) WithStreamResolvers(srs ...StreamResolver) Definitions

WithFeatureSets is a variadic function that adds Stream Resolver definitions note it is a chaining method and not in-place

type FeatureBuilder

type FeatureBuilder interface {
	// contains filtered or unexported methods
}

type FeatureSet

type FeatureSet struct {
	// metadata fields
	Name               string
	Features           []*graphv1.FeatureType
	IsSingleton        bool
	Tags               []string
	Owner              string
	Doc                string
	EtlOfflineToOnline bool
	MaxStaleness       time.Duration
	// contains filtered or unexported fields
}

func (*FeatureSet) ToProto

func (fs *FeatureSet) ToProto() (*graphv1.FeatureSet, hset[string], error)

func (FeatureSet) With

func (fs FeatureSet) With(name string, ofType FeatureBuilder) FeatureSet

With adds a new feature to a Feature Set; roughly equivalent to one line of equivalent Python

func (FeatureSet) WithAll added in v1.2.110

func (fs FeatureSet) WithAll(m Features) FeatureSet

WithAll lets you define features in bulk For example the following line:

FeatureSet{name: "user"}.WithAll(Features{"name": graph.String, "age": graph.Int})

is equivalent to two .With chained calls

func (FeatureSet) WithForeignKey

func (fs FeatureSet) WithForeignKey(name, relation string) FeatureSet

WithForeignKey allows specifying fields which refer to primary fields of other Feature Sets For example

FeatureSet{name: "Transaction"}.WithForeignKey("user_id", User)

is equivalent to the following chalkpy

@features
class Transaction:
  user_id: 'User'

func (FeatureSet) WithPrimary

func (fs FeatureSet) WithPrimary(name string, ofType FeatureBuilder) FeatureSet

WithPrimary defines a primary field, of type Int or String Note this must be called separately from WithAll, and if you do not manually define one, an implicit `id: Primary[int]` field will be added

type FeatureTimeBuilder added in v1.2.114

type FeatureTimeBuilder struct{}

type Features added in v1.2.110

type Features map[string]FeatureBuilder

type HasManyFeatureBuilder

type HasManyFeatureBuilder struct {
	ForeignNamespace string
	MaxStaleness     time.Duration
}

func DataFrame

func DataFrame(foreignName string) *HasManyFeatureBuilder

DataFrame defines a new dataframe feature For example

FeatureSet{name: "User"}.With("transactions", DataFrame("Transactions")))

is equivalent to the following chalkpy

@features
class User:
  transactions: DataFrame[Transactions]

func (*HasManyFeatureBuilder) WithMaxStaleness added in v1.2.109

func (hm *HasManyFeatureBuilder) WithMaxStaleness(d time.Duration) *HasManyFeatureBuilder

type HasOneFeatureBuilder added in v1.2.112

type HasOneFeatureBuilder struct {
	// contains filtered or unexported fields
}

func HasOne added in v1.2.112

func HasOne(relation string, join expr.Expr) *HasOneFeatureBuilder

HasOne defines a new has_one feature For example

FeatureSet{name: "Transaction"}.With("user_id", graph.Int).With("user", HasOne("User", expr.Col("User", "id").Eq(expr.Col("Transaction", "user_id"))))

is equivalent to the following chalkpy

@features
class Transaction:
  user_id: int
  user: has_one('User', lambda: User.id == Transaction.user_id)

type MaterializationOptions

type MaterializationOptions struct {
	// map window -> bucket duration
	// NOTE: OPPOSITE ORDER AS chalkpy if construction is being done via struct
	BucketDurations       map[time.Duration]time.Duration
	DefaultBucketDuration time.Duration
	// The period for which to use the continuous resolver, instead
	// of relying upon the last backfill. If not provided, and a continuous
	// resolver is provided, this will be set to backfill_lookback_duration.
	ContinuousBufferDuration time.Duration
	// A crontab or duration string to specify the schedule for back filling the
	// materialized aggregate.
	BackfillSchedule string
	// The lower bound of the first bucket. All buckets are aligned to this time.
	BucketStart time.Time

	// technically not part of materialization kwarg, but still useful to expose (for now)
	// The 'k' arg of approx_top_k.
	ApproxTopKArgK int64
	// The resolver to use for back-filling the materialized aggregate.
	// If not provided, the data will be back filled using the resolver
	// that would run for an offline query.
	BackfillResolver string
	// The amount of time before the start of the previous backfill
	// to consider when running the backfill resolver. Set this parameter
	// to the be equal to the latest arriving data in the backfill window.
	BackfillLookbackDuration time.Duration
	// The time at which to start back filling the materialized aggregate.
	// If not provided, the backfill consider the earliest available data returned
	// by the `backfill_resolver`.
	BackfillStartTime time.Time
	// The resolver to use for continuous updates to the materialized aggregate.
	// If not provided, the data will be updated using the resolver that would run
	// for an online query.
	ContinuousResolver string
}

https://docs.chalk.ai/api-docs#windowed.materialization

type ScalarFeatureBuilder

type ScalarFeatureBuilder struct {
	// contains filtered or unexported fields
}

func List added in v1.2.114

List lets you specify lists of a certain type For example

graph.FeatureSet{name: "User"}.With("recent_transaction_ids": graph.List(graph.Int))

is equivalent to the following chalkpy

@features
class User:
  recent_transaction_ids: List[int]

func Optional added in v1.2.112

func Optional(ofType *ScalarFeatureBuilder) *ScalarFeatureBuilder

Optional modifies a scalar type to also specify None as a possible For example

graph.FeatureSet{name: "Transaction"}.With("memo": graph.Optional(graph.String))

is equivalent to the following chalkpy

@features
class Transaction:
  memo: Optional[str]

func (ScalarFeatureBuilder) Expr

func (ofType ScalarFeatureBuilder) Expr(expression expr.Expr) ScalarFeatureBuilder

Expr lets you define an underscore expression to compute a scalar feature For example

graph.FeatureSet{name: "Transaction"}.WithAll(graph.Features{"total": graph.Float, "sales_tax": graph.Float, "subtotal": graph.Float.Expr(expr.Col("total").Sub(expr.Col("sales_tax")))})

is equivalent to the following chalkpy

@features
class Transaction:
  total: float
  sales_tax: float
  subtotal: float = _.total - _.sales_tax

func (ScalarFeatureBuilder) ToProto

func (f ScalarFeatureBuilder) ToProto(fieldName string, namespace string) *graphv1.FeatureType

func (ScalarFeatureBuilder) WithMaxStaleness added in v1.2.109

WithMaxStaleness is equivalent to the max_staleness kwarg on feature in chalkpy

type StreamListMessage added in v1.2.140

type StreamListMessage struct {
	ValueType StreamStructMessage
}

func (StreamListMessage) ToArrowType added in v1.2.140

func (t StreamListMessage) ToArrowType() *arrowv1.ArrowType

type StreamMessageType added in v1.2.140

type StreamMessageType interface {
	ToArrowType() *arrowv1.ArrowType
	// contains filtered or unexported methods
}

type StreamResolver added in v1.2.112

type StreamResolver struct {
	Name  string
	Owner string
	Doc   string

	// must match up to a UI-defined Data Source
	StreamSourceName string
	// "kafka" or "kinesis" or "pubsub"
	StreamSourceType string

	OutputFeatureSet string
	OutputFeatures   map[string]expr.Expr
	// defines the structure of message
	// should be a mapping from JSON key name to value type ("int" or "float" or "bool" or "str")
	MessageType StreamMessageType
	// if provided, only run on this machine type (defaults to all)
	MachineType string
	// if provided, only run in these environments (defaults to all)
	RunInEnvironments []string
	// if provided, evaluate this expression against incoming messages to either parse or filter
	Parse expr.Expr
}

StreamResolver represents a Native Streaming Resolver see https://docs.chalk.ai/docs/native-streaming

func (StreamResolver) ToProto added in v1.2.112

func (sr StreamResolver) ToProto() (*graphv1.StreamResolver, error)

type StreamStructMessage added in v1.2.140

type StreamStructMessage struct {
	Fields map[string]string
}

func (StreamStructMessage) ToArrowType added in v1.2.140

func (t StreamStructMessage) ToArrowType() *arrowv1.ArrowType

type WindowedFeatureBuilder

type WindowedFeatureBuilder struct {
	Default      expr.Expr
	Expression   expr.Expr
	MaxStaleness time.Duration

	Materialization MaterializationOptions
	// contains filtered or unexported fields
}

func Windowed

func Windowed(ofType FeatureBuilder, windows ...time.Duration) *WindowedFeatureBuilder

Windowed defines a new windowed feature For example

FeatureSet{name: "User"}.
 With("transactions", DataFrame("Transactions"))).
 With("txn_count", Windowed(graph.Int, graph.Hours(1), graph.Days(1), graph.Weeks(1)).
  WithExpr(expr.DataFrame("transactions").
   Filter(expr.Col("ts").Gt(expr.ChalkWindow())).
   Filter(expr.Col("ts").Lt(expr.ChalkNow())).
   Agg("count")))

is equivalent to the following chalkpy

@features
class User:
  transactions: DataFrame[Transactions]
  txn_count: int = windowed("1h", "1d", "1w", expression=_.transactions[_.ts > _.chalk_window, _.ts < _.chalk_now].count())

func (*WindowedFeatureBuilder) WithBucketDuration

func (w *WindowedFeatureBuilder) WithBucketDuration(duration time.Duration) *WindowedFeatureBuilder

func (*WindowedFeatureBuilder) WithDefault

func (w *WindowedFeatureBuilder) WithDefault(expression expr.Expr) *WindowedFeatureBuilder

func (*WindowedFeatureBuilder) WithDurationForWindows

func (w *WindowedFeatureBuilder) WithDurationForWindows(duration time.Duration, windows ...time.Duration) *WindowedFeatureBuilder

func (*WindowedFeatureBuilder) WithExpr

func (w *WindowedFeatureBuilder) WithExpr(expression expr.Expr) *WindowedFeatureBuilder

func (*WindowedFeatureBuilder) WithMaterialization

NOTE: this will override all previous materialization settings (WithBucketDuration,WithDurationForWindows)

func (*WindowedFeatureBuilder) WithMaxStaleness added in v1.2.109

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL