Compare commits

..

1 Commits

Author SHA1 Message Date
wpetit 193fc6360a fix: enhance proxy stability 2020-10-24 16:41:41 +02:00
4 changed files with 67 additions and 41 deletions

View File

@ -5,6 +5,8 @@ import (
"io"
"net"
"net/http"
"sync"
"time"
"gitlab.com/wpetit/goweb/logger"
@ -15,11 +17,12 @@ import (
)
type Client struct {
conf *ClientConfig
conn *kcp.UDPSession
sess *smux.Session
control *control.Control
http *http.Client
conf *ClientConfig
conn *kcp.UDPSession
sess *smux.Session
control *control.Control
http *http.Client
openStreamMutext sync.Mutex
}
func (c *Client) Connect(ctx context.Context) error {
@ -39,6 +42,8 @@ func (c *Client) Connect(ctx context.Context) error {
config := smux.DefaultConfig()
config.Version = 2
config.KeepAliveInterval = 10 * time.Second
config.KeepAliveTimeout = 2 * config.KeepAliveInterval
sess, err := smux.Client(conn, config)
if err != nil {
@ -72,6 +77,9 @@ func (c *Client) Connect(ctx context.Context) error {
func (c *Client) Listen(ctx context.Context) error {
logger.Debug(ctx, "listening for messages")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := c.control.Listen(ctx, control.Handlers{
control.TypeProxyRequest: c.handleProxyRequest,
})
@ -121,19 +129,22 @@ func (c *Client) handleProxyRequest(ctx context.Context, m *control.Message) (*c
}
func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
in, err := c.sess.AcceptStream()
c.openStreamMutext.Lock()
in, err := c.sess.OpenStream()
if err != nil {
c.openStreamMutext.Unlock()
logger.Error(ctx, "error while accepting proxy stream", logger.E(err))
return
}
defer in.Close()
c.openStreamMutext.Unlock()
streamCopy := func(dst io.Writer, src io.ReadCloser) {
if _, err := Copy(dst, src); err != nil {
if !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, io.EOF) {
logger.Error(ctx, "error while proxying", logger.E(err))
logger.Error(ctx, "error while proxying", logger.E(errors.WithStack(err)))
}
}

View File

@ -77,40 +77,45 @@ func (c *Control) ProxyReq(ctx context.Context, network, address string) error {
func (c *Control) Listen(ctx context.Context, handlers Handlers) error {
for {
logger.Debug(ctx, "reading next message")
select {
case <-ctx.Done():
return nil
default:
logger.Debug(ctx, "reading next message")
req, err := c.Read()
if err != nil {
return errors.WithStack(err)
}
go func() {
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
handler, exists := handlers[req.Type]
if !exists {
logger.Error(subCtx, "no message handler registered")
return
}
res, err := handler(subCtx, req)
req, err := c.Read()
if err != nil {
logger.Error(subCtx, "error while handling message", logger.E(err))
return
return errors.WithStack(err)
}
if res == nil {
return
}
go func() {
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
if err := c.Write(res); err != nil {
logger.Error(subCtx, "error while write message response", logger.E(err))
handler, exists := handlers[req.Type]
if !exists {
logger.Error(subCtx, "no message handler registered")
return
}
}()
return
}
res, err := handler(subCtx, req)
if err != nil {
logger.Error(subCtx, "error while handling message", logger.E(err))
return
}
if res == nil {
return
}
if err := c.Write(res); err != nil {
logger.Error(subCtx, "error while write message response", logger.E(err))
return
}
}()
}
}
}

View File

@ -4,6 +4,8 @@ import (
"context"
"io"
"net"
"sync"
"time"
"forge.cadoles.com/wpetit/go-tunnel/control"
@ -23,11 +25,14 @@ type RemoteClient struct {
control *control.Control
remoteAddr net.Addr
proxies cmap.ConcurrentMap
acceptStreamMutex sync.Mutex
}
func (c *RemoteClient) Accept(ctx context.Context, conn *kcp.UDPSession) error {
config := smux.DefaultConfig()
config.Version = 2
config.KeepAliveInterval = 10 * time.Second
config.KeepAliveTimeout = 2 * config.KeepAliveInterval
sess, err := smux.Server(conn, config)
if err != nil {
@ -64,14 +69,14 @@ func (c *RemoteClient) Listen(ctx context.Context) error {
if errors.Is(err, io.ErrClosedPipe) {
if c.onClientDisconnectHook != nil {
if err := c.onClientDisconnectHook.OnClientDisconnect(ctx, c); err != nil {
logger.Error(ctx, "client disconnect hook error", logger.E(err))
logger.Error(ctx, "client disconnect hook error", logger.E(errors.WithStack(err)))
}
}
return errors.WithStack(ErrConnectionClosed)
}
return err
return errors.WithStack(err)
}
func (c *RemoteClient) ConfigureHooks(hooks interface{}) {
@ -111,11 +116,16 @@ func (c *RemoteClient) Proxy(ctx context.Context, network, address string) (net.
logger.Debug(ctx, "opening proxy stream")
stream, err := c.sess.OpenStream()
c.acceptStreamMutex.Lock()
stream, err := c.sess.AcceptStream()
if err != nil {
c.acceptStreamMutex.Unlock()
return nil, errors.WithStack(err)
}
c.acceptStreamMutex.Unlock()
go func() {
<-ctx.Done()
logger.Debug(ctx, "closing proxy stream")

View File

@ -42,13 +42,13 @@ func (s *Server) handleNewConn(ctx context.Context, conn *kcp.UDPSession) {
remoteClient.ConfigureHooks(s.conf.Hooks)
if err := remoteClient.Accept(ctx, conn); err != nil {
logger.Error(ctx, "remote client error", logger.E(err))
logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err)))
return
}
if err := remoteClient.Listen(ctx); err != nil {
logger.Error(ctx, "remote client error", logger.E(err))
logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err)))
}
}