kv_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package kv
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "os"
  6. "path"
  7. "testing"
  8. "time"
  9. "github.com/docker/docker/pkg/discovery"
  10. "github.com/docker/libkv"
  11. "github.com/docker/libkv/store"
  12. "github.com/go-check/check"
  13. )
  14. // Hook up gocheck into the "go test" runner.
  15. func Test(t *testing.T) { check.TestingT(t) }
  16. type DiscoverySuite struct{}
  17. var _ = check.Suite(&DiscoverySuite{})
  18. func (ds *DiscoverySuite) TestInitialize(c *check.C) {
  19. storeMock := &FakeStore{
  20. Endpoints: []string{"127.0.0.1"},
  21. }
  22. d := &Discovery{backend: store.CONSUL}
  23. d.Initialize("127.0.0.1", 0, 0, nil)
  24. d.store = storeMock
  25. s := d.store.(*FakeStore)
  26. c.Assert(s.Endpoints, check.HasLen, 1)
  27. c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1")
  28. c.Assert(d.path, check.Equals, defaultDiscoveryPath)
  29. storeMock = &FakeStore{
  30. Endpoints: []string{"127.0.0.1:1234"},
  31. }
  32. d = &Discovery{backend: store.CONSUL}
  33. d.Initialize("127.0.0.1:1234/path", 0, 0, nil)
  34. d.store = storeMock
  35. s = d.store.(*FakeStore)
  36. c.Assert(s.Endpoints, check.HasLen, 1)
  37. c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234")
  38. c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
  39. storeMock = &FakeStore{
  40. Endpoints: []string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"},
  41. }
  42. d = &Discovery{backend: store.CONSUL}
  43. d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0, nil)
  44. d.store = storeMock
  45. s = d.store.(*FakeStore)
  46. c.Assert(s.Endpoints, check.HasLen, 3)
  47. c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234")
  48. c.Assert(s.Endpoints[1], check.Equals, "127.0.0.2:1234")
  49. c.Assert(s.Endpoints[2], check.Equals, "127.0.0.3:1234")
  50. c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
  51. }
  52. // Extremely limited mock store so we can test initialization
  53. type Mock struct {
  54. // Endpoints passed to InitializeMock
  55. Endpoints []string
  56. // Options passed to InitializeMock
  57. Options *store.Config
  58. }
  59. func NewMock(endpoints []string, options *store.Config) (store.Store, error) {
  60. s := &Mock{}
  61. s.Endpoints = endpoints
  62. s.Options = options
  63. return s, nil
  64. }
  65. func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
  66. return errors.New("Put not supported")
  67. }
  68. func (s *Mock) Get(key string) (*store.KVPair, error) {
  69. return nil, errors.New("Get not supported")
  70. }
  71. func (s *Mock) Delete(key string) error {
  72. return errors.New("Delete not supported")
  73. }
  74. // Exists mock
  75. func (s *Mock) Exists(key string) (bool, error) {
  76. return false, errors.New("Exists not supported")
  77. }
  78. // Watch mock
  79. func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
  80. return nil, errors.New("Watch not supported")
  81. }
  82. // WatchTree mock
  83. func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
  84. return nil, errors.New("WatchTree not supported")
  85. }
  86. // NewLock mock
  87. func (s *Mock) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
  88. return nil, errors.New("NewLock not supported")
  89. }
  90. // List mock
  91. func (s *Mock) List(prefix string) ([]*store.KVPair, error) {
  92. return nil, errors.New("List not supported")
  93. }
  94. // DeleteTree mock
  95. func (s *Mock) DeleteTree(prefix string) error {
  96. return errors.New("DeleteTree not supported")
  97. }
  98. // AtomicPut mock
  99. func (s *Mock) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
  100. return false, nil, errors.New("AtomicPut not supported")
  101. }
  102. // AtomicDelete mock
  103. func (s *Mock) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
  104. return false, errors.New("AtomicDelete not supported")
  105. }
  106. // Close mock
  107. func (s *Mock) Close() {
  108. return
  109. }
  110. func (ds *DiscoverySuite) TestInitializeWithCerts(c *check.C) {
  111. cert := `-----BEGIN CERTIFICATE-----
  112. MIIDCDCCAfKgAwIBAgIICifG7YeiQOEwCwYJKoZIhvcNAQELMBIxEDAOBgNVBAMT
  113. B1Rlc3QgQ0EwHhcNMTUxMDAxMjMwMDAwWhcNMjAwOTI5MjMwMDAwWjASMRAwDgYD
  114. VQQDEwdUZXN0IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1wRC
  115. O+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4+zE9h80aC4hz+6caRpds
  116. +J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhRSoSi3nY+B7F2E8cuz14q
  117. V2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZrpXUyXxAvzXfpFXo1RhSb
  118. UywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUerVYrCPq8vqfn//01qz55
  119. Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHojxOpXTBepUCIJLbtNnWFT
  120. V44t9gh5IqIWtoBReQIDAQABo2YwZDAOBgNVHQ8BAf8EBAMCAAYwEgYDVR0TAQH/
  121. BAgwBgEB/wIBAjAdBgNVHQ4EFgQUZKUI8IIjIww7X/6hvwggQK4bD24wHwYDVR0j
  122. BBgwFoAUZKUI8IIjIww7X/6hvwggQK4bD24wCwYJKoZIhvcNAQELA4IBAQDES2cz
  123. 7sCQfDCxCIWH7X8kpi/JWExzUyQEJ0rBzN1m3/x8ySRxtXyGekimBqQwQdFqlwMI
  124. xzAQKkh3ue8tNSzRbwqMSyH14N1KrSxYS9e9szJHfUasoTpQGPmDmGIoRJuq1h6M
  125. ej5x1SCJ7GWCR6xEXKUIE9OftXm9TdFzWa7Ja3OHz/mXteii8VXDuZ5ACq6EE5bY
  126. 8sP4gcICfJ5fTrpTlk9FIqEWWQrCGa5wk95PGEj+GJpNogjXQ97wVoo/Y3p1brEn
  127. t5zjN9PAq4H1fuCMdNNA+p1DHNwd+ELTxcMAnb2ajwHvV6lKPXutrTFc4umJToBX
  128. FpTxDmJHEV4bzUzh
  129. -----END CERTIFICATE-----
  130. `
  131. key := `-----BEGIN RSA PRIVATE KEY-----
  132. MIIEpQIBAAKCAQEA1wRCO+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4
  133. +zE9h80aC4hz+6caRpds+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhR
  134. SoSi3nY+B7F2E8cuz14qV2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZr
  135. pXUyXxAvzXfpFXo1RhSbUywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUe
  136. rVYrCPq8vqfn//01qz55Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHoj
  137. xOpXTBepUCIJLbtNnWFTV44t9gh5IqIWtoBReQIDAQABAoIBAHSWipORGp/uKFXj
  138. i/mut776x8ofsAxhnLBARQr93ID+i49W8H7EJGkOfaDjTICYC1dbpGrri61qk8sx
  139. qX7p3v/5NzKwOIfEpirgwVIqSNYe/ncbxnhxkx6tXtUtFKmEx40JskvSpSYAhmmO
  140. 1XSx0E/PWaEN/nLgX/f1eWJIlxlQkk3QeqL+FGbCXI48DEtlJ9+MzMu4pAwZTpj5
  141. 5qtXo5JJ0jRGfJVPAOznRsYqv864AhMdMIWguzk6EGnbaCWwPcfcn+h9a5LMdony
  142. MDHfBS7bb5tkF3+AfnVY3IBMVx7YlsD9eAyajlgiKu4zLbwTRHjXgShy+4Oussz0
  143. ugNGnkECgYEA/hi+McrZC8C4gg6XqK8+9joD8tnyDZDz88BQB7CZqABUSwvjDqlP
  144. L8hcwo/lzvjBNYGkqaFPUICGWKjeCtd8pPS2DCVXxDQX4aHF1vUur0uYNncJiV3N
  145. XQz4Iemsa6wnKf6M67b5vMXICw7dw0HZCdIHD1hnhdtDz0uVpeevLZ8CgYEA2KCT
  146. Y43lorjrbCgMqtlefkr3GJA9dey+hTzCiWEOOqn9RqGoEGUday0sKhiLofOgmN2B
  147. LEukpKIey8s+Q/cb6lReajDVPDsMweX8i7hz3Wa4Ugp4Xa5BpHqu8qIAE2JUZ7bU
  148. t88aQAYE58pUF+/Lq1QzAQdrjjzQBx6SrBxieecCgYEAvukoPZEC8mmiN1VvbTX+
  149. QFHmlZha3QaDxChB+QUe7bMRojEUL/fVnzkTOLuVFqSfxevaI/km9n0ac5KtAchV
  150. xjp2bTnBb5EUQFqjopYktWA+xO07JRJtMfSEmjZPbbay1kKC7rdTfBm961EIHaRj
  151. xZUf6M+rOE8964oGrdgdLlECgYEA046GQmx6fh7/82FtdZDRQp9tj3SWQUtSiQZc
  152. qhO59Lq8mjUXz+MgBuJXxkiwXRpzlbaFB0Bca1fUoYw8o915SrDYf/Zu2OKGQ/qa
  153. V81sgiVmDuEgycR7YOlbX6OsVUHrUlpwhY3hgfMe6UtkMvhBvHF/WhroBEIJm1pV
  154. PXZ/CbMCgYEApNWVktFBjOaYfY6SNn4iSts1jgsQbbpglg3kT7PLKjCAhI6lNsbk
  155. dyT7ut01PL6RaW4SeQWtrJIVQaM6vF3pprMKqlc5XihOGAmVqH7rQx9rtQB5TicL
  156. BFrwkQE4HQtQBV60hYQUzzlSk44VFDz+jxIEtacRHaomDRh2FtOTz+I=
  157. -----END RSA PRIVATE KEY-----
  158. `
  159. certFile, err := ioutil.TempFile("", "cert")
  160. c.Assert(err, check.IsNil)
  161. defer os.Remove(certFile.Name())
  162. certFile.Write([]byte(cert))
  163. certFile.Close()
  164. keyFile, err := ioutil.TempFile("", "key")
  165. c.Assert(err, check.IsNil)
  166. defer os.Remove(keyFile.Name())
  167. keyFile.Write([]byte(key))
  168. keyFile.Close()
  169. libkv.AddStore("mock", NewMock)
  170. d := &Discovery{backend: "mock"}
  171. err = d.Initialize("127.0.0.3:1234", 0, 0, map[string]string{
  172. "kv.cacertfile": certFile.Name(),
  173. "kv.certfile": certFile.Name(),
  174. "kv.keyfile": keyFile.Name(),
  175. })
  176. c.Assert(err, check.IsNil)
  177. s := d.store.(*Mock)
  178. c.Assert(s.Options.TLS, check.NotNil)
  179. c.Assert(s.Options.TLS.RootCAs, check.NotNil)
  180. c.Assert(s.Options.TLS.Certificates, check.HasLen, 1)
  181. }
  182. func (ds *DiscoverySuite) TestWatch(c *check.C) {
  183. mockCh := make(chan []*store.KVPair)
  184. storeMock := &FakeStore{
  185. Endpoints: []string{"127.0.0.1:1234"},
  186. mockKVChan: mockCh,
  187. }
  188. d := &Discovery{backend: store.CONSUL}
  189. d.Initialize("127.0.0.1:1234/path", 0, 0, nil)
  190. d.store = storeMock
  191. expected := discovery.Entries{
  192. &discovery.Entry{Host: "1.1.1.1", Port: "1111"},
  193. &discovery.Entry{Host: "2.2.2.2", Port: "2222"},
  194. }
  195. kvs := []*store.KVPair{
  196. {Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
  197. {Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
  198. }
  199. stopCh := make(chan struct{})
  200. ch, errCh := d.Watch(stopCh)
  201. // It should fire an error since the first WatchTree call failed.
  202. c.Assert(<-errCh, check.ErrorMatches, "test error")
  203. // We have to drain the error channel otherwise Watch will get stuck.
  204. go func() {
  205. for range errCh {
  206. }
  207. }()
  208. // Push the entries into the store channel and make sure discovery emits.
  209. mockCh <- kvs
  210. c.Assert(<-ch, check.DeepEquals, expected)
  211. // Add a new entry.
  212. expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
  213. kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
  214. mockCh <- kvs
  215. c.Assert(<-ch, check.DeepEquals, expected)
  216. close(mockCh)
  217. // Give it enough time to call WatchTree.
  218. time.Sleep(3 * time.Second)
  219. // Stop and make sure it closes all channels.
  220. close(stopCh)
  221. c.Assert(<-ch, check.IsNil)
  222. c.Assert(<-errCh, check.IsNil)
  223. }
  224. // FakeStore implements store.Store methods. It mocks all store
  225. // function in a simple, naive way.
  226. type FakeStore struct {
  227. Endpoints []string
  228. Options *store.Config
  229. mockKVChan <-chan []*store.KVPair
  230. watchTreeCallCount int
  231. }
  232. func (s *FakeStore) Put(key string, value []byte, options *store.WriteOptions) error {
  233. return nil
  234. }
  235. func (s *FakeStore) Get(key string) (*store.KVPair, error) {
  236. return nil, nil
  237. }
  238. func (s *FakeStore) Delete(key string) error {
  239. return nil
  240. }
  241. func (s *FakeStore) Exists(key string) (bool, error) {
  242. return true, nil
  243. }
  244. func (s *FakeStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
  245. return nil, nil
  246. }
  247. // WatchTree will fail the first time, and return the mockKVchan afterwards.
  248. // This is the behavior we need for testing.. If we need 'moar', should update this.
  249. func (s *FakeStore) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
  250. if s.watchTreeCallCount == 0 {
  251. s.watchTreeCallCount = 1
  252. return nil, errors.New("test error")
  253. }
  254. // First calls error
  255. return s.mockKVChan, nil
  256. }
  257. func (s *FakeStore) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
  258. return nil, nil
  259. }
  260. func (s *FakeStore) List(directory string) ([]*store.KVPair, error) {
  261. return []*store.KVPair{}, nil
  262. }
  263. func (s *FakeStore) DeleteTree(directory string) error {
  264. return nil
  265. }
  266. func (s *FakeStore) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
  267. return true, nil, nil
  268. }
  269. func (s *FakeStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
  270. return true, nil
  271. }
  272. func (s *FakeStore) Close() {
  273. }