emissary/internal/agent/agent.go

160 lines
3.5 KiB
Go

package agent
import (
"context"
"time"
"forge.cadoles.com/Cadoles/emissary/internal/agent/metadata"
"forge.cadoles.com/Cadoles/emissary/internal/auth/agent"
"forge.cadoles.com/Cadoles/emissary/internal/jwk"
"forge.cadoles.com/Cadoles/emissary/pkg/client"
"github.com/getsentry/sentry-go"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/api"
"gitlab.com/wpetit/goweb/logger"
)
type Agent struct {
thumbprint string
privateKey jwk.Key
serverURL string
controllers []Controller
interval time.Duration
collectors []metadata.Collector
}
func (a *Agent) Run(ctx context.Context) error {
state := NewState()
logger.Info(ctx, "starting reconciliation ticker", logger.F("interval", a.interval))
ticker := time.NewTicker(a.interval)
defer ticker.Stop()
token, err := agent.GenerateToken(a.privateKey, a.thumbprint)
if err != nil {
return errors.WithStack(err)
}
client := client.New(a.serverURL, client.WithToken(token))
ctx = withClient(ctx, client)
tick := func() {
logger.Debug(ctx, "registering agent")
if err := a.registerAgent(ctx, client, state); err != nil {
err = errors.WithStack(err)
logger.Error(ctx, "could not register agent", logger.E(err))
sentry.CaptureException(err)
}
logger.Debug(ctx, "state before reconciliation", logger.F("state", state))
if err := a.Reconcile(ctx, state); err != nil {
err = errors.WithStack(err)
logger.Error(ctx, "could not reconcile node with state", logger.E(err))
sentry.CaptureException(err)
return
}
logger.Debug(ctx, "state after reconciliation", logger.F("state", state))
}
tick()
for {
select {
case <-ticker.C:
tick()
case <-ctx.Done():
return errors.WithStack(ctx.Err())
}
}
}
func (a *Agent) Reconcile(ctx context.Context, state *State) error {
for _, ctrl := range a.controllers {
ctrlCtx := logger.With(ctx, logger.F("controller", ctrl.Name()))
logger.Debug(
ctrlCtx, "executing controller",
logger.F("state", state),
)
if err := ctrl.Reconcile(ctrlCtx, state); err != nil {
err = errors.WithStack(err)
logger.Error(ctx, "could not reconcile", logger.E(err))
sentry.CaptureException(err)
}
}
return nil
}
func (a *Agent) registerAgent(ctx context.Context, client *client.Client, state *State) error {
meta, err := a.collectMetadata(ctx)
if err != nil {
return errors.WithStack(err)
}
sorted := metadata.Sort(meta)
agent, err := client.RegisterAgent(ctx, a.privateKey, a.thumbprint, sorted)
if err != nil {
return errors.WithStack(err)
}
state.agentID = agent.ID
return nil
}
func (a *Agent) collectMetadata(ctx context.Context) (map[string]any, error) {
metadata := make(map[string]any)
for _, collector := range a.collectors {
name, value, err := collector.Collect(ctx)
if err != nil {
err = errors.WithStack(err)
logger.Error(
ctx, "could not collect metadata",
logger.E(err), logger.F("name", name),
)
sentry.CaptureException(err)
continue
}
metadata[name] = value
}
return metadata, nil
}
func isAPIError(err error, code api.ErrorCode) (bool, any) {
apiError := &api.Error{}
if errors.As(err, &apiError) && apiError.Code == code {
return true, apiError.Data
}
return false, nil
}
func New(serverURL string, privateKey jwk.Key, thumbprint string, funcs ...OptionFunc) *Agent {
opt := defaultOption()
for _, fn := range funcs {
fn(opt)
}
return &Agent{
serverURL: serverURL,
privateKey: privateKey,
thumbprint: thumbprint,
controllers: opt.Controllers,
interval: opt.Interval,
collectors: opt.Collectors,
}
}