Added support for multi-root queries

This commit is contained in:
Vikram Rangnekar
2019-11-19 00:47:55 -05:00
parent 396f4bcfd8
commit 176514b5f1
12 changed files with 314 additions and 160 deletions

View File

@ -52,8 +52,6 @@ func (c *coreContext) execQuery() ([]byte, error) {
var qc *qcode.QCode
var data []byte
logger.Debug().Str("role", c.req.role).Msg(c.req.Query)
if conf.Production {
var ps *preparedItem
@ -156,12 +154,17 @@ func (c *coreContext) resolvePreparedSQL() ([]byte, *preparedItem, error) {
if mutation {
err = tx.QueryRow(c, ps.stmt.SQL, vars...).Scan(&root)
} else {
err = tx.QueryRow(c, ps.stmt.SQL, vars...).Scan(&c.req.role, &root)
err = tx.QueryRow(c, ps.stmt.SQL, vars...).Scan(&role, &root)
}
logger.Debug().Str("default_role", c.req.role).Str("role", role).Msg(c.req.Query)
if err != nil {
return nil, nil, err
}
c.req.role = role
if err := tx.Commit(c); err != nil {
return nil, nil, err
}
@ -229,12 +232,23 @@ func (c *coreContext) resolveSQL() ([]byte, uint32, error) {
}
var root []byte
var role string
log := logger.Debug()
if mutation {
err = tx.QueryRow(c, finalSQL).Scan(&root)
log = log.Str("role", role)
} else {
err = tx.QueryRow(c, finalSQL).Scan(&c.req.role, &root)
err = tx.QueryRow(c, finalSQL).Scan(&role, &root)
log = log.Str("default_role", c.req.role).Str("role", role)
c.req.role = role
}
log.Msg(c.req.Query)
if err != nil {
return nil, 0, err
}
@ -250,10 +264,9 @@ func (c *coreContext) resolveSQL() ([]byte, uint32, error) {
}
if conf.EnableTracing && len(st.qc.Selects) != 0 {
c.addTrace(
st.qc.Selects,
st.qc.Selects[0].ID,
stime)
for _, id := range st.qc.Roots {
c.addTrace(st.qc.Selects, id, stime)
}
}
if conf.Production == false {
@ -451,7 +464,7 @@ func (c *coreContext) addTrace(sel []qcode.Select, id int32, st time.Time) {
c.res.Extensions.Tracing.Duration = du
n := 1
for i := id; i != 0; i = sel[i].ParentID {
for i := id; i != -1; i = sel[i].ParentID {
n++
}
path := make([]string, n)
@ -459,7 +472,7 @@ func (c *coreContext) addTrace(sel []qcode.Select, id int32, st time.Time) {
n--
for i := id; ; i = sel[i].ParentID {
path[n] = sel[i].Table
if sel[i].ID == 0 {
if sel[i].ParentID == -1 {
break
}
n--

View File

@ -55,8 +55,11 @@ func (c *coreContext) buildStmt() ([]stmt, error) {
}
if conf.Production && role.Name == "anon" {
if _, ok := role.tablesMap[qc.Selects[0].Table]; !ok {
continue
for _, id := range qc.Roots {
root := qc.Selects[id]
if _, ok := role.tablesMap[root.Table]; !ok {
continue
}
}
}