Compare commits

..

1 Commits

Author SHA1 Message Date
wpetit 7c054fee58 fix: enhance proxy stability 2020-10-24 17:54:50 +02:00
3 changed files with 54 additions and 20 deletions

View File

@ -17,12 +17,12 @@ import (
)
type Client struct {
conf *ClientConfig
conn *kcp.UDPSession
sess *smux.Session
control *control.Control
http *http.Client
openStreamMutext sync.Mutex
conf *ClientConfig
conn *kcp.UDPSession
sess *smux.Session
control *control.Control
http *http.Client
openStreamMutex sync.Mutex
}
func (c *Client) Connect(ctx context.Context) error {
@ -129,21 +129,21 @@ func (c *Client) handleProxyRequest(ctx context.Context, m *control.Message) (*c
}
func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
c.openStreamMutext.Lock()
c.openStreamMutex.Lock()
in, err := c.sess.OpenStream()
if err != nil {
c.openStreamMutext.Unlock()
c.openStreamMutex.Unlock()
logger.Error(ctx, "error while accepting proxy stream", logger.E(err))
return
}
c.openStreamMutext.Unlock()
c.openStreamMutex.Unlock()
streamCopy := func(dst io.Writer, src io.ReadCloser) {
if _, err := Copy(dst, src); err != nil {
if !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, io.EOF) {
if errors.Is(err, smux.ErrInvalidProtocol) {
logger.Error(ctx, "error while proxying", logger.E(errors.WithStack(err)))
}
}
@ -154,8 +154,8 @@ func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
out.Close()
}
go streamCopy(out, in)
streamCopy(in, out)
go streamCopy(in, out)
streamCopy(out, in)
}
func NewClient(funcs ...ClientConfigFunc) *Client {

View File

@ -76,29 +76,55 @@ func (c *Control) ProxyReq(ctx context.Context, network, address string) error {
}
func (c *Control) Listen(ctx context.Context, handlers Handlers) error {
errChan := make(chan error)
msgChan := make(chan *Message)
go func(msgChan chan *Message, errChan chan error) {
for {
logger.Debug(ctx, "reading next message")
msg, err := c.Read()
if err != nil {
errChan <- errors.WithStack(err)
close(errChan)
close(msgChan)
return
}
msgChan <- msg
}
}(msgChan, errChan)
for {
select {
case <-ctx.Done():
return nil
default:
logger.Debug(ctx, "reading next message")
req, err := c.Read()
if err != nil {
return errors.WithStack(err)
case err, ok := <-errChan:
if !ok {
return nil
}
return err
case msg, ok := <-msgChan:
if !ok {
return nil
}
go func() {
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
subCtx := logger.With(ctx, logger.F("messageType", msg.Type))
handler, exists := handlers[req.Type]
handler, exists := handlers[msg.Type]
if !exists {
logger.Error(subCtx, "no message handler registered")
return
}
res, err := handler(subCtx, req)
res, err := handler(subCtx, msg)
if err != nil {
logger.Error(subCtx, "error while handling message", logger.E(err))

View File

@ -102,9 +102,17 @@ func (c *RemoteClient) RemoteAddr() net.Addr {
}
func (c *RemoteClient) Close() {
if c.sess != nil {
c.sess.Close()
}
if c.conn != nil {
c.conn.Close()
}
c.sess = nil
c.conn = nil
c.control = nil
}
func (c *RemoteClient) Proxy(ctx context.Context, network, address string) (net.Conn, error) {