William Petit
8e574c299b
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good
429 lines
9.1 KiB
Go
429 lines
9.1 KiB
Go
package sqlite
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/app"
|
|
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
|
"github.com/pkg/errors"
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
)
|
|
|
|
type ShareStore struct {
|
|
getDB GetDBFunc
|
|
}
|
|
|
|
// DeleteAttributes implements share.Repository
|
|
func (s *ShareStore) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error {
|
|
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
|
query := `
|
|
DELETE FROM resources
|
|
WHERE origin = $1 AND resource_id = $2
|
|
`
|
|
args := []any{origin, resourceID}
|
|
criteria := ""
|
|
|
|
for idx, name := range names {
|
|
if idx == 0 {
|
|
criteria += " AND ("
|
|
}
|
|
|
|
if idx != 0 {
|
|
criteria += " OR "
|
|
}
|
|
|
|
criteria += fmt.Sprintf(" name = $%d", len(args)+1)
|
|
args = append(args, name)
|
|
|
|
if idx == len(names)-1 {
|
|
criteria += " )"
|
|
}
|
|
}
|
|
|
|
query += criteria
|
|
|
|
logger.Debug(
|
|
ctx, "executing query",
|
|
logger.F("query", query),
|
|
logger.F("args", args),
|
|
)
|
|
|
|
res, err := tx.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
affected, err := res.RowsAffected()
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
if affected == 0 {
|
|
return errors.WithStack(share.ErrNotFound)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// DeleteResource implements share.Repository
|
|
func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error {
|
|
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
|
query := `
|
|
DELETE FROM resources
|
|
WHERE origin = $1 AND resource_id = $2
|
|
`
|
|
|
|
args := []any{origin, resourceID}
|
|
|
|
logger.Debug(
|
|
ctx, "executing query",
|
|
logger.F("query", query),
|
|
logger.F("args", args),
|
|
)
|
|
|
|
res, err := tx.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
affected, err := res.RowsAffected()
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
if affected == 0 {
|
|
return errors.WithStack(share.ErrNotFound)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// FindResources implements share.Repository
|
|
func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
|
|
opts := share.NewFindResourcesOptions(funcs...)
|
|
|
|
var resources []share.Resource
|
|
|
|
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
|
query := `
|
|
SELECT
|
|
main.origin, main.resource_id,
|
|
main.name, main.type, main.value,
|
|
main.created_at, main.updated_at
|
|
FROM resources AS main
|
|
JOIN resources AS sub ON
|
|
main.resource_id = sub.resource_id
|
|
AND main.origin = sub.origin
|
|
`
|
|
|
|
criteria := " WHERE 1 = 1"
|
|
preparedArgIndex := 1
|
|
args := make([]any, 0)
|
|
|
|
if opts.Name != nil {
|
|
criteria += fmt.Sprintf(" AND sub.name = $%d", preparedArgIndex)
|
|
args = append(args, *opts.Name)
|
|
preparedArgIndex++
|
|
}
|
|
|
|
if opts.ValueType != nil {
|
|
criteria += fmt.Sprintf(" AND sub.type = $%d", preparedArgIndex)
|
|
args = append(args, *opts.ValueType)
|
|
preparedArgIndex++
|
|
}
|
|
|
|
query += criteria
|
|
|
|
logger.Debug(
|
|
ctx, "executing query",
|
|
logger.F("query", query),
|
|
logger.F("args", args),
|
|
)
|
|
|
|
rows, err := tx.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := rows.Close(); err != nil {
|
|
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
|
}
|
|
}()
|
|
|
|
indexedResources := make(map[string]*share.BaseResource)
|
|
|
|
for rows.Next() {
|
|
var (
|
|
origin string
|
|
resourceID string
|
|
name string
|
|
valueType string
|
|
value any
|
|
updatedAt time.Time
|
|
createdAt time.Time
|
|
)
|
|
|
|
if err := rows.Scan(&origin, &resourceID, &name, &valueType, &value, &createdAt, &updatedAt); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
resourceKey := origin + resourceID
|
|
resource, exists := indexedResources[resourceKey]
|
|
if !exists {
|
|
resource = share.NewBaseResource(app.ID(origin), share.ResourceID(resourceID))
|
|
indexedResources[resourceKey] = resource
|
|
}
|
|
|
|
attr := share.NewBaseAttribute(
|
|
name,
|
|
share.ValueType(valueType),
|
|
value,
|
|
)
|
|
|
|
attr.SetCreatedAt(createdAt)
|
|
attr.SetUpdatedAt(updatedAt)
|
|
|
|
resource.SetAttribute(attr)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
resources = make([]share.Resource, 0, len(indexedResources))
|
|
for _, res := range indexedResources {
|
|
resources = append(resources, res)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return resources, nil
|
|
}
|
|
|
|
// GetResource implements share.Repository
|
|
func (s *ShareStore) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) {
|
|
var (
|
|
resource *share.BaseResource
|
|
err error
|
|
)
|
|
|
|
err = s.withTx(ctx, func(tx *sql.Tx) error {
|
|
resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return resource, nil
|
|
}
|
|
|
|
// UpdateAttributes implements share.Repository
|
|
func (s *ShareStore) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) {
|
|
if len(attributes) == 0 {
|
|
return nil, errors.WithStack(share.ErrAttributeRequired)
|
|
}
|
|
|
|
var resource *share.BaseResource
|
|
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
|
query := `
|
|
INSERT INTO resources (origin, resource_id, name, type, value, created_at, updated_at)
|
|
VALUES($1, $2, $3, $4, $5, $6, $6)
|
|
ON CONFLICT (origin, resource_id, name) DO UPDATE SET
|
|
type = $4, value = $5, updated_at = $6
|
|
`
|
|
|
|
stmt, err := tx.PrepareContext(ctx, query)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := stmt.Close(); err != nil {
|
|
logger.Error(ctx, "could not close statement", logger.E(errors.WithStack(err)))
|
|
}
|
|
}()
|
|
|
|
now := time.Now().UTC()
|
|
|
|
for _, attr := range attributes {
|
|
args := []any{
|
|
string(origin), string(resourceID),
|
|
attr.Name(), string(attr.Type()), attr.Value(),
|
|
now, now,
|
|
}
|
|
|
|
logger.Debug(
|
|
ctx, "executing query",
|
|
logger.F("query", query),
|
|
logger.F("args", args),
|
|
)
|
|
|
|
if _, err := stmt.ExecContext(ctx, args...); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return resource, nil
|
|
}
|
|
|
|
func (s *ShareStore) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin app.ID, resourceID share.ResourceID) (*share.BaseResource, error) {
|
|
query := `
|
|
SELECT name, type, value, created_at, updated_at
|
|
FROM resources
|
|
WHERE origin = $1 AND resource_id = $2
|
|
`
|
|
|
|
rows, err := tx.QueryContext(ctx, query, origin, resourceID)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := rows.Close(); err != nil {
|
|
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
|
}
|
|
}()
|
|
|
|
attributes := make([]share.Attribute, 0)
|
|
|
|
for rows.Next() {
|
|
var (
|
|
name string
|
|
valueType string
|
|
value any
|
|
updatedAt time.Time
|
|
createdAt time.Time
|
|
)
|
|
|
|
if err := rows.Scan(&name, &valueType, &value, &createdAt, &updatedAt); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
attr := share.NewBaseAttribute(
|
|
name,
|
|
share.ValueType(valueType),
|
|
value,
|
|
)
|
|
|
|
attr.SetCreatedAt(createdAt)
|
|
attr.SetUpdatedAt(updatedAt)
|
|
|
|
attributes = append(attributes, attr)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
if len(attributes) == 0 {
|
|
return nil, errors.WithStack(share.ErrNotFound)
|
|
}
|
|
|
|
resource := share.NewBaseResource(origin, resourceID, attributes...)
|
|
|
|
return resource, nil
|
|
}
|
|
|
|
func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
|
|
var db *sql.DB
|
|
|
|
db, err := s.getDB(ctx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
if err := WithTx(ctx, db, fn); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ensureShareTables(ctx context.Context, db *sql.DB) error {
|
|
err := WithTx(ctx, db, func(tx *sql.Tx) error {
|
|
query := `
|
|
CREATE TABLE IF NOT EXISTS resources (
|
|
resource_id TEXT NOT NULL,
|
|
origin TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
type TEXT NOT NULL,
|
|
value TEXT,
|
|
created_at TIMESTAMP NOT NULL,
|
|
updated_at TIMESTAMP NOT NULL,
|
|
UNIQUE(origin, resource_id, name) ON CONFLICT REPLACE
|
|
);
|
|
`
|
|
if _, err := tx.ExecContext(ctx, query); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
query = `
|
|
CREATE INDEX IF NOT EXISTS resource_idx ON resources (origin, resource_id, name);
|
|
`
|
|
if _, err := tx.ExecContext(ctx, query); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewShareStore(path string) *ShareStore {
|
|
getDB := NewGetDBFunc(path, ensureShareTables)
|
|
|
|
return &ShareStore{
|
|
getDB: getDB,
|
|
}
|
|
}
|
|
|
|
func NewShareStoreWithDB(db *sql.DB) *ShareStore {
|
|
getDB := NewGetDBFuncFromDB(db, ensureShareTables)
|
|
|
|
return &ShareStore{
|
|
getDB: getDB,
|
|
}
|
|
}
|
|
|
|
var _ share.Store = &ShareStore{}
|