From 8b06473e58a5bf65f80dcdf19239c6729cedadfc Mon Sep 17 00:00:00 2001 From: Vikram Rangnekar Date: Sun, 2 Jun 2019 01:38:51 -0400 Subject: [PATCH] Make remote joins use parallel http requests --- docs/guide.md | 10 +++-- psql/bench.0 | 5 ++- psql/bench.1 | 6 --- psql/bench.2 | 6 --- psql/bench.3 | 6 --- psql/psql.go | 22 +++++----- psql/psql_test.go | 69 +++++++++++++++++++++----------- qcode/parse.go | 12 +++--- qcode/qcode.go | 13 +++--- serv/core.go | 100 +++++++++++++++++++++++++++++----------------- 10 files changed, 143 insertions(+), 106 deletions(-) delete mode 100644 psql/bench.1 delete mode 100644 psql/bench.2 delete mode 100644 psql/bench.3 diff --git a/docs/guide.md b/docs/guide.md index eb5da31..2b73a61 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -345,13 +345,17 @@ class AddSearchColumn < ActiveRecord::Migration[5.1] end ``` -## Join with remote REST APIs +## Remote Joins -It often happens that after fetching some data from the DB we need to call another API to fetch some more data and all this combined into a single JSON response. For example along with a list of users you need their last 5 payments from Stripe. This requires you to query your DB for the users and Stripe for the payments. Super Graph handles all this for you also only the fields you requested from the Stripe API are returned. All this is a very performance focused way. +It often happens that after fetching some data from the DB we need to call another API to fetch some more data and all this combined into a single JSON response. For example along with a list of users you need their last 5 payments from Stripe. This requires you to query your DB for the users and Stripe for the payments. Super Graph handles all this for you also only the fields you requested from the Stripe API are returned. + +::: tip Is this fast? +Super Graph is able fetch remote data and merge it with the DB response in an efficient manner. Several optimizations such as parallel HTTP requests and a zero-allocation JSON merge algorithm makes this very fast. All of this without you having to write a line of code. +::: For example you need to list the last 3 payments made by a user. You will first need to look up the user in the database and then call the Stripe API to fetch his last 3 payments. For this to work your user table in the db has a `customer_id` column that contains his Stripe customer ID. -Similiarly you might also have the need to fetch the users last tweet and include that too. Super Graph can handle this for you using it's `API Stitching` feature. +Similiarly you could also fetch the users last tweet, lead info from Salesforce or whatever else you need. It's fine to mix up several different `remote joins` into a single GraphQL query. ### Stripe API example diff --git a/psql/bench.0 b/psql/bench.0 index 268a10d..de1698d 100644 --- a/psql/bench.0 +++ b/psql/bench.0 @@ -1,6 +1,7 @@ goos: darwin goarch: amd64 pkg: github.com/dosco/super-graph/psql -BenchmarkCompileGQLToSQL-8 50000 38882 ns/op 15177 B/op 266 allocs/op +BenchmarkCompile-8 50000 38531 ns/op 5175 B/op 148 allocs/op +BenchmarkCompileParallel-8 200000 11472 ns/op 5237 B/op 148 allocs/op PASS -ok github.com/dosco/super-graph/psql 2.473s +ok github.com/dosco/super-graph/psql 4.686s diff --git a/psql/bench.1 b/psql/bench.1 deleted file mode 100644 index 2f86f56..0000000 --- a/psql/bench.1 +++ /dev/null @@ -1,6 +0,0 @@ -goos: darwin -goarch: amd64 -pkg: github.com/dosco/super-graph/psql -BenchmarkCompileGQLToSQL-8 50000 39601 ns/op 20165 B/op 263 allocs/op -PASS -ok github.com/dosco/super-graph/psql 2.549s diff --git a/psql/bench.2 b/psql/bench.2 deleted file mode 100644 index 095f5fe..0000000 --- a/psql/bench.2 +++ /dev/null @@ -1,6 +0,0 @@ -goos: darwin -goarch: amd64 -pkg: github.com/dosco/super-graph/psql -BenchmarkCompileGQLToSQL-8 50000 35559 ns/op 8453 B/op 228 allocs/op -PASS -ok github.com/dosco/super-graph/psql 2.162s diff --git a/psql/bench.3 b/psql/bench.3 deleted file mode 100644 index 1d622d8..0000000 --- a/psql/bench.3 +++ /dev/null @@ -1,6 +0,0 @@ -goos: darwin -goarch: amd64 -pkg: github.com/dosco/super-graph/psql -BenchmarkCompileGQLToSQL-8 50000 28320 ns/op 7698 B/op 154 allocs/op -PASS -ok github.com/dosco/super-graph/psql 1.724s diff --git a/psql/psql.go b/psql/psql.go index eb74e8d..27e8a23 100644 --- a/psql/psql.go +++ b/psql/psql.go @@ -91,7 +91,7 @@ func (c *Compiler) Compile(qc *qcode.QCode, w *bytes.Buffer) (uint32, error) { ignored |= skipped for _, id := range v.sel.Children { - if hasBit(skipped, id) { + if hasBit(skipped, uint16(id)) { continue } child := &qc.Query.Selects[id] @@ -377,7 +377,7 @@ func (v *selectBlock) renderJoinedColumns(w *bytes.Buffer, skipped uint32) error colsRendered := len(v.sel.Cols) != 0 for _, id := range v.sel.Children { - skipThis := hasBit(skipped, id) + skipThis := hasBit(skipped, uint16(id)) if colsRendered && !skipThis { io.WriteString(w, ", ") @@ -884,7 +884,7 @@ func alias(w *bytes.Buffer, alias string) { w.WriteString(`"`) } -func aliasWithID(w *bytes.Buffer, alias string, id uint16) { +func aliasWithID(w *bytes.Buffer, alias string, id int16) { w.WriteString(` AS "`) w.WriteString(alias) w.WriteString(`_`) @@ -892,7 +892,7 @@ func aliasWithID(w *bytes.Buffer, alias string, id uint16) { w.WriteString(`"`) } -func aliasWithIDSuffix(w *bytes.Buffer, alias string, id uint16, suffix string) { +func aliasWithIDSuffix(w *bytes.Buffer, alias string, id int16, suffix string) { w.WriteString(` AS "`) w.WriteString(alias) w.WriteString(`_`) @@ -917,7 +917,7 @@ func colWithTable(w *bytes.Buffer, table, col string) { w.WriteString(`"`) } -func colWithTableID(w *bytes.Buffer, table string, id uint16, col string) { +func colWithTableID(w *bytes.Buffer, table string, id int16, col string) { w.WriteString(`"`) w.WriteString(table) w.WriteString(`_`) @@ -927,7 +927,7 @@ func colWithTableID(w *bytes.Buffer, table string, id uint16, col string) { w.WriteString(`"`) } -func colWithTableIDAlias(w *bytes.Buffer, table string, id uint16, col, alias string) { +func colWithTableIDAlias(w *bytes.Buffer, table string, id int16, col, alias string) { w.WriteString(`"`) w.WriteString(table) w.WriteString(`_`) @@ -939,7 +939,7 @@ func colWithTableIDAlias(w *bytes.Buffer, table string, id uint16, col, alias st w.WriteString(`"`) } -func colWithTableIDSuffixAlias(w *bytes.Buffer, table string, id uint16, +func colWithTableIDSuffixAlias(w *bytes.Buffer, table string, id int16, suffix, col, alias string) { w.WriteString(`"`) w.WriteString(table) @@ -953,7 +953,7 @@ func colWithTableIDSuffixAlias(w *bytes.Buffer, table string, id uint16, w.WriteString(`"`) } -func tableIDColSuffix(w *bytes.Buffer, table string, id uint16, col, suffix string) { +func tableIDColSuffix(w *bytes.Buffer, table string, id int16, col, suffix string) { w.WriteString(`"`) w.WriteString(table) w.WriteString(`_`) @@ -966,18 +966,18 @@ func tableIDColSuffix(w *bytes.Buffer, table string, id uint16, col, suffix stri const charset = "0123456789" -func int2string(w *bytes.Buffer, val uint16) { +func int2string(w *bytes.Buffer, val int16) { if val < 10 { w.WriteByte(charset[val]) return } - temp := uint16(0) + temp := int16(0) val2 := val for val2 > 0 { temp *= 10 temp += val2 % 10 - val2 = uint16(math.Floor(float64(val2 / 10))) + val2 = int16(math.Floor(float64(val2 / 10))) } val3 := temp diff --git a/psql/psql_test.go b/psql/psql_test.go index 50d3a32..0c5fb3d 100644 --- a/psql/psql_test.go +++ b/psql/psql_test.go @@ -480,45 +480,68 @@ func TestCompileGQL(t *testing.T) { t.Run("syntheticTables", syntheticTables) } -func BenchmarkCompileGQLToSQL(b *testing.B) { - gql := `query { - products( - # returns only 30 items - limit: 30, +var benchGQL = `query { + products( + # returns only 30 items + limit: 30, - # starts from item 10, commented out for now - # offset: 10, + # starts from item 10, commented out for now + # offset: 10, - # orders the response items by highest price - order_by: { price: desc }, + # orders the response items by highest price + order_by: { price: desc }, - # only items with an id >= 30 and < 30 are returned - where: { id: { and: { greater_or_equals: 20, lt: 28 } } }) { - id - name - price - user { - full_name - picture : avatar - } + # only items with an id >= 30 and < 30 are returned + where: { id: { and: { greater_or_equals: 20, lt: 28 } } }) { + id + name + price + user { + full_name + picture : avatar } - }` + } +}` - w := &bytes.Buffer() +func BenchmarkCompile(b *testing.B) { + w := &bytes.Buffer{} b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - qc, err := qcompile.CompileQuery(gql) + w.Reset() + + qc, err := qcompile.CompileQuery(benchGQL) if err != nil { b.Fatal(err) } - _, sqlStmt, err := pcompile.Compile(qc, w) + _, err = pcompile.Compile(qc, w) if err != nil { b.Fatal(err) } - w.Reset() } } + +func BenchmarkCompileParallel(b *testing.B) { + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + w := &bytes.Buffer{} + + for pb.Next() { + w.Reset() + + qc, err := qcompile.CompileQuery(benchGQL) + if err != nil { + b.Fatal(err) + } + + _, err = pcompile.Compile(qc, w) + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/qcode/parse.go b/qcode/parse.go index 0f96345..e046325 100644 --- a/qcode/parse.go +++ b/qcode/parse.go @@ -48,14 +48,14 @@ func (o *Operation) Reset() { } type Field struct { - ID uint16 + ID int16 Name string Alias string Args []Arg argsA [10]Arg - ParentID uint16 - Children []uint16 - childrenA [10]uint16 + ParentID int16 + Children []int16 + childrenA [10]int16 } type Arg struct { @@ -277,7 +277,7 @@ func (p *Parser) parseFields(fields []Field) ([]Field, error) { return nil, errors.New("expecting an alias or field name") } - fields = append(fields, Field{ID: uint16(len(fields))}) + fields = append(fields, Field{ID: int16(len(fields))}) f := &fields[(len(fields) - 1)] f.Args = f.argsA[:0] f.Children = f.childrenA[:0] @@ -288,7 +288,7 @@ func (p *Parser) parseFields(fields []Field) ([]Field, error) { if f.ID != 0 { intf := st.Peek() - pid, ok := intf.(uint16) + pid, ok := intf.(int16) if !ok { return nil, fmt.Errorf("14: unexpected value %v (%t)", intf, intf) diff --git a/qcode/qcode.go b/qcode/qcode.go index 71b6c5f..ceadf69 100644 --- a/qcode/qcode.go +++ b/qcode/qcode.go @@ -29,8 +29,8 @@ type Column struct { } type Select struct { - ID uint16 - ParentID uint16 + ID int16 + ParentID int16 RelID uint64 Args map[string]*Node AsList bool @@ -42,7 +42,7 @@ type Select struct { OrderBy []*OrderBy DistinctOn []string Paging Paging - Children []uint16 + Children []int16 } type Exp struct { @@ -197,7 +197,8 @@ func (com *Compiler) CompileQuery(query string) (*QCode, error) { } func (com *Compiler) compileQuery(op *Operation) (*Query, error) { - var id, parentID uint16 + id := int16(0) + parentID := int16(-1) selects := make([]Select, 0, 5) st := util.NewStack() @@ -218,7 +219,7 @@ func (com *Compiler) compileQuery(op *Operation) (*Query, error) { } intf := st.Pop() - fid, ok := intf.(uint16) + fid, ok := intf.(int16) if !ok { return nil, fmt.Errorf("15: unexpected value %v (%t)", intf, intf) @@ -235,7 +236,7 @@ func (com *Compiler) compileQuery(op *Operation) (*Query, error) { ID: id, ParentID: parentID, Table: tn, - Children: make([]uint16, 0, 5), + Children: make([]int16, 0, 5), } if s.ID != 0 { diff --git a/serv/core.go b/serv/core.go index 802b5b3..381bddb 100644 --- a/serv/core.go +++ b/serv/core.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "os" + "sync" "time" "github.com/cespare/xxhash/v2" @@ -74,10 +75,6 @@ func (c *coreContext) handleReq(w io.Writer, req *http.Request) error { return errors.New("something wrong no remote ids found in db response") } - if err != nil { - return err - } - var ob bytes.Buffer err = jsn.Replace(&ob, data, from, to) @@ -131,7 +128,7 @@ func (c *coreContext) resolveRemote( } if conf.EnableTracing { - c.addTrace(s, st) + c.addTrace(sel, s.ID, st) } if len(r.Path) != 0 { @@ -163,9 +160,14 @@ func (c *coreContext) resolveRemotes( // replacement data for the marked insertion points // key and value will be replaced by whats below - to := make([]jsn.Field, 0, len(from)) + to := make([]jsn.Field, len(from)) - for _, id := range from { + var wg sync.WaitGroup + wg.Add(len(from)) + + var cerr error + + for i, id := range from { // use the json key to find the related Select object k1 := xxhash.Sum64(id.Key) @@ -190,39 +192,48 @@ func (c *coreContext) resolveRemotes( return nil, nil } - st := time.Now() + go func(n int) { + defer wg.Done() - b, err := r.Fn(req, id) - if err != nil { - return nil, err - } + st := time.Now() - if conf.EnableTracing { - c.addTrace(s, st) - } - - if len(r.Path) != 0 { - b = jsn.Strip(b, r.Path) - } - - var ob bytes.Buffer - - if len(s.Cols) != 0 { - err = jsn.Filter(&ob, b, colsToList(s.Cols)) + b, err := r.Fn(req, id) if err != nil { - return nil, err + cerr = err + return } - } else { - ob.WriteString("null") - } + if conf.EnableTracing { + c.addTrace(sel, s.ID, st) + } - to = append(to, jsn.Field{[]byte(s.FieldName), ob.Bytes()}) + if len(r.Path) != 0 { + b = jsn.Strip(b, r.Path) + } + + var ob bytes.Buffer + + if len(s.Cols) != 0 { + err = jsn.Filter(&ob, b, colsToList(s.Cols)) + if err != nil { + cerr = err + return + } + + } else { + ob.WriteString("null") + } + + to[n] = jsn.Field{[]byte(s.FieldName), ob.Bytes()} + }(i) } - return to, nil + wg.Wait() + + return to, cerr } -func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) ([]byte, uint32, error) { +func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) ( + []byte, uint32, error) { // var entry []byte // var key string @@ -291,7 +302,10 @@ func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) ([]byte, uint3 } if conf.EnableTracing && len(qc.Query.Selects) != 0 { - c.addTrace(&qc.Query.Selects[0], st) + c.addTrace( + qc.Query.Selects, + qc.Query.Selects[0].ID, + st) } return []byte(root), skipped, nil @@ -302,7 +316,7 @@ func (c *coreContext) render(w io.Writer, data []byte) error { return json.NewEncoder(w).Encode(c.res) } -func (c *coreContext) addTrace(sel *qcode.Select, st time.Time) { +func (c *coreContext) addTrace(sel []qcode.Select, id int16, st time.Time) { et := time.Now() du := et.Sub(st) @@ -317,10 +331,22 @@ func (c *coreContext) addTrace(sel *qcode.Select, st time.Time) { c.res.Extensions.Tracing.EndTime = et c.res.Extensions.Tracing.Duration = du + n := 0 + for i := id; i != -1; i = sel[i].ParentID { + n++ + } + + path := make([]string, n) + n-- + for i := id; i != -1; i = sel[i].ParentID { + path[n] = sel[i].Table + n-- + } + tr := resolver{ - Path: []string{sel.Table}, + Path: path, ParentType: "Query", - FieldName: sel.Table, + FieldName: sel[id].Table, ReturnType: "object", StartOffset: 1, Duration: du, @@ -337,7 +363,7 @@ func parentFieldIds(h *xxhash.Digest, sel []qcode.Select, skipped uint32) ( c := 0 for i := range sel { s := &sel[i] - if isSkipped(skipped, s.ID) { + if isSkipped(skipped, uint16(s.ID)) { c++ } } @@ -354,7 +380,7 @@ func parentFieldIds(h *xxhash.Digest, sel []qcode.Select, skipped uint32) ( for i := range sel { s := &sel[i] - if isSkipped(skipped, s.ID) == false { + if isSkipped(skipped, uint16(s.ID)) == false { continue }