Documentation
¶
Index ¶
- Variables
- func EnsureStateRepository(ctx context.Context, conn Connection) error
- func MakeMigrator(conn Connection, namespace string, steps migration.Steps[Connection]) migration.Migrator[Connection]
- type Connection
- type Locker
- type LockerFactory
- type MetaAccessor
- type Queue
- func (q Queue[Entity, JSONDTO]) Migrate(ctx context.Context) error
- func (q Queue[Entity, JSONDTO]) Publish(ctx context.Context, vs ...Entity) error
- func (q Queue[Entity, JSONDTO]) Purge(ctx context.Context) error
- func (q Queue[Entity, JSONDTO]) Subscribe(ctx context.Context) pubsub.Subscription[Entity]
- type QueueMapper
- type Repository
- func (r Repository[ENT, ID]) BeginTx(ctx context.Context) (context.Context, error)
- func (r Repository[ENT, ID]) CommitTx(ctx context.Context) error
- func (r Repository[ENT, ID]) Create(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) DeleteAll(ctx context.Context) (rErr error)
- func (r Repository[ENT, ID]) DeleteByID(ctx context.Context, id ID) (rErr error)
- func (r Repository[ENT, ID]) FindAll(ctx context.Context) iterkit.ErrSeq[ENT]
- func (r Repository[ENT, ID]) FindByID(ctx context.Context, id ID) (ENT, bool, error)
- func (r Repository[ENT, ID]) FindByIDs(ctx context.Context, ids ...ID) iterkit.SeqE[ENT]
- func (r Repository[ENT, ID]) RollbackTx(ctx context.Context) error
- func (r Repository[ENT, ID]) Save(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) Update(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) Upsert(ctx context.Context, ptrs ...*ENT) (rErr error)deprecated
- type T
- type TaskerSchedulerLocks
- type TaskerSchedulerStateRepository
- func (r TaskerSchedulerStateRepository) Create(ctx context.Context, ptr *tasker.ScheduleState) error
- func (r TaskerSchedulerStateRepository) DeleteByID(ctx context.Context, id tasker.ScheduleID) error
- func (r TaskerSchedulerStateRepository) FindByID(ctx context.Context, id tasker.ScheduleID) (ent tasker.ScheduleState, found bool, err error)
- func (r TaskerSchedulerStateRepository) Migrate(ctx context.Context) error
- func (r TaskerSchedulerStateRepository) Update(ctx context.Context, ptr *tasker.ScheduleState) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ContextTxOptions contextkit.ValueHandler[ctxKeyTxOptions, pgx.TxOptions]
Functions ¶
func EnsureStateRepository ¶
func EnsureStateRepository(ctx context.Context, conn Connection) error
func MakeMigrator ¶
func MakeMigrator(conn Connection, namespace string, steps migration.Steps[Connection]) migration.Migrator[Connection]
Types ¶
type Connection ¶
type Connection struct {
flsql.ConnectionAdapter[pgxpool.Pool, pgx.Tx]
}
func Connect ¶
func Connect(dsn string) (Connection, error)
Example ¶
package main
import (
"context"
"go.llib.dev/frameless/adapter/postgresql"
)
func main() {
c, err := postgresql.Connect(`dsn`)
if err != nil {
panic(err)
}
defer c.Close()
_, err = c.ExecContext(context.Background(), `SELECT VERSION()`)
if err != nil {
panic(err)
}
}
type Locker ¶
type Locker struct {
Name string
Connection Connection
}
Locker is a PG-based shared mutex implementation. It depends on the existence of the frameless_locker_locks table. Locker is safe to call from different application instances, ensuring that only one of them can hold the lock concurrently.
Example ¶
package main
import (
"context"
"os"
"go.llib.dev/frameless/adapter/postgresql"
)
func main() {
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
panic(err)
}
l := postgresql.Locker{
Name: "my-lock",
Connection: cm,
}
ctx, err := l.Lock(context.Background())
if err != nil {
panic(err)
}
if err := l.Unlock(ctx); err != nil {
panic(err)
}
}
type LockerFactory ¶
type LockerFactory[Key comparable] struct{ Connection Connection }
Example ¶
package main
import (
"context"
"log"
"os"
"go.llib.dev/frameless/adapter/postgresql"
)
func main() {
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
lockerFactory := postgresql.LockerFactory[string]{Connection: cm}
if err := lockerFactory.Migrate(context.Background()); err != nil {
log.Fatal(err)
}
locker := lockerFactory.LockerFor("hello world")
ctx, err := locker.Lock(context.Background())
if err != nil {
log.Fatal(err)
}
if err := locker.Unlock(ctx); err != nil {
log.Fatal(err)
}
}
func (LockerFactory[Key]) LockerFor ¶
func (lf LockerFactory[Key]) LockerFor(key Key) guard.Locker
func (LockerFactory[Key]) NonBlockingLockerFor ¶
func (lf LockerFactory[Key]) NonBlockingLockerFor(key Key) guard.NonBlockingLocker
type Queue ¶
type Queue[Entity, JSONDTO any] struct { Name string Connection Connection Mapping dtokit.MapperTo[Entity, JSONDTO] // EmptyQueueBreakTime is the time.Duration that the queue waits when the queue is empty for the given queue Name. EmptyQueueBreakTime time.Duration // Blocking flag will cause the Queue.Publish method to wait until the message is processed. Blocking bool // LIFO flag will set the queue to use a Last in First out ordering LIFO bool }
Example ¶
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
panic(err)
}
defer cm.Close()
q := postgresql.Queue[Entity, EntityDTO]{
Name: "queue_name",
Connection: cm,
Mapping: EntityJSONMapping{},
}
ctx := context.Background()
ent := Entity{Foo: "foo"}
err = q.Publish(ctx, ent)
if err != nil {
panic(err)
}
for msg, err := range q.Subscribe(ctx) {
if err != nil {
break
}
fmt.Println(msg.Data())
_ = msg.ACK()
}
type QueueMapper ¶
type Repository ¶
type Repository[ENT, ID any] struct { Connection Connection Mapping flsql.Mapping[ENT, ID] }
Repository is a frameless external resource supplier to store a certain entity type. The Repository supplier itself is a stateless entity.
SRP: DBA
Example ¶
package main
import (
"context"
"os"
"go.llib.dev/frameless/adapter/postgresql"
"go.llib.dev/frameless/pkg/flsql"
)
func main() {
type Entity struct {
ID int `ext:"ID"`
Value string
}
mapping := flsql.Mapping[Entity, int]{
TableName: "entities",
QueryID: func(id int) (flsql.QueryArgs, error) {
return flsql.QueryArgs{"id": id}, nil
},
ToArgs: func(e Entity) (flsql.QueryArgs, error) {
return flsql.QueryArgs{
`id`: e.ID,
`value`: e.Value,
}, nil
},
ToQuery: func(ctx context.Context) ([]flsql.ColumnName, flsql.MapScan[Entity]) {
return []flsql.ColumnName{"id", "value"},
func(v *Entity, s flsql.Scanner) error {
return s.Scan(&v.ID, &v.Value)
}
},
ID: func(e *Entity) *int {
return &e.ID
},
}
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
panic(err)
}
defer cm.Close()
repo := postgresql.Repository[Entity, int]{
Connection: cm,
Mapping: mapping,
}
_ = repo
}
func NewMigrationStateRepository ¶
func NewMigrationStateRepository(conn Connection) Repository[migration.State, migration.StateID]
func (Repository[ENT, ID]) CommitTx ¶
func (r Repository[ENT, ID]) CommitTx(ctx context.Context) error
func (Repository[ENT, ID]) Create ¶
func (r Repository[ENT, ID]) Create(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) DeleteAll ¶
func (r Repository[ENT, ID]) DeleteAll(ctx context.Context) (rErr error)
func (Repository[ENT, ID]) DeleteByID ¶
func (r Repository[ENT, ID]) DeleteByID(ctx context.Context, id ID) (rErr error)
func (Repository[ENT, ID]) FindAll ¶
func (r Repository[ENT, ID]) FindAll(ctx context.Context) iterkit.ErrSeq[ENT]
func (Repository[ENT, ID]) FindByID ¶
func (r Repository[ENT, ID]) FindByID(ctx context.Context, id ID) (ENT, bool, error)
func (Repository[ENT, ID]) FindByIDs ¶
func (r Repository[ENT, ID]) FindByIDs(ctx context.Context, ids ...ID) iterkit.SeqE[ENT]
func (Repository[ENT, ID]) RollbackTx ¶
func (r Repository[ENT, ID]) RollbackTx(ctx context.Context) error
func (Repository[ENT, ID]) Save ¶
func (r Repository[ENT, ID]) Save(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) Update ¶
func (r Repository[ENT, ID]) Update(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) Upsert
deprecated
func (r Repository[ENT, ID]) Upsert(ctx context.Context, ptrs ...*ENT) (rErr error)
Upsert
Deprecated: use Repository.Save instead
type TaskerSchedulerLocks ¶
type TaskerSchedulerLocks struct{ Connection Connection }
func (TaskerSchedulerLocks) LockerFor ¶
func (lf TaskerSchedulerLocks) LockerFor(id tasker.ScheduleID) guard.Locker
func (TaskerSchedulerLocks) NonBlockingLockerFor ¶
func (lf TaskerSchedulerLocks) NonBlockingLockerFor(id tasker.ScheduleID) guard.NonBlockingLocker
type TaskerSchedulerStateRepository ¶
type TaskerSchedulerStateRepository struct{ Connection Connection }
Example ¶
package main
import (
"context"
"os"
"time"
"go.llib.dev/frameless/adapter/postgresql"
"go.llib.dev/frameless/pkg/tasker"
)
func main() {
c, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
panic(err.Error())
}
s := tasker.Scheduler{
Locks: postgresql.TaskerSchedulerLocks{Connection: c},
States: postgresql.TaskerSchedulerStateRepository{Connection: c},
}
maintenance := s.WithSchedule("maintenance", tasker.Monthly{Day: 1, Hour: 12, Location: time.UTC},
func(ctx context.Context) error {
// The monthly maintenance task
return nil
})
// form your main func
_ = tasker.Main(context.Background(), maintenance)
}
func (TaskerSchedulerStateRepository) Create ¶
func (r TaskerSchedulerStateRepository) Create(ctx context.Context, ptr *tasker.ScheduleState) error
func (TaskerSchedulerStateRepository) DeleteByID ¶
func (r TaskerSchedulerStateRepository) DeleteByID(ctx context.Context, id tasker.ScheduleID) error
func (TaskerSchedulerStateRepository) FindByID ¶
func (r TaskerSchedulerStateRepository) FindByID(ctx context.Context, id tasker.ScheduleID) (ent tasker.ScheduleState, found bool, err error)
func (TaskerSchedulerStateRepository) Migrate ¶
func (r TaskerSchedulerStateRepository) Migrate(ctx context.Context) error
func (TaskerSchedulerStateRepository) Update ¶
func (r TaskerSchedulerStateRepository) Update(ctx context.Context, ptr *tasker.ScheduleState) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.