dnet.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "net/http/httptest"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "syscall"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/codegangsta/cli"
  19. "github.com/docker/docker/opts"
  20. "github.com/docker/docker/pkg/discovery"
  21. "github.com/docker/docker/pkg/reexec"
  22. "github.com/Sirupsen/logrus"
  23. "github.com/docker/docker/pkg/term"
  24. "github.com/docker/libnetwork"
  25. "github.com/docker/libnetwork/api"
  26. "github.com/docker/libnetwork/config"
  27. "github.com/docker/libnetwork/datastore"
  28. "github.com/docker/libnetwork/driverapi"
  29. "github.com/docker/libnetwork/netlabel"
  30. "github.com/docker/libnetwork/netutils"
  31. "github.com/docker/libnetwork/options"
  32. "github.com/docker/libnetwork/types"
  33. "github.com/gorilla/mux"
  34. )
  35. const (
  36. // DefaultHTTPHost is used if only port is provided to -H flag e.g. docker -d -H tcp://:8080
  37. DefaultHTTPHost = "0.0.0.0"
  38. // DefaultHTTPPort is the default http port used by dnet
  39. DefaultHTTPPort = 2385
  40. // DefaultUnixSocket exported
  41. DefaultUnixSocket = "/var/run/dnet.sock"
  42. cfgFileEnv = "LIBNETWORK_CFG"
  43. defaultCfgFile = "/etc/default/libnetwork.toml"
  44. defaultHeartbeat = time.Duration(10) * time.Second
  45. ttlFactor = 2
  46. )
  47. var epConn *dnetConnection
  48. func main() {
  49. if reexec.Init() {
  50. return
  51. }
  52. _, stdout, stderr := term.StdStreams()
  53. logrus.SetOutput(stderr)
  54. err := dnetApp(stdout, stderr)
  55. if err != nil {
  56. os.Exit(1)
  57. }
  58. }
  59. // ParseConfig parses the libnetwork configuration file
  60. func (d *dnetConnection) parseOrchestrationConfig(tomlCfgFile string) error {
  61. dummy := &dnetConnection{}
  62. if _, err := toml.DecodeFile(tomlCfgFile, dummy); err != nil {
  63. return err
  64. }
  65. if dummy.Orchestration != nil {
  66. d.Orchestration = dummy.Orchestration
  67. }
  68. return nil
  69. }
  70. func (d *dnetConnection) parseConfig(cfgFile string) (*config.Config, error) {
  71. if strings.Trim(cfgFile, " ") == "" {
  72. cfgFile = os.Getenv(cfgFileEnv)
  73. if strings.Trim(cfgFile, " ") == "" {
  74. cfgFile = defaultCfgFile
  75. }
  76. }
  77. if err := d.parseOrchestrationConfig(cfgFile); err != nil {
  78. return nil, err
  79. }
  80. return config.ParseConfig(cfgFile)
  81. }
  82. func processConfig(cfg *config.Config) []config.Option {
  83. options := []config.Option{}
  84. if cfg == nil {
  85. return options
  86. }
  87. dn := "bridge"
  88. if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
  89. dn = cfg.Daemon.DefaultNetwork
  90. }
  91. options = append(options, config.OptionDefaultNetwork(dn))
  92. dd := "bridge"
  93. if strings.TrimSpace(cfg.Daemon.DefaultDriver) != "" {
  94. dd = cfg.Daemon.DefaultDriver
  95. }
  96. options = append(options, config.OptionDefaultDriver(dd))
  97. if cfg.Daemon.Labels != nil {
  98. options = append(options, config.OptionLabels(cfg.Daemon.Labels))
  99. }
  100. if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
  101. options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
  102. options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
  103. }
  104. dOptions, err := startDiscovery(&cfg.Cluster)
  105. if err != nil {
  106. logrus.Infof("Skipping discovery : %s", err.Error())
  107. } else {
  108. options = append(options, dOptions...)
  109. }
  110. return options
  111. }
  112. func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
  113. if cfg == nil {
  114. return nil, fmt.Errorf("discovery requires a valid configuration")
  115. }
  116. hb := time.Duration(cfg.Heartbeat) * time.Second
  117. if hb == 0 {
  118. hb = defaultHeartbeat
  119. }
  120. logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String())
  121. d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb, map[string]string{})
  122. if err != nil {
  123. return nil, err
  124. }
  125. if cfg.Address == "" {
  126. iface, err := net.InterfaceByName("eth0")
  127. if err != nil {
  128. return nil, err
  129. }
  130. addrs, err := iface.Addrs()
  131. if err != nil || len(addrs) == 0 {
  132. return nil, err
  133. }
  134. ip, _, _ := net.ParseCIDR(addrs[0].String())
  135. cfg.Address = ip.String()
  136. }
  137. if ip := net.ParseIP(cfg.Address); ip == nil {
  138. return nil, errors.New("address config should be either ipv4 or ipv6 address")
  139. }
  140. if err := d.Register(cfg.Address + ":0"); err != nil {
  141. return nil, err
  142. }
  143. options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
  144. go func() {
  145. for {
  146. select {
  147. case <-time.After(hb):
  148. if err := d.Register(cfg.Address + ":0"); err != nil {
  149. logrus.Warn(err)
  150. }
  151. }
  152. }
  153. }()
  154. return options, nil
  155. }
  156. func dnetApp(stdout, stderr io.Writer) error {
  157. app := cli.NewApp()
  158. app.Name = "dnet"
  159. app.Usage = "A self-sufficient runtime for container networking."
  160. app.Flags = dnetFlags
  161. app.Before = processFlags
  162. app.Commands = dnetCommands
  163. app.Run(os.Args)
  164. return nil
  165. }
  166. func createDefaultNetwork(c libnetwork.NetworkController) {
  167. nw := c.Config().Daemon.DefaultNetwork
  168. d := c.Config().Daemon.DefaultDriver
  169. createOptions := []libnetwork.NetworkOption{}
  170. genericOption := options.Generic{}
  171. if nw != "" && d != "" {
  172. // Bridge driver is special due to legacy reasons
  173. if d == "bridge" {
  174. genericOption[netlabel.GenericData] = map[string]string{
  175. "BridgeName": "docker0",
  176. "DefaultBridge": "true",
  177. }
  178. createOptions = append(createOptions,
  179. libnetwork.NetworkOptionGeneric(genericOption),
  180. ipamOption(nw))
  181. }
  182. if n, err := c.NetworkByName(nw); err == nil {
  183. logrus.Debugf("Default network %s already present. Deleting it", nw)
  184. if err = n.Delete(); err != nil {
  185. logrus.Debugf("Network could not be deleted: %v", err)
  186. return
  187. }
  188. }
  189. _, err := c.NewNetwork(d, nw, "", createOptions...)
  190. if err != nil {
  191. logrus.Errorf("Error creating default network : %s : %v", nw, err)
  192. }
  193. }
  194. }
  195. type dnetConnection struct {
  196. // proto holds the client protocol i.e. unix.
  197. proto string
  198. // addr holds the client address.
  199. addr string
  200. Orchestration *NetworkOrchestration
  201. configEvent chan struct{}
  202. }
  203. // NetworkOrchestration exported
  204. type NetworkOrchestration struct {
  205. Agent bool
  206. Manager bool
  207. Bind string
  208. Peer string
  209. }
  210. func (d *dnetConnection) dnetDaemon(cfgFile string) error {
  211. if err := startTestDriver(); err != nil {
  212. return fmt.Errorf("failed to start test driver: %v\n", err)
  213. }
  214. cfg, err := d.parseConfig(cfgFile)
  215. var cOptions []config.Option
  216. if err == nil {
  217. cOptions = processConfig(cfg)
  218. } else {
  219. logrus.Errorf("Error parsing config %v", err)
  220. }
  221. bridgeConfig := options.Generic{
  222. "EnableIPForwarding": true,
  223. "EnableIPTables": true,
  224. }
  225. bridgeOption := options.Generic{netlabel.GenericData: bridgeConfig}
  226. cOptions = append(cOptions, config.OptionDriverConfig("bridge", bridgeOption))
  227. controller, err := libnetwork.New(cOptions...)
  228. if err != nil {
  229. fmt.Println("Error starting dnetDaemon :", err)
  230. return err
  231. }
  232. controller.SetClusterProvider(d)
  233. if d.Orchestration.Agent || d.Orchestration.Manager {
  234. d.configEvent <- struct{}{}
  235. }
  236. createDefaultNetwork(controller)
  237. httpHandler := api.NewHTTPHandler(controller)
  238. r := mux.NewRouter().StrictSlash(false)
  239. post := r.PathPrefix("/{.*}/networks").Subrouter()
  240. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  241. post = r.PathPrefix("/networks").Subrouter()
  242. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  243. post = r.PathPrefix("/{.*}/services").Subrouter()
  244. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  245. post = r.PathPrefix("/services").Subrouter()
  246. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  247. post = r.PathPrefix("/{.*}/sandboxes").Subrouter()
  248. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  249. post = r.PathPrefix("/sandboxes").Subrouter()
  250. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  251. handleSignals(controller)
  252. setupDumpStackTrap()
  253. return http.ListenAndServe(d.addr, r)
  254. }
  255. func (d *dnetConnection) IsManager() bool {
  256. return d.Orchestration.Manager
  257. }
  258. func (d *dnetConnection) IsAgent() bool {
  259. return d.Orchestration.Agent
  260. }
  261. func (d *dnetConnection) GetListenAddress() string {
  262. return d.Orchestration.Bind
  263. }
  264. func (d *dnetConnection) GetRemoteAddress() string {
  265. return d.Orchestration.Peer
  266. }
  267. func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
  268. return nil
  269. }
  270. func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
  271. }
  272. func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
  273. return d.configEvent
  274. }
  275. func handleSignals(controller libnetwork.NetworkController) {
  276. c := make(chan os.Signal, 1)
  277. signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}
  278. signal.Notify(c, signals...)
  279. go func() {
  280. for range c {
  281. controller.Stop()
  282. os.Exit(0)
  283. }
  284. }()
  285. }
  286. func startTestDriver() error {
  287. mux := http.NewServeMux()
  288. server := httptest.NewServer(mux)
  289. if server == nil {
  290. return fmt.Errorf("Failed to start a HTTP Server")
  291. }
  292. mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
  293. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  294. fmt.Fprintf(w, `{"Implements": ["%s"]}`, driverapi.NetworkPluginEndpointType)
  295. })
  296. mux.HandleFunc(fmt.Sprintf("/%s.GetCapabilities", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  297. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  298. fmt.Fprintf(w, `{"Scope":"global"}`)
  299. })
  300. mux.HandleFunc(fmt.Sprintf("/%s.CreateNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  301. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  302. fmt.Fprintf(w, "null")
  303. })
  304. mux.HandleFunc(fmt.Sprintf("/%s.DeleteNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  305. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  306. fmt.Fprintf(w, "null")
  307. })
  308. mux.HandleFunc(fmt.Sprintf("/%s.CreateEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  309. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  310. fmt.Fprintf(w, "null")
  311. })
  312. mux.HandleFunc(fmt.Sprintf("/%s.DeleteEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  313. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  314. fmt.Fprintf(w, "null")
  315. })
  316. mux.HandleFunc(fmt.Sprintf("/%s.Join", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  317. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  318. fmt.Fprintf(w, "null")
  319. })
  320. mux.HandleFunc(fmt.Sprintf("/%s.Leave", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  321. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  322. fmt.Fprintf(w, "null")
  323. })
  324. if err := os.MkdirAll("/etc/docker/plugins", 0755); err != nil {
  325. return err
  326. }
  327. if err := ioutil.WriteFile("/etc/docker/plugins/test.spec", []byte(server.URL), 0644); err != nil {
  328. return err
  329. }
  330. return nil
  331. }
  332. func newDnetConnection(val string) (*dnetConnection, error) {
  333. url, err := opts.ParseHost(false, val)
  334. if err != nil {
  335. return nil, err
  336. }
  337. protoAddrParts := strings.SplitN(url, "://", 2)
  338. if len(protoAddrParts) != 2 {
  339. return nil, fmt.Errorf("bad format, expected tcp://ADDR")
  340. }
  341. if strings.ToLower(protoAddrParts[0]) != "tcp" {
  342. return nil, fmt.Errorf("dnet currently only supports tcp transport")
  343. }
  344. return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
  345. }
  346. func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
  347. var in io.Reader
  348. in, err := encodeData(data)
  349. if err != nil {
  350. return nil, nil, -1, err
  351. }
  352. req, err := http.NewRequest(method, fmt.Sprintf("%s", path), in)
  353. if err != nil {
  354. return nil, nil, -1, err
  355. }
  356. setupRequestHeaders(method, data, req, headers)
  357. req.URL.Host = d.addr
  358. req.URL.Scheme = "http"
  359. httpClient := &http.Client{}
  360. resp, err := httpClient.Do(req)
  361. statusCode := -1
  362. if resp != nil {
  363. statusCode = resp.StatusCode
  364. }
  365. if err != nil {
  366. return nil, nil, statusCode, fmt.Errorf("error when trying to connect: %v", err)
  367. }
  368. if statusCode < 200 || statusCode >= 400 {
  369. body, err := ioutil.ReadAll(resp.Body)
  370. if err != nil {
  371. return nil, nil, statusCode, err
  372. }
  373. return nil, nil, statusCode, fmt.Errorf("error : %s", bytes.TrimSpace(body))
  374. }
  375. return resp.Body, resp.Header, statusCode, nil
  376. }
  377. func setupRequestHeaders(method string, data interface{}, req *http.Request, headers map[string][]string) {
  378. if data != nil {
  379. if headers == nil {
  380. headers = make(map[string][]string)
  381. }
  382. headers["Content-Type"] = []string{"application/json"}
  383. }
  384. expectedPayload := (method == "POST" || method == "PUT")
  385. if expectedPayload && req.Header.Get("Content-Type") == "" {
  386. req.Header.Set("Content-Type", "text/plain")
  387. }
  388. if headers != nil {
  389. for k, v := range headers {
  390. req.Header[k] = v
  391. }
  392. }
  393. }
  394. func encodeData(data interface{}) (*bytes.Buffer, error) {
  395. params := bytes.NewBuffer(nil)
  396. if data != nil {
  397. if err := json.NewEncoder(params).Encode(data); err != nil {
  398. return nil, err
  399. }
  400. }
  401. return params, nil
  402. }
  403. func ipamOption(bridgeName string) libnetwork.NetworkOption {
  404. if nw, _, err := netutils.ElectInterfaceAddresses(bridgeName); err == nil {
  405. ipamV4Conf := &libnetwork.IpamConf{PreferredPool: nw.String()}
  406. hip, _ := types.GetHostPartIP(nw.IP, nw.Mask)
  407. if hip.IsGlobalUnicast() {
  408. ipamV4Conf.Gateway = nw.IP.String()
  409. }
  410. return libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil, nil)
  411. }
  412. return nil
  413. }