Remove diffsync experiment
This commit is contained in:
parent
d9fb51394c
commit
1ac485abf3
|
@ -1,21 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
type Backup struct {
|
|
||||||
localVersion Version
|
|
||||||
content []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Backup) LocalVersion() Version {
|
|
||||||
return b.localVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Backup) Content() []byte {
|
|
||||||
return b.content
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBackup(content []byte) *Backup {
|
|
||||||
return &Backup{
|
|
||||||
localVersion: 0,
|
|
||||||
content: content,
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,31 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
type EditsStack []*Edits
|
|
||||||
|
|
||||||
type Ops interface{}
|
|
||||||
|
|
||||||
type Edits struct {
|
|
||||||
ops Ops
|
|
||||||
localVersion Version
|
|
||||||
remoteVersion Version
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Edits) Ops() Ops {
|
|
||||||
return e.ops
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Edits) LocalVersion() Version {
|
|
||||||
return e.localVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Edits) RemoteVersion() Version {
|
|
||||||
return e.remoteVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
type Differ interface {
|
|
||||||
Diff(new, old []byte) (Ops, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Patcher interface {
|
|
||||||
Patch(content []byte, ops Ops) ([]byte, error)
|
|
||||||
}
|
|
|
@ -1,56 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
type Version uint64
|
|
||||||
|
|
||||||
type void struct{}
|
|
||||||
|
|
||||||
type Document struct {
|
|
||||||
content []byte
|
|
||||||
patcher Patcher
|
|
||||||
differ Differ
|
|
||||||
peers map[*Peer]void
|
|
||||||
peersMutex sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Document) NewPeer() *Peer {
|
|
||||||
d.peersMutex.Lock()
|
|
||||||
defer d.peersMutex.Unlock()
|
|
||||||
|
|
||||||
p := &Peer{
|
|
||||||
document: d,
|
|
||||||
shadow: NewShadow(Copy(d.content)),
|
|
||||||
editsStack: make(EditsStack, 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
d.peers[p] = void{}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Document) RemovePeer(p *Peer) {
|
|
||||||
d.peersMutex.Lock()
|
|
||||||
defer d.peersMutex.Unlock()
|
|
||||||
|
|
||||||
delete(d.peers, p)
|
|
||||||
p.document = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Document) Content() []byte {
|
|
||||||
return d.content
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDocument(content []byte, funcs ...OptionFunc) *Document {
|
|
||||||
opt := MergeOption(
|
|
||||||
DefaultOption(),
|
|
||||||
funcs...,
|
|
||||||
)
|
|
||||||
|
|
||||||
return &Document{
|
|
||||||
content: content,
|
|
||||||
patcher: opt.Patcher,
|
|
||||||
differ: opt.Differ,
|
|
||||||
peers: make(map[*Peer]void),
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
import "errors"
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrUnexpectedOpsFormat = errors.New("unexpected ops format")
|
|
||||||
ErrUnexpectedRemoteVersion = errors.New("unexpected remote version")
|
|
||||||
ErrInvalidState = errors.New("invalid state")
|
|
||||||
)
|
|
|
@ -1,42 +0,0 @@
|
||||||
package json
|
|
||||||
|
|
||||||
import (
|
|
||||||
"forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
jsonpatch "gopkg.in/evanphx/json-patch.v4"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Patcher struct{}
|
|
||||||
|
|
||||||
func (p *Patcher) Patch(content []byte, ops diffsync.Ops) ([]byte, error) {
|
|
||||||
patch, ok := ops.([]byte)
|
|
||||||
if !ok {
|
|
||||||
return nil, diffsync.ErrUnexpectedOpsFormat
|
|
||||||
}
|
|
||||||
|
|
||||||
modified, err := jsonpatch.MergePatch(content, patch)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not apply patch")
|
|
||||||
}
|
|
||||||
|
|
||||||
return modified, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPatcher() *Patcher {
|
|
||||||
return &Patcher{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Differ struct{}
|
|
||||||
|
|
||||||
func (d *Differ) Diff(new, old []byte) (diffsync.Ops, error) {
|
|
||||||
ops, err := jsonpatch.CreateMergePatch(old, new)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not compute diff")
|
|
||||||
}
|
|
||||||
|
|
||||||
return ops, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDiffer() *Differ {
|
|
||||||
return &Differ{}
|
|
||||||
}
|
|
|
@ -1,169 +0,0 @@
|
||||||
package json
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestJSONSync(t *testing.T) {
|
|
||||||
doc1 := diffsync.NewDocument(
|
|
||||||
[]byte("{}"),
|
|
||||||
WithJSONSync(),
|
|
||||||
)
|
|
||||||
|
|
||||||
doc2 := diffsync.NewDocument(
|
|
||||||
[]byte("{}"),
|
|
||||||
WithJSONSync(),
|
|
||||||
)
|
|
||||||
|
|
||||||
p1 := doc1.NewPeer()
|
|
||||||
p2 := doc2.NewPeer()
|
|
||||||
|
|
||||||
log.Printf("p1 shadow: %s", p1.Shadow().Content())
|
|
||||||
log.Printf("p2 shadow: %s", p2.Shadow().Content())
|
|
||||||
|
|
||||||
newContent1 := []byte(`{"hello":"world"}`)
|
|
||||||
|
|
||||||
log.Printf("updating doc1 with '%s'", newContent1)
|
|
||||||
|
|
||||||
stack1, err := p1.Update(newContent1)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("applying stack1 to doc2 '%s'", doc2.Content())
|
|
||||||
|
|
||||||
if err := p2.Apply(stack1); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("new doc2 content: '%s'", doc2.Content())
|
|
||||||
|
|
||||||
if g, e := doc2.Content(), doc1.Content(); !jsonEqual(e, g) {
|
|
||||||
t.Errorf("doc2.Content(): expected '%s', got '%s'", e, g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type testStep struct {
|
|
||||||
Local *diffsync.Peer
|
|
||||||
Remote *diffsync.Peer
|
|
||||||
Update string
|
|
||||||
MatchLocal bool
|
|
||||||
MatchRemote bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBidirectionnalUpdate(t *testing.T) {
|
|
||||||
doc1 := diffsync.NewDocument([]byte(`{}`), WithJSONSync())
|
|
||||||
doc2 := diffsync.NewDocument([]byte(`{}`), WithJSONSync())
|
|
||||||
|
|
||||||
p1 := doc1.NewPeer()
|
|
||||||
p2 := doc2.NewPeer()
|
|
||||||
|
|
||||||
var cases = []testStep{
|
|
||||||
{
|
|
||||||
Local: p1,
|
|
||||||
Remote: p2,
|
|
||||||
Update: `{"hello":"world"}`,
|
|
||||||
MatchLocal: true,
|
|
||||||
MatchRemote: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Local: p2,
|
|
||||||
Remote: p1,
|
|
||||||
Update: `{"hello":"world","foo":"bar"}`,
|
|
||||||
MatchLocal: true,
|
|
||||||
MatchRemote: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Local: p1,
|
|
||||||
Remote: p2,
|
|
||||||
Update: `{"hello":1,"foo":"bar"}`,
|
|
||||||
MatchLocal: true,
|
|
||||||
MatchRemote: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Local: p1,
|
|
||||||
Remote: nil,
|
|
||||||
Update: `{"hello":1,"foo":"bar", "test":{"bar": "baz"}}`,
|
|
||||||
MatchLocal: true,
|
|
||||||
MatchRemote: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Local: p2,
|
|
||||||
Remote: p1,
|
|
||||||
Update: `{"hello":2,"foo":"bar", "test":{"bar": "baz","world":"hello"}}`,
|
|
||||||
MatchLocal: true,
|
|
||||||
MatchRemote: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, step := range cases {
|
|
||||||
func(step testStep, i int) {
|
|
||||||
t.Run(fmt.Sprintf("Step %d", i), func(t *testing.T) {
|
|
||||||
log.Printf("local document before update: '%s'", step.Local.Document().Content())
|
|
||||||
|
|
||||||
stack, err := step.Local.Update([]byte(step.Update))
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("local document after update: '%s'", step.Local.Document().Content())
|
|
||||||
|
|
||||||
log.Printf("resulting stack: '%s'", spew.Sdump(stack))
|
|
||||||
|
|
||||||
if step.MatchLocal {
|
|
||||||
if e, g := step.Local.Document().Content(), []byte(step.Update); !jsonEqual(e, g) {
|
|
||||||
t.Errorf("local.Document().Content(): expected '%s', got '%s'", e, g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if step.Remote != nil {
|
|
||||||
log.Printf("remote document before apply: '%s'", step.Remote.Document().Content())
|
|
||||||
if err := step.Remote.Apply(stack); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
log.Printf("remote document after apply: '%s'", step.Remote.Document().Content())
|
|
||||||
}
|
|
||||||
|
|
||||||
if step.MatchRemote {
|
|
||||||
if e, g := step.Remote.Document().Content(), []byte(step.Update); !jsonEqual(e, g) {
|
|
||||||
t.Errorf("remote.Document().Content(): expected '%s', got '%s'", e, g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if step.MatchLocal && step.MatchRemote {
|
|
||||||
if e, g := step.Local.Document().Content(), step.Remote.Document().Content(); !jsonEqual(e, g) {
|
|
||||||
t.Errorf("local.Document().Content() should match remote.Document().Content() ! Got '%s' and '%s'", e, g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}(step, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func jsonEqual(s1, s2 []byte) bool {
|
|
||||||
var (
|
|
||||||
o1 interface{}
|
|
||||||
o2 interface{}
|
|
||||||
)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
err = json.Unmarshal(s1, &o1)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("Error mashalling []byte 1 :: %s", err.Error()))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = json.Unmarshal(s2, &o2)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("Error mashalling []byte 2 :: %s", err.Error()))
|
|
||||||
}
|
|
||||||
|
|
||||||
return reflect.DeepEqual(o1, o2)
|
|
||||||
}
|
|
|
@ -1,10 +0,0 @@
|
||||||
package json
|
|
||||||
|
|
||||||
import "forge.cadoles.com/wpetit/guesstimate/internal/diffsync"
|
|
||||||
|
|
||||||
func WithJSONSync() diffsync.OptionFunc {
|
|
||||||
return func(opt *diffsync.Option) {
|
|
||||||
opt.Differ = NewDiffer()
|
|
||||||
opt.Patcher = NewPatcher()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
type Option struct {
|
|
||||||
Patcher Patcher
|
|
||||||
Differ Differ
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithPatcher(p Patcher) OptionFunc {
|
|
||||||
return func(opt *Option) {
|
|
||||||
opt.Patcher = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithDiffer(d Differ) OptionFunc {
|
|
||||||
return func(opt *Option) {
|
|
||||||
opt.Differ = d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type OptionFunc func(*Option)
|
|
||||||
|
|
||||||
func DefaultOption() *Option {
|
|
||||||
return MergeOption(
|
|
||||||
&Option{},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MergeOption(opt *Option, funcs ...OptionFunc) *Option {
|
|
||||||
for _, fn := range funcs {
|
|
||||||
fn(opt)
|
|
||||||
}
|
|
||||||
|
|
||||||
return opt
|
|
||||||
}
|
|
|
@ -1,151 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Peer struct {
|
|
||||||
document *Document
|
|
||||||
shadow *Shadow
|
|
||||||
mutex sync.RWMutex
|
|
||||||
editsStack EditsStack
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) Document() *Document {
|
|
||||||
return p.document
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) Shadow() *Shadow {
|
|
||||||
return p.shadow
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) Update(content []byte) (EditsStack, error) {
|
|
||||||
p.mutex.Lock()
|
|
||||||
defer p.mutex.Unlock()
|
|
||||||
|
|
||||||
ops, err := p.document.differ.Diff(content, p.shadow.content)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.editsStack = append(p.editsStack, &Edits{
|
|
||||||
ops: ops,
|
|
||||||
localVersion: p.shadow.localVersion,
|
|
||||||
remoteVersion: p.shadow.remoteVersion,
|
|
||||||
})
|
|
||||||
|
|
||||||
p.shadow.localVersion++
|
|
||||||
p.shadow.content = content
|
|
||||||
p.document.content = content
|
|
||||||
|
|
||||||
newStack := make(EditsStack, len(p.editsStack))
|
|
||||||
copy(newStack, p.editsStack)
|
|
||||||
|
|
||||||
return newStack, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) Apply(stack EditsStack) error {
|
|
||||||
p.mutex.Lock()
|
|
||||||
defer p.mutex.Unlock()
|
|
||||||
|
|
||||||
newStack := make(EditsStack, len(p.editsStack))
|
|
||||||
copy(newStack, p.editsStack)
|
|
||||||
|
|
||||||
for _, edits := range stack {
|
|
||||||
rollbacked, err := p.applyEdits(edits)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "could not apply edits")
|
|
||||||
}
|
|
||||||
|
|
||||||
if rollbacked {
|
|
||||||
newStack = EditsStack{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over local edits
|
|
||||||
for i, localEdits := range newStack {
|
|
||||||
if localEdits.LocalVersion() != edits.RemoteVersion() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("removing edits %d", localEdits.LocalVersion())
|
|
||||||
|
|
||||||
// Remove local edit
|
|
||||||
copy(newStack[i:], newStack[i+1:])
|
|
||||||
newStack[len(newStack)-1] = nil
|
|
||||||
newStack = newStack[:len(newStack)-1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.editsStack = newStack
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) applyEdits(edits *Edits) (bool, error) {
|
|
||||||
rollbacked := false
|
|
||||||
|
|
||||||
shadow := p.Shadow()
|
|
||||||
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
newDocumentContent []byte
|
|
||||||
newShadowRemoteVersion Version
|
|
||||||
)
|
|
||||||
|
|
||||||
newShadowContent := shadow.content
|
|
||||||
newShadowLocalVersion := shadow.localVersion
|
|
||||||
|
|
||||||
log.Printf(
|
|
||||||
"shadow(l: %d, r: %d) edits(l: %d, r: %d)",
|
|
||||||
shadow.LocalVersion(), shadow.RemoteVersion(),
|
|
||||||
edits.LocalVersion(), edits.RemoteVersion(),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Desync occurred, rollback to backup if possible
|
|
||||||
if shadow.LocalVersion() > edits.RemoteVersion() {
|
|
||||||
if shadow.Backup().LocalVersion() != edits.RemoteVersion() {
|
|
||||||
return false, ErrUnexpectedRemoteVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
newShadowContent = shadow.backup.Content()
|
|
||||||
newShadowLocalVersion = shadow.backup.LocalVersion()
|
|
||||||
rollbacked = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if edits.LocalVersion() < shadow.RemoteVersion() {
|
|
||||||
return rollbacked, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if edits.RemoteVersion() < newShadowLocalVersion {
|
|
||||||
return rollbacked, ErrInvalidState
|
|
||||||
}
|
|
||||||
|
|
||||||
newShadowContent, err = p.document.patcher.Patch(newShadowContent, edits.Ops())
|
|
||||||
if err != nil {
|
|
||||||
return false, errors.Wrap(err, "could not patch shadow content")
|
|
||||||
}
|
|
||||||
|
|
||||||
newDocumentContent, err = p.document.patcher.Patch(p.document.content, edits.Ops())
|
|
||||||
if err != nil {
|
|
||||||
return rollbacked, errors.Wrap(err, "could not patch document content")
|
|
||||||
}
|
|
||||||
|
|
||||||
newShadowRemoteVersion++
|
|
||||||
|
|
||||||
// Update shadow
|
|
||||||
shadow.content = newShadowContent
|
|
||||||
shadow.remoteVersion = newShadowRemoteVersion
|
|
||||||
shadow.localVersion = newShadowLocalVersion
|
|
||||||
|
|
||||||
// Update document
|
|
||||||
p.document.content = newDocumentContent
|
|
||||||
|
|
||||||
// Update backup
|
|
||||||
shadow.backup.content = newShadowContent
|
|
||||||
shadow.backup.localVersion = shadow.localVersion
|
|
||||||
|
|
||||||
return rollbacked, nil
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
type Shadow struct {
|
|
||||||
localVersion Version
|
|
||||||
remoteVersion Version
|
|
||||||
content []byte
|
|
||||||
backup *Backup
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewShadow(content []byte) *Shadow {
|
|
||||||
return &Shadow{
|
|
||||||
localVersion: 0,
|
|
||||||
remoteVersion: 0,
|
|
||||||
content: content,
|
|
||||||
backup: NewBackup(Copy(content)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shadow) LocalVersion() Version {
|
|
||||||
return s.localVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shadow) RemoteVersion() Version {
|
|
||||||
return s.remoteVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shadow) Content() []byte {
|
|
||||||
return s.content
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shadow) Backup() *Backup {
|
|
||||||
return s.backup
|
|
||||||
}
|
|
||||||
|
|
||||||
func Copy(content []byte) []byte {
|
|
||||||
contentCopy := make([]byte, len(content))
|
|
||||||
copy(contentCopy, content)
|
|
||||||
|
|
||||||
return contentCopy
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
package diffsync
|
|
||||||
|
|
||||||
type Versioned struct {
|
|
||||||
localVersion Version
|
|
||||||
remoteVersion Version
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *Versioned) LocalVersion() Version {
|
|
||||||
return v.localVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *Versioned) RemoteVersion() Version {
|
|
||||||
return v.remoteVersion
|
|
||||||
}
|
|
Loading…
Reference in New Issue