feat(module,share): cross-app resource sharing module
All checks were successful
arcad/edge/pipeline/head This commit looks good
All checks were successful
arcad/edge/pipeline/head This commit looks good
This commit is contained in:
13
pkg/module/share/sqlite/module_test.go
Normal file
13
pkg/module/share/sqlite/module_test.go
Normal file
@ -0,0 +1,13 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/share/testsuite"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestModule(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
testsuite.TestModule(t, newTestRepo)
|
||||
}
|
429
pkg/module/share/sqlite/repository.go
Normal file
429
pkg/module/share/sqlite/repository.go
Normal file
@ -0,0 +1,429 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/share"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type Repository struct {
|
||||
getDB sqlite.GetDBFunc
|
||||
}
|
||||
|
||||
// DeleteAttributes implements share.Repository
|
||||
func (r *Repository) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
DELETE FROM resources
|
||||
WHERE origin = $1 AND resource_id = $2
|
||||
`
|
||||
args := []any{origin, resourceID}
|
||||
criteria := ""
|
||||
|
||||
for idx, name := range names {
|
||||
if idx == 0 {
|
||||
criteria += " AND ("
|
||||
}
|
||||
|
||||
if idx != 0 {
|
||||
criteria += " OR "
|
||||
}
|
||||
|
||||
criteria += fmt.Sprintf(" name = $%d", len(args)+1)
|
||||
args = append(args, name)
|
||||
|
||||
if idx == len(names)-1 {
|
||||
criteria += " )"
|
||||
}
|
||||
}
|
||||
|
||||
query += criteria
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
logger.F("query", query),
|
||||
logger.F("args", args),
|
||||
)
|
||||
|
||||
res, err := tx.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
affected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if affected == 0 {
|
||||
return errors.WithStack(share.ErrNotFound)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteResource implements share.Repository
|
||||
func (r *Repository) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
DELETE FROM resources
|
||||
WHERE origin = $1 AND resource_id = $2
|
||||
`
|
||||
|
||||
args := []any{origin, resourceID}
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
logger.F("query", query),
|
||||
logger.F("args", args),
|
||||
)
|
||||
|
||||
res, err := tx.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
affected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if affected == 0 {
|
||||
return errors.WithStack(share.ErrNotFound)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// FindResources implements share.Repository
|
||||
func (r *Repository) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
|
||||
opts := share.FillFindResourcesOptions(funcs...)
|
||||
|
||||
var resources []share.Resource
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT
|
||||
main.origin, main.resource_id,
|
||||
main.name, main.type, main.value,
|
||||
main.created_at, main.updated_at
|
||||
FROM resources AS main
|
||||
JOIN resources AS sub ON
|
||||
main.resource_id = sub.resource_id
|
||||
AND main.origin = sub.origin
|
||||
`
|
||||
|
||||
criteria := " WHERE 1 = 1"
|
||||
preparedArgIndex := 1
|
||||
args := make([]any, 0)
|
||||
|
||||
if opts.Name != nil {
|
||||
criteria += fmt.Sprintf(" AND sub.name = $%d", preparedArgIndex)
|
||||
args = append(args, *opts.Name)
|
||||
preparedArgIndex++
|
||||
}
|
||||
|
||||
if opts.ValueType != nil {
|
||||
criteria += fmt.Sprintf(" AND sub.type = $%d", preparedArgIndex)
|
||||
args = append(args, *opts.ValueType)
|
||||
preparedArgIndex++
|
||||
}
|
||||
|
||||
query += criteria
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
logger.F("query", query),
|
||||
logger.F("args", args),
|
||||
)
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
indexedResources := make(map[string]*share.BaseResource)
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
origin string
|
||||
resourceID string
|
||||
name string
|
||||
valueType string
|
||||
value any
|
||||
updatedAt time.Time
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&origin, &resourceID, &name, &valueType, &value, &createdAt, &updatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
resourceKey := origin + resourceID
|
||||
resource, exists := indexedResources[resourceKey]
|
||||
if !exists {
|
||||
resource = share.NewBaseResource(app.ID(origin), share.ResourceID(resourceID))
|
||||
indexedResources[resourceKey] = resource
|
||||
}
|
||||
|
||||
attr := share.NewBaseAttribute(
|
||||
name,
|
||||
share.ValueType(valueType),
|
||||
value,
|
||||
)
|
||||
|
||||
attr.SetCreatedAt(createdAt)
|
||||
attr.SetUpdatedAt(updatedAt)
|
||||
|
||||
resource.SetAttribute(attr)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
resources = make([]share.Resource, 0, len(indexedResources))
|
||||
for _, res := range indexedResources {
|
||||
resources = append(resources, res)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// GetResource implements share.Repository
|
||||
func (r *Repository) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) {
|
||||
var (
|
||||
resource *share.BaseResource
|
||||
err error
|
||||
)
|
||||
|
||||
err = r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
resource, err = r.getResourceWithinTx(ctx, tx, origin, resourceID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return resource, nil
|
||||
}
|
||||
|
||||
// UpdateAttributes implements share.Repository
|
||||
func (r *Repository) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) {
|
||||
if len(attributes) == 0 {
|
||||
return nil, errors.WithStack(share.ErrAttributeRequired)
|
||||
}
|
||||
|
||||
var resource *share.BaseResource
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
INSERT INTO resources (origin, resource_id, name, type, value, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $5, $6, $6)
|
||||
ON CONFLICT (origin, resource_id, name) DO UPDATE SET
|
||||
type = $4, value = $5, updated_at = $6
|
||||
`
|
||||
|
||||
stmt, err := tx.PrepareContext(ctx, query)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := stmt.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close statement", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
for _, attr := range attributes {
|
||||
args := []any{
|
||||
string(origin), string(resourceID),
|
||||
attr.Name(), string(attr.Type()), attr.Value(),
|
||||
now, now,
|
||||
}
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
logger.F("query", query),
|
||||
logger.F("args", args),
|
||||
)
|
||||
|
||||
if _, err := stmt.ExecContext(ctx, args...); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
resource, err = r.getResourceWithinTx(ctx, tx, origin, resourceID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return resource, nil
|
||||
}
|
||||
|
||||
func (r *Repository) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin app.ID, resourceID share.ResourceID) (*share.BaseResource, error) {
|
||||
query := `
|
||||
SELECT name, type, value, created_at, updated_at
|
||||
FROM resources
|
||||
WHERE origin = $1 AND resource_id = $2
|
||||
`
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, origin, resourceID)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
attributes := make([]share.Attribute, 0)
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
name string
|
||||
valueType string
|
||||
value any
|
||||
updatedAt time.Time
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&name, &valueType, &value, &createdAt, &updatedAt); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
attr := share.NewBaseAttribute(
|
||||
name,
|
||||
share.ValueType(valueType),
|
||||
value,
|
||||
)
|
||||
|
||||
attr.SetCreatedAt(createdAt)
|
||||
attr.SetUpdatedAt(updatedAt)
|
||||
|
||||
attributes = append(attributes, attr)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if len(attributes) == 0 {
|
||||
return nil, errors.WithStack(share.ErrNotFound)
|
||||
}
|
||||
|
||||
resource := share.NewBaseResource(origin, resourceID, attributes...)
|
||||
|
||||
return resource, nil
|
||||
}
|
||||
|
||||
func (r *Repository) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
|
||||
var db *sql.DB
|
||||
|
||||
db, err := r.getDB(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := sqlite.WithTx(ctx, db, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ensureTables(ctx context.Context, db *sql.DB) error {
|
||||
err := sqlite.WithTx(ctx, db, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
CREATE TABLE IF NOT EXISTS resources (
|
||||
resource_id TEXT NOT NULL,
|
||||
origin TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
value TEXT,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
UNIQUE(origin, resource_id, name) ON CONFLICT REPLACE
|
||||
);
|
||||
`
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query = `
|
||||
CREATE INDEX IF NOT EXISTS resource_idx ON resources (origin, resource_id, name);
|
||||
`
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRepository(path string) *Repository {
|
||||
getDB := sqlite.NewGetDBFunc(path, ensureTables)
|
||||
|
||||
return &Repository{
|
||||
getDB: getDB,
|
||||
}
|
||||
}
|
||||
|
||||
func NewRepositoryWithDB(db *sql.DB) *Repository {
|
||||
getDB := sqlite.NewGetDBFuncFromDB(db, ensureTables)
|
||||
|
||||
return &Repository{
|
||||
getDB: getDB,
|
||||
}
|
||||
}
|
||||
|
||||
var _ share.Repository = &Repository{}
|
33
pkg/module/share/sqlite/repository_test.go
Normal file
33
pkg/module/share/sqlite/repository_test.go
Normal file
@ -0,0 +1,33 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/share"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/share/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestRepository(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
testsuite.TestRepository(t, newTestRepo)
|
||||
}
|
||||
|
||||
func newTestRepo(testName string) (share.Repository, error) {
|
||||
filename := strings.ToLower(strings.ReplaceAll(testName, " ", "_"))
|
||||
file := fmt.Sprintf("./testdata/%s.sqlite", filename)
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
repo := NewRepository(dsn)
|
||||
|
||||
return repo, nil
|
||||
}
|
1
pkg/module/share/sqlite/testdata/.gitignore
vendored
Normal file
1
pkg/module/share/sqlite/testdata/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.sqlite*
|
Reference in New Issue
Block a user