package testing import ( "context" "sync" "testing" "time" "forge.cadoles.com/arcad/edge/pkg/bus" "github.com/pkg/errors" ) const ( testTypeReqRes bus.MessageNamespace = "testNamspaceReqRes" ) type testReqResMessage struct { i int } func (m *testReqResMessage) MessageNamespace() bus.MessageNamespace { return testNamespace } func TestRequestReply(t *testing.T, b bus.Bus) { expectedRoundTrips := 256 timeout := time.Now().Add(time.Duration(expectedRoundTrips) * time.Second) var ( initWaitGroup sync.WaitGroup resWaitGroup sync.WaitGroup ) initWaitGroup.Add(1) go func() { repondCtx, cancelRespond := context.WithDeadline(context.Background(), timeout) defer cancelRespond() initWaitGroup.Done() err := b.Reply(repondCtx, testNamespace, func(msg bus.Message) (bus.Message, error) { defer resWaitGroup.Done() req, ok := msg.(*testReqResMessage) if !ok { return nil, errors.WithStack(bus.ErrUnexpectedMessage) } result := &testReqResMessage{req.i} // Simulate random work time.Sleep(time.Millisecond * 100) t.Logf("[RES] sending res #%d", req.i) return result, nil }) if err != nil { t.Error(err) } }() initWaitGroup.Wait() var reqWaitGroup sync.WaitGroup for i := 0; i < expectedRoundTrips; i++ { resWaitGroup.Add(1) reqWaitGroup.Add(1) go func(i int) { defer reqWaitGroup.Done() requestCtx, cancelRequest := context.WithDeadline(context.Background(), timeout) defer cancelRequest() req := &testReqResMessage{i} t.Logf("[REQ] sending req #%d", i) result, err := b.Request(requestCtx, req) if err != nil { t.Error(err) } t.Logf("[REQ] received req #%d reply", i) if result == nil { t.Error("result should not be nil") return } res, ok := result.(*testReqResMessage) if !ok { t.Error(errors.WithStack(bus.ErrUnexpectedMessage)) return } if e, g := req.i, res.i; e != g { t.Errorf("res.i: expected '%v', got '%v'", e, g) } }(i) } reqWaitGroup.Wait() resWaitGroup.Wait() }