Compare commits
1 Commits
193fc6360a
...
7c054fee58
Author | SHA1 | Date |
---|---|---|
wpetit | 7c054fee58 |
24
client.go
24
client.go
|
@ -17,12 +17,12 @@ import (
|
|||
)
|
||||
|
||||
type Client struct {
|
||||
conf *ClientConfig
|
||||
conn *kcp.UDPSession
|
||||
sess *smux.Session
|
||||
control *control.Control
|
||||
http *http.Client
|
||||
openStreamMutext sync.Mutex
|
||||
conf *ClientConfig
|
||||
conn *kcp.UDPSession
|
||||
sess *smux.Session
|
||||
control *control.Control
|
||||
http *http.Client
|
||||
openStreamMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
|
@ -129,21 +129,21 @@ func (c *Client) handleProxyRequest(ctx context.Context, m *control.Message) (*c
|
|||
}
|
||||
|
||||
func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
|
||||
c.openStreamMutext.Lock()
|
||||
c.openStreamMutex.Lock()
|
||||
|
||||
in, err := c.sess.OpenStream()
|
||||
if err != nil {
|
||||
c.openStreamMutext.Unlock()
|
||||
c.openStreamMutex.Unlock()
|
||||
logger.Error(ctx, "error while accepting proxy stream", logger.E(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
c.openStreamMutext.Unlock()
|
||||
c.openStreamMutex.Unlock()
|
||||
|
||||
streamCopy := func(dst io.Writer, src io.ReadCloser) {
|
||||
if _, err := Copy(dst, src); err != nil {
|
||||
if !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, io.EOF) {
|
||||
if errors.Is(err, smux.ErrInvalidProtocol) {
|
||||
logger.Error(ctx, "error while proxying", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
@ -154,8 +154,8 @@ func (c *Client) handleProxyStream(ctx context.Context, out net.Conn) {
|
|||
out.Close()
|
||||
}
|
||||
|
||||
go streamCopy(out, in)
|
||||
streamCopy(in, out)
|
||||
go streamCopy(in, out)
|
||||
streamCopy(out, in)
|
||||
}
|
||||
|
||||
func NewClient(funcs ...ClientConfigFunc) *Client {
|
||||
|
|
|
@ -76,29 +76,55 @@ func (c *Control) ProxyReq(ctx context.Context, network, address string) error {
|
|||
}
|
||||
|
||||
func (c *Control) Listen(ctx context.Context, handlers Handlers) error {
|
||||
errChan := make(chan error)
|
||||
msgChan := make(chan *Message)
|
||||
|
||||
go func(msgChan chan *Message, errChan chan error) {
|
||||
for {
|
||||
logger.Debug(ctx, "reading next message")
|
||||
|
||||
msg, err := c.Read()
|
||||
if err != nil {
|
||||
errChan <- errors.WithStack(err)
|
||||
|
||||
close(errChan)
|
||||
close(msgChan)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
msgChan <- msg
|
||||
}
|
||||
}(msgChan, errChan)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
logger.Debug(ctx, "reading next message")
|
||||
|
||||
req, err := c.Read()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
case err, ok := <-errChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
case msg, ok := <-msgChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
subCtx := logger.With(ctx, logger.F("messageType", req.Type))
|
||||
subCtx := logger.With(ctx, logger.F("messageType", msg.Type))
|
||||
|
||||
handler, exists := handlers[req.Type]
|
||||
handler, exists := handlers[msg.Type]
|
||||
if !exists {
|
||||
logger.Error(subCtx, "no message handler registered")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
res, err := handler(subCtx, req)
|
||||
res, err := handler(subCtx, msg)
|
||||
if err != nil {
|
||||
logger.Error(subCtx, "error while handling message", logger.E(err))
|
||||
|
||||
|
|
|
@ -102,9 +102,17 @@ func (c *RemoteClient) RemoteAddr() net.Addr {
|
|||
}
|
||||
|
||||
func (c *RemoteClient) Close() {
|
||||
if c.sess != nil {
|
||||
c.sess.Close()
|
||||
}
|
||||
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
c.sess = nil
|
||||
c.conn = nil
|
||||
c.control = nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Proxy(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
|
|
Loading…
Reference in New Issue