feat: initial commit
This commit is contained in:
24
internal/datastore/agent.go
Normal file
24
internal/datastore/agent.go
Normal file
@ -0,0 +1,24 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type AgentID int64
|
||||
|
||||
type AgentStatus int
|
||||
|
||||
const (
|
||||
AgentStatusPending AgentStatus = 0
|
||||
AgentStatusAccepted AgentStatus = 1
|
||||
AgentStatusRejected AgentStatus = 2
|
||||
AgentStatusForgotten AgentStatus = 3
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
ID AgentID
|
||||
RemoteID string
|
||||
Status AgentStatus
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
67
internal/datastore/agent_repository.go
Normal file
67
internal/datastore/agent_repository.go
Normal file
@ -0,0 +1,67 @@
|
||||
package datastore
|
||||
|
||||
import "context"
|
||||
|
||||
type AgentRepository interface {
|
||||
Create(ctx context.Context, remoteID string, state AgentStatus) (*Agent, error)
|
||||
Get(ctx context.Context, id AgentID) (*Agent, error)
|
||||
Update(ctx context.Context, id AgentID, updates ...AgentUpdateOptionFunc) (*Agent, error)
|
||||
Query(ctx context.Context, opts ...AgentQueryOptionFunc) ([]*Agent, int, error)
|
||||
Delete(ctx context.Context, id AgentID) error
|
||||
|
||||
UpdateSpec(ctx context.Context, id AgentID, name string, revision int, data map[string]any) (*Spec, error)
|
||||
GetSpecs(ctx context.Context, id AgentID) ([]*Spec, error)
|
||||
DeleteSpec(ctx context.Context, id AgentID, name string) error
|
||||
}
|
||||
|
||||
type AgentQueryOptionFunc func(*AgentQueryOptions)
|
||||
|
||||
type AgentQueryOptions struct {
|
||||
Limit *int
|
||||
Offset *int
|
||||
RemoteIDs []string
|
||||
IDs []AgentID
|
||||
Statuses []AgentStatus
|
||||
}
|
||||
|
||||
func WithAgentQueryLimit(limit int) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.Limit = &limit
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryOffset(offset int) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.Offset = &offset
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryRemoteID(remoteIDs ...string) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.RemoteIDs = remoteIDs
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryID(ids ...AgentID) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.IDs = ids
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryStatus(statuses ...AgentStatus) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.Statuses = statuses
|
||||
}
|
||||
}
|
||||
|
||||
type AgentUpdateOptionFunc func(*AgentUpdateOptions)
|
||||
|
||||
type AgentUpdateOptions struct {
|
||||
Status *AgentStatus
|
||||
}
|
||||
|
||||
func WithAgentUpdateStatus(status AgentStatus) AgentUpdateOptionFunc {
|
||||
return func(opts *AgentUpdateOptions) {
|
||||
opts.Status = &status
|
||||
}
|
||||
}
|
9
internal/datastore/error.go
Normal file
9
internal/datastore/error.go
Normal file
@ -0,0 +1,9 @@
|
||||
package datastore
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrNotFound = errors.New("not found")
|
||||
ErrAlreadyExist = errors.New("already exist")
|
||||
ErrUnexpectedRevision = errors.New("unexpected revision")
|
||||
)
|
35
internal/datastore/spec.go
Normal file
35
internal/datastore/spec.go
Normal file
@ -0,0 +1,35 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type SpecID int64
|
||||
|
||||
type Spec struct {
|
||||
ID SpecID
|
||||
Name string
|
||||
Data map[string]any
|
||||
Revision int
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
func (s *Spec) SpecName() spec.Name {
|
||||
return spec.Name(s.Name)
|
||||
}
|
||||
|
||||
func (s *Spec) SpecRevision() int {
|
||||
return s.Revision
|
||||
}
|
||||
|
||||
func (s *Spec) SpecData() any {
|
||||
return s.Data
|
||||
}
|
||||
|
||||
func (s *Spec) SpecValid() (bool, error) {
|
||||
return false, errors.WithStack(spec.ErrSchemaUnknown)
|
||||
}
|
393
internal/datastore/sqlite/agent_repository.go
Normal file
393
internal/datastore/sqlite/agent_repository.go
Normal file
@ -0,0 +1,393 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type AgentRepository struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// DeleteSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
||||
query := `DELETE FROM specs WHERE agent_id = $1 AND name = $2`
|
||||
|
||||
_, err := r.db.ExecContext(ctx, query, agentID, name)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSpecs implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) {
|
||||
specs := make([]*datastore.Spec, 0)
|
||||
|
||||
query := `
|
||||
SELECT id, name, revision, data, created_at, updated_at
|
||||
FROM specs
|
||||
WHERE agent_id = $1
|
||||
`
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, agentID)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
spec := &datastore.Spec{}
|
||||
|
||||
data := JSONMap{}
|
||||
|
||||
if err := rows.Scan(&spec.ID, &spec.Name, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
spec.Data = data
|
||||
|
||||
specs = append(specs, spec)
|
||||
}
|
||||
|
||||
return specs, nil
|
||||
}
|
||||
|
||||
// UpdateSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||
spec := &datastore.Spec{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
query := `
|
||||
INSERT INTO specs (agent_id, name, revision, data, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $5, $5)
|
||||
ON CONFLICT (agent_id, name) DO UPDATE SET
|
||||
data = $4, updated_at = $5, revision = specs.revision + 1
|
||||
WHERE revision = $3
|
||||
RETURNING "id", "name", "revision", "data", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
args := []any{agentID, name, revision, JSONMap(data), now}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
data := JSONMap{}
|
||||
|
||||
err := row.Scan(&spec.ID, &spec.Name, &spec.Revision, &data, &spec.CreatedAt, &spec.UpdatedAt)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(datastore.ErrUnexpectedRevision)
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
spec.Data = data
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return spec, nil
|
||||
}
|
||||
|
||||
// Query implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQueryOptionFunc) ([]*datastore.Agent, int, error) {
|
||||
options := &datastore.AgentQueryOptions{}
|
||||
for _, fn := range opts {
|
||||
fn(options)
|
||||
}
|
||||
|
||||
agents := make([]*datastore.Agent, 0)
|
||||
count := 0
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, remote_id, status, created_at, updated_at FROM agents`
|
||||
|
||||
limit := 10
|
||||
if options.Limit != nil {
|
||||
limit = *options.Limit
|
||||
}
|
||||
|
||||
offset := 0
|
||||
if options.Offset != nil {
|
||||
offset = *options.Offset
|
||||
}
|
||||
|
||||
filters := ""
|
||||
paramIndex := 3
|
||||
args := []any{offset, limit}
|
||||
|
||||
if options.IDs != nil && len(options.IDs) > 0 {
|
||||
filters += "id in ("
|
||||
|
||||
filter, newArgs, newParamIndex := inFilter("id", paramIndex, options.RemoteIDs)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if options.RemoteIDs != nil && len(options.RemoteIDs) > 0 {
|
||||
if filters != "" {
|
||||
filters += " AND "
|
||||
}
|
||||
|
||||
filter, newArgs, newParamIndex := inFilter("remote_id", paramIndex, options.RemoteIDs)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if options.Statuses != nil && len(options.Statuses) > 0 {
|
||||
if filters != "" {
|
||||
filters += " AND "
|
||||
}
|
||||
|
||||
filter, newArgs, _ := inFilter("status", paramIndex, options.Statuses)
|
||||
filters += filter
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if filters != "" {
|
||||
filters = ` WHERE ` + filters
|
||||
}
|
||||
|
||||
query += filters + ` LIMIT $2 OFFSET $1`
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
if err := rows.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agents = append(agents, agent)
|
||||
}
|
||||
|
||||
row := tx.QueryRowContext(ctx, `SELECT count(id) FROM agents `+filters, args...)
|
||||
if err := row.Scan(&count); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return agents, count, nil
|
||||
}
|
||||
|
||||
// Create implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Create(ctx context.Context, remoteID string, status datastore.AgentStatus) (*datastore.Agent, error) {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT count(id) FROM agents WHERE remote_id = $1`
|
||||
row := tx.QueryRowContext(ctx, query, remoteID)
|
||||
|
||||
var count int
|
||||
|
||||
if err := row.Scan(&count); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
return errors.WithStack(datastore.ErrAlreadyExist)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
query = `
|
||||
INSERT INTO agents (remote_id, status, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $3)
|
||||
RETURNING "id", "remote_id", "status", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
row = tx.QueryRowContext(
|
||||
ctx, query,
|
||||
remoteID, status, now,
|
||||
)
|
||||
|
||||
err := row.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return agent, nil
|
||||
}
|
||||
|
||||
// Delete implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error {
|
||||
query := `DELETE FROM agents WHERE id = $1`
|
||||
|
||||
_, err := r.db.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datastore.Agent, error) {
|
||||
agent := &datastore.Agent{
|
||||
ID: id,
|
||||
}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "remote_id", "status", "created_at", "updated_at"
|
||||
FROM agents
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
row := r.db.QueryRowContext(ctx, query, id)
|
||||
|
||||
if err := row.Scan(&agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return datastore.ErrNotFound
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return agent, nil
|
||||
}
|
||||
|
||||
// Update implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts ...datastore.AgentUpdateOptionFunc) (*datastore.Agent, error) {
|
||||
options := &datastore.AgentUpdateOptions{}
|
||||
for _, fn := range opts {
|
||||
fn(options)
|
||||
}
|
||||
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
UPDATE agents SET updated_at = $2
|
||||
`
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
args := []any{
|
||||
id, now,
|
||||
}
|
||||
index := 3
|
||||
|
||||
if options.Status != nil {
|
||||
query += fmt.Sprintf(`, status = $%d`, index)
|
||||
|
||||
args = append(args, *options.Status)
|
||||
|
||||
index++
|
||||
}
|
||||
|
||||
query += `
|
||||
WHERE id = $1
|
||||
RETURNING "id","remote_id","status","updated_at","created_at"
|
||||
`
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
if err := row.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return agent, nil
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := r.db.Begin()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
if errors.Is(err, sql.ErrTxDone) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Error(ctx, "could not rollback transaction", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewAgentRepository(db *sql.DB) *AgentRepository {
|
||||
return &AgentRepository{db}
|
||||
}
|
||||
|
||||
var _ datastore.AgentRepository = &AgentRepository{}
|
||||
|
||||
func inFilter[T any](column string, paramIndex int, items []T) (string, []any, int) {
|
||||
args := make([]any, 0, len(items))
|
||||
filter := fmt.Sprintf("%s in (", column)
|
||||
|
||||
for idx, item := range items {
|
||||
if idx != 0 {
|
||||
filter += ","
|
||||
}
|
||||
|
||||
filter += fmt.Sprintf("$%d", paramIndex)
|
||||
paramIndex++
|
||||
|
||||
args = append(args, item)
|
||||
}
|
||||
|
||||
filter += ")"
|
||||
|
||||
return filter, args, paramIndex
|
||||
}
|
42
internal/datastore/sqlite/json.go
Normal file
42
internal/datastore/sqlite/json.go
Normal file
@ -0,0 +1,42 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type JSONMap map[string]any
|
||||
|
||||
func (j *JSONMap) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var data []byte
|
||||
|
||||
switch typ := value.(type) {
|
||||
case []byte:
|
||||
data = typ
|
||||
case string:
|
||||
data = []byte(typ)
|
||||
default:
|
||||
return errors.Errorf("unexpected type '%T'", value)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &j); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j JSONMap) Value() (driver.Value, error) {
|
||||
data, err := json.Marshal(j)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
Reference in New Issue
Block a user