143 lines
3.0 KiB
Go
143 lines
3.0 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"forge.cadoles.com/Cadoles/emissary/internal/agent/metadata"
|
|
"forge.cadoles.com/Cadoles/emissary/internal/client"
|
|
"forge.cadoles.com/Cadoles/emissary/internal/jwk"
|
|
"github.com/pkg/errors"
|
|
"gitlab.com/wpetit/goweb/api"
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
)
|
|
|
|
type Agent struct {
|
|
thumbprint string
|
|
privateKey jwk.Key
|
|
client *client.Client
|
|
controllers []Controller
|
|
interval time.Duration
|
|
collectors []metadata.Collector
|
|
}
|
|
|
|
func (a *Agent) Run(ctx context.Context) error {
|
|
state := NewState()
|
|
|
|
logger.Info(ctx, "starting reconciliation ticker", logger.F("interval", a.interval))
|
|
|
|
ticker := time.NewTicker(a.interval)
|
|
defer ticker.Stop()
|
|
|
|
ctx = withClient(ctx, a.client)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
|
|
logger.Debug(ctx, "registering agent")
|
|
|
|
if err := a.registerAgent(ctx, state); err != nil {
|
|
logger.Error(ctx, "could not register agent", logger.E(errors.WithStack(err)))
|
|
|
|
continue
|
|
}
|
|
|
|
logger.Debug(ctx, "state before reconciliation", logger.F("state", state))
|
|
|
|
if err := a.Reconcile(ctx, state); err != nil {
|
|
logger.Error(ctx, "could not reconcile node with state", logger.E(errors.WithStack(err)))
|
|
|
|
continue
|
|
}
|
|
|
|
logger.Debug(ctx, "state after reconciliation", logger.F("state", state))
|
|
|
|
case <-ctx.Done():
|
|
return errors.WithStack(ctx.Err())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) Reconcile(ctx context.Context, state *State) error {
|
|
for _, ctrl := range a.controllers {
|
|
ctrlCtx := logger.With(ctx, logger.F("controller", ctrl.Name()))
|
|
|
|
logger.Debug(
|
|
ctrlCtx, "executing controller",
|
|
logger.F("state", state),
|
|
)
|
|
|
|
if err := ctrl.Reconcile(ctrlCtx, state); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Agent) registerAgent(ctx context.Context, state *State) error {
|
|
meta, err := a.collectMetadata(ctx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
sorted := metadata.Sort(meta)
|
|
|
|
agent, err := a.client.RegisterAgent(ctx, a.privateKey, a.thumbprint, sorted)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
state.agentID = agent.ID
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Agent) collectMetadata(ctx context.Context) (map[string]any, error) {
|
|
metadata := make(map[string]any)
|
|
|
|
for _, collector := range a.collectors {
|
|
name, value, err := collector.Collect(ctx)
|
|
if err != nil {
|
|
logger.Error(
|
|
ctx, "could not collect metadata",
|
|
logger.E(errors.WithStack(err)), logger.F("name", name),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
metadata[name] = value
|
|
}
|
|
|
|
return metadata, nil
|
|
}
|
|
|
|
func isAPIError(err error, code api.ErrorCode) (bool, any) {
|
|
apiError := &api.Error{}
|
|
if errors.As(err, &apiError) && apiError.Code == code {
|
|
return true, apiError.Data
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func New(serverURL string, privateKey jwk.Key, thumbprint string, funcs ...OptionFunc) *Agent {
|
|
opt := defaultOption()
|
|
for _, fn := range funcs {
|
|
fn(opt)
|
|
}
|
|
|
|
client := client.New(serverURL)
|
|
|
|
return &Agent{
|
|
privateKey: privateKey,
|
|
thumbprint: thumbprint,
|
|
client: client,
|
|
controllers: opt.Controllers,
|
|
interval: opt.Interval,
|
|
collectors: opt.Collectors,
|
|
}
|
|
}
|