feat: add spec definition api with versioning
This commit is contained in:
@ -18,9 +18,9 @@ type AgentRepository interface {
|
||||
Query(ctx context.Context, opts ...AgentQueryOptionFunc) ([]*Agent, int, error)
|
||||
Delete(ctx context.Context, id AgentID) error
|
||||
|
||||
UpdateSpec(ctx context.Context, id AgentID, name string, revision int, data map[string]any) (*Spec, error)
|
||||
UpdateSpec(ctx context.Context, id AgentID, name string, version string, revision int, data map[string]any) (*Spec, error)
|
||||
GetSpecs(ctx context.Context, id AgentID) ([]*Spec, error)
|
||||
DeleteSpec(ctx context.Context, id AgentID, name string) error
|
||||
DeleteSpec(ctx context.Context, id AgentID, name string, version string) error
|
||||
}
|
||||
|
||||
type AgentQueryOptionFunc func(*AgentQueryOptions)
|
||||
|
166
internal/datastore/memory/spec_definition_repository.go
Normal file
166
internal/datastore/memory/spec_definition_repository.go
Normal file
@ -0,0 +1,166 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type specDefRecord struct {
|
||||
Schema []byte
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
type SpecDefinitionRepository struct {
|
||||
definitions map[string]map[string]specDefRecord
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// Delete implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Delete(ctx context.Context, name string, version string) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
versions, exists := r.definitions[name]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(versions, version)
|
||||
|
||||
r.definitions[name] = versions
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Get(ctx context.Context, name string, version string) (*datastore.SpecDefinition, error) {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
|
||||
versions, exists := r.definitions[name]
|
||||
if !exists {
|
||||
return nil, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
rec, exists := versions[version]
|
||||
if !exists {
|
||||
return nil, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
specDef := datastore.SpecDefinition{
|
||||
SpecDefinitionHeader: datastore.SpecDefinitionHeader{
|
||||
Name: name,
|
||||
Version: version,
|
||||
CreatedAt: rec.CreatedAt,
|
||||
UpdatedAt: rec.UpdatedAt,
|
||||
},
|
||||
Schema: rec.Schema[:],
|
||||
}
|
||||
|
||||
return &specDef, nil
|
||||
}
|
||||
|
||||
// Query implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Query(ctx context.Context, opts ...datastore.SpecDefinitionQueryOptionFunc) ([]datastore.SpecDefinitionHeader, int, error) {
|
||||
options := &datastore.SpecDefinitionQueryOptions{}
|
||||
for _, fn := range opts {
|
||||
fn(options)
|
||||
}
|
||||
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
|
||||
specDefs := make([]datastore.SpecDefinitionHeader, 0)
|
||||
count := 0
|
||||
|
||||
for name, versions := range r.definitions {
|
||||
for version, rec := range versions {
|
||||
count++
|
||||
|
||||
matches := true
|
||||
|
||||
if options.Names != nil && !slices.Contains(options.Names, name) {
|
||||
matches = false
|
||||
}
|
||||
|
||||
if options.Versions != nil && !slices.Contains(options.Versions, version) {
|
||||
matches = false
|
||||
}
|
||||
|
||||
if options.Offset != nil && count < *options.Offset {
|
||||
matches = false
|
||||
}
|
||||
|
||||
if options.Limit != nil && len(specDefs) >= *options.Limit {
|
||||
matches = false
|
||||
}
|
||||
|
||||
if !matches {
|
||||
continue
|
||||
}
|
||||
|
||||
specDefs = append(specDefs, datastore.SpecDefinitionHeader{
|
||||
Name: name,
|
||||
Version: version,
|
||||
CreatedAt: rec.CreatedAt,
|
||||
UpdatedAt: rec.UpdatedAt,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return specDefs, count, nil
|
||||
}
|
||||
|
||||
// Upsert implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Upsert(ctx context.Context, name string, version string, schema []byte) (*datastore.SpecDefinition, error) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
versions, exists := r.definitions[name]
|
||||
if !exists {
|
||||
versions = make(map[string]specDefRecord)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
rec, exists := versions[version]
|
||||
if !exists {
|
||||
rec = specDefRecord{
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
Schema: schema[:],
|
||||
}
|
||||
} else {
|
||||
rec.UpdatedAt = now
|
||||
rec.Schema = schema
|
||||
}
|
||||
|
||||
versions[version] = rec
|
||||
r.definitions[name] = versions
|
||||
|
||||
specDef := datastore.SpecDefinition{
|
||||
SpecDefinitionHeader: datastore.SpecDefinitionHeader{
|
||||
Name: name,
|
||||
Version: version,
|
||||
CreatedAt: rec.CreatedAt,
|
||||
UpdatedAt: rec.UpdatedAt,
|
||||
},
|
||||
Schema: rec.Schema[:],
|
||||
}
|
||||
|
||||
return &specDef, nil
|
||||
}
|
||||
|
||||
func NewSpecDefinitionRepository() *SpecDefinitionRepository {
|
||||
return &SpecDefinitionRepository{
|
||||
definitions: make(map[string]map[string]specDefRecord),
|
||||
}
|
||||
}
|
||||
|
||||
var _ datastore.SpecDefinitionRepository = &SpecDefinitionRepository{}
|
14
internal/datastore/memory/spec_definition_repository_test.go
Normal file
14
internal/datastore/memory/spec_definition_repository_test.go
Normal file
@ -0,0 +1,14 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore/testsuite"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestMemorySpecDefinitionRepository(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
repo := NewSpecDefinitionRepository()
|
||||
testsuite.TestSpecDefinitionRepository(t, repo)
|
||||
}
|
@ -2,25 +2,28 @@ package datastore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec"
|
||||
)
|
||||
|
||||
type SpecID int64
|
||||
|
||||
type Spec struct {
|
||||
ID SpecID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Data map[string]any `json:"data"`
|
||||
Revision int `json:"revision"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
TenantID TenantID `json:"tenantId"`
|
||||
AgentID AgentID `json:"agentId"`
|
||||
ID SpecID `json:"id"`
|
||||
DefinitionName string `json:"name"`
|
||||
DefinitionVersion string `json:"version"`
|
||||
Data map[string]any `json:"data"`
|
||||
Revision int `json:"revision"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
TenantID TenantID `json:"tenantId"`
|
||||
AgentID AgentID `json:"agentId"`
|
||||
}
|
||||
|
||||
func (s *Spec) SpecName() spec.Name {
|
||||
return spec.Name(s.Name)
|
||||
func (s *Spec) SpecDefinitionName() string {
|
||||
return s.DefinitionName
|
||||
}
|
||||
|
||||
func (s *Spec) SpecDefinitionVersion() string {
|
||||
return s.DefinitionVersion
|
||||
}
|
||||
|
||||
func (s *Spec) SpecRevision() int {
|
||||
|
18
internal/datastore/spec_definition.go
Normal file
18
internal/datastore/spec_definition.go
Normal file
@ -0,0 +1,18 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type SpecDefinitionHeader struct {
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
}
|
||||
|
||||
type SpecDefinition struct {
|
||||
SpecDefinitionHeader
|
||||
|
||||
Schema []byte `json:"schema"`
|
||||
}
|
46
internal/datastore/spec_definition_repository.go
Normal file
46
internal/datastore/spec_definition_repository.go
Normal file
@ -0,0 +1,46 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type SpecDefinitionRepository interface {
|
||||
Upsert(ctx context.Context, name string, version string, schema []byte) (*SpecDefinition, error)
|
||||
Get(ctx context.Context, name string, version string) (*SpecDefinition, error)
|
||||
Delete(ctx context.Context, name string, version string) error
|
||||
|
||||
Query(ctx context.Context, opts ...SpecDefinitionQueryOptionFunc) ([]SpecDefinitionHeader, int, error)
|
||||
}
|
||||
|
||||
type SpecDefinitionQueryOptionFunc func(*SpecDefinitionQueryOptions)
|
||||
|
||||
type SpecDefinitionQueryOptions struct {
|
||||
Limit *int
|
||||
Offset *int
|
||||
Names []string
|
||||
Versions []string
|
||||
}
|
||||
|
||||
func WithSpecDefinitionQueryLimit(limit int) SpecDefinitionQueryOptionFunc {
|
||||
return func(opts *SpecDefinitionQueryOptions) {
|
||||
opts.Limit = &limit
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpecDefinitionQueryOffset(offset int) SpecDefinitionQueryOptionFunc {
|
||||
return func(opts *SpecDefinitionQueryOptions) {
|
||||
opts.Offset = &offset
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpecDefinitionQueryNames(names ...string) SpecDefinitionQueryOptionFunc {
|
||||
return func(opts *SpecDefinitionQueryOptions) {
|
||||
opts.Names = names
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpecDefinitionQueryVersions(versions ...string) SpecDefinitionQueryOptionFunc {
|
||||
return func(opts *SpecDefinitionQueryOptions) {
|
||||
opts.Versions = versions
|
||||
}
|
||||
}
|
@ -128,7 +128,7 @@ func (r *AgentRepository) Detach(ctx context.Context, agentID datastore.AgentID)
|
||||
}
|
||||
|
||||
// DeleteSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string, version string) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
@ -139,9 +139,9 @@ func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.Agen
|
||||
return errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
query := `DELETE FROM specs WHERE agent_id = $1 AND name = $2`
|
||||
query := `DELETE FROM specs WHERE agent_id = $1 AND name = $2 AND version = $3`
|
||||
|
||||
if _, err = tx.ExecContext(ctx, query, agentID, name); err != nil {
|
||||
if _, err = tx.ExecContext(ctx, query, agentID, name, version); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -169,7 +169,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT id, name, revision, data, created_at, updated_at
|
||||
SELECT id, name, version, revision, data, created_at, updated_at, agent_id, tenant_id
|
||||
FROM specs
|
||||
WHERE agent_id = $1
|
||||
`
|
||||
@ -191,10 +191,14 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
||||
|
||||
data := JSONMap{}
|
||||
|
||||
if err := rows.Scan(&spec.ID, &spec.Name, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt); err != nil {
|
||||
var tenantID sql.NullString
|
||||
if err := rows.Scan(&spec.ID, &spec.DefinitionName, &spec.DefinitionVersion, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt, &spec.AgentID, &tenantID); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if tenantID.Valid {
|
||||
spec.TenantID = datastore.TenantID(tenantID.String)
|
||||
}
|
||||
spec.Data = data
|
||||
|
||||
specs = append(specs, spec)
|
||||
@ -214,7 +218,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
||||
}
|
||||
|
||||
// UpdateSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, version string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||
spec := &datastore.Spec{}
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
@ -230,23 +234,24 @@ func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.Agen
|
||||
now := time.Now().UTC()
|
||||
|
||||
query := `
|
||||
INSERT INTO specs (agent_id, name, revision, data, created_at, updated_at, tenant_id)
|
||||
VALUES($1, $2, $3, $4, $5, $5, ( SELECT tenant_id FROM agents WHERE id = $1 ))
|
||||
ON CONFLICT (agent_id, name) DO UPDATE SET
|
||||
data = $4, updated_at = $5, revision = specs.revision + 1
|
||||
WHERE revision = $3
|
||||
RETURNING "id", "name", "revision", "data", "created_at", "updated_at"
|
||||
INSERT INTO specs (agent_id, name, version, revision, data, created_at, updated_at, tenant_id)
|
||||
VALUES($1, $2, $3, $4, $5, $6, $6, ( SELECT tenant_id FROM agents WHERE id = $1 ))
|
||||
ON CONFLICT (agent_id, name, version) DO UPDATE SET
|
||||
data = $5, updated_at = $6, revision = specs.revision + 1, tenant_id = ( SELECT tenant_id FROM agents WHERE id = $1 )
|
||||
WHERE revision = $4
|
||||
RETURNING "id", "name", "version", "revision", "data", "created_at", "updated_at", "tenant_id", "agent_id"
|
||||
`
|
||||
|
||||
args := []any{agentID, name, revision, JSONMap(data), now}
|
||||
args := []any{agentID, name, version, revision, JSONMap(data), now}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
data := JSONMap{}
|
||||
var tenantID sql.NullString
|
||||
|
||||
err = row.Scan(&spec.ID, &spec.Name, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt)
|
||||
err = row.Scan(&spec.ID, &spec.DefinitionName, &spec.DefinitionVersion, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt, &tenantID, &spec.AgentID)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(datastore.ErrUnexpectedRevision)
|
||||
@ -255,6 +260,10 @@ func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.Agen
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if tenantID.Valid {
|
||||
spec.TenantID = datastore.TenantID(tenantID.String)
|
||||
}
|
||||
|
||||
spec.Data = data
|
||||
|
||||
return nil
|
||||
@ -301,6 +310,10 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
}
|
||||
|
||||
if options.TenantIDs != nil && len(options.TenantIDs) > 0 {
|
||||
if filters != "" {
|
||||
filters += " AND "
|
||||
}
|
||||
|
||||
filter, newArgs, newParamIndex := inFilter("tenant_id", paramIndex, options.TenantIDs)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
|
@ -7,6 +7,42 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type JSON struct {
|
||||
value any
|
||||
}
|
||||
|
||||
func (j JSON) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var data []byte
|
||||
|
||||
switch typ := value.(type) {
|
||||
case []byte:
|
||||
data = typ
|
||||
case string:
|
||||
data = []byte(typ)
|
||||
default:
|
||||
return errors.Errorf("unexpected type '%T'", value)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &j.value); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j JSON) Value() (driver.Value, error) {
|
||||
data, err := json.Marshal(j.value)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
type JSONMap map[string]any
|
||||
|
||||
func (j *JSONMap) Scan(value interface{}) error {
|
||||
|
219
internal/datastore/sqlite/spec_definition_repository.go
Normal file
219
internal/datastore/sqlite/spec_definition_repository.go
Normal file
@ -0,0 +1,219 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type SpecDefinitionRepository struct {
|
||||
repository
|
||||
}
|
||||
|
||||
// Delete implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Delete(ctx context.Context, name string, version string) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
if exists, err := r.specDefinitionExists(ctx, tx, name, version); !exists {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query := `DELETE FROM spec_definitions WHERE name = $1 AND version = $2`
|
||||
_, err := tx.ExecContext(ctx, query, name, version)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Get(ctx context.Context, name string, version string) (*datastore.SpecDefinition, error) {
|
||||
var specDef datastore.SpecDefinition
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "name", "version", "schema", "created_at", "updated_at"
|
||||
FROM spec_definitions
|
||||
WHERE name = $1 AND version = $2
|
||||
`
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, name, version)
|
||||
|
||||
if err := row.Scan(&specDef.Name, &specDef.Version, &specDef.Schema, &specDef.CreatedAt, &specDef.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &specDef, nil
|
||||
}
|
||||
|
||||
// Query implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Query(ctx context.Context, opts ...datastore.SpecDefinitionQueryOptionFunc) ([]datastore.SpecDefinitionHeader, int, error) {
|
||||
options := &datastore.SpecDefinitionQueryOptions{}
|
||||
for _, fn := range opts {
|
||||
fn(options)
|
||||
}
|
||||
|
||||
specDefs := make([]datastore.SpecDefinitionHeader, 0)
|
||||
count := 0
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT name, version, created_at, updated_at FROM spec_definitions`
|
||||
|
||||
limit := 10
|
||||
if options.Limit != nil {
|
||||
limit = *options.Limit
|
||||
}
|
||||
|
||||
offset := 0
|
||||
if options.Offset != nil {
|
||||
offset = *options.Offset
|
||||
}
|
||||
|
||||
filters := ""
|
||||
paramIndex := 3
|
||||
args := []any{offset, limit}
|
||||
|
||||
if options.Names != nil && len(options.Names) > 0 {
|
||||
filter, newArgs, newParamIndex := inFilter("name", paramIndex, options.Names)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if options.Versions != nil && len(options.Versions) > 0 {
|
||||
if filters != "" {
|
||||
filters += " AND "
|
||||
}
|
||||
|
||||
filter, newArgs, _ := inFilter("version", paramIndex, options.Versions)
|
||||
filters += filter
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if filters != "" {
|
||||
filters = ` WHERE ` + filters
|
||||
}
|
||||
|
||||
query += filters + ` LIMIT $2 OFFSET $1`
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
err = errors.WithStack(err)
|
||||
logger.Error(ctx, "could not close rows", logger.CapturedE(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for rows.Next() {
|
||||
sdh := datastore.SpecDefinitionHeader{}
|
||||
|
||||
if err := rows.Scan(&sdh.Name, &sdh.Version, &sdh.CreatedAt, &sdh.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
specDefs = append(specDefs, sdh)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
row := tx.QueryRowContext(ctx, `SELECT count(*) FROM spec_definitions `+filters, args...)
|
||||
if err := row.Scan(&count); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return specDefs, count, nil
|
||||
}
|
||||
|
||||
// Upsert implements datastore.SpecDefinitionRepository.
|
||||
func (r *SpecDefinitionRepository) Upsert(ctx context.Context, name string, version string, schema []byte) (*datastore.SpecDefinition, error) {
|
||||
var specDef datastore.SpecDefinition
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
query := `
|
||||
INSERT INTO spec_definitions (name, version, schema, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $4)
|
||||
ON CONFLICT(name, version) DO UPDATE SET schema = $3, updated_at = $4
|
||||
RETURNING "name", "version", "schema", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
row := tx.QueryRowContext(
|
||||
ctx, query,
|
||||
name, version, schema, now, now,
|
||||
)
|
||||
|
||||
if err := row.Scan(&specDef.Name, &specDef.Version, &specDef.Schema, &specDef.CreatedAt, &specDef.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &specDef, nil
|
||||
}
|
||||
|
||||
func (r *SpecDefinitionRepository) specDefinitionExists(ctx context.Context, tx *sql.Tx, name string, version string) (bool, error) {
|
||||
row := tx.QueryRowContext(ctx, `SELECT count(id) FROM spec_definitions WHERE name = $1 AND version = $2`, name, version)
|
||||
|
||||
var count int
|
||||
|
||||
if err := row.Scan(&count); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
return false, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func NewSpecDefinitionRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *SpecDefinitionRepository {
|
||||
return &SpecDefinitionRepository{
|
||||
repository: repository{db, sqliteBusyRetryMaxAttempts},
|
||||
}
|
||||
}
|
||||
|
||||
var _ datastore.SpecDefinitionRepository = &SpecDefinitionRepository{}
|
46
internal/datastore/sqlite/spec_definition_repository_test.go
Normal file
46
internal/datastore/sqlite/spec_definition_repository_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore/testsuite"
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/migrate"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func TestSQLiteSpecDefinitionRepository(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
file := "testdata/spec_definition_repository_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
|
||||
migr, err := migrate.New("../../../migrations", "sqlite", "sqlite://"+dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if err := migr.Up(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
repo := NewSpecDefinitionRepository(db, 5)
|
||||
|
||||
testsuite.TestSpecDefinitionRepository(t, repo)
|
||||
}
|
@ -15,7 +15,7 @@ import (
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func TestSQLiteTeantRepository(t *testing.T) {
|
||||
func TestSQLiteTenantRepository(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
file := "testdata/tenant_repository_test.sqlite"
|
||||
|
@ -50,7 +50,7 @@ var agentRepositoryTestCases = []agentRepositoryTestCase{
|
||||
var unexistantAgentID datastore.AgentID = 9999
|
||||
var specData map[string]any
|
||||
|
||||
agent, err := repo.UpdateSpec(ctx, unexistantAgentID, string(spec.Name), 0, specData)
|
||||
agent, err := repo.UpdateSpec(ctx, unexistantAgentID, spec.Name, spec.Version, 0, specData)
|
||||
if err == nil {
|
||||
return errors.New("error should not be nil")
|
||||
}
|
||||
@ -71,7 +71,7 @@ var agentRepositoryTestCases = []agentRepositoryTestCase{
|
||||
Run: func(ctx context.Context, repo datastore.AgentRepository) error {
|
||||
var unexistantAgentID datastore.AgentID = 9999
|
||||
|
||||
err := repo.DeleteSpec(ctx, unexistantAgentID, string(spec.Name))
|
||||
err := repo.DeleteSpec(ctx, unexistantAgentID, spec.Name, spec.Version)
|
||||
if err == nil {
|
||||
return errors.New("error should not be nil")
|
||||
}
|
||||
|
14
internal/datastore/testsuite/spec_definition_repository.go
Normal file
14
internal/datastore/testsuite/spec_definition_repository.go
Normal file
@ -0,0 +1,14 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
)
|
||||
|
||||
func TestSpecDefinitionRepository(t *testing.T, repo datastore.SpecDefinitionRepository) {
|
||||
t.Run("Cases", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
runSpecDefinitionRepositoryTests(t, repo)
|
||||
})
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type specDefinitionRepositoryTestCase struct {
|
||||
Name string
|
||||
Skip bool
|
||||
Run func(ctx context.Context, repo datastore.SpecDefinitionRepository) error
|
||||
}
|
||||
|
||||
var specDefinitionRepositoryTestCases = []specDefinitionRepositoryTestCase{
|
||||
{
|
||||
Name: "Create a spec definition",
|
||||
Run: func(ctx context.Context, repo datastore.SpecDefinitionRepository) error {
|
||||
schema := []byte("{}")
|
||||
name := "net.example.foo"
|
||||
version := "0.0.0"
|
||||
|
||||
specDef, err := repo.Upsert(ctx, name, version, schema)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if specDef.CreatedAt.IsZero() {
|
||||
return errors.Errorf("specDef.CreatedAt should not be zero time")
|
||||
}
|
||||
|
||||
if specDef.UpdatedAt.IsZero() {
|
||||
return errors.Errorf("specDef.UpdatedAt should not be zero time")
|
||||
}
|
||||
|
||||
if e, g := name, specDef.Name; e != g {
|
||||
return errors.Errorf("specDef.Name: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := version, specDef.Version; e != g {
|
||||
return errors.Errorf("specDef.Name: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := schema, specDef.Schema; !reflect.DeepEqual(e, g) {
|
||||
return errors.Errorf("specDef.Schema: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func runSpecDefinitionRepositoryTests(t *testing.T, repo datastore.SpecDefinitionRepository) {
|
||||
for _, tc := range specDefinitionRepositoryTestCases {
|
||||
func(tc specDefinitionRepositoryTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if tc.Skip {
|
||||
t.SkipNow()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if err := tc.Run(ctx, repo); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user