Make remote joins use parallel http requests

This commit is contained in:
Vikram Rangnekar
2019-06-02 01:38:51 -04:00
parent f9fc5dd7de
commit 8b06473e58
10 changed files with 143 additions and 106 deletions

View File

@ -8,6 +8,7 @@ import (
"io"
"net/http"
"os"
"sync"
"time"
"github.com/cespare/xxhash/v2"
@ -74,10 +75,6 @@ func (c *coreContext) handleReq(w io.Writer, req *http.Request) error {
return errors.New("something wrong no remote ids found in db response")
}
if err != nil {
return err
}
var ob bytes.Buffer
err = jsn.Replace(&ob, data, from, to)
@ -131,7 +128,7 @@ func (c *coreContext) resolveRemote(
}
if conf.EnableTracing {
c.addTrace(s, st)
c.addTrace(sel, s.ID, st)
}
if len(r.Path) != 0 {
@ -163,9 +160,14 @@ func (c *coreContext) resolveRemotes(
// replacement data for the marked insertion points
// key and value will be replaced by whats below
to := make([]jsn.Field, 0, len(from))
to := make([]jsn.Field, len(from))
for _, id := range from {
var wg sync.WaitGroup
wg.Add(len(from))
var cerr error
for i, id := range from {
// use the json key to find the related Select object
k1 := xxhash.Sum64(id.Key)
@ -190,39 +192,48 @@ func (c *coreContext) resolveRemotes(
return nil, nil
}
st := time.Now()
go func(n int) {
defer wg.Done()
b, err := r.Fn(req, id)
if err != nil {
return nil, err
}
st := time.Now()
if conf.EnableTracing {
c.addTrace(s, st)
}
if len(r.Path) != 0 {
b = jsn.Strip(b, r.Path)
}
var ob bytes.Buffer
if len(s.Cols) != 0 {
err = jsn.Filter(&ob, b, colsToList(s.Cols))
b, err := r.Fn(req, id)
if err != nil {
return nil, err
cerr = err
return
}
} else {
ob.WriteString("null")
}
if conf.EnableTracing {
c.addTrace(sel, s.ID, st)
}
to = append(to, jsn.Field{[]byte(s.FieldName), ob.Bytes()})
if len(r.Path) != 0 {
b = jsn.Strip(b, r.Path)
}
var ob bytes.Buffer
if len(s.Cols) != 0 {
err = jsn.Filter(&ob, b, colsToList(s.Cols))
if err != nil {
cerr = err
return
}
} else {
ob.WriteString("null")
}
to[n] = jsn.Field{[]byte(s.FieldName), ob.Bytes()}
}(i)
}
return to, nil
wg.Wait()
return to, cerr
}
func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) ([]byte, uint32, error) {
func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) (
[]byte, uint32, error) {
// var entry []byte
// var key string
@ -291,7 +302,10 @@ func (c *coreContext) resolveSQL(qc *qcode.QCode, vars variables) ([]byte, uint3
}
if conf.EnableTracing && len(qc.Query.Selects) != 0 {
c.addTrace(&qc.Query.Selects[0], st)
c.addTrace(
qc.Query.Selects,
qc.Query.Selects[0].ID,
st)
}
return []byte(root), skipped, nil
@ -302,7 +316,7 @@ func (c *coreContext) render(w io.Writer, data []byte) error {
return json.NewEncoder(w).Encode(c.res)
}
func (c *coreContext) addTrace(sel *qcode.Select, st time.Time) {
func (c *coreContext) addTrace(sel []qcode.Select, id int16, st time.Time) {
et := time.Now()
du := et.Sub(st)
@ -317,10 +331,22 @@ func (c *coreContext) addTrace(sel *qcode.Select, st time.Time) {
c.res.Extensions.Tracing.EndTime = et
c.res.Extensions.Tracing.Duration = du
n := 0
for i := id; i != -1; i = sel[i].ParentID {
n++
}
path := make([]string, n)
n--
for i := id; i != -1; i = sel[i].ParentID {
path[n] = sel[i].Table
n--
}
tr := resolver{
Path: []string{sel.Table},
Path: path,
ParentType: "Query",
FieldName: sel.Table,
FieldName: sel[id].Table,
ReturnType: "object",
StartOffset: 1,
Duration: du,
@ -337,7 +363,7 @@ func parentFieldIds(h *xxhash.Digest, sel []qcode.Select, skipped uint32) (
c := 0
for i := range sel {
s := &sel[i]
if isSkipped(skipped, s.ID) {
if isSkipped(skipped, uint16(s.ID)) {
c++
}
}
@ -354,7 +380,7 @@ func parentFieldIds(h *xxhash.Digest, sel []qcode.Select, skipped uint32) (
for i := range sel {
s := &sel[i]
if isSkipped(skipped, s.ID) == false {
if isSkipped(skipped, uint16(s.ID)) == false {
continue
}