feat: add mdns controller
Some checks failed
arcad/emissary/pipeline/head There was a failure building this commit
Some checks failed
arcad/emissary/pipeline/head There was a failure building this commit
This commit is contained in:
181
internal/agent/controller/mdns/controller.go
Normal file
181
internal/agent/controller/mdns/controller.go
Normal file
@ -0,0 +1,181 @@
|
||||
package mdns
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/agent"
|
||||
mdns "forge.cadoles.com/Cadoles/emissary/internal/agent/controller/mdns/spec"
|
||||
"github.com/brutella/dnssd"
|
||||
"github.com/mitchellh/hashstructure/v2"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultDomain = "local"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
serviceDefHash uint64
|
||||
cancel context.CancelFunc
|
||||
responder dnssd.Responder
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// Name implements node.Controller.
|
||||
func (c *Controller) Name() string {
|
||||
return "mdns-controller"
|
||||
}
|
||||
|
||||
// Reconcile implements node.Controller.
|
||||
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
||||
mdnsSpec := mdns.NewSpec()
|
||||
|
||||
if err := state.GetSpec(mdns.Name, mdnsSpec); err != nil {
|
||||
if errors.Is(err, agent.ErrSpecNotFound) {
|
||||
logger.Info(ctx, "could not find mdns spec")
|
||||
|
||||
c.stopResponder(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "retrieved spec", logger.F("spec", mdnsSpec.SpecName()), logger.F("revision", mdnsSpec.SpecRevision()))
|
||||
|
||||
if err := c.updateResponder(ctx, mdnsSpec); err != nil {
|
||||
return errors.Wrap(err, "could not update responder")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) stopResponder(ctx context.Context) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if c.responder == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.cancel()
|
||||
c.responder = nil
|
||||
c.cancel = nil
|
||||
}
|
||||
|
||||
func (c *Controller) updateResponder(ctx context.Context, spec *mdns.Spec) error {
|
||||
serviceDef := struct {
|
||||
Services map[string]mdns.Service
|
||||
}{
|
||||
Services: spec.Services,
|
||||
}
|
||||
|
||||
newServerDefHash, err := hashstructure.Hash(serviceDef, hashstructure.FormatV2, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
c.mutex.RLock()
|
||||
if newServerDefHash == c.serviceDefHash && c.responder != nil {
|
||||
c.mutex.RUnlock()
|
||||
return nil
|
||||
}
|
||||
c.mutex.RUnlock()
|
||||
|
||||
c.stopResponder(ctx)
|
||||
|
||||
defaultIfaces, err := c.getDefaultIfaces()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
services := make([]dnssd.Service, 0, len(spec.Services))
|
||||
|
||||
for name, service := range spec.Services {
|
||||
domain := service.Domain
|
||||
if domain == "" {
|
||||
domain = DefaultDomain
|
||||
}
|
||||
|
||||
ifaces := service.Ifaces
|
||||
if len(ifaces) == 0 {
|
||||
ifaces = defaultIfaces
|
||||
}
|
||||
|
||||
config := dnssd.Config{
|
||||
Name: name,
|
||||
Type: service.Type,
|
||||
Domain: domain,
|
||||
Host: service.Host,
|
||||
Ifaces: ifaces,
|
||||
Port: service.Port,
|
||||
}
|
||||
|
||||
service, err := dnssd.NewService(config)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not create mdns service", logger.E(errors.WithStack(err)))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
responder, err := dnssd.NewResponder()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
if _, err := responder.Add(service); err != nil {
|
||||
logger.Error(ctx, "could not add mdns service", logger.E(errors.WithStack(err)))
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
c.responder = responder
|
||||
c.cancel = cancel
|
||||
c.serviceDefHash = newServerDefHash
|
||||
|
||||
go func() {
|
||||
defer c.stopResponder(ctx)
|
||||
|
||||
if err := responder.Respond(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
||||
logger.Error(ctx, "could not respond to mdns queries", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) getDefaultIfaces() ([]string, error) {
|
||||
ifaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
ifaceNames := make([]string, len(ifaces))
|
||||
|
||||
for idx, ifa := range ifaces {
|
||||
ifaceNames[idx] = ifa.Name
|
||||
}
|
||||
|
||||
return ifaceNames, nil
|
||||
}
|
||||
|
||||
func NewController() *Controller {
|
||||
return &Controller{
|
||||
cancel: nil,
|
||||
responder: nil,
|
||||
serviceDefHash: 0,
|
||||
}
|
||||
}
|
||||
|
||||
var _ agent.Controller = &Controller{}
|
17
internal/agent/controller/mdns/spec/init.go
Normal file
17
internal/agent/controller/mdns/spec/init.go
Normal file
@ -0,0 +1,17 @@
|
||||
package spec
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
//go:embed schema.json
|
||||
var schema []byte
|
||||
|
||||
func init() {
|
||||
if err := spec.Register(Name, schema); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
47
internal/agent/controller/mdns/spec/schema.json
Normal file
47
internal/agent/controller/mdns/spec/schema.json
Normal file
@ -0,0 +1,47 @@
|
||||
{
|
||||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||||
"$id": "https://mdns.edge.emissary.cadoles.com/spec.json",
|
||||
"title": "MDNSSpec",
|
||||
"description": "Emissary 'MDNS' specification",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"services": {
|
||||
"type": "object",
|
||||
"patternProperties": {
|
||||
".*": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string"
|
||||
},
|
||||
"domain": {
|
||||
"type": "string"
|
||||
},
|
||||
"host": {
|
||||
"type": "string"
|
||||
},
|
||||
"ifaces": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"port": {
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type",
|
||||
"host",
|
||||
"port"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"services"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
42
internal/agent/controller/mdns/spec/spec.go
Normal file
42
internal/agent/controller/mdns/spec/spec.go
Normal file
@ -0,0 +1,42 @@
|
||||
package spec
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec"
|
||||
)
|
||||
|
||||
const Name spec.Name = "mdns.emissary.cadoles.com"
|
||||
|
||||
type Spec struct {
|
||||
Revision int `json:"revision"`
|
||||
Services map[string]Service `json:"services"`
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
Type string `json:"type"`
|
||||
Domain string `json:"domain"`
|
||||
Host string `json:"host"`
|
||||
Ifaces []string `json:"ifaces"`
|
||||
Port int `json:"port"`
|
||||
}
|
||||
|
||||
func (s *Spec) SpecName() spec.Name {
|
||||
return Name
|
||||
}
|
||||
|
||||
func (s *Spec) SpecRevision() int {
|
||||
return s.Revision
|
||||
}
|
||||
|
||||
func (s *Spec) SpecData() map[string]any {
|
||||
return map[string]any{
|
||||
"services": s.Services,
|
||||
}
|
||||
}
|
||||
|
||||
func NewSpec() *Spec {
|
||||
return &Spec{
|
||||
Revision: -1,
|
||||
}
|
||||
}
|
||||
|
||||
var _ spec.Spec = &Spec{}
|
15
internal/agent/controller/mdns/spec/testdata/spec-ok.json
vendored
Normal file
15
internal/agent/controller/mdns/spec/testdata/spec-ok.json
vendored
Normal file
@ -0,0 +1,15 @@
|
||||
{
|
||||
"name": "mdns.emissary.cadoles.com",
|
||||
"data": {
|
||||
"services": {
|
||||
"My Website": {
|
||||
"type": "_http._tcp",
|
||||
"domain": "local",
|
||||
"host": "mywebsite",
|
||||
"ifaces": ["lo", "eth0"],
|
||||
"port": 80
|
||||
}
|
||||
}
|
||||
},
|
||||
"revision": 0
|
||||
}
|
65
internal/agent/controller/mdns/spec/validator_test.go
Normal file
65
internal/agent/controller/mdns/spec/validator_test.go
Normal file
@ -0,0 +1,65 @@
|
||||
package spec
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type validatorTestCase struct {
|
||||
Name string
|
||||
Source string
|
||||
ShouldFail bool
|
||||
}
|
||||
|
||||
var validatorTestCases = []validatorTestCase{
|
||||
{
|
||||
Name: "SpecOK",
|
||||
Source: "testdata/spec-ok.json",
|
||||
ShouldFail: false,
|
||||
},
|
||||
}
|
||||
|
||||
func TestValidator(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
validator := spec.NewValidator()
|
||||
if err := validator.Register(Name, schema); err != nil {
|
||||
t.Fatalf("+%v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
for _, tc := range validatorTestCases {
|
||||
func(tc validatorTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rawSpec, err := ioutil.ReadFile(tc.Source)
|
||||
if err != nil {
|
||||
t.Fatalf("+%v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
var spec spec.RawSpec
|
||||
|
||||
if err := json.Unmarshal(rawSpec, &spec); err != nil {
|
||||
t.Fatalf("+%v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err = validator.Validate(ctx, &spec)
|
||||
|
||||
if !tc.ShouldFail && err != nil {
|
||||
t.Errorf("+%v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if tc.ShouldFail && err == nil {
|
||||
t.Error("validation should have failed")
|
||||
}
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user