package v2 import ( "context" "net/http" "sync" "forge.cadoles.com/cadoles/go-emlid/reach/client/logger" "forge.cadoles.com/cadoles/go-emlid/reach/client/protocol" "forge.cadoles.com/cadoles/go-emlid/reach/client/protocol/v2/model" "forge.cadoles.com/cadoles/go-emlid/reach/client/socketio" "github.com/pkg/errors" ) type Operations struct { addr string client *socketio.Client mutex sync.RWMutex logger logger.Logger dial protocol.DialFunc getClientOnce sync.Once httpClient *http.Client } // Reboot implements protocol.Operations. func (o *Operations) Reboot(ctx context.Context) error { var err error var wg sync.WaitGroup var once sync.Once done := func() { o.client.Off(socketio.OnDisconnection) wg.Done() } wg.Add(1) go func() { <-ctx.Done() err = ctx.Err() once.Do(done) }() err = o.client.On(socketio.OnDisconnection, func(h *socketio.Channel, data any) { once.Do(done) }) if err != nil { return errors.Wrapf(err, "error while binding to '%s' event", socketio.OnDisconnection) } if err = o.client.Emit("action", &model.Action{Name: "reboot"}); err != nil { return err } wg.Wait() return err } // SetBase implements protocol.Operations. func (o *Operations) SetBase(ctx context.Context, funcs ...protocol.SetBaseOptionFunc) error { config, err := o.GetConfiguration(ctx) if err != nil { return errors.WithStack(err) } opts := protocol.NewSetBaseOptions(funcs...) base := &model.Base{ Accumulation: config.BaseMode.BaseCoordinates.Accumulation, AntennaOffset: config.BaseMode.BaseCoordinates.AntennaOffset, Coordinates: model.BaseCoordinates{ Height: config.BaseMode.BaseCoordinates.Coordinates.Height, Latitude: config.BaseMode.BaseCoordinates.Coordinates.Latitude, Longitude: config.BaseMode.BaseCoordinates.Coordinates.Longitude, }, Mode: config.BaseMode.BaseCoordinates.Mode, } if opts.Mode != nil { base.Mode = *opts.Mode } if opts.Height != nil { base.Coordinates.Height = *opts.Height } if opts.Latitude != nil { base.Coordinates.Latitude = *opts.Latitude } if opts.Longitude != nil { base.Coordinates.Longitude = *opts.Longitude } if opts.AntennaOffset != nil { base.AntennaOffset = *opts.AntennaOffset } if _, err := o.PostBaseCoordinates(ctx, base); err != nil { return errors.WithStack(err) } return nil } // Configuration implements protocol.Operations. func (o *Operations) Configuration(ctx context.Context) (any, error) { config, err := o.GetConfiguration(ctx) if err != nil { return nil, errors.WithStack(err) } return config, nil } // Alive implements protocol.Operations. func (o *Operations) Alive(ctx context.Context) (bool, error) { o.mutex.RLock() defer o.mutex.RUnlock() return o.client.Alive(), nil } // Version implements protocol.Operations. func (o *Operations) Version(ctx context.Context) (string, bool, error) { info, err := o.GetInfo(ctx) if err != nil { return "", false, errors.WithStack(err) } updater, err := o.GetUpdater(ctx) if err != nil { return "", false, errors.WithStack(err) } version := info.Reachview.Version stable := updater.Release.Channel == "stable" return version, stable, nil } // Close implements protocol.Operations. func (o *Operations) Close(ctx context.Context) error { o.mutex.Lock() defer o.mutex.Unlock() if o.client == nil { return nil } o.client.Close() return nil } // Connect implements protocol.Operations. func (o *Operations) Connect(ctx context.Context) error { o.mutex.Lock() defer o.mutex.Unlock() if o.client != nil { o.client.Close() } endpoint, err := socketio.EndpointFromAddr(o.addr) if err != nil { return errors.WithStack(err) } client := socketio.NewClient(endpoint, socketio.WithDialFunc(socketio.DialFunc(o.dial))) if err := client.Connect(); err != nil { return errors.WithStack(err) } o.client = client return nil } // Emit implements protocol.Operations. func (o *Operations) Emit(ctx context.Context, mType string, message any) error { o.mutex.RLock() defer o.mutex.RUnlock() if o.client == nil || !o.client.Alive() { return errors.WithStack(protocol.ErrClosed) } if err := o.client.Emit(mType, message); err != nil { return errors.WithStack(err) } return nil } // On implements protocol.Operations. func (o *Operations) On(ctx context.Context, event string) (chan any, error) { o.mutex.RLock() defer o.mutex.RUnlock() if o.client == nil || !o.client.Alive() { return nil, errors.WithStack(protocol.ErrClosed) } out := make(chan any) closer := new(sync.Once) handler := func(ch *socketio.Channel, data any) { select { case <-ctx.Done(): closer.Do(func() { o.mutex.RLock() defer o.mutex.RUnlock() ch.Close() close(out) if o.client == nil { return } }) return default: out <- data } } if err := o.client.On(event, handler); err != nil { return nil, errors.WithStack(err) } return out, nil } var _ protocol.Operations = &Operations{}