networkdbdiagnostic.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. package networkdb
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net/http"
  7. "strings"
  8. "github.com/containerd/log"
  9. "github.com/docker/docker/libnetwork/diagnostic"
  10. "github.com/docker/docker/libnetwork/internal/caller"
  11. )
  12. const (
  13. missingParameter = "missing parameter"
  14. dbNotAvailable = "database not available"
  15. )
  16. // NetDbPaths2Func TODO
  17. var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
  18. "/join": dbJoin,
  19. "/networkpeers": dbPeers,
  20. "/clusterpeers": dbClusterPeers,
  21. "/joinnetwork": dbJoinNetwork,
  22. "/leavenetwork": dbLeaveNetwork,
  23. "/createentry": dbCreateEntry,
  24. "/updateentry": dbUpdateEntry,
  25. "/deleteentry": dbDeleteEntry,
  26. "/getentry": dbGetEntry,
  27. "/gettable": dbGetTable,
  28. "/networkstats": dbNetworkStats,
  29. }
  30. func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  31. _ = r.ParseForm()
  32. diagnostic.DebugHTTPForm(r)
  33. _, json := diagnostic.ParseHTTPFormOptions(r)
  34. // audit logs
  35. logger := log.G(context.TODO()).WithFields(log.Fields{
  36. "component": "diagnostic",
  37. "remoteIP": r.RemoteAddr,
  38. "method": caller.Name(0),
  39. "url": r.URL.String(),
  40. })
  41. logger.Info("join cluster")
  42. if len(r.Form["members"]) < 1 {
  43. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
  44. logger.Error("join cluster failed, wrong input")
  45. diagnostic.HTTPReply(w, rsp, json)
  46. return
  47. }
  48. nDB, ok := ctx.(*NetworkDB)
  49. if ok {
  50. err := nDB.Join(strings.Split(r.Form["members"][0], ","))
  51. if err != nil {
  52. rsp := diagnostic.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
  53. logger.WithError(err).Error("join cluster failed")
  54. diagnostic.HTTPReply(w, rsp, json)
  55. return
  56. }
  57. logger.Info("join cluster done")
  58. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  59. return
  60. }
  61. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  62. }
  63. func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  64. _ = r.ParseForm()
  65. diagnostic.DebugHTTPForm(r)
  66. _, json := diagnostic.ParseHTTPFormOptions(r)
  67. // audit logs
  68. logger := log.G(context.TODO()).WithFields(log.Fields{
  69. "component": "diagnostic",
  70. "remoteIP": r.RemoteAddr,
  71. "method": caller.Name(0),
  72. "url": r.URL.String(),
  73. })
  74. logger.Info("network peers")
  75. if len(r.Form["nid"]) < 1 {
  76. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
  77. logger.Error("network peers failed, wrong input")
  78. diagnostic.HTTPReply(w, rsp, json)
  79. return
  80. }
  81. nDB, ok := ctx.(*NetworkDB)
  82. if ok {
  83. peers := nDB.Peers(r.Form["nid"][0])
  84. rsp := &diagnostic.TableObj{Length: len(peers)}
  85. for i, peerInfo := range peers {
  86. if peerInfo.IP == "unknown" {
  87. rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: "orphan-" + peerInfo.Name, IP: peerInfo.IP})
  88. } else {
  89. rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
  90. }
  91. }
  92. logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
  93. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
  94. return
  95. }
  96. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  97. }
  98. func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  99. _ = r.ParseForm()
  100. diagnostic.DebugHTTPForm(r)
  101. _, json := diagnostic.ParseHTTPFormOptions(r)
  102. // audit logs
  103. logger := log.G(context.TODO()).WithFields(log.Fields{
  104. "component": "diagnostic",
  105. "remoteIP": r.RemoteAddr,
  106. "method": caller.Name(0),
  107. "url": r.URL.String(),
  108. })
  109. logger.Info("cluster peers")
  110. nDB, ok := ctx.(*NetworkDB)
  111. if ok {
  112. peers := nDB.ClusterPeers()
  113. rsp := &diagnostic.TableObj{Length: len(peers)}
  114. for i, peerInfo := range peers {
  115. rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
  116. }
  117. logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
  118. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
  119. return
  120. }
  121. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  122. }
  123. func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  124. _ = r.ParseForm()
  125. diagnostic.DebugHTTPForm(r)
  126. unsafe, json := diagnostic.ParseHTTPFormOptions(r)
  127. // audit logs
  128. logger := log.G(context.TODO()).WithFields(log.Fields{
  129. "component": "diagnostic",
  130. "remoteIP": r.RemoteAddr,
  131. "method": caller.Name(0),
  132. "url": r.URL.String(),
  133. })
  134. logger.Info("create entry")
  135. if len(r.Form["tname"]) < 1 ||
  136. len(r.Form["nid"]) < 1 ||
  137. len(r.Form["key"]) < 1 ||
  138. len(r.Form["value"]) < 1 {
  139. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
  140. logger.Error("create entry failed, wrong input")
  141. diagnostic.HTTPReply(w, rsp, json)
  142. return
  143. }
  144. tname := r.Form["tname"][0]
  145. nid := r.Form["nid"][0]
  146. key := r.Form["key"][0]
  147. value := r.Form["value"][0]
  148. decodedValue := []byte(value)
  149. if !unsafe {
  150. var err error
  151. decodedValue, err = base64.StdEncoding.DecodeString(value)
  152. if err != nil {
  153. logger.WithError(err).Error("create entry failed")
  154. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  155. return
  156. }
  157. }
  158. nDB, ok := ctx.(*NetworkDB)
  159. if ok {
  160. if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
  161. rsp := diagnostic.FailCommand(err)
  162. diagnostic.HTTPReply(w, rsp, json)
  163. logger.WithError(err).Error("create entry failed")
  164. return
  165. }
  166. logger.Info("create entry done")
  167. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  168. return
  169. }
  170. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  171. }
  172. func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  173. _ = r.ParseForm()
  174. diagnostic.DebugHTTPForm(r)
  175. unsafe, json := diagnostic.ParseHTTPFormOptions(r)
  176. // audit logs
  177. logger := log.G(context.TODO()).WithFields(log.Fields{
  178. "component": "diagnostic",
  179. "remoteIP": r.RemoteAddr,
  180. "method": caller.Name(0),
  181. "url": r.URL.String(),
  182. })
  183. logger.Info("update entry")
  184. if len(r.Form["tname"]) < 1 ||
  185. len(r.Form["nid"]) < 1 ||
  186. len(r.Form["key"]) < 1 ||
  187. len(r.Form["value"]) < 1 {
  188. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
  189. logger.Error("update entry failed, wrong input")
  190. diagnostic.HTTPReply(w, rsp, json)
  191. return
  192. }
  193. tname := r.Form["tname"][0]
  194. nid := r.Form["nid"][0]
  195. key := r.Form["key"][0]
  196. value := r.Form["value"][0]
  197. decodedValue := []byte(value)
  198. if !unsafe {
  199. var err error
  200. decodedValue, err = base64.StdEncoding.DecodeString(value)
  201. if err != nil {
  202. logger.WithError(err).Error("update entry failed")
  203. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  204. return
  205. }
  206. }
  207. nDB, ok := ctx.(*NetworkDB)
  208. if ok {
  209. if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
  210. logger.WithError(err).Error("update entry failed")
  211. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  212. return
  213. }
  214. logger.Info("update entry done")
  215. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  216. return
  217. }
  218. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  219. }
  220. func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  221. _ = r.ParseForm()
  222. diagnostic.DebugHTTPForm(r)
  223. _, json := diagnostic.ParseHTTPFormOptions(r)
  224. // audit logs
  225. logger := log.G(context.TODO()).WithFields(log.Fields{
  226. "component": "diagnostic",
  227. "remoteIP": r.RemoteAddr,
  228. "method": caller.Name(0),
  229. "url": r.URL.String(),
  230. })
  231. logger.Info("delete entry")
  232. if len(r.Form["tname"]) < 1 ||
  233. len(r.Form["nid"]) < 1 ||
  234. len(r.Form["key"]) < 1 {
  235. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
  236. logger.Error("delete entry failed, wrong input")
  237. diagnostic.HTTPReply(w, rsp, json)
  238. return
  239. }
  240. tname := r.Form["tname"][0]
  241. nid := r.Form["nid"][0]
  242. key := r.Form["key"][0]
  243. nDB, ok := ctx.(*NetworkDB)
  244. if ok {
  245. err := nDB.DeleteEntry(tname, nid, key)
  246. if err != nil {
  247. logger.WithError(err).Error("delete entry failed")
  248. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  249. return
  250. }
  251. logger.Info("delete entry done")
  252. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  253. return
  254. }
  255. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  256. }
  257. func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  258. _ = r.ParseForm()
  259. diagnostic.DebugHTTPForm(r)
  260. unsafe, json := diagnostic.ParseHTTPFormOptions(r)
  261. // audit logs
  262. logger := log.G(context.TODO()).WithFields(log.Fields{
  263. "component": "diagnostic",
  264. "remoteIP": r.RemoteAddr,
  265. "method": caller.Name(0),
  266. "url": r.URL.String(),
  267. })
  268. logger.Info("get entry")
  269. if len(r.Form["tname"]) < 1 ||
  270. len(r.Form["nid"]) < 1 ||
  271. len(r.Form["key"]) < 1 {
  272. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
  273. logger.Error("get entry failed, wrong input")
  274. diagnostic.HTTPReply(w, rsp, json)
  275. return
  276. }
  277. tname := r.Form["tname"][0]
  278. nid := r.Form["nid"][0]
  279. key := r.Form["key"][0]
  280. nDB, ok := ctx.(*NetworkDB)
  281. if ok {
  282. value, err := nDB.GetEntry(tname, nid, key)
  283. if err != nil {
  284. logger.WithError(err).Error("get entry failed")
  285. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  286. return
  287. }
  288. var encodedValue string
  289. if unsafe {
  290. encodedValue = string(value)
  291. } else {
  292. encodedValue = base64.StdEncoding.EncodeToString(value)
  293. }
  294. rsp := &diagnostic.TableEntryObj{Key: key, Value: encodedValue}
  295. logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get entry done")
  296. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
  297. return
  298. }
  299. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  300. }
  301. func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  302. _ = r.ParseForm()
  303. diagnostic.DebugHTTPForm(r)
  304. _, json := diagnostic.ParseHTTPFormOptions(r)
  305. // audit logs
  306. logger := log.G(context.TODO()).WithFields(log.Fields{
  307. "component": "diagnostic",
  308. "remoteIP": r.RemoteAddr,
  309. "method": caller.Name(0),
  310. "url": r.URL.String(),
  311. })
  312. logger.Info("join network")
  313. if len(r.Form["nid"]) < 1 {
  314. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
  315. logger.Error("join network failed, wrong input")
  316. diagnostic.HTTPReply(w, rsp, json)
  317. return
  318. }
  319. nid := r.Form["nid"][0]
  320. nDB, ok := ctx.(*NetworkDB)
  321. if ok {
  322. if err := nDB.JoinNetwork(nid); err != nil {
  323. logger.WithError(err).Error("join network failed")
  324. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  325. return
  326. }
  327. logger.Info("join network done")
  328. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  329. return
  330. }
  331. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  332. }
  333. func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  334. _ = r.ParseForm()
  335. diagnostic.DebugHTTPForm(r)
  336. _, json := diagnostic.ParseHTTPFormOptions(r)
  337. // audit logs
  338. logger := log.G(context.TODO()).WithFields(log.Fields{
  339. "component": "diagnostic",
  340. "remoteIP": r.RemoteAddr,
  341. "method": caller.Name(0),
  342. "url": r.URL.String(),
  343. })
  344. logger.Info("leave network")
  345. if len(r.Form["nid"]) < 1 {
  346. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
  347. logger.Error("leave network failed, wrong input")
  348. diagnostic.HTTPReply(w, rsp, json)
  349. return
  350. }
  351. nid := r.Form["nid"][0]
  352. nDB, ok := ctx.(*NetworkDB)
  353. if ok {
  354. if err := nDB.LeaveNetwork(nid); err != nil {
  355. logger.WithError(err).Error("leave network failed")
  356. diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
  357. return
  358. }
  359. logger.Info("leave network done")
  360. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
  361. return
  362. }
  363. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  364. }
  365. func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  366. _ = r.ParseForm()
  367. diagnostic.DebugHTTPForm(r)
  368. unsafe, json := diagnostic.ParseHTTPFormOptions(r)
  369. // audit logs
  370. logger := log.G(context.TODO()).WithFields(log.Fields{
  371. "component": "diagnostic",
  372. "remoteIP": r.RemoteAddr,
  373. "method": caller.Name(0),
  374. "url": r.URL.String(),
  375. })
  376. logger.Info("get table")
  377. if len(r.Form["tname"]) < 1 ||
  378. len(r.Form["nid"]) < 1 {
  379. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
  380. logger.Error("get table failed, wrong input")
  381. diagnostic.HTTPReply(w, rsp, json)
  382. return
  383. }
  384. tname := r.Form["tname"][0]
  385. nid := r.Form["nid"][0]
  386. nDB, ok := ctx.(*NetworkDB)
  387. if ok {
  388. table := nDB.GetTableByNetwork(tname, nid)
  389. rsp := &diagnostic.TableObj{Length: len(table)}
  390. i := 0
  391. for k, v := range table {
  392. var encodedValue string
  393. if unsafe {
  394. encodedValue = string(v.Value)
  395. } else {
  396. encodedValue = base64.StdEncoding.EncodeToString(v.Value)
  397. }
  398. rsp.Elements = append(rsp.Elements,
  399. &diagnostic.TableEntryObj{
  400. Index: i,
  401. Key: k,
  402. Value: encodedValue,
  403. Owner: v.owner,
  404. })
  405. i++
  406. }
  407. logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
  408. diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
  409. return
  410. }
  411. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  412. }
  413. func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
  414. _ = r.ParseForm()
  415. diagnostic.DebugHTTPForm(r)
  416. _, json := diagnostic.ParseHTTPFormOptions(r)
  417. // audit logs
  418. logger := log.G(context.TODO()).WithFields(log.Fields{
  419. "component": "diagnostic",
  420. "remoteIP": r.RemoteAddr,
  421. "method": caller.Name(0),
  422. "url": r.URL.String(),
  423. })
  424. logger.Info("network stats")
  425. if len(r.Form["nid"]) < 1 {
  426. rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
  427. logger.Error("network stats failed, wrong input")
  428. diagnostic.HTTPReply(w, rsp, json)
  429. return
  430. }
  431. nDB, ok := ctx.(*NetworkDB)
  432. if ok {
  433. nDB.RLock()
  434. networks := nDB.networks[nDB.config.NodeID]
  435. network, ok := networks[r.Form["nid"][0]]
  436. entries := -1
  437. qLen := -1
  438. if ok {
  439. entries = int(network.entriesNumber.Load())
  440. qLen = network.tableBroadcasts.NumQueued()
  441. }
  442. nDB.RUnlock()
  443. rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen})
  444. logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done")
  445. diagnostic.HTTPReply(w, rsp, json)
  446. return
  447. }
  448. diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
  449. }