package sqlite import ( "context" "database/sql" "fmt" "math" "sync" "time" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/filter" filterSQL "forge.cadoles.com/arcad/edge/pkg/storage/filter/sql" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" _ "modernc.org/sqlite" ) type DocumentStore struct { db *sql.DB path string openOnce sync.Once mutex sync.RWMutex } // Delete implements storage.DocumentStore func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` DELETE FROM documents WHERE collection = $1 AND id = $2 ` _, err := tx.ExecContext(ctx, query, collection, string(id)) if err != nil { return errors.WithStack(err) } return nil }) if err != nil { return errors.WithStack(err) } return nil } // Get implements storage.DocumentStore func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { var document storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` SELECT id, data, created_at, updated_at FROM documents WHERE collection = $1 AND id = $2 ` row := tx.QueryRowContext(ctx, query, collection, string(id)) var ( createdAt time.Time updatedAt time.Time data JSONMap ) err := row.Scan(&id, &data, &createdAt, &updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrDocumentNotFound) } return errors.WithStack(err) } document = storage.Document(data) document[storage.DocumentAttrID] = id document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt return nil }) if err != nil { return nil, errors.WithStack(err) } return document, nil } // Query implements storage.DocumentStore func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { opts := &storage.QueryOptions{} for _, fn := range funcs { fn(opts) } var documents []storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { criteria := "1 = 1" args := make([]any, 0) var err error if filter != nil { criteria, args, err = filterSQL.ToSQL( filter.Root(), filterSQL.WithPreparedParameter("$", 2), filterSQL.WithTransform(transformOperator), filterSQL.WithKeyTransform(func(key string) string { return fmt.Sprintf("json_extract(data, '$.%s')", key) }), ) if err != nil { return errors.WithStack(err) } } query := ` SELECT id, data, created_at, updated_at FROM documents WHERE collection = $1 AND (` + criteria + `) ` args = append([]interface{}{collection}, args...) if opts.OrderBy != nil { direction := storage.OrderDirectionAsc if opts.OrderDirection != nil { direction = *opts.OrderDirection } query, args = withOrderByClause(query, args, *opts.OrderBy, direction) } if opts.Offset != nil || opts.Limit != nil { offset := 0 if opts.Offset != nil { offset = *opts.Offset } limit := math.MaxInt if opts.Limit != nil { limit = *opts.Limit } query, args = withLimitOffsetClause(query, args, limit, offset) } logger.Debug( ctx, "executing query", logger.F("query", query), logger.F("args", args), ) rows, err := tx.QueryContext(ctx, query, args...) if err != nil { return errors.WithStack(err) } defer rows.Close() documents = make([]storage.Document, 0) for rows.Next() { var ( id storage.DocumentID createdAt time.Time updatedAt time.Time data JSONMap ) if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } document := storage.Document(data) document[storage.DocumentAttrID] = id document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt documents = append(documents, document) } if err := rows.Err(); err != nil { return errors.WithStack(err) } return nil }) if err != nil { return nil, errors.WithStack(err) } return documents, nil } // Upsert implements storage.DocumentStore func (s *DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) { var upsertedDocument storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` INSERT INTO documents (id, collection, data, created_at, updated_at) VALUES($1, $2, $3, $4, $4) ON CONFLICT (id, collection) DO UPDATE SET data = $3, updated_at = $4 RETURNING "id", "data", "created_at", "updated_at" ` now := time.Now().UTC() id, exists := document.ID() if !exists || id == "" { id = storage.NewDocumentID() } args := []any{id, collection, JSONMap(document), now, now} logger.Debug( ctx, "executing query", logger.F("query", query), logger.F("args", args), ) row := tx.QueryRowContext(ctx, query, args...) var ( createdAt time.Time updatedAt time.Time data JSONMap ) err := row.Scan(&id, &data, &createdAt, &updatedAt) if err != nil { return errors.WithStack(err) } upsertedDocument = storage.Document(data) upsertedDocument[storage.DocumentAttrID] = id upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt return nil }) if err != nil { return nil, errors.WithStack(err) } return upsertedDocument, nil } func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { var db *sql.DB db, err := s.getDatabase(ctx) if err != nil { return errors.WithStack(err) } if err := withTx(ctx, db, fn); err != nil { return errors.WithStack(err) } return nil } func (s *DocumentStore) getDatabase(ctx context.Context) (*sql.DB, error) { s.mutex.RLock() if s.db != nil { defer s.mutex.RUnlock() var err error s.openOnce.Do(func() { if err = s.ensureTables(ctx, s.db); err != nil { err = errors.WithStack(err) return } }) if err != nil { return nil, errors.WithStack(err) } return s.db, nil } s.mutex.RUnlock() var ( db *sql.DB err error ) s.openOnce.Do(func() { db, err = sql.Open("sqlite", s.path) if err != nil { err = errors.WithStack(err) return } if err = s.ensureTables(ctx, db); err != nil { err = errors.WithStack(err) return } }) if err != nil { return nil, errors.WithStack(err) } if db != nil { s.mutex.Lock() s.db = db s.mutex.Unlock() } s.mutex.RLock() defer s.mutex.RUnlock() return s.db, nil } func (s *DocumentStore) ensureTables(ctx context.Context, db *sql.DB) error { err := withTx(ctx, db, func(tx *sql.Tx) error { query := ` CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, collection TEXT NOT NULL, data TEXT, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, UNIQUE(id, collection) ON CONFLICT REPLACE ); ` if _, err := tx.ExecContext(ctx, query); err != nil { return errors.WithStack(err) } query = ` CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); ` if _, err := tx.ExecContext(ctx, query); err != nil { return errors.WithStack(err) } return nil }) if err != nil { return errors.WithStack(err) } return nil } func withOrderByClause(query string, args []any, orderBy string, orderDirection storage.OrderDirection) (string, []any) { direction := "ASC" if orderDirection == storage.OrderDirectionDesc { direction = "DESC" } var column string switch orderBy { case storage.DocumentAttrID: column = "id" case storage.DocumentAttrCreatedAt: column = "created_at" case storage.DocumentAttrUpdatedAt: column = "updated_at" default: column = fmt.Sprintf("json_extract(data, '$.' || $%d)", len(args)+1) args = append(args, orderBy) } query += fmt.Sprintf(` ORDER BY %s %s`, column, direction) return query, args } func withLimitOffsetClause(query string, args []any, limit int, offset int) (string, []any) { query += fmt.Sprintf(` LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2) args = append(args, limit, offset) return query, args } func NewDocumentStore(path string) *DocumentStore { return &DocumentStore{ db: nil, path: path, openOnce: sync.Once{}, } } func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore { return &DocumentStore{ db: db, path: "", openOnce: sync.Once{}, } } var _ storage.DocumentStore = &DocumentStore{}