package sqlite import ( "context" "database/sql" "fmt" "math" "time" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/filter" filterSQL "forge.cadoles.com/arcad/edge/pkg/storage/filter/sql" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" _ "embed" _ "modernc.org/sqlite" ) //go:embed document_store.sql var documentStoreSchema string type DocumentStore struct { getDB GetDBFunc } // Delete implements storage.DocumentStore func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` DELETE FROM documents WHERE collection = $1 AND id = $2 ` _, err := tx.ExecContext(ctx, query, collection, string(id)) if err != nil { return errors.WithStack(err) } return nil }) if err != nil { return errors.WithStack(err) } return nil } // Get implements storage.DocumentStore func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { var document storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND id = $2 ` row := tx.QueryRowContext(ctx, query, collection, string(id)) var ( createdAt time.Time updatedAt time.Time data JSONMap revision int ) err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrDocumentNotFound) } return errors.WithStack(err) } if err := row.Err(); err != nil { return errors.WithStack(err) } document = storage.Document(data) document[storage.DocumentAttrID] = id document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt return nil }) if err != nil { return nil, errors.WithStack(err) } return document, nil } // Query implements storage.DocumentStore func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { opts := &storage.QueryOptions{} for _, fn := range funcs { fn(opts) } var documents []storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { criteria := "1 = 1" args := make([]any, 0) var err error if filter != nil { criteria, args, err = filterSQL.ToSQL( filter.Root(), filterSQL.WithPreparedParameter("$", 2), filterSQL.WithTransform(transformOperator), filterSQL.WithKeyTransform(func(key string) string { return fmt.Sprintf("json_extract(data, '$.%s')", key) }), ) if err != nil { return errors.WithStack(err) } } query := ` SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND (` + criteria + `) ` args = append([]interface{}{collection}, args...) if opts.OrderBy != nil { direction := storage.OrderDirectionAsc if opts.OrderDirection != nil { direction = *opts.OrderDirection } query, args = withOrderByClause(query, args, *opts.OrderBy, direction) } if opts.Offset != nil || opts.Limit != nil { offset := 0 if opts.Offset != nil { offset = *opts.Offset } limit := math.MaxInt if opts.Limit != nil { limit = *opts.Limit } query, args = withLimitOffsetClause(query, args, limit, offset) } logger.Debug( ctx, "executing query", logger.F("query", query), logger.F("args", args), ) rows, err := tx.QueryContext(ctx, query, args...) if err != nil { return errors.WithStack(err) } defer func() { if err := rows.Close(); err != nil { logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err))) } }() documents = make([]storage.Document, 0) for rows.Next() { var ( id storage.DocumentID revision int createdAt time.Time updatedAt time.Time data JSONMap ) if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } document := storage.Document(data) document[storage.DocumentAttrID] = id document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt documents = append(documents, document) } if err := rows.Err(); err != nil { return errors.WithStack(err) } return nil }) if err != nil { return nil, errors.WithStack(err) } return documents, nil } // Upsert implements storage.DocumentStore func (s *DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) { var upsertedDocument storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { id, exists := document.ID() if !exists || id == "" { id = storage.NewDocumentID() } query := ` SELECT revision FROM documents WHERE id = $1 ` args := []any{id} logger.Debug( ctx, "executing query", logger.F("query", query), logger.F("args", args), ) row := tx.QueryRowContext(ctx, query, args...) var storedRevision int if err := row.Scan(&storedRevision); err != nil && !errors.Is(err, sql.ErrNoRows) { return errors.WithStack(err) } revision, found := document.Revision() if found && storedRevision != revision { return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match stored '%d'", revision, storedRevision) } query = ` INSERT INTO documents (id, collection, data, created_at, updated_at) VALUES($1, $2, $3, $4, $4) ON CONFLICT (id, collection) DO UPDATE SET data = $3, updated_at = $4, revision = revision + 1 RETURNING "id", "revision", "data", "created_at", "updated_at" ` now := time.Now().UTC() args = []any{id, collection, JSONMap(document), now, now} logger.Debug( ctx, "executing query", logger.F("query", query), logger.F("args", args), ) row = tx.QueryRowContext(ctx, query, args...) var ( createdAt time.Time updatedAt time.Time data JSONMap ) if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } if err := row.Err(); err != nil { return errors.WithStack(err) } upsertedDocument = storage.Document(data) upsertedDocument[storage.DocumentAttrID] = id upsertedDocument[storage.DocumentAttrRevision] = revision upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt return nil }) if err != nil { return nil, errors.WithStack(err) } return upsertedDocument, nil } func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { var db *sql.DB db, err := s.getDB(ctx) if err != nil { return errors.WithStack(err) } if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } return nil } func migrateSchema(ctx context.Context, db *sql.DB) error { err := WithTx(ctx, db, func(tx *sql.Tx) error { for _, migr := range documentStoreMigrations { if err := migr(ctx, tx); err != nil { return errors.WithStack(err) } } return nil }) if err != nil { return errors.WithStack(err) } return nil } func withOrderByClause(query string, args []any, orderBy string, orderDirection storage.OrderDirection) (string, []any) { direction := "ASC" if orderDirection == storage.OrderDirectionDesc { direction = "DESC" } var column string switch orderBy { case storage.DocumentAttrID: column = "id" case storage.DocumentAttrCreatedAt: column = "created_at" case storage.DocumentAttrUpdatedAt: column = "updated_at" default: column = fmt.Sprintf("json_extract(data, '$.' || $%d)", len(args)+1) args = append(args, orderBy) } query += fmt.Sprintf(` ORDER BY %s %s`, column, direction) return query, args } func withLimitOffsetClause(query string, args []any, limit int, offset int) (string, []any) { query += fmt.Sprintf(` LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2) args = append(args, limit, offset) return query, args } func NewDocumentStore(path string) *DocumentStore { getDB := NewGetDBFunc(path, migrateSchema) return &DocumentStore{ getDB: getDB, } } func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore { getDB := NewGetDBFuncFromDB(db, migrateSchema) return &DocumentStore{ getDB: getDB, } } var _ storage.DocumentStore = &DocumentStore{}