package tunnel import ( "context" "github.com/pkg/errors" cmap "github.com/streamrail/concurrent-map" "github.com/xtaci/kcp-go/v5" "gitlab.com/wpetit/goweb/logger" ) type Server struct { conf *ServerConfig clients cmap.ConcurrentMap } func (s *Server) Listen(ctx context.Context) error { listener, err := kcp.ListenWithOptions( s.conf.Address, s.conf.BlockCrypt, s.conf.DataShards, s.conf.ParityShards, ) if err != nil { return errors.WithStack(err) } if s.conf.ConfigureListener != nil { if err := s.conf.ConfigureListener(listener); err != nil { return errors.WithStack(err) } } logger.Debug(ctx, "accepting connections", logger.F("address", s.conf.Address)) for { conn, err := listener.AcceptKCP() if err != nil { return errors.WithStack(err) } go s.handleNewConn(ctx, conn) } } func (s *Server) handleNewConn(ctx context.Context, conn *kcp.UDPSession) { var remoteClient *RemoteClient remoteAddr := conn.RemoteAddr().String() ctx = logger.With(ctx, logger.F("remoteAddr", remoteAddr)) rawExistingClient, exists := s.clients.Get(remoteAddr) if exists { logger.Debug(ctx, "remote client already exists") remoteClient, _ = rawExistingClient.(*RemoteClient) if err := remoteClient.SwitchConn(ctx, conn); err != nil { logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err))) s.clients.Remove(remoteAddr) return } } remoteClient = NewRemoteClient( s.conf.SmuxConfig, s.conf.AuthenticationTimeout, s.conf.ProxyRequestTimeout, ) remoteClient.ConfigureHooks(s.conf.Hooks) if err := remoteClient.Accept(ctx, conn); err != nil { logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err))) return } s.clients.Set(remoteAddr, remoteClient) } func NewServer(funcs ...ServerConfigFunc) *Server { conf := DefaultServerConfig() for _, fn := range funcs { fn(conf) } return &Server{ conf: conf, clients: cmap.New(), } }