edge/pkg/storage/sqlite/document_store.go

349 lines
7.1 KiB
Go

package sqlite
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
filterSQL "forge.cadoles.com/arcad/edge/pkg/storage/filter/sql"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
_ "modernc.org/sqlite"
)
type DocumentStore struct {
db *sql.DB
path string
openOnce sync.Once
mutex sync.RWMutex
}
// Delete implements storage.DocumentStore
func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
DELETE FROM documents
WHERE collection = $1 AND id = $2
`
_, err := tx.ExecContext(ctx, query, collection, string(id))
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.DocumentStore
func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
var document storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
SELECT id, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND id = $2
`
row := tx.QueryRowContext(ctx, query, collection, string(id))
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrDocumentNotFound)
}
return errors.WithStack(err)
}
document = storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return document, nil
}
// Query implements storage.DocumentStore
func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
var documents []storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
criteria := "1 = 1"
args := make([]any, 0)
var err error
if filter != nil {
criteria, args, err = filterSQL.ToSQL(
filter.Root(),
filterSQL.WithPreparedParameter("$", 2),
filterSQL.WithTransform(transformOperator),
filterSQL.WithKeyTransform(func(key string) string {
return fmt.Sprintf("json_extract(data, '$.%s')", key)
}),
)
if err != nil {
return errors.WithStack(err)
}
}
query := `
SELECT id, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND (` + criteria + `)
`
args = append([]interface{}{collection}, args...)
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer rows.Close()
documents = make([]storage.Document, 0)
for rows.Next() {
var (
id storage.DocumentID
createdAt time.Time
updatedAt time.Time
data JSONMap
)
if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
document := storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
documents = append(documents, document)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return documents, nil
}
// Upsert implements storage.DocumentStore
func (s *DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
var upsertedDocument storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO documents (id, collection, data, created_at, updated_at)
VALUES($1, $2, $3, $4, $4)
ON CONFLICT (id, collection) DO UPDATE SET
data = $3, updated_at = $4
RETURNING "id", "data", "created_at", "updated_at"
`
now := time.Now().UTC()
id, exists := document.ID()
if !exists || id == "" {
id = storage.NewDocumentID()
}
delete(document, storage.DocumentAttrID)
delete(document, storage.DocumentAttrCreatedAt)
delete(document, storage.DocumentAttrUpdatedAt)
args := []any{id, collection, JSONMap(document), now, now}
row := tx.QueryRowContext(ctx, query, args...)
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
if err != nil {
return errors.WithStack(err)
}
upsertedDocument = storage.Document(data)
upsertedDocument[storage.DocumentAttrID] = id
upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt
upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return upsertedDocument, nil
}
func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
var db *sql.DB
db, err := s.getDatabase(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := withTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
func (s *DocumentStore) getDatabase(ctx context.Context) (*sql.DB, error) {
s.mutex.RLock()
if s.db != nil {
defer s.mutex.RUnlock()
var err error
s.openOnce.Do(func() {
if err = s.ensureTables(ctx, s.db); err != nil {
err = errors.WithStack(err)
return
}
})
if err != nil {
return nil, errors.WithStack(err)
}
return s.db, nil
}
s.mutex.RUnlock()
var (
db *sql.DB
err error
)
s.openOnce.Do(func() {
db, err = sql.Open("sqlite", s.path)
if err != nil {
err = errors.WithStack(err)
return
}
if err = s.ensureTables(ctx, db); err != nil {
err = errors.WithStack(err)
return
}
})
if err != nil {
return nil, errors.WithStack(err)
}
if db != nil {
s.mutex.Lock()
s.db = db
s.mutex.Unlock()
}
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.db, nil
}
func (s *DocumentStore) ensureTables(ctx context.Context, db *sql.DB) error {
err := withTx(ctx, db, func(tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(id, collection) ON CONFLICT REPLACE
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
query = `
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func NewDocumentStore(path string) *DocumentStore {
return &DocumentStore{
db: nil,
path: path,
openOnce: sync.Once{},
}
}
func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore {
return &DocumentStore{
db: db,
path: "",
openOnce: sync.Once{},
}
}
var _ storage.DocumentStore = &DocumentStore{}