jsonmessagepublisher_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package utils
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) {
  7. if q.SubscribersCount() != expected {
  8. t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount())
  9. }
  10. }
  11. func TestJSONMessagePublisherSubscription(t *testing.T) {
  12. q := NewJSONMessagePublisher()
  13. l1 := make(chan JSONMessage)
  14. l2 := make(chan JSONMessage)
  15. assertSubscribersCount(t, q, 0)
  16. q.Subscribe(l1)
  17. assertSubscribersCount(t, q, 1)
  18. q.Subscribe(l2)
  19. assertSubscribersCount(t, q, 2)
  20. q.Unsubscribe(l1)
  21. q.Unsubscribe(l2)
  22. assertSubscribersCount(t, q, 0)
  23. }
  24. func TestJSONMessagePublisherPublish(t *testing.T) {
  25. q := NewJSONMessagePublisher()
  26. l1 := make(chan JSONMessage)
  27. l2 := make(chan JSONMessage)
  28. go func() {
  29. for {
  30. select {
  31. case <-l1:
  32. close(l1)
  33. l1 = nil
  34. case <-l2:
  35. close(l2)
  36. l2 = nil
  37. case <-time.After(1 * time.Second):
  38. q.Unsubscribe(l1)
  39. q.Unsubscribe(l2)
  40. t.Fatal("Timeout waiting for broadcasted message")
  41. }
  42. }
  43. }()
  44. q.Subscribe(l1)
  45. q.Subscribe(l2)
  46. q.Publish(JSONMessage{})
  47. }
  48. func TestJSONMessagePublishTimeout(t *testing.T) {
  49. q := NewJSONMessagePublisher()
  50. l := make(chan JSONMessage)
  51. q.Subscribe(l)
  52. c := make(chan struct{})
  53. go func() {
  54. q.Publish(JSONMessage{})
  55. close(c)
  56. }()
  57. select {
  58. case <-c:
  59. case <-time.After(time.Second):
  60. t.Fatal("Timeout publishing message")
  61. }
  62. }