package v2 import ( "context" "net" "strconv" "strings" "sync" "forge.cadoles.com/cadoles/go-emlid/reach/client/protocol" "forge.cadoles.com/cadoles/go-emlid/reach/client/protocol/v2/api" "forge.cadoles.com/cadoles/go-emlid/reach/client/socketio" "github.com/pkg/errors" ) type Operations struct { addr string client *socketio.Client mutex sync.RWMutex } // Alive implements protocol.Operations. func (o *Operations) Alive(ctx context.Context) (bool, error) { o.mutex.RLock() defer o.mutex.RUnlock() return o.client.Alive(), nil } // Version implements protocol.Operations. func (o *Operations) Version(ctx context.Context) (string, bool, error) { info, err := api.GetInfo(ctx, o.addr) if err != nil { return "", false, errors.WithStack(err) } updater, err := api.GetUpdater(ctx, o.addr) if err != nil { return "", false, errors.WithStack(err) } version := info.Reachview.Version stable := updater.Release.Channel == "stable" return version, stable, nil } // Close implements protocol.Operations. func (o *Operations) Close(ctx context.Context) error { o.mutex.Lock() defer o.mutex.Unlock() if o.client == nil { return nil } o.client.Close() return nil } // Connect implements protocol.Operations. func (o *Operations) Connect(ctx context.Context) error { o.mutex.Lock() defer o.mutex.Unlock() if o.client != nil { o.client.Close() } endpoint, err := o.getSocketIOEndpoint() if err != nil { return errors.WithStack(err) } client := socketio.NewClient(endpoint) if err := client.Connect(); err != nil { return errors.WithStack(err) } o.client = client return nil } func (o *Operations) getSocketIOEndpoint() (string, error) { host, rawPort, err := net.SplitHostPort(o.addr) if err != nil { var addrErr *net.AddrError if !errors.As(err, &addrErr) || !strings.Contains(addrErr.Error(), "missing port in address") { return "", errors.WithStack(err) } host = o.addr } port := int64(80) if rawPort != "" { port, err = strconv.ParseInt(rawPort, 10, 32) if err != nil { return "", errors.WithStack(err) } } endpoint := socketio.Endpoint(host, int(port), false) return endpoint, nil } // Emit implements protocol.Operations. func (o *Operations) Emit(ctx context.Context, message any) error { o.mutex.RLock() defer o.mutex.RUnlock() if o.client == nil || !o.client.Alive() { return errors.WithStack(protocol.ErrClosed) } return nil } // On implements protocol.Operations. func (o *Operations) On(ctx context.Context, event string) (chan any, error) { o.mutex.RLock() defer o.mutex.RUnlock() if o.client == nil || !o.client.Alive() { return nil, errors.WithStack(protocol.ErrClosed) } out := make(chan any) closer := new(sync.Once) handler := func(ch *socketio.Channel, data any) { select { case <-ctx.Done(): closer.Do(func() { o.mutex.RLock() defer o.mutex.RUnlock() ch.Close() close(out) if o.client == nil { return } }) return default: out <- data } } if err := o.client.On(event, handler); err != nil { return nil, errors.WithStack(err) } return out, nil } var _ protocol.Operations = &Operations{}