Frontend/backend project structure

+ Base implementation of a differential synchronization based on Neil
  Fraser article/talk

See https://www.youtube.com/watch?v=S2Hp_1jqpY8
This commit is contained in:
2020-04-27 22:43:42 +02:00
parent 40759f59d6
commit d9fb51394c
89 changed files with 2178 additions and 14 deletions

View File

@ -0,0 +1,77 @@
package config
import (
"io"
"io/ioutil"
"github.com/caarlos0/env/v6"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
type Config struct {
HTTP HTTPConfig `yaml:"http"`
Data DataConfig `ymal:"data"`
}
type HTTPConfig struct {
Address string `yaml:"address" env:"GUESSTIMATE_HTTP_ADDRESS"`
}
type DataConfig struct {
Path string `yaml:"path" env:"GUESSTIMATE_DATA_PATH"`
}
// NewFromFile retrieves the configuration from the given file
func NewFromFile(filepath string) (*Config, error) {
config := NewDefault()
data, err := ioutil.ReadFile(filepath)
if err != nil {
return nil, errors.Wrapf(err, "could not read file '%s'", filepath)
}
if err := yaml.Unmarshal(data, config); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal configuration")
}
return config, nil
}
func WithEnvironment(conf *Config) error {
if err := env.Parse(conf); err != nil {
return err
}
return nil
}
func NewDumpDefault() *Config {
config := NewDefault()
return config
}
func NewDefault() *Config {
return &Config{
HTTP: HTTPConfig{
Address: ":8081",
},
Data: DataConfig{
Path: "guesstimate.db",
},
}
}
func Dump(config *Config, w io.Writer) error {
data, err := yaml.Marshal(config)
if err != nil {
return errors.Wrap(err, "could not dump config")
}
if _, err := w.Write(data); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,9 @@
package config
import "gitlab.com/wpetit/goweb/service"
func ServiceProvider(config *Config) service.Provider {
return func(ctn *service.Container) (interface{}, error) {
return config, nil
}
}

View File

@ -0,0 +1,33 @@
package config
import (
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/service"
)
const ServiceName service.Name = "config"
// From retrieves the config service in the given container
func From(container *service.Container) (*Config, error) {
service, err := container.Service(ServiceName)
if err != nil {
return nil, errors.Wrapf(err, "error while retrieving '%s' service", ServiceName)
}
srv, ok := service.(*Config)
if !ok {
return nil, errors.Errorf("retrieved service is not a valid '%s' service", ServiceName)
}
return srv, nil
}
// Must retrieves the config service in the given container or panic otherwise
func Must(container *service.Container) *Config {
srv, err := From(container)
if err != nil {
panic(err)
}
return srv
}

View File

@ -0,0 +1,21 @@
package diffsync
type Backup struct {
localVersion Version
content []byte
}
func (b *Backup) LocalVersion() Version {
return b.localVersion
}
func (b *Backup) Content() []byte {
return b.content
}
func NewBackup(content []byte) *Backup {
return &Backup{
localVersion: 0,
content: content,
}
}

View File

@ -0,0 +1,31 @@
package diffsync
type EditsStack []*Edits
type Ops interface{}
type Edits struct {
ops Ops
localVersion Version
remoteVersion Version
}
func (e *Edits) Ops() Ops {
return e.ops
}
func (e *Edits) LocalVersion() Version {
return e.localVersion
}
func (e *Edits) RemoteVersion() Version {
return e.remoteVersion
}
type Differ interface {
Diff(new, old []byte) (Ops, error)
}
type Patcher interface {
Patch(content []byte, ops Ops) ([]byte, error)
}

View File

@ -0,0 +1,56 @@
package diffsync
import "sync"
type Version uint64
type void struct{}
type Document struct {
content []byte
patcher Patcher
differ Differ
peers map[*Peer]void
peersMutex sync.RWMutex
}
func (d *Document) NewPeer() *Peer {
d.peersMutex.Lock()
defer d.peersMutex.Unlock()
p := &Peer{
document: d,
shadow: NewShadow(Copy(d.content)),
editsStack: make(EditsStack, 0),
}
d.peers[p] = void{}
return p
}
func (d *Document) RemovePeer(p *Peer) {
d.peersMutex.Lock()
defer d.peersMutex.Unlock()
delete(d.peers, p)
p.document = nil
}
func (d *Document) Content() []byte {
return d.content
}
func NewDocument(content []byte, funcs ...OptionFunc) *Document {
opt := MergeOption(
DefaultOption(),
funcs...,
)
return &Document{
content: content,
patcher: opt.Patcher,
differ: opt.Differ,
peers: make(map[*Peer]void),
}
}

View File

@ -0,0 +1,9 @@
package diffsync
import "errors"
var (
ErrUnexpectedOpsFormat = errors.New("unexpected ops format")
ErrUnexpectedRemoteVersion = errors.New("unexpected remote version")
ErrInvalidState = errors.New("invalid state")
)

View File

@ -0,0 +1,42 @@
package json
import (
"forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
"github.com/pkg/errors"
jsonpatch "gopkg.in/evanphx/json-patch.v4"
)
type Patcher struct{}
func (p *Patcher) Patch(content []byte, ops diffsync.Ops) ([]byte, error) {
patch, ok := ops.([]byte)
if !ok {
return nil, diffsync.ErrUnexpectedOpsFormat
}
modified, err := jsonpatch.MergePatch(content, patch)
if err != nil {
return nil, errors.Wrap(err, "could not apply patch")
}
return modified, nil
}
func NewPatcher() *Patcher {
return &Patcher{}
}
type Differ struct{}
func (d *Differ) Diff(new, old []byte) (diffsync.Ops, error) {
ops, err := jsonpatch.CreateMergePatch(old, new)
if err != nil {
return nil, errors.Wrap(err, "could not compute diff")
}
return ops, nil
}
func NewDiffer() *Differ {
return &Differ{}
}

View File

@ -0,0 +1,169 @@
package json
import (
"encoding/json"
"fmt"
"log"
"reflect"
"testing"
"forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
"github.com/davecgh/go-spew/spew"
)
func TestJSONSync(t *testing.T) {
doc1 := diffsync.NewDocument(
[]byte("{}"),
WithJSONSync(),
)
doc2 := diffsync.NewDocument(
[]byte("{}"),
WithJSONSync(),
)
p1 := doc1.NewPeer()
p2 := doc2.NewPeer()
log.Printf("p1 shadow: %s", p1.Shadow().Content())
log.Printf("p2 shadow: %s", p2.Shadow().Content())
newContent1 := []byte(`{"hello":"world"}`)
log.Printf("updating doc1 with '%s'", newContent1)
stack1, err := p1.Update(newContent1)
if err != nil {
t.Error(err)
}
log.Printf("applying stack1 to doc2 '%s'", doc2.Content())
if err := p2.Apply(stack1); err != nil {
t.Error(err)
}
log.Printf("new doc2 content: '%s'", doc2.Content())
if g, e := doc2.Content(), doc1.Content(); !jsonEqual(e, g) {
t.Errorf("doc2.Content(): expected '%s', got '%s'", e, g)
}
}
type testStep struct {
Local *diffsync.Peer
Remote *diffsync.Peer
Update string
MatchLocal bool
MatchRemote bool
}
func TestBidirectionnalUpdate(t *testing.T) {
doc1 := diffsync.NewDocument([]byte(`{}`), WithJSONSync())
doc2 := diffsync.NewDocument([]byte(`{}`), WithJSONSync())
p1 := doc1.NewPeer()
p2 := doc2.NewPeer()
var cases = []testStep{
{
Local: p1,
Remote: p2,
Update: `{"hello":"world"}`,
MatchLocal: true,
MatchRemote: true,
},
{
Local: p2,
Remote: p1,
Update: `{"hello":"world","foo":"bar"}`,
MatchLocal: true,
MatchRemote: true,
},
{
Local: p1,
Remote: p2,
Update: `{"hello":1,"foo":"bar"}`,
MatchLocal: true,
MatchRemote: true,
},
{
Local: p1,
Remote: nil,
Update: `{"hello":1,"foo":"bar", "test":{"bar": "baz"}}`,
MatchLocal: true,
MatchRemote: false,
},
{
Local: p2,
Remote: p1,
Update: `{"hello":2,"foo":"bar", "test":{"bar": "baz","world":"hello"}}`,
MatchLocal: true,
MatchRemote: true,
},
}
for i, step := range cases {
func(step testStep, i int) {
t.Run(fmt.Sprintf("Step %d", i), func(t *testing.T) {
log.Printf("local document before update: '%s'", step.Local.Document().Content())
stack, err := step.Local.Update([]byte(step.Update))
if err != nil {
t.Error(err)
}
log.Printf("local document after update: '%s'", step.Local.Document().Content())
log.Printf("resulting stack: '%s'", spew.Sdump(stack))
if step.MatchLocal {
if e, g := step.Local.Document().Content(), []byte(step.Update); !jsonEqual(e, g) {
t.Errorf("local.Document().Content(): expected '%s', got '%s'", e, g)
}
}
if step.Remote != nil {
log.Printf("remote document before apply: '%s'", step.Remote.Document().Content())
if err := step.Remote.Apply(stack); err != nil {
t.Error(err)
}
log.Printf("remote document after apply: '%s'", step.Remote.Document().Content())
}
if step.MatchRemote {
if e, g := step.Remote.Document().Content(), []byte(step.Update); !jsonEqual(e, g) {
t.Errorf("remote.Document().Content(): expected '%s', got '%s'", e, g)
}
}
if step.MatchLocal && step.MatchRemote {
if e, g := step.Local.Document().Content(), step.Remote.Document().Content(); !jsonEqual(e, g) {
t.Errorf("local.Document().Content() should match remote.Document().Content() ! Got '%s' and '%s'", e, g)
}
}
})
}(step, i)
}
}
func jsonEqual(s1, s2 []byte) bool {
var (
o1 interface{}
o2 interface{}
)
var err error
err = json.Unmarshal(s1, &o1)
if err != nil {
panic(fmt.Errorf("Error mashalling []byte 1 :: %s", err.Error()))
}
err = json.Unmarshal(s2, &o2)
if err != nil {
panic(fmt.Errorf("Error mashalling []byte 2 :: %s", err.Error()))
}
return reflect.DeepEqual(o1, o2)
}

View File

@ -0,0 +1,10 @@
package json
import "forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
func WithJSONSync() diffsync.OptionFunc {
return func(opt *diffsync.Option) {
opt.Differ = NewDiffer()
opt.Patcher = NewPatcher()
}
}

View File

@ -0,0 +1,34 @@
package diffsync
type Option struct {
Patcher Patcher
Differ Differ
}
func WithPatcher(p Patcher) OptionFunc {
return func(opt *Option) {
opt.Patcher = p
}
}
func WithDiffer(d Differ) OptionFunc {
return func(opt *Option) {
opt.Differ = d
}
}
type OptionFunc func(*Option)
func DefaultOption() *Option {
return MergeOption(
&Option{},
)
}
func MergeOption(opt *Option, funcs ...OptionFunc) *Option {
for _, fn := range funcs {
fn(opt)
}
return opt
}

View File

@ -0,0 +1,151 @@
package diffsync
import (
"log"
"sync"
"github.com/pkg/errors"
)
type Peer struct {
document *Document
shadow *Shadow
mutex sync.RWMutex
editsStack EditsStack
}
func (p *Peer) Document() *Document {
return p.document
}
func (p *Peer) Shadow() *Shadow {
return p.shadow
}
func (p *Peer) Update(content []byte) (EditsStack, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
ops, err := p.document.differ.Diff(content, p.shadow.content)
if err != nil {
return nil, err
}
p.editsStack = append(p.editsStack, &Edits{
ops: ops,
localVersion: p.shadow.localVersion,
remoteVersion: p.shadow.remoteVersion,
})
p.shadow.localVersion++
p.shadow.content = content
p.document.content = content
newStack := make(EditsStack, len(p.editsStack))
copy(newStack, p.editsStack)
return newStack, nil
}
func (p *Peer) Apply(stack EditsStack) error {
p.mutex.Lock()
defer p.mutex.Unlock()
newStack := make(EditsStack, len(p.editsStack))
copy(newStack, p.editsStack)
for _, edits := range stack {
rollbacked, err := p.applyEdits(edits)
if err != nil {
return errors.Wrap(err, "could not apply edits")
}
if rollbacked {
newStack = EditsStack{}
}
// Iterate over local edits
for i, localEdits := range newStack {
if localEdits.LocalVersion() != edits.RemoteVersion() {
continue
}
log.Printf("removing edits %d", localEdits.LocalVersion())
// Remove local edit
copy(newStack[i:], newStack[i+1:])
newStack[len(newStack)-1] = nil
newStack = newStack[:len(newStack)-1]
}
}
p.editsStack = newStack
return nil
}
func (p *Peer) applyEdits(edits *Edits) (bool, error) {
rollbacked := false
shadow := p.Shadow()
var (
err error
newDocumentContent []byte
newShadowRemoteVersion Version
)
newShadowContent := shadow.content
newShadowLocalVersion := shadow.localVersion
log.Printf(
"shadow(l: %d, r: %d) edits(l: %d, r: %d)",
shadow.LocalVersion(), shadow.RemoteVersion(),
edits.LocalVersion(), edits.RemoteVersion(),
)
// Desync occurred, rollback to backup if possible
if shadow.LocalVersion() > edits.RemoteVersion() {
if shadow.Backup().LocalVersion() != edits.RemoteVersion() {
return false, ErrUnexpectedRemoteVersion
}
newShadowContent = shadow.backup.Content()
newShadowLocalVersion = shadow.backup.LocalVersion()
rollbacked = true
}
if edits.LocalVersion() < shadow.RemoteVersion() {
return rollbacked, nil
}
if edits.RemoteVersion() < newShadowLocalVersion {
return rollbacked, ErrInvalidState
}
newShadowContent, err = p.document.patcher.Patch(newShadowContent, edits.Ops())
if err != nil {
return false, errors.Wrap(err, "could not patch shadow content")
}
newDocumentContent, err = p.document.patcher.Patch(p.document.content, edits.Ops())
if err != nil {
return rollbacked, errors.Wrap(err, "could not patch document content")
}
newShadowRemoteVersion++
// Update shadow
shadow.content = newShadowContent
shadow.remoteVersion = newShadowRemoteVersion
shadow.localVersion = newShadowLocalVersion
// Update document
p.document.content = newDocumentContent
// Update backup
shadow.backup.content = newShadowContent
shadow.backup.localVersion = shadow.localVersion
return rollbacked, nil
}

View File

@ -0,0 +1,40 @@
package diffsync
type Shadow struct {
localVersion Version
remoteVersion Version
content []byte
backup *Backup
}
func NewShadow(content []byte) *Shadow {
return &Shadow{
localVersion: 0,
remoteVersion: 0,
content: content,
backup: NewBackup(Copy(content)),
}
}
func (s *Shadow) LocalVersion() Version {
return s.localVersion
}
func (s *Shadow) RemoteVersion() Version {
return s.remoteVersion
}
func (s *Shadow) Content() []byte {
return s.content
}
func (s *Shadow) Backup() *Backup {
return s.backup
}
func Copy(content []byte) []byte {
contentCopy := make([]byte, len(content))
copy(contentCopy, content)
return contentCopy
}

View File

@ -0,0 +1,14 @@
package diffsync
type Versioned struct {
localVersion Version
remoteVersion Version
}
func (v *Versioned) LocalVersion() Version {
return v.localVersion
}
func (v *Versioned) RemoteVersion() Version {
return v.remoteVersion
}

View File

@ -0,0 +1,12 @@
package route
import (
"forge.cadoles.com/wpetit/guesstimate/internal/config"
"github.com/go-chi/chi"
)
func Mount(r *chi.Mux, config *config.Config) error {
r.Get("/ws/{projectId}", handleProjectWebsocket)
return nil
}

View File

@ -0,0 +1,109 @@
package route
import (
"log"
"net/http"
"github.com/davecgh/go-spew/spew"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
var upgrader = websocket.Upgrader{} // nolint: gochecknoglobals
func handleProjectWebsocket(w http.ResponseWriter, r *http.Request) {
log.Println("websocket request")
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(errors.Wrap(err, "could not upgrade connection"))
}
defer c.Close()
// ctn := container.Must(r.Context())
// ctx := r.Context()
// Loop over incoming websocket messages
for {
_, data, err := c.ReadMessage()
if err != nil {
cause := errors.Cause(err)
if websocket.IsCloseError(cause, 1001, 1005) { // Ignore "going away" and "no status" close errors
return
}
logger.Error(
r.Context(),
"could not read message",
logger.E(err),
)
break
}
spew.Dump(data)
// message := &game.Message{}
// if err := json.Unmarshal(data, message); err != nil {
// logger.Error(
// r.Context(),
// "could not decode message",
// logger.E(err),
// )
// break
// }
// switch {
// case message.Type == game.MessageTypeInit:
// payload := &game.InitPayload{}
// if err := json.Unmarshal(message.Payload, payload); err != nil {
// logger.Error(
// r.Context(),
// "could not decode payload",
// logger.E(err),
// )
// break
// }
// setGameID(payload.GameID)
// evt := game.NewEventPlayerConnected(payload.GameID, user.ID)
// bus.Publish(game.EventNamespace, evt)
// case message.Type == game.MessageTypeGameEvent:
// gameID, ok := getGameID()
// if !ok {
// logger.Error(
// r.Context(),
// "game id not received yet",
// )
// break
// }
// payload := &game.EventPayload{}
// if err := json.Unmarshal(message.Payload, payload); err != nil {
// logger.Error(
// r.Context(),
// "could not decode payload",
// logger.E(err),
// )
// break
// }
// evt := game.NewEventPlayerMessage(gameID, user.ID, payload.Data)
// bus.Publish(game.EventNamespace, evt)
// default:
// logger.Error(
// r.Context(),
// "unsupported message type",
// logger.F("messageType", message.Type),
// )
// }
}
}

View File

@ -0,0 +1,9 @@
package storm
import (
"github.com/asdine/storm/v3"
)
var (
ErrNotFound = storm.ErrNotFound
)

View File

@ -0,0 +1,51 @@
package storm
type Option struct {
Path string
Objects []interface{}
ReIndex bool
Init bool
}
type OptionFunc func(*Option)
func DefaultOption() *Option {
return MergeOption(
&Option{},
WithPath("data.db"),
WithInit(true),
WithReIndex(true),
)
}
func MergeOption(opt *Option, funcs ...OptionFunc) *Option {
for _, fn := range funcs {
fn(opt)
}
return opt
}
func WithPath(path string) OptionFunc {
return func(opt *Option) {
opt.Path = path
}
}
func WithReIndex(reindex bool) OptionFunc {
return func(opt *Option) {
opt.ReIndex = reindex
}
}
func WithInit(init bool) OptionFunc {
return func(opt *Option) {
opt.Init = init
}
}
func WithObjects(objects ...interface{}) OptionFunc {
return func(opt *Option) {
opt.Objects = objects
}
}

View File

@ -0,0 +1,48 @@
package storm
import (
"reflect"
"github.com/asdine/storm/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/service"
)
func ServiceProvider(funcs ...OptionFunc) service.Provider {
opt := MergeOption(
DefaultOption(),
funcs...,
)
db, err := storm.Open(opt.Path)
if err == nil && opt.Objects != nil {
err = migrate(db, opt.Objects, opt.Init, opt.ReIndex)
}
return func(ctn *service.Container) (interface{}, error) {
if err != nil {
return nil, err
}
return db, nil
}
}
func migrate(db *storm.DB, objects []interface{}, init, reindex bool) error {
for _, o := range objects {
if init {
if err := db.Init(o); err != nil {
return errors.Wrapf(err, "could not init object '%s'", reflect.TypeOf(o).String())
}
}
if reindex {
if err := db.ReIndex(o); err != nil {
return errors.Wrapf(err, "could not reindex object '%s'", reflect.TypeOf(o).String())
}
}
}
return nil
}

View File

@ -0,0 +1,34 @@
package storm
import (
"github.com/asdine/storm/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/service"
)
const ServiceName service.Name = "storm"
// From retrieves the storm service in the given container
func From(container *service.Container) (*storm.DB, error) {
service, err := container.Service(ServiceName)
if err != nil {
return nil, errors.Wrapf(err, "error while retrieving '%s' service", ServiceName)
}
srv, ok := service.(*storm.DB)
if !ok {
return nil, errors.Errorf("retrieved service is not a valid '%s' service", ServiceName)
}
return srv, nil
}
// Must retrieves the storm service in the given container or panic otherwise
func Must(container *service.Container) *storm.DB {
srv, err := From(container)
if err != nil {
panic(err)
}
return srv
}