package client import ( "context" "net/url" "strconv" "sync" "time" "github.com/jackc/puddle/v2" "github.com/keegancsmith/rpc" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) { constructor := func(context.Context) (*rpc.Client, error) { client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery) if err != nil { return nil, errors.WithStack(err) } return client, nil } destructor := func(client *rpc.Client) { if err := client.Close(); err != nil { logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err))) } } maxPoolSize := int32(poolSize) pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize}) if err != nil { return nil, errors.WithStack(err) } return pool, nil } type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error func WithPooledClient(serverURL *url.URL) WithClientFunc { var ( pool *puddle.Pool[*rpc.Client] createPool sync.Once ) return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error { var err error createPool.Do(func() { rawPoolSize := serverURL.Query().Get("clientPoolSize") if rawPoolSize == "" { rawPoolSize = "5" } var poolSize int64 poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32) if err != nil { err = errors.Wrap(err, "could not parse clientPoolSize url query parameter") return } pool, err = NewClientPool(serverURL, int(poolSize)) if err != nil { err = errors.WithStack(err) return } }) if err != nil { return errors.WithStack(err) } attempts := 0 max := 5 for { if attempts >= max { logger.Debug(ctx, "rpc client call retrying failed", logger.F("attempts", attempts)) return errors.Wrapf(err, "rpc client call failed after %d attempts", max) } clientResource, err := pool.Acquire(ctx) if err != nil { return errors.WithStack(err) } client := clientResource.Value() if err := fn(ctx, client); err != nil { if errors.Is(err, rpc.ErrShutdown) { clientResource.Destroy() wait := time.Duration(8<<(attempts+1)) * time.Millisecond logger.Warn( ctx, "rpc client connection is shutdown, retrying", logger.F("attempts", attempts), logger.F("max", max), logger.F("delay", wait), ) timer := time.NewTimer(wait) select { case <-timer.C: attempts++ continue case <-ctx.Done(): if err := ctx.Err(); err != nil { return errors.WithStack(err) } return nil } } clientResource.Release() return errors.WithStack(err) } clientResource.Release() return nil } } }