edge/pkg/storage/driver/sqlite/document_store.go

381 lines
8.5 KiB
Go

package sqlite
import (
"context"
"database/sql"
"fmt"
"math"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
filterSQL "forge.cadoles.com/arcad/edge/pkg/storage/filter/sql"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
_ "embed"
_ "modernc.org/sqlite"
)
//go:embed document_store.sql
var documentStoreSchema string
type DocumentStore struct {
getDB GetDBFunc
}
// Delete implements storage.DocumentStore
func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
DELETE FROM documents
WHERE collection = $1 AND id = $2
`
_, err := tx.ExecContext(ctx, query, collection, string(id))
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.DocumentStore
func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
var document storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
SELECT id, revision, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND id = $2
`
row := tx.QueryRowContext(ctx, query, collection, string(id))
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
revision int
)
err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrDocumentNotFound)
}
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
document = storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrRevision] = revision
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return document, nil
}
// Query implements storage.DocumentStore
func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
opts := &storage.QueryOptions{}
for _, fn := range funcs {
fn(opts)
}
var documents []storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
criteria := "1 = 1"
args := make([]any, 0)
var err error
if filter != nil {
criteria, args, err = filterSQL.ToSQL(
filter.Root(),
filterSQL.WithPreparedParameter("$", 2),
filterSQL.WithTransform(transformOperator),
filterSQL.WithKeyTransform(func(key string) string {
return fmt.Sprintf("json_extract(data, '$.%s')", key)
}),
)
if err != nil {
return errors.WithStack(err)
}
}
query := `
SELECT id, revision, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND (` + criteria + `)
`
args = append([]interface{}{collection}, args...)
if opts.OrderBy != nil {
direction := storage.OrderDirectionAsc
if opts.OrderDirection != nil {
direction = *opts.OrderDirection
}
query, args = withOrderByClause(query, args, *opts.OrderBy, direction)
}
if opts.Offset != nil || opts.Limit != nil {
offset := 0
if opts.Offset != nil {
offset = *opts.Offset
}
limit := math.MaxInt
if opts.Limit != nil {
limit = *opts.Limit
}
query, args = withLimitOffsetClause(query, args, limit, offset)
}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err)))
}
}()
documents = make([]storage.Document, 0)
for rows.Next() {
var (
id storage.DocumentID
revision int
createdAt time.Time
updatedAt time.Time
data JSONMap
)
if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
document := storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrRevision] = revision
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
documents = append(documents, document)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return documents, nil
}
// Upsert implements storage.DocumentStore
func (s *DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
var upsertedDocument storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
id, exists := document.ID()
if !exists || id == "" {
id = storage.NewDocumentID()
}
query := `
SELECT revision FROM documents WHERE id = $1
`
args := []any{id}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
row := tx.QueryRowContext(ctx, query, args...)
var storedRevision int
if err := row.Scan(&storedRevision); err != nil && !errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(err)
}
revision, found := document.Revision()
if found && storedRevision != revision {
return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match stored '%d'", revision, storedRevision)
}
query = `
INSERT INTO documents (id, collection, data, created_at, updated_at)
VALUES($1, $2, $3, $4, $4)
ON CONFLICT (id, collection) DO UPDATE SET
data = $3, updated_at = $4, revision = revision + 1
RETURNING "id", "revision", "data", "created_at", "updated_at"
`
now := time.Now().UTC()
args = []any{id, collection, JSONMap(document), now, now}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
row = tx.QueryRowContext(ctx, query, args...)
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
upsertedDocument = storage.Document(data)
upsertedDocument[storage.DocumentAttrID] = id
upsertedDocument[storage.DocumentAttrRevision] = revision
upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt
upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return upsertedDocument, nil
}
func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
var db *sql.DB
db, err := s.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
func migrateSchema(ctx context.Context, db *sql.DB) error {
err := WithTx(ctx, db, func(tx *sql.Tx) error {
for _, migr := range documentStoreMigrations {
if err := migr(ctx, tx); err != nil {
return errors.WithStack(err)
}
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func withOrderByClause(query string, args []any, orderBy string, orderDirection storage.OrderDirection) (string, []any) {
direction := "ASC"
if orderDirection == storage.OrderDirectionDesc {
direction = "DESC"
}
var column string
switch orderBy {
case storage.DocumentAttrID:
column = "id"
case storage.DocumentAttrCreatedAt:
column = "created_at"
case storage.DocumentAttrUpdatedAt:
column = "updated_at"
default:
column = fmt.Sprintf("json_extract(data, '$.' || $%d)", len(args)+1)
args = append(args, orderBy)
}
query += fmt.Sprintf(` ORDER BY %s %s`, column, direction)
return query, args
}
func withLimitOffsetClause(query string, args []any, limit int, offset int) (string, []any) {
query += fmt.Sprintf(` LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2)
args = append(args, limit, offset)
return query, args
}
func NewDocumentStore(path string) *DocumentStore {
getDB := NewGetDBFunc(path, migrateSchema)
return &DocumentStore{
getDB: getDB,
}
}
func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore {
getDB := NewGetDBFuncFromDB(db, migrateSchema)
return &DocumentStore{
getDB: getDB,
}
}
var _ storage.DocumentStore = &DocumentStore{}