feat: agent metadata with custom collectors
This commit is contained in:
@ -1,7 +1,11 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/lestrrat-go/jwx/v2/jwk"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type AgentID int64
|
||||
@ -16,9 +20,36 @@ const (
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
ID AgentID `json:"id"`
|
||||
RemoteID string `json:"remoteId"`
|
||||
Status AgentStatus `json:"status"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
ID AgentID `json:"id"`
|
||||
Thumbprint string `json:"thumbprint"`
|
||||
KeySet *SerializableKeySet `json:"keyset,omitempty"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
Status AgentStatus `json:"status"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
}
|
||||
|
||||
type SerializableKeySet struct {
|
||||
jwk.Set
|
||||
}
|
||||
|
||||
func (s *SerializableKeySet) UnmarshalJSON(data []byte) error {
|
||||
keySet := jwk.NewSet()
|
||||
|
||||
if err := json.Unmarshal(data, &keySet); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.Set = keySet
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SerializableKeySet) MarshalJSON() ([]byte, error) {
|
||||
data, err := json.Marshal(s.Set)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
@ -1,9 +1,13 @@
|
||||
package datastore
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/lestrrat-go/jwx/v2/jwk"
|
||||
)
|
||||
|
||||
type AgentRepository interface {
|
||||
Create(ctx context.Context, remoteID string, state AgentStatus) (*Agent, error)
|
||||
Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*Agent, error)
|
||||
Get(ctx context.Context, id AgentID) (*Agent, error)
|
||||
Update(ctx context.Context, id AgentID, updates ...AgentUpdateOptionFunc) (*Agent, error)
|
||||
Query(ctx context.Context, opts ...AgentQueryOptionFunc) ([]*Agent, int, error)
|
||||
@ -17,11 +21,12 @@ type AgentRepository interface {
|
||||
type AgentQueryOptionFunc func(*AgentQueryOptions)
|
||||
|
||||
type AgentQueryOptions struct {
|
||||
Limit *int
|
||||
Offset *int
|
||||
RemoteIDs []string
|
||||
IDs []AgentID
|
||||
Statuses []AgentStatus
|
||||
Limit *int
|
||||
Offset *int
|
||||
IDs []AgentID
|
||||
Thumbprints []string
|
||||
Metadata *map[string]any
|
||||
Statuses []AgentStatus
|
||||
}
|
||||
|
||||
func WithAgentQueryLimit(limit int) AgentQueryOptionFunc {
|
||||
@ -36,9 +41,9 @@ func WithAgentQueryOffset(offset int) AgentQueryOptionFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryRemoteID(remoteIDs ...string) AgentQueryOptionFunc {
|
||||
func WithAgentQueryMetadata(metadata map[string]any) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.RemoteIDs = remoteIDs
|
||||
opts.Metadata = &metadata
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,10 +59,19 @@ func WithAgentQueryStatus(statuses ...AgentStatus) AgentQueryOptionFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentQueryThumbprints(thumbprints ...string) AgentQueryOptionFunc {
|
||||
return func(opts *AgentQueryOptions) {
|
||||
opts.Thumbprints = thumbprints
|
||||
}
|
||||
}
|
||||
|
||||
type AgentUpdateOptionFunc func(*AgentUpdateOptions)
|
||||
|
||||
type AgentUpdateOptions struct {
|
||||
Status *AgentStatus
|
||||
Status *AgentStatus
|
||||
Metadata *map[string]any
|
||||
KeySet *jwk.Set
|
||||
Thumbprint *string
|
||||
}
|
||||
|
||||
func WithAgentUpdateStatus(status AgentStatus) AgentUpdateOptionFunc {
|
||||
@ -65,3 +79,21 @@ func WithAgentUpdateStatus(status AgentStatus) AgentUpdateOptionFunc {
|
||||
opts.Status = &status
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentUpdateMetadata(metadata map[string]any) AgentUpdateOptionFunc {
|
||||
return func(opts *AgentUpdateOptions) {
|
||||
opts.Metadata = &metadata
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentUpdateKeySet(keySet jwk.Set) AgentUpdateOptionFunc {
|
||||
return func(opts *AgentUpdateOptions) {
|
||||
opts.KeySet = &keySet
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentUpdateThumbprint(thumbprint string) AgentUpdateOptionFunc {
|
||||
return func(opts *AgentUpdateOptions) {
|
||||
opts.Thumbprint = &thumbprint
|
||||
}
|
||||
}
|
||||
|
@ -3,10 +3,13 @@ package sqlite
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
|
||||
"github.com/lestrrat-go/jwx/v2/jwk"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
@ -116,7 +119,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
count := 0
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, remote_id, status, created_at, updated_at FROM agents`
|
||||
query := `SELECT id, thumbprint, status, created_at, updated_at FROM agents`
|
||||
|
||||
limit := 10
|
||||
if options.Limit != nil {
|
||||
@ -133,20 +136,18 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
args := []any{offset, limit}
|
||||
|
||||
if options.IDs != nil && len(options.IDs) > 0 {
|
||||
filters += "id in ("
|
||||
|
||||
filter, newArgs, newParamIndex := inFilter("id", paramIndex, options.RemoteIDs)
|
||||
filter, newArgs, newParamIndex := inFilter("id", paramIndex, options.IDs)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
args = append(args, newArgs...)
|
||||
}
|
||||
|
||||
if options.RemoteIDs != nil && len(options.RemoteIDs) > 0 {
|
||||
if options.Thumbprints != nil && len(options.Thumbprints) > 0 {
|
||||
if filters != "" {
|
||||
filters += " AND "
|
||||
}
|
||||
|
||||
filter, newArgs, newParamIndex := inFilter("remote_id", paramIndex, options.RemoteIDs)
|
||||
filter, newArgs, newParamIndex := inFilter("thumbprint", paramIndex, options.Thumbprints)
|
||||
filters += filter
|
||||
paramIndex = newParamIndex
|
||||
args = append(args, newArgs...)
|
||||
@ -180,10 +181,14 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
for rows.Next() {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
if err := rows.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
metadata := JSONMap{}
|
||||
|
||||
if err := rows.Scan(&agent.ID, &agent.Thumbprint, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.Metadata = metadata
|
||||
|
||||
agents = append(agents, agent)
|
||||
}
|
||||
|
||||
@ -202,12 +207,12 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
}
|
||||
|
||||
// Create implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Create(ctx context.Context, remoteID string, status datastore.AgentStatus) (*datastore.Agent, error) {
|
||||
func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT count(id) FROM agents WHERE remote_id = $1`
|
||||
row := tx.QueryRowContext(ctx, query, remoteID)
|
||||
query := `SELECT count(id) FROM agents WHERE thumbprint = $1`
|
||||
row := tx.QueryRowContext(ctx, query, thumbprint)
|
||||
|
||||
var count int
|
||||
|
||||
@ -222,21 +227,37 @@ func (r *AgentRepository) Create(ctx context.Context, remoteID string, status da
|
||||
now := time.Now().UTC()
|
||||
|
||||
query = `
|
||||
INSERT INTO agents (remote_id, status, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $3)
|
||||
RETURNING "id", "remote_id", "status", "created_at", "updated_at"
|
||||
INSERT INTO agents (thumbprint, keyset, metadata, status, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $5, $5)
|
||||
RETURNING "id", "thumbprint", "keyset", "metadata", "status", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
row = tx.QueryRowContext(
|
||||
ctx, query,
|
||||
remoteID, status, now,
|
||||
)
|
||||
|
||||
err := row.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt)
|
||||
rawKeySet, err := json.Marshal(keySet)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
row = tx.QueryRowContext(
|
||||
ctx, query,
|
||||
thumbprint, rawKeySet, JSONMap(metadata), datastore.AgentStatusPending, now,
|
||||
)
|
||||
|
||||
metadata := JSONMap{}
|
||||
|
||||
err = row.Scan(&agent.ID, &agent.Thumbprint, &rawKeySet, &metadata, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.Metadata = metadata
|
||||
|
||||
keySet, err = jwk.Parse(rawKeySet)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.KeySet = &datastore.SerializableKeySet{keySet}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -266,14 +287,17 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "remote_id", "status", "created_at", "updated_at"
|
||||
SELECT "id", "thumbprint", "keyset", "metadata", "status", "created_at", "updated_at"
|
||||
FROM agents
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
row := r.db.QueryRowContext(ctx, query, id)
|
||||
|
||||
if err := row.Scan(&agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
metadata := JSONMap{}
|
||||
var rawKeySet []byte
|
||||
|
||||
if err := row.Scan(&agent.ID, &agent.Thumbprint, &rawKeySet, &metadata, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return datastore.ErrNotFound
|
||||
}
|
||||
@ -281,6 +305,15 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.Metadata = metadata
|
||||
|
||||
keySet := jwk.NewSet()
|
||||
if err := json.Unmarshal(rawKeySet, &keySet); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.KeySet = &datastore.SerializableKeySet{keySet}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -313,23 +346,60 @@ func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts
|
||||
|
||||
if options.Status != nil {
|
||||
query += fmt.Sprintf(`, status = $%d`, index)
|
||||
|
||||
args = append(args, *options.Status)
|
||||
index++
|
||||
}
|
||||
|
||||
if options.KeySet != nil {
|
||||
rawKeySet, err := json.Marshal(*options.KeySet)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query += fmt.Sprintf(`, keyset = $%d`, index)
|
||||
args = append(args, rawKeySet)
|
||||
index++
|
||||
}
|
||||
|
||||
if options.Thumbprint != nil {
|
||||
query += fmt.Sprintf(`, thumbprint = $%d`, index)
|
||||
args = append(args, *options.Thumbprint)
|
||||
index++
|
||||
}
|
||||
|
||||
if options.Metadata != nil {
|
||||
query += fmt.Sprintf(`, metadata = $%d`, index)
|
||||
args = append(args, JSONMap(*options.Metadata))
|
||||
index++
|
||||
}
|
||||
|
||||
query += `
|
||||
WHERE id = $1
|
||||
RETURNING "id","remote_id","status","updated_at","created_at"
|
||||
RETURNING "id", "thumbprint", "keyset", "metadata", "status", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
if err := row.Scan(&agent.ID, &agent.RemoteID, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
metadata := JSONMap{}
|
||||
var rawKeySet []byte
|
||||
|
||||
if err := row.Scan(&agent.ID, &agent.Thumbprint, &rawKeySet, &metadata, &agent.Status, &agent.CreatedAt, &agent.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return datastore.ErrNotFound
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.Metadata = metadata
|
||||
|
||||
keySet := jwk.NewSet()
|
||||
if err := json.Unmarshal(rawKeySet, &keySet); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
agent.KeySet = &datastore.SerializableKeySet{keySet}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user