Compare commits

...

4 Commits

12 changed files with 316 additions and 26 deletions

View File

@ -1,15 +1,20 @@
package app package app
import ( import (
"context"
"math/rand" "math/rand"
"sync" "sync"
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/eventloop"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
) )
var ErrFuncDoesNotExist = errors.New("function does not exist") var (
ErrFuncDoesNotExist = errors.New("function does not exist")
ErUnknownError = errors.New("unknown error")
)
type Server struct { type Server struct {
runtime *goja.Runtime runtime *goja.Runtime
@ -26,16 +31,18 @@ func (s *Server) Load(name string, src string) error {
return nil return nil
} }
func (s *Server) ExecFuncByName(funcName string, args ...interface{}) (goja.Value, error) { func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...interface{}) (goja.Value, error) {
ctx = logger.With(ctx, logger.F("function", funcName), logger.F("args", args))
callable, ok := goja.AssertFunction(s.runtime.Get(funcName)) callable, ok := goja.AssertFunction(s.runtime.Get(funcName))
if !ok { if !ok {
return nil, errors.WithStack(ErrFuncDoesNotExist) return nil, errors.WithStack(ErrFuncDoesNotExist)
} }
return s.Exec(callable, args...) return s.Exec(ctx, callable, args...)
} }
func (s *Server) Exec(callable goja.Callable, args ...interface{}) (goja.Value, error) { func (s *Server) Exec(ctx context.Context, callable goja.Callable, args ...interface{}) (goja.Value, error) {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
value goja.Value value goja.Value
@ -45,6 +52,25 @@ func (s *Server) Exec(callable goja.Callable, args ...interface{}) (goja.Value,
wg.Add(1) wg.Add(1)
s.loop.RunOnLoop(func(vm *goja.Runtime) { s.loop.RunOnLoop(func(vm *goja.Runtime) {
logger.Debug(ctx, "executing callable")
defer wg.Done()
defer func() {
if recovered := recover(); recovered != nil {
revoveredErr, ok := recovered.(error)
if ok {
logger.Error(ctx, "recovered runtime error", logger.E(errors.WithStack(revoveredErr)))
err = errors.WithStack(ErUnknownError)
return
}
panic(recovered)
}
}()
jsArgs := make([]goja.Value, 0, len(args)) jsArgs := make([]goja.Value, 0, len(args))
for _, a := range args { for _, a := range args {
jsArgs = append(jsArgs, vm.ToValue(a)) jsArgs = append(jsArgs, vm.ToValue(a))
@ -54,8 +80,6 @@ func (s *Server) Exec(callable goja.Callable, args ...interface{}) (goja.Value,
if err != nil { if err != nil {
err = errors.WithStack(err) err = errors.WithStack(err)
} }
wg.Done()
}) })
wg.Wait() wg.Wait()

View File

@ -65,7 +65,7 @@ func TestAuthModule(t *testing.T) {
ctx := context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req) ctx := context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
if _, err := server.ExecFuncByName("testAuth", ctx); err != nil { if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
} }
@ -104,7 +104,7 @@ func TestAuthAnonymousModule(t *testing.T) {
ctx := context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req) ctx := context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
if _, err := server.ExecFuncByName("testAuth", ctx); err != nil { if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
} }

View File

@ -88,7 +88,7 @@ func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUpl
"contentType": req.FileHeader.Header.Get("Content-Type"), "contentType": req.FileHeader.Header.Get("Content-Type"),
} }
rawResult, err := m.server.ExecFuncByName("onBlobUpload", ctx, blobID, blobInfo, req.Metadata) rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
if err != nil { if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) { if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false res.Allow = false
@ -193,7 +193,7 @@ func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID sto
func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) { func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
res := NewMessageDownloadResponse(req.RequestID) res := NewMessageDownloadResponse(req.RequestID)
rawResult, err := m.server.ExecFuncByName("onBlobDownload", req.Context, req.Bucket, req.BlobID) rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
if err != nil { if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) { if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false res.Allow = false

View File

@ -1,6 +1,7 @@
package cast package cast
import ( import (
"context"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
@ -79,7 +80,7 @@ func TestCastModuleRefreshDevices(t *testing.T) {
defer server.Stop() defer server.Stop()
result, err := server.ExecFuncByName("refreshDevices") result, err := server.ExecFuncByName(context.Background(), "refreshDevices")
if err != nil { if err != nil {
t.Error(errors.WithStack(err)) t.Error(errors.WithStack(err))
} }

View File

@ -21,7 +21,7 @@ func (m *LifecycleModule) Export(export *goja.Object) {
} }
func (m *LifecycleModule) OnInit() error { func (m *LifecycleModule) OnInit() error {
if _, err := m.server.ExecFuncByName("onInit"); err != nil { if _, err := m.server.ExecFuncByName(context.Background(), "onInit"); err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) { if errors.Is(err, app.ErrFuncDoesNotExist) {
logger.Warn(context.Background(), "could not find onInit() function", logger.E(errors.WithStack(err))) logger.Warn(context.Background(), "could not find onInit() function", logger.E(errors.WithStack(err)))

View File

@ -129,7 +129,7 @@ func (m *Module) handleClientMessages() {
logger.F("message", clientMessage), logger.F("message", clientMessage),
) )
if _, err := m.server.ExecFuncByName("onClientMessage", clientMessage.Context, clientMessage.Data); err != nil { if _, err := m.server.ExecFuncByName(clientMessage.Context, "onClientMessage", clientMessage.Context, clientMessage.Data); err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) { if errors.Is(err, app.ErrFuncDoesNotExist) {
continue continue
} }

View File

@ -161,7 +161,7 @@ func (m *RPCModule) handleMessages() {
continue continue
} }
result, err := m.server.Exec(callable, clientMessage.Context, req.Params) result, err := m.server.Exec(clientMessage.Context, callable, clientMessage.Context, req.Params)
if err != nil { if err != nil {
logger.Error( logger.Error(
ctx, "rpc call error", ctx, "rpc call error",

View File

@ -102,7 +102,7 @@ func (m *Module) query(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
} }
if queryOptions.Offset != nil { if queryOptions.Offset != nil {
queryOptionsFuncs = append(queryOptionsFuncs, storage.WithOffset(*queryOptions.Limit)) queryOptionsFuncs = append(queryOptionsFuncs, storage.WithOffset(*queryOptions.Offset))
} }
if queryOptions.OrderDirection != nil { if queryOptions.OrderDirection != nil {

View File

@ -1,6 +1,7 @@
package store package store
import ( import (
"context"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -34,7 +35,7 @@ func TestStoreModule(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
if _, err := server.ExecFuncByName("testStore"); err != nil { if _, err := server.ExecFuncByName(context.Background(), "testStore"); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }

View File

@ -7,35 +7,35 @@ const (
OrderDirectionDesc OrderDirection = "DESC" OrderDirectionDesc OrderDirection = "DESC"
) )
type QueryOption struct { type QueryOptions struct {
Limit *int Limit *int
Offset *int Offset *int
OrderBy *string OrderBy *string
OrderDirection *OrderDirection OrderDirection *OrderDirection
} }
type QueryOptionFunc func(o *QueryOption) type QueryOptionFunc func(o *QueryOptions)
func WithLimit(limit int) QueryOptionFunc { func WithLimit(limit int) QueryOptionFunc {
return func(o *QueryOption) { return func(o *QueryOptions) {
o.Limit = &limit o.Limit = &limit
} }
} }
func WithOffset(offset int) QueryOptionFunc { func WithOffset(offset int) QueryOptionFunc {
return func(o *QueryOption) { return func(o *QueryOptions) {
o.Offset = &offset o.Offset = &offset
} }
} }
func WithOrderBy(orderBy string) QueryOptionFunc { func WithOrderBy(orderBy string) QueryOptionFunc {
return func(o *QueryOption) { return func(o *QueryOptions) {
o.OrderBy = &orderBy o.OrderBy = &orderBy
} }
} }
func WithOrderDirection(direction OrderDirection) QueryOptionFunc { func WithOrderDirection(direction OrderDirection) QueryOptionFunc {
return func(o *QueryOption) { return func(o *QueryOptions) {
o.OrderDirection = &direction o.OrderDirection = &direction
} }
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"math"
"sync" "sync"
"time" "time"
@ -90,6 +91,11 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
// Query implements storage.DocumentStore // Query implements storage.DocumentStore
func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
opts := &storage.QueryOptions{}
for _, fn := range funcs {
fn(opts)
}
var documents []storage.Document var documents []storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error { err := s.withTx(ctx, func(tx *sql.Tx) error {
@ -120,6 +126,29 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
args = append([]interface{}{collection}, args...) args = append([]interface{}{collection}, args...)
if opts.OrderBy != nil {
direction := storage.OrderDirectionAsc
if opts.OrderDirection != nil {
direction = *opts.OrderDirection
}
query, args = withOrderByClause(query, args, *opts.OrderBy, direction)
}
if opts.Offset != nil || opts.Limit != nil {
offset := 0
if opts.Offset != nil {
offset = *opts.Offset
}
limit := math.MaxInt
if opts.Limit != nil {
limit = *opts.Limit
}
query, args = withLimitOffsetClause(query, args, limit, offset)
}
logger.Debug( logger.Debug(
ctx, "executing query", ctx, "executing query",
logger.F("query", query), logger.F("query", query),
@ -331,6 +360,41 @@ func (s *DocumentStore) ensureTables(ctx context.Context, db *sql.DB) error {
return nil return nil
} }
func withOrderByClause(query string, args []any, orderBy string, orderDirection storage.OrderDirection) (string, []any) {
direction := "ASC"
if orderDirection == storage.OrderDirectionDesc {
direction = "DESC"
}
var column string
switch orderBy {
case storage.DocumentAttrID:
column = "id"
case storage.DocumentAttrCreatedAt:
column = "created_at"
case storage.DocumentAttrUpdatedAt:
column = "updated_at"
default:
column = fmt.Sprintf("json_extract(data, '$.' || $%d)", len(args)+1)
args = append(args, orderBy)
}
query += fmt.Sprintf(` ORDER BY %s %s`, column, direction)
return query, args
}
func withLimitOffsetClause(query string, args []any, limit int, offset int) (string, []any) {
query += fmt.Sprintf(` LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2)
args = append(args, limit, offset)
return query, args
}
func NewDocumentStore(path string) *DocumentStore { func NewDocumentStore(path string) *DocumentStore {
return &DocumentStore{ return &DocumentStore{
db: nil, db: nil,

View File

@ -41,7 +41,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
}), }),
) )
results, err := store.Query(ctx, collection, filter, nil) results, err := store.Query(ctx, collection, filter)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -82,7 +82,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
}), }),
) )
results, err := store.Query(ctx, collection, filter, nil) results, err := store.Query(ctx, collection, filter)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -127,7 +127,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
), ),
) )
results, err := store.Query(ctx, collection, filter, nil) results, err := store.Query(ctx, collection, filter)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -219,7 +219,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
// Verify that there is no additional created document in the collection // Verify that there is no additional created document in the collection
results, err := store.Query(ctx, collection, nil, nil) results, err := store.Query(ctx, collection, nil)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -228,6 +228,206 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
return errors.Errorf("len(results): expected '%v', got '%v'", e, g) return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
} }
return nil
},
},
{
Name: "Query order by document field",
Run: func(ctx context.Context, store storage.DocumentStore) error {
docs := []storage.Document{
{
"sortedField": 0,
"name": "Item 1",
},
{
"sortedField": 1,
"name": "Item 2",
},
{
"sortedField": 2,
"name": "Item 3",
},
}
collection := "ordered_query_by_document_field"
for _, doc := range docs {
if _, err := store.Upsert(ctx, collection, doc); err != nil {
return errors.WithStack(err)
}
}
results, err := store.Query(
ctx, collection, nil,
storage.WithOrderBy("sortedField"),
storage.WithOrderDirection(storage.OrderDirectionAsc),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 3, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[0]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[2]["name"], results[2]["name"]; e != g {
return errors.Errorf("results[2][\"name\"]: expected '%v', got '%v'", e, g)
}
results, err = store.Query(
ctx, collection, nil,
storage.WithOrderBy("sortedField"),
storage.WithOrderDirection(storage.OrderDirectionDesc),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 3, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[2]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[0]["name"], results[2]["name"]; e != g {
return errors.Errorf("results[2][\"name\"]: expected '%v', got '%v'", e, g)
}
return nil
},
},
{
Name: "Query order by special attr",
Run: func(ctx context.Context, store storage.DocumentStore) error {
docs := []storage.Document{
{
"name": "Item 1",
},
{
"name": "Item 2",
},
{
"name": "Item 3",
},
}
collection := "ordered_query_by_special_attr"
for _, doc := range docs {
if _, err := store.Upsert(ctx, collection, doc); err != nil {
return errors.WithStack(err)
}
}
results, err := store.Query(
ctx, collection, nil,
storage.WithOrderBy(storage.DocumentAttrCreatedAt),
storage.WithOrderDirection(storage.OrderDirectionAsc),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 3, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[0]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[2]["name"], results[2]["name"]; e != g {
return errors.Errorf("results[2][\"name\"]: expected '%v', got '%v'", e, g)
}
results, err = store.Query(
ctx, collection, nil,
storage.WithOrderBy(storage.DocumentAttrCreatedAt),
storage.WithOrderDirection(storage.OrderDirectionDesc),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 3, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[2]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[0]["name"], results[2]["name"]; e != g {
return errors.Errorf("results[2][\"name\"]: expected '%v', got '%v'", e, g)
}
return nil
},
},
{
Name: "Query limit and offset",
Run: func(ctx context.Context, store storage.DocumentStore) error {
docs := []storage.Document{
{"name": "Item 1"},
{"name": "Item 2"},
{"name": "Item 3"},
{"name": "Item 4"},
}
collection := "query_limit_and_offset"
for _, doc := range docs {
if _, err := store.Upsert(ctx, collection, doc); err != nil {
return errors.WithStack(err)
}
}
results, err := store.Query(
ctx, collection, nil,
storage.WithLimit(2),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 2, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[0]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[1]["name"], results[1]["name"]; e != g {
return errors.Errorf("results[1][\"name\"]: expected '%v', got '%v'", e, g)
}
results, err = store.Query(
ctx, collection, nil,
storage.WithOffset(2),
)
if err != nil {
return errors.WithStack(err)
}
if e, g := 2, len(results); e != g {
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
}
if e, g := docs[2]["name"], results[0]["name"]; e != g {
return errors.Errorf("results[0][\"name\"]: expected '%v', got '%v'", e, g)
}
if e, g := docs[3]["name"], results[1]["name"]; e != g {
return errors.Errorf("results[1][\"name\"]: expected '%v', got '%v'", e, g)
}
return nil return nil
}, },
}, },