2023-10-24 22:52:33 +02:00
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
2024-01-06 16:48:14 +01:00
|
|
|
"bytes"
|
2023-10-24 22:52:33 +02:00
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
|
2024-01-06 16:48:14 +01:00
|
|
|
"cdr.dev/slog"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
2023-10-24 22:52:33 +02:00
|
|
|
"github.com/pkg/errors"
|
|
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
type readCacher struct {
|
|
|
|
reader io.ReadSeekCloser
|
2024-01-06 16:48:14 +01:00
|
|
|
cache *lfu.Cache[string, []byte]
|
|
|
|
buf bytes.Buffer
|
2023-10-24 22:52:33 +02:00
|
|
|
key string
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close implements io.ReadSeekCloser.
|
|
|
|
func (r *readCacher) Close() error {
|
|
|
|
if err := r.reader.Close(); err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
2024-01-06 16:48:14 +01:00
|
|
|
if err := r.cache.Set(r.key, r.buf.Bytes()); err != nil {
|
|
|
|
var logErr slog.Field
|
|
|
|
if errors.Is(err, lfu.ErrSizeExceedCapacity) {
|
|
|
|
logErr = logger.E(errors.WithStack(err))
|
|
|
|
} else {
|
|
|
|
logErr = logger.CapturedE(errors.WithStack(err))
|
|
|
|
}
|
|
|
|
logger.Error(context.Background(), "could not cache buffered data",
|
|
|
|
logger.F("cacheKey", r.key),
|
|
|
|
logErr,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
r.buf.Reset()
|
|
|
|
|
2023-10-24 22:52:33 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read implements io.ReadSeekCloser.
|
|
|
|
func (r *readCacher) Read(p []byte) (n int, err error) {
|
|
|
|
length, err := r.reader.Read(p)
|
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF {
|
|
|
|
return length, io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
return length, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if length > 0 {
|
2024-01-06 16:48:14 +01:00
|
|
|
if _, err := r.buf.Write(p[:length]); err != nil {
|
2023-11-29 11:10:29 +01:00
|
|
|
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
|
|
|
|
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
|
2023-10-24 22:52:33 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return length, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Seek implements io.ReadSeekCloser.
|
|
|
|
func (r *readCacher) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
length, err := r.reader.Seek(offset, whence)
|
|
|
|
if err != nil {
|
|
|
|
return length, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return length, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ io.ReadSeekCloser = &readCacher{}
|
|
|
|
|
|
|
|
type cachedReader struct {
|
|
|
|
buffer []byte
|
|
|
|
offset int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read implements io.ReadSeekCloser.
|
|
|
|
func (r *cachedReader) Read(p []byte) (n int, err error) {
|
|
|
|
available := len(r.buffer) - int(r.offset)
|
|
|
|
if available == 0 {
|
|
|
|
return 0, io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
size := len(p)
|
|
|
|
if size > available {
|
|
|
|
size = available
|
|
|
|
}
|
|
|
|
|
|
|
|
copy(p, r.buffer[r.offset:r.offset+int64(size)])
|
|
|
|
r.offset += int64(size)
|
|
|
|
|
|
|
|
return size, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close implements io.ReadSeekCloser.
|
|
|
|
func (r *cachedReader) Close() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Seek implements io.ReadSeekCloser.
|
|
|
|
func (r *cachedReader) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
var newOffset int64
|
|
|
|
|
|
|
|
switch whence {
|
|
|
|
case io.SeekStart:
|
|
|
|
newOffset = offset
|
|
|
|
case io.SeekCurrent:
|
|
|
|
newOffset = r.offset + offset
|
|
|
|
case io.SeekEnd:
|
|
|
|
newOffset = int64(len(r.buffer)) + offset
|
|
|
|
default:
|
|
|
|
return 0, errors.Errorf("unknown seek whence '%d'", whence)
|
|
|
|
}
|
|
|
|
|
|
|
|
if newOffset > int64(len(r.buffer)) || newOffset < 0 {
|
|
|
|
return 0, fmt.Errorf("invalid offset %d", offset)
|
|
|
|
}
|
|
|
|
|
|
|
|
r.offset = newOffset
|
|
|
|
|
|
|
|
return newOffset, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ io.ReadSeekCloser = &cachedReader{}
|