package testing import ( "context" "sync" "sync/atomic" "testing" "time" "forge.cadoles.com/arcad/edge/pkg/bus" "github.com/pkg/errors" ) const ( testNamespace bus.MessageNamespace = "testNamespace" ) type testMessage struct{} func (e *testMessage) MessageNamespace() bus.MessageNamespace { return testNamespace } func TestPublishSubscribe(t *testing.T, b bus.Bus) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() t.Log("subscribe") messages, err := b.Subscribe(ctx, testNamespace) if err != nil { t.Fatal(errors.WithStack(err)) } var wg sync.WaitGroup wg.Add(5) go func() { // 5 events should be received t.Log("publish 0") if err := b.Publish(ctx, &testMessage{}); err != nil { t.Error(errors.WithStack(err)) } t.Log("publish 1") if err := b.Publish(ctx, &testMessage{}); err != nil { t.Error(errors.WithStack(err)) } t.Log("publish 2") if err := b.Publish(ctx, &testMessage{}); err != nil { t.Error(errors.WithStack(err)) } t.Log("publish 3") if err := b.Publish(ctx, &testMessage{}); err != nil { t.Error(errors.WithStack(err)) } t.Log("publish 4") if err := b.Publish(ctx, &testMessage{}); err != nil { t.Error(errors.WithStack(err)) } }() var count int32 = 0 go func() { t.Log("range for events") for msg := range messages { t.Logf("received msg %d", atomic.LoadInt32(&count)) atomic.AddInt32(&count, 1) if e, g := testNamespace, msg.MessageNamespace(); e != g { t.Errorf("evt.MessageNamespace(): expected '%v', got '%v'", e, g) } wg.Done() } }() wg.Wait() b.Unsubscribe(ctx, testNamespace, messages) if e, g := int32(5), count; e != g { t.Errorf("message received count: expected '%v', got '%v'", e, g) } }