2023-11-29 11:10:29 +01:00
|
|
|
package client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"net/url"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
2023-11-28 16:35:49 +01:00
|
|
|
"time"
|
2023-11-29 11:10:29 +01:00
|
|
|
|
|
|
|
"github.com/jackc/puddle/v2"
|
|
|
|
"github.com/keegancsmith/rpc"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) {
|
|
|
|
constructor := func(context.Context) (*rpc.Client, error) {
|
|
|
|
client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return client, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
destructor := func(client *rpc.Client) {
|
|
|
|
if err := client.Close(); err != nil {
|
|
|
|
logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
maxPoolSize := int32(poolSize)
|
|
|
|
|
|
|
|
pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize})
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return pool, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error
|
|
|
|
|
|
|
|
func WithPooledClient(serverURL *url.URL) WithClientFunc {
|
|
|
|
var (
|
|
|
|
pool *puddle.Pool[*rpc.Client]
|
|
|
|
createPool sync.Once
|
|
|
|
)
|
|
|
|
|
|
|
|
return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error {
|
|
|
|
var err error
|
|
|
|
createPool.Do(func() {
|
|
|
|
rawPoolSize := serverURL.Query().Get("clientPoolSize")
|
|
|
|
if rawPoolSize == "" {
|
|
|
|
rawPoolSize = "5"
|
|
|
|
}
|
|
|
|
|
|
|
|
var poolSize int64
|
|
|
|
|
|
|
|
poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32)
|
|
|
|
if err != nil {
|
|
|
|
err = errors.Wrap(err, "could not parse clientPoolSize url query parameter")
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
pool, err = NewClientPool(serverURL, int(poolSize))
|
|
|
|
if err != nil {
|
|
|
|
err = errors.WithStack(err)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
2023-11-28 16:35:49 +01:00
|
|
|
attempts := 0
|
|
|
|
max := 5
|
2023-11-29 11:10:29 +01:00
|
|
|
|
2023-11-28 16:35:49 +01:00
|
|
|
for {
|
|
|
|
if attempts >= max {
|
|
|
|
logger.Debug(ctx, "rpc client call retrying failed", logger.F("attempts", attempts))
|
|
|
|
|
|
|
|
return errors.Wrapf(err, "rpc client call failed after %d attempts", max)
|
2023-11-29 11:10:29 +01:00
|
|
|
}
|
|
|
|
|
2023-11-28 16:35:49 +01:00
|
|
|
clientResource, err := pool.Acquire(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
client := clientResource.Value()
|
|
|
|
|
|
|
|
if err := fn(ctx, client); err != nil {
|
|
|
|
if errors.Is(err, rpc.ErrShutdown) {
|
|
|
|
clientResource.Destroy()
|
2023-11-29 11:10:29 +01:00
|
|
|
|
2023-11-28 16:35:49 +01:00
|
|
|
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
2023-11-29 11:10:29 +01:00
|
|
|
|
2023-11-28 16:35:49 +01:00
|
|
|
logger.Warn(
|
|
|
|
ctx, "rpc client connection is shutdown, retrying",
|
|
|
|
logger.F("attempts", attempts),
|
|
|
|
logger.F("max", max),
|
|
|
|
logger.F("delay", wait),
|
|
|
|
)
|
|
|
|
|
|
|
|
timer := time.NewTimer(wait)
|
|
|
|
select {
|
|
|
|
case <-timer.C:
|
|
|
|
attempts++
|
|
|
|
continue
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
clientResource.Release()
|
|
|
|
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
clientResource.Release()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2023-11-29 11:10:29 +01:00
|
|
|
}
|
|
|
|
}
|