dnet.go 14 KB


  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "net/http/httptest"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "syscall"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/codegangsta/cli"
  19. "github.com/docker/docker/opts"
  20. "github.com/docker/docker/pkg/discovery"
  21. "github.com/docker/docker/pkg/reexec"
  22. "github.com/Sirupsen/logrus"
  23. "github.com/docker/docker/api/types/network"
  24. "github.com/docker/docker/pkg/term"
  25. "github.com/docker/libnetwork"
  26. "github.com/docker/libnetwork/api"
  27. "github.com/docker/libnetwork/config"
  28. "github.com/docker/libnetwork/datastore"
  29. "github.com/docker/libnetwork/driverapi"
  30. "github.com/docker/libnetwork/netlabel"
  31. "github.com/docker/libnetwork/netutils"
  32. "github.com/docker/libnetwork/options"
  33. "github.com/docker/libnetwork/types"
  34. "github.com/gorilla/mux"
  35. "golang.org/x/net/context"
  36. )
  37. const (
  38. // DefaultHTTPHost is used if only port is provided to -H flag e.g. docker -d -H tcp://:8080
  39. DefaultHTTPHost = "0.0.0.0"
  40. // DefaultHTTPPort is the default http port used by dnet
  41. DefaultHTTPPort = 2385
  42. // DefaultUnixSocket exported
  43. DefaultUnixSocket = "/var/run/dnet.sock"
  44. cfgFileEnv = "LIBNETWORK_CFG"
  45. defaultCfgFile = "/etc/default/libnetwork.toml"
  46. defaultHeartbeat = time.Duration(10) * time.Second
  47. ttlFactor = 2
  48. )
  49. var epConn *dnetConnection
  50. func main() {
  51. if reexec.Init() {
  52. return
  53. }
  54. _, stdout, stderr := term.StdStreams()
  55. logrus.SetOutput(stderr)
  56. err := dnetApp(stdout, stderr)
  57. if err != nil {
  58. os.Exit(1)
  59. }
  60. }
  61. // ParseConfig parses the libnetwork configuration file
  62. func (d *dnetConnection) parseOrchestrationConfig(tomlCfgFile string) error {
  63. dummy := &dnetConnection{}
  64. if _, err := toml.DecodeFile(tomlCfgFile, dummy); err != nil {
  65. return err
  66. }
  67. if dummy.Orchestration != nil {
  68. d.Orchestration = dummy.Orchestration
  69. }
  70. return nil
  71. }
  72. func (d *dnetConnection) parseConfig(cfgFile string) (*config.Config, error) {
  73. if strings.Trim(cfgFile, " ") == "" {
  74. cfgFile = os.Getenv(cfgFileEnv)
  75. if strings.Trim(cfgFile, " ") == "" {
  76. cfgFile = defaultCfgFile
  77. }
  78. }
  79. if err := d.parseOrchestrationConfig(cfgFile); err != nil {
  80. return nil, err
  81. }
  82. return config.ParseConfig(cfgFile)
  83. }
  84. func processConfig(cfg *config.Config) []config.Option {
  85. options := []config.Option{}
  86. if cfg == nil {
  87. return options
  88. }
  89. dn := "bridge"
  90. if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
  91. dn = cfg.Daemon.DefaultNetwork
  92. }
  93. options = append(options, config.OptionDefaultNetwork(dn))
  94. dd := "bridge"
  95. if strings.TrimSpace(cfg.Daemon.DefaultDriver) != "" {
  96. dd = cfg.Daemon.DefaultDriver
  97. }
  98. options = append(options, config.OptionDefaultDriver(dd))
  99. if cfg.Daemon.Labels != nil {
  100. options = append(options, config.OptionLabels(cfg.Daemon.Labels))
  101. }
  102. if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
  103. options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
  104. options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
  105. }
  106. dOptions, err := startDiscovery(&cfg.Cluster)
  107. if err != nil {
  108. logrus.Infof("Skipping discovery : %s", err.Error())
  109. } else {
  110. options = append(options, dOptions...)
  111. }
  112. return options
  113. }
  114. func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
  115. if cfg == nil {
  116. return nil, fmt.Errorf("discovery requires a valid configuration")
  117. }
  118. hb := time.Duration(cfg.Heartbeat) * time.Second
  119. if hb == 0 {
  120. hb = defaultHeartbeat
  121. }
  122. logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String())
  123. d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb, map[string]string{})
  124. if err != nil {
  125. return nil, err
  126. }
  127. if cfg.Address == "" {
  128. iface, err := net.InterfaceByName("eth0")
  129. if err != nil {
  130. return nil, err
  131. }
  132. addrs, err := iface.Addrs()
  133. if err != nil || len(addrs) == 0 {
  134. return nil, err
  135. }
  136. ip, _, _ := net.ParseCIDR(addrs[0].String())
  137. cfg.Address = ip.String()
  138. }
  139. if ip := net.ParseIP(cfg.Address); ip == nil {
  140. return nil, errors.New("address config should be either ipv4 or ipv6 address")
  141. }
  142. if err := d.Register(cfg.Address + ":0"); err != nil {
  143. return nil, err
  144. }
  145. options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
  146. go func() {
  147. for {
  148. select {
  149. case <-time.After(hb):
  150. if err := d.Register(cfg.Address + ":0"); err != nil {
  151. logrus.Warn(err)
  152. }
  153. }
  154. }
  155. }()
  156. return options, nil
  157. }
  158. func dnetApp(stdout, stderr io.Writer) error {
  159. app := cli.NewApp()
  160. app.Name = "dnet"
  161. app.Usage = "A self-sufficient runtime for container networking."
  162. app.Flags = dnetFlags
  163. app.Before = processFlags
  164. app.Commands = dnetCommands
  165. app.Run(os.Args)
  166. return nil
  167. }
  168. func createDefaultNetwork(c libnetwork.NetworkController) {
  169. nw := c.Config().Daemon.DefaultNetwork
  170. d := c.Config().Daemon.DefaultDriver
  171. createOptions := []libnetwork.NetworkOption{}
  172. genericOption := options.Generic{}
  173. if nw != "" && d != "" {
  174. // Bridge driver is special due to legacy reasons
  175. if d == "bridge" {
  176. genericOption[netlabel.GenericData] = map[string]string{
  177. "BridgeName": "docker0",
  178. "DefaultBridge": "true",
  179. }
  180. createOptions = append(createOptions,
  181. libnetwork.NetworkOptionGeneric(genericOption),
  182. ipamOption(nw))
  183. }
  184. if n, err := c.NetworkByName(nw); err == nil {
  185. logrus.Debugf("Default network %s already present. Deleting it", nw)
  186. if err = n.Delete(); err != nil {
  187. logrus.Debugf("Network could not be deleted: %v", err)
  188. return
  189. }
  190. }
  191. _, err := c.NewNetwork(d, nw, "", createOptions...)
  192. if err != nil {
  193. logrus.Errorf("Error creating default network : %s : %v", nw, err)
  194. }
  195. }
  196. }
  197. type dnetConnection struct {
  198. // proto holds the client protocol i.e. unix.
  199. proto string
  200. // addr holds the client address.
  201. addr string
  202. Orchestration *NetworkOrchestration
  203. configEvent chan struct{}
  204. }
  205. // NetworkOrchestration exported
  206. type NetworkOrchestration struct {
  207. Agent bool
  208. Manager bool
  209. Bind string
  210. Peer string
  211. }
  212. func (d *dnetConnection) dnetDaemon(cfgFile string) error {
  213. if err := startTestDriver(); err != nil {
  214. return fmt.Errorf("failed to start test driver: %v\n", err)
  215. }
  216. cfg, err := d.parseConfig(cfgFile)
  217. var cOptions []config.Option
  218. if err == nil {
  219. cOptions = processConfig(cfg)
  220. } else {
  221. logrus.Errorf("Error parsing config %v", err)
  222. }
  223. bridgeConfig := options.Generic{
  224. "EnableIPForwarding": true,
  225. "EnableIPTables": true,
  226. }
  227. bridgeOption := options.Generic{netlabel.GenericData: bridgeConfig}
  228. cOptions = append(cOptions, config.OptionDriverConfig("bridge", bridgeOption))
  229. controller, err := libnetwork.New(cOptions...)
  230. if err != nil {
  231. fmt.Println("Error starting dnetDaemon :", err)
  232. return err
  233. }
  234. controller.SetClusterProvider(d)
  235. if d.Orchestration.Agent || d.Orchestration.Manager {
  236. d.configEvent <- struct{}{}
  237. }
  238. createDefaultNetwork(controller)
  239. httpHandler := api.NewHTTPHandler(controller)
  240. r := mux.NewRouter().StrictSlash(false)
  241. post := r.PathPrefix("/{.*}/networks").Subrouter()
  242. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  243. post = r.PathPrefix("/networks").Subrouter()
  244. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  245. post = r.PathPrefix("/{.*}/services").Subrouter()
  246. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  247. post = r.PathPrefix("/services").Subrouter()
  248. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  249. post = r.PathPrefix("/{.*}/sandboxes").Subrouter()
  250. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  251. post = r.PathPrefix("/sandboxes").Subrouter()
  252. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  253. handleSignals(controller)
  254. setupDumpStackTrap()
  255. return http.ListenAndServe(d.addr, r)
  256. }
  257. func (d *dnetConnection) IsManager() bool {
  258. return d.Orchestration.Manager
  259. }
  260. func (d *dnetConnection) IsAgent() bool {
  261. return d.Orchestration.Agent
  262. }
  263. func (d *dnetConnection) GetAdvertiseAddress() string {
  264. return d.Orchestration.Bind
  265. }
  266. func (d *dnetConnection) GetLocalAddress() string {
  267. return d.Orchestration.Bind
  268. }
  269. func (d *dnetConnection) GetRemoteAddress() string {
  270. return d.Orchestration.Peer
  271. }
  272. func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
  273. return nil
  274. }
  275. func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
  276. }
  277. func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
  278. return d.configEvent
  279. }
  280. func (d *dnetConnection) AttachNetwork(string, string, []string) (*network.NetworkingConfig, error) {
  281. return nil, nil
  282. }
  283. func (d *dnetConnection) DetachNetwork(string, string) error {
  284. return nil
  285. }
  286. func (d *dnetConnection) UpdateAttachment(string, string, *network.NetworkingConfig) error {
  287. return nil
  288. }
  289. func (d *dnetConnection) WaitForDetachment(context.Context, string, string, string, string) error {
  290. return nil
  291. }
  292. func handleSignals(controller libnetwork.NetworkController) {
  293. c := make(chan os.Signal, 1)
  294. signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}
  295. signal.Notify(c, signals...)
  296. go func() {
  297. for range c {
  298. controller.Stop()
  299. os.Exit(0)
  300. }
  301. }()
  302. }
  303. func startTestDriver() error {
  304. mux := http.NewServeMux()
  305. server := httptest.NewServer(mux)
  306. if server == nil {
  307. return fmt.Errorf("Failed to start an HTTP Server")
  308. }
  309. mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
  310. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  311. fmt.Fprintf(w, `{"Implements": ["%s"]}`, driverapi.NetworkPluginEndpointType)
  312. })
  313. mux.HandleFunc(fmt.Sprintf("/%s.GetCapabilities", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  314. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  315. fmt.Fprintf(w, `{"Scope":"global"}`)
  316. })
  317. mux.HandleFunc(fmt.Sprintf("/%s.CreateNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  318. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  319. fmt.Fprintf(w, "null")
  320. })
  321. mux.HandleFunc(fmt.Sprintf("/%s.DeleteNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  322. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  323. fmt.Fprintf(w, "null")
  324. })
  325. mux.HandleFunc(fmt.Sprintf("/%s.CreateEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  326. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  327. fmt.Fprintf(w, "null")
  328. })
  329. mux.HandleFunc(fmt.Sprintf("/%s.DeleteEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  330. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  331. fmt.Fprintf(w, "null")
  332. })
  333. mux.HandleFunc(fmt.Sprintf("/%s.Join", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  334. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  335. fmt.Fprintf(w, "null")
  336. })
  337. mux.HandleFunc(fmt.Sprintf("/%s.Leave", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  338. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  339. fmt.Fprintf(w, "null")
  340. })
  341. if err := os.MkdirAll("/etc/docker/plugins", 0755); err != nil {
  342. return err
  343. }
  344. if err := ioutil.WriteFile("/etc/docker/plugins/test.spec", []byte(server.URL), 0644); err != nil {
  345. return err
  346. }
  347. return nil
  348. }
  349. func newDnetConnection(val string) (*dnetConnection, error) {
  350. url, err := opts.ParseHost(false, val)
  351. if err != nil {
  352. return nil, err
  353. }
  354. protoAddrParts := strings.SplitN(url, "://", 2)
  355. if len(protoAddrParts) != 2 {
  356. return nil, fmt.Errorf("bad format, expected tcp://ADDR")
  357. }
  358. if strings.ToLower(protoAddrParts[0]) != "tcp" {
  359. return nil, fmt.Errorf("dnet currently only supports tcp transport")
  360. }
  361. return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
  362. }
  363. func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
  364. var in io.Reader
  365. in, err := encodeData(data)
  366. if err != nil {
  367. return nil, nil, -1, err
  368. }
  369. req, err := http.NewRequest(method, fmt.Sprintf("%s", path), in)
  370. if err != nil {
  371. return nil, nil, -1, err
  372. }
  373. setupRequestHeaders(method, data, req, headers)
  374. req.URL.Host = d.addr
  375. req.URL.Scheme = "http"
  376. httpClient := &http.Client{}
  377. resp, err := httpClient.Do(req)
  378. statusCode := -1
  379. if resp != nil {
  380. statusCode = resp.StatusCode
  381. }
  382. if err != nil {
  383. return nil, nil, statusCode, fmt.Errorf("error when trying to connect: %v", err)
  384. }
  385. if statusCode < 200 || statusCode >= 400 {
  386. body, err := ioutil.ReadAll(resp.Body)
  387. if err != nil {
  388. return nil, nil, statusCode, err
  389. }
  390. return nil, nil, statusCode, fmt.Errorf("error : %s", bytes.TrimSpace(body))
  391. }
  392. return resp.Body, resp.Header, statusCode, nil
  393. }
  394. func setupRequestHeaders(method string, data interface{}, req *http.Request, headers map[string][]string) {
  395. if data != nil {
  396. if headers == nil {
  397. headers = make(map[string][]string)
  398. }
  399. headers["Content-Type"] = []string{"application/json"}
  400. }
  401. expectedPayload := (method == "POST" || method == "PUT")
  402. if expectedPayload && req.Header.Get("Content-Type") == "" {
  403. req.Header.Set("Content-Type", "text/plain")
  404. }
  405. if headers != nil {
  406. for k, v := range headers {
  407. req.Header[k] = v
  408. }
  409. }
  410. }
  411. func encodeData(data interface{}) (*bytes.Buffer, error) {
  412. params := bytes.NewBuffer(nil)
  413. if data != nil {
  414. if err := json.NewEncoder(params).Encode(data); err != nil {
  415. return nil, err
  416. }
  417. }
  418. return params, nil
  419. }
  420. func ipamOption(bridgeName string) libnetwork.NetworkOption {
  421. if nw, _, err := netutils.ElectInterfaceAddresses(bridgeName); err == nil {
  422. ipamV4Conf := &libnetwork.IpamConf{PreferredPool: nw.String()}
  423. hip, _ := types.GetHostPartIP(nw.IP, nw.Mask)
  424. if hip.IsGlobalUnicast() {
  425. ipamV4Conf.Gateway = nw.IP.String()
  426. }
  427. return libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil, nil)
  428. }
  429. return nil
  430. }