Basic storage backend with diff/patch synchronization

This commit is contained in:
2020-05-03 18:34:44 +02:00
parent 1ac485abf3
commit a9c24051b0
20 changed files with 734 additions and 398 deletions

View File

@ -6,7 +6,12 @@ import (
)
func Mount(r *chi.Mux, config *config.Config) error {
r.Get("/ws/{projectId}", handleProjectWebsocket)
r.Route("/api/v1", func(r chi.Router) {
r.Get("/projects/{projectID}", handleGetProject)
r.Post("/projects/{projectID}", handleCreateProject)
r.Patch("/projects/{projectID}", handlePatchProject)
r.Delete("/projects/{projectID}", handleDeleteProject)
})
return nil
}

View File

@ -1,109 +1,248 @@
package route
import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/davecgh/go-spew/spew"
"github.com/gorilla/websocket"
jsonpatch "gopkg.in/evanphx/json-patch.v4"
"forge.cadoles.com/wpetit/guesstimate/internal/model"
"forge.cadoles.com/wpetit/guesstimate/internal/storm"
"github.com/go-chi/chi"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
"gitlab.com/wpetit/goweb/middleware/container"
)
var upgrader = websocket.Upgrader{} // nolint: gochecknoglobals
type projectResponse struct {
Version uint64 `json:"version"`
Project *model.Project `json:"project"`
}
func handleProjectWebsocket(w http.ResponseWriter, r *http.Request) {
log.Println("websocket request")
func handleGetProject(w http.ResponseWriter, r *http.Request) {
ctn := container.Must(r.Context())
db := storm.Must(ctn)
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(errors.Wrap(err, "could not upgrade connection"))
}
defer c.Close()
projectID := getProjectID(r)
// ctn := container.Must(r.Context())
var (
version uint64
err error
)
// ctx := r.Context()
// Loop over incoming websocket messages
for {
_, data, err := c.ReadMessage()
rawVersion := r.URL.Query().Get("version")
if rawVersion != "" {
version, err = strconv.ParseUint(rawVersion, 10, 64)
if err != nil {
cause := errors.Cause(err)
if websocket.IsCloseError(cause, 1001, 1005) { // Ignore "going away" and "no status" close errors
return
}
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
logger.Error(
r.Context(),
"could not read message",
logger.E(err),
)
return
}
}
break
tx, err := db.Begin(false)
if err != nil {
panic(errors.Wrap(err, "could not start transaction"))
}
defer func() {
if err := tx.Rollback(); err != nil && err != storm.ErrNotInTransaction {
panic(errors.Wrap(err, "could not rollback transaction"))
}
}()
entry := &model.ProjectEntry{}
if err := tx.One("ID", projectID, entry); err != nil {
if err == storm.ErrNotFound {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
spew.Dump(data)
panic(errors.Wrapf(err, "could not find project '%s'", projectID))
}
// message := &game.Message{}
// if err := json.Unmarshal(data, message); err != nil {
// logger.Error(
// r.Context(),
// "could not decode message",
// logger.E(err),
// )
if rawVersion != "" && entry.Version == version {
http.Error(w, http.StatusText(http.StatusNotModified), http.StatusNotModified)
// break
// }
return
}
// switch {
// case message.Type == game.MessageTypeInit:
// payload := &game.InitPayload{}
// if err := json.Unmarshal(message.Payload, payload); err != nil {
// logger.Error(
// r.Context(),
// "could not decode payload",
// logger.E(err),
// )
// break
// }
// setGameID(payload.GameID)
// evt := game.NewEventPlayerConnected(payload.GameID, user.ID)
// bus.Publish(game.EventNamespace, evt)
// case message.Type == game.MessageTypeGameEvent:
// gameID, ok := getGameID()
// if !ok {
// logger.Error(
// r.Context(),
// "game id not received yet",
// )
// break
// }
// payload := &game.EventPayload{}
// if err := json.Unmarshal(message.Payload, payload); err != nil {
// logger.Error(
// r.Context(),
// "could not decode payload",
// logger.E(err),
// )
// break
// }
// evt := game.NewEventPlayerMessage(gameID, user.ID, payload.Data)
// bus.Publish(game.EventNamespace, evt)
// default:
// logger.Error(
// r.Context(),
// "unsupported message type",
// logger.F("messageType", message.Type),
// )
// }
if err := writeJSON(w, http.StatusOK, &projectResponse{entry.Version, entry.Project}); err != nil {
panic(errors.Wrap(err, "could not write json"))
}
}
type createRequest struct {
Project *model.Project `json:"project"`
}
func handleCreateProject(w http.ResponseWriter, r *http.Request) {
ctn := container.Must(r.Context())
db := storm.Must(ctn)
projectID := getProjectID(r)
log.Printf("handling create request for project %s", projectID)
createReq := &createRequest{}
if err := parseJSONBody(r, createReq); err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
panic(errors.Wrap(err, "could not parse create request"))
}
tx, err := db.Begin(true)
if err != nil {
panic(errors.Wrap(err, "could not start transaction"))
}
defer func() {
if err := tx.Rollback(); err != nil && err != storm.ErrNotInTransaction {
panic(errors.Wrap(err, "could not rollback transaction"))
}
}()
entry := &model.ProjectEntry{}
err = tx.One("ID", projectID, entry)
if err == nil {
http.Error(w, http.StatusText(http.StatusConflict), http.StatusConflict)
return
}
if err != storm.ErrNotFound {
panic(errors.Wrapf(err, "could not check project '%s'", projectID))
}
entry.ID = projectID
entry.Project = createReq.Project
entry.Version = 0
if err := tx.Save(entry); err != nil {
panic(errors.Wrap(err, "could not save project"))
}
if err := tx.Commit(); err != nil {
panic(errors.Wrap(err, "could not commit transaction"))
}
if err := writeJSON(w, http.StatusCreated, &projectResponse{entry.Version, entry.Project}); err != nil {
panic(errors.Wrap(err, "could not write json response"))
}
}
type patchRequest struct {
Version uint64 `json:"version"`
Patch json.RawMessage `json:"patch"`
}
func handlePatchProject(w http.ResponseWriter, r *http.Request) {
ctn := container.Must(r.Context())
db := storm.Must(ctn)
projectID := getProjectID(r)
log.Printf("handling patch request for project %s", projectID)
patchReq := &patchRequest{}
if err := parseJSONBody(r, patchReq); err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
panic(errors.Wrap(err, "could not parse patch request"))
}
tx, err := db.Begin(true)
if err != nil {
panic(errors.Wrap(err, "could not start transaction"))
}
defer func() {
if err := tx.Rollback(); err != nil && err != storm.ErrNotInTransaction {
panic(errors.Wrap(err, "could not rollback transaction"))
}
}()
entry := &model.ProjectEntry{}
if err := tx.One("ID", projectID, entry); err != nil {
if err == storm.ErrNotFound {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
panic(errors.Wrapf(err, "could not find project '%s'", projectID))
}
if entry.Version != patchReq.Version {
if err := writeJSON(w, http.StatusConflict, &projectResponse{entry.Version, entry.Project}); err != nil {
panic(errors.Wrap(err, "could not write json response"))
}
return
}
projectData, err := json.Marshal(entry.Project)
if err != nil {
panic(errors.Wrap(err, "could not marshal project"))
}
projectData, err = jsonpatch.MergePatch(projectData, patchReq.Patch)
if err != nil {
panic(errors.Wrap(err, "could not merge project patch"))
}
newProject := &model.Project{}
if err := json.Unmarshal(projectData, newProject); err != nil {
panic(errors.Wrap(err, "could not merge project patch"))
}
entry.Version++
entry.Project = newProject
if err := tx.Save(entry); err != nil {
panic(errors.Wrap(err, "could not save project"))
}
if err := tx.Commit(); err != nil {
panic(errors.Wrap(err, "could not commit transaction"))
}
if err := writeJSON(w, http.StatusOK, &projectResponse{entry.Version, entry.Project}); err != nil {
panic(errors.Wrap(err, "could not write json response"))
}
}
func handleDeleteProject(w http.ResponseWriter, r *http.Request) {
}
func getProjectID(r *http.Request) model.ProjectID {
return model.ProjectID(chi.URLParam(r, "projectID"))
}
func parseJSONBody(r *http.Request, payload interface{}) (err error) {
decoder := json.NewDecoder(r.Body)
defer func() {
if err = r.Body.Close(); err != nil {
err = errors.Wrap(err, "could not close request body")
}
}()
if err := decoder.Decode(payload); err != nil {
return errors.Wrap(err, "could not decode request body")
}
return nil
}
func writeJSON(w http.ResponseWriter, statusCode int, data interface{}) error {
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "application/json")
return encoder.Encode(data)
}