Compare commits
1 Commits
e140439b1e
...
193fc6360a
Author | SHA1 | Date |
---|---|---|
wpetit | 193fc6360a |
27
client.go
27
client.go
|
@ -5,6 +5,8 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
|
@ -15,11 +17,12 @@ import (
|
|||
)
|
||||
|
||||
type Client struct {
|
||||
conf *ClientConfig
|
||||
conn *kcp.UDPSession
|
||||
sess *smux.Session
|
||||
control *control.Control
|
||||
http *http.Client
|
||||
conf *ClientConfig
|
||||
conn *kcp.UDPSession
|
||||
sess *smux.Session
|
||||
control *control.Control
|
||||
http *http.Client
|
||||
openStreamMutext sync.Mutex
|
||||
}
|
||||
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
|
@ -39,6 +42,8 @@ func (c *Client) Connect(ctx context.Context) error {
|
|||
|
||||
config := smux.DefaultConfig()
|
||||
config.Version = 2
|
||||
config.KeepAliveInterval = 10 * time.Second
|
||||
config.KeepAliveTimeout = 2 * config.KeepAliveInterval
|
||||
|
||||
sess, err := smux.Client(conn, config)
|
||||
if err != nil {
|
||||
|
@ -72,6 +77,9 @@ func (c *Client) Connect(ctx context.Context) error {
|
|||
func (c *Client) Listen(ctx context.Context) error {
|
||||
logger.Debug(ctx, "listening for messages")
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
err := c.control.Listen(ctx, control.Handlers{
|
||||
control.TypeProxyRequest: c.handleProxyRequest,
|
||||
})
|
||||
|
@ -121,19 +129,22 @@ func (c *Client) handleProxyRequest(ctx context.Context, m *control.Message) (*c
|
|||
}
|
||||
|
||||
func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
|
||||
in, err := c.sess.AcceptStream()
|
||||
c.openStreamMutext.Lock()
|
||||
|
||||
in, err := c.sess.OpenStream()
|
||||
if err != nil {
|
||||
c.openStreamMutext.Unlock()
|
||||
logger.Error(ctx, "error while accepting proxy stream", logger.E(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer in.Close()
|
||||
c.openStreamMutext.Unlock()
|
||||
|
||||
streamCopy := func(dst io.Writer, src io.ReadCloser) {
|
||||
if _, err := Copy(dst, src); err != nil {
|
||||
if !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, io.EOF) {
|
||||
logger.Error(ctx, "error while proxying", logger.E(err))
|
||||
logger.Error(ctx, "error while proxying", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,40 +77,45 @@ func (c *Control) ProxyReq(ctx context.Context, network, address string) error {
|
|||
|
||||
func (c *Control) Listen(ctx context.Context, handlers Handlers) error {
|
||||
for {
|
||||
logger.Debug(ctx, "reading next message")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
logger.Debug(ctx, "reading next message")
|
||||
|
||||
req, err := c.Read()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
|
||||
|
||||
handler, exists := handlers[req.Type]
|
||||
if !exists {
|
||||
logger.Error(subCtx, "no message handler registered")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
res, err := handler(subCtx, req)
|
||||
req, err := c.Read()
|
||||
if err != nil {
|
||||
logger.Error(subCtx, "error while handling message", logger.E(err))
|
||||
|
||||
return
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
|
||||
|
||||
if err := c.Write(res); err != nil {
|
||||
logger.Error(subCtx, "error while write message response", logger.E(err))
|
||||
handler, exists := handlers[req.Type]
|
||||
if !exists {
|
||||
logger.Error(subCtx, "no message handler registered")
|
||||
|
||||
return
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
res, err := handler(subCtx, req)
|
||||
if err != nil {
|
||||
logger.Error(subCtx, "error while handling message", logger.E(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.Write(res); err != nil {
|
||||
logger.Error(subCtx, "error while write message response", logger.E(err))
|
||||
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/wpetit/go-tunnel/control"
|
||||
|
||||
|
@ -23,11 +25,14 @@ type RemoteClient struct {
|
|||
control *control.Control
|
||||
remoteAddr net.Addr
|
||||
proxies cmap.ConcurrentMap
|
||||
acceptStreamMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Accept(ctx context.Context, conn *kcp.UDPSession) error {
|
||||
config := smux.DefaultConfig()
|
||||
config.Version = 2
|
||||
config.KeepAliveInterval = 10 * time.Second
|
||||
config.KeepAliveTimeout = 2 * config.KeepAliveInterval
|
||||
|
||||
sess, err := smux.Server(conn, config)
|
||||
if err != nil {
|
||||
|
@ -64,14 +69,14 @@ func (c *RemoteClient) Listen(ctx context.Context) error {
|
|||
if errors.Is(err, io.ErrClosedPipe) {
|
||||
if c.onClientDisconnectHook != nil {
|
||||
if err := c.onClientDisconnectHook.OnClientDisconnect(ctx, c); err != nil {
|
||||
logger.Error(ctx, "client disconnect hook error", logger.E(err))
|
||||
logger.Error(ctx, "client disconnect hook error", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
return errors.WithStack(ErrConnectionClosed)
|
||||
}
|
||||
|
||||
return err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (c *RemoteClient) ConfigureHooks(hooks interface{}) {
|
||||
|
@ -111,11 +116,16 @@ func (c *RemoteClient) Proxy(ctx context.Context, network, address string) (net.
|
|||
|
||||
logger.Debug(ctx, "opening proxy stream")
|
||||
|
||||
stream, err := c.sess.OpenStream()
|
||||
c.acceptStreamMutex.Lock()
|
||||
|
||||
stream, err := c.sess.AcceptStream()
|
||||
if err != nil {
|
||||
c.acceptStreamMutex.Unlock()
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
c.acceptStreamMutex.Unlock()
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
logger.Debug(ctx, "closing proxy stream")
|
||||
|
|
|
@ -42,13 +42,13 @@ func (s *Server) handleNewConn(ctx context.Context, conn *kcp.UDPSession) {
|
|||
remoteClient.ConfigureHooks(s.conf.Hooks)
|
||||
|
||||
if err := remoteClient.Accept(ctx, conn); err != nil {
|
||||
logger.Error(ctx, "remote client error", logger.E(err))
|
||||
logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err)))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if err := remoteClient.Listen(ctx); err != nil {
|
||||
logger.Error(ctx, "remote client error", logger.E(err))
|
||||
logger.Error(ctx, "remote client error", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue