manager_run_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package leakybucket
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/crowdsecurity/crowdsec/pkg/types"
  7. log "github.com/sirupsen/logrus"
  8. "gopkg.in/tomb.v2"
  9. )
  10. func expectBucketCount(buckets *Buckets, expected int) error {
  11. count := 0
  12. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  13. count++
  14. return true
  15. })
  16. if count != expected {
  17. return fmt.Errorf("expected %d live buckets, got %d", expected, count)
  18. }
  19. return nil
  20. }
  21. func TestGCandDump(t *testing.T) {
  22. var (
  23. buckets *Buckets = NewBuckets()
  24. tomb *tomb.Tomb = &tomb.Tomb{}
  25. )
  26. var Holders = []BucketFactory{
  27. //one overflowing soon + bh
  28. BucketFactory{
  29. Name: "test_counter_fast",
  30. Description: "test_counter_fast",
  31. Debug: true,
  32. Type: "counter",
  33. Capacity: -1,
  34. Duration: "0.5s",
  35. Blackhole: "1m",
  36. Filter: "true",
  37. wgDumpState: buckets.wgDumpState,
  38. wgPour: buckets.wgPour,
  39. },
  40. //one long counter
  41. BucketFactory{
  42. Name: "test_counter_slow",
  43. Description: "test_counter_slow",
  44. Debug: true,
  45. Type: "counter",
  46. Capacity: -1,
  47. Duration: "10m",
  48. Filter: "true",
  49. wgDumpState: buckets.wgDumpState,
  50. wgPour: buckets.wgPour,
  51. },
  52. //slow leaky
  53. BucketFactory{
  54. Name: "test_leaky_slow",
  55. Description: "test_leaky_slow",
  56. Debug: true,
  57. Type: "leaky",
  58. Capacity: 5,
  59. LeakSpeed: "10m",
  60. Filter: "true",
  61. wgDumpState: buckets.wgDumpState,
  62. wgPour: buckets.wgPour,
  63. },
  64. }
  65. for idx := range Holders {
  66. if err := LoadBucket(&Holders[idx], tomb); err != nil {
  67. t.Fatalf("while loading (%d/%d): %s", idx, len(Holders), err)
  68. }
  69. if err := ValidateFactory(&Holders[idx]); err != nil {
  70. t.Fatalf("while validating (%d/%d): %s", idx, len(Holders), err)
  71. }
  72. }
  73. log.Printf("Pouring to bucket")
  74. var in = types.Event{Parsed: map[string]string{"something": "something"}}
  75. //pour an item that will go to leaky + counter
  76. ok, err := PourItemToHolders(in, Holders, buckets)
  77. if err != nil {
  78. t.Fatalf("while pouring item : %s", err)
  79. }
  80. if !ok {
  81. t.Fatalf("didn't pour item")
  82. }
  83. time.Sleep(2 * time.Second)
  84. if err := expectBucketCount(buckets, 3); err != nil {
  85. t.Fatal(err)
  86. }
  87. log.Printf("Bucket GC")
  88. //call garbage collector
  89. if err := GarbageCollectBuckets(time.Now().UTC(), buckets); err != nil {
  90. t.Fatalf("failed to garbage collect buckets : %s", err)
  91. }
  92. if err := expectBucketCount(buckets, 1); err != nil {
  93. t.Fatal(err)
  94. }
  95. log.Printf("Dumping buckets state")
  96. //dump remaining buckets
  97. if _, err := DumpBucketsStateAt(time.Now().UTC(), ".", buckets); err != nil {
  98. t.Fatalf("failed to dump buckets : %s", err)
  99. }
  100. }
  101. func TestShutdownBuckets(t *testing.T) {
  102. var (
  103. buckets *Buckets = NewBuckets()
  104. Holders = []BucketFactory{
  105. //one long counter
  106. BucketFactory{
  107. Name: "test_counter_slow",
  108. Description: "test_counter_slow",
  109. Debug: true,
  110. Type: "counter",
  111. Capacity: -1,
  112. Duration: "10m",
  113. Filter: "true",
  114. wgDumpState: buckets.wgDumpState,
  115. wgPour: buckets.wgPour,
  116. },
  117. //slow leaky
  118. BucketFactory{
  119. Name: "test_leaky_slow",
  120. Description: "test_leaky_slow",
  121. Debug: true,
  122. Type: "leaky",
  123. Capacity: 5,
  124. LeakSpeed: "10m",
  125. Filter: "true",
  126. wgDumpState: buckets.wgDumpState,
  127. wgPour: buckets.wgPour,
  128. },
  129. }
  130. tomb *tomb.Tomb = &tomb.Tomb{}
  131. )
  132. for idx := range Holders {
  133. if err := LoadBucket(&Holders[idx], tomb); err != nil {
  134. t.Fatalf("while loading (%d/%d): %s", idx, len(Holders), err)
  135. }
  136. if err := ValidateFactory(&Holders[idx]); err != nil {
  137. t.Fatalf("while validating (%d/%d): %s", idx, len(Holders), err)
  138. }
  139. }
  140. log.Printf("Pouring to bucket")
  141. var in = types.Event{Parsed: map[string]string{"something": "something"}}
  142. //pour an item that will go to leaky + counter
  143. ok, err := PourItemToHolders(in, Holders, buckets)
  144. if err != nil {
  145. t.Fatalf("while pouring item : %s", err)
  146. }
  147. if !ok {
  148. t.Fatalf("didn't pour item")
  149. }
  150. time.Sleep(1 * time.Second)
  151. if err := expectBucketCount(buckets, 2); err != nil {
  152. t.Fatal(err)
  153. }
  154. if err := ShutdownAllBuckets(buckets); err != nil {
  155. t.Fatalf("while shutting down buckets : %s", err)
  156. }
  157. time.Sleep(2 * time.Second)
  158. if err := expectBucketCount(buckets, 2); err != nil {
  159. t.Fatal(err)
  160. }
  161. }