dnet.go 12 KB


  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "net/http/httptest"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "syscall"
  16. "time"
  17. "github.com/codegangsta/cli"
  18. "github.com/docker/docker/pkg/discovery"
  19. "github.com/docker/docker/pkg/parsers"
  20. "github.com/docker/docker/pkg/reexec"
  21. "github.com/Sirupsen/logrus"
  22. psignal "github.com/docker/docker/pkg/signal"
  23. "github.com/docker/docker/pkg/term"
  24. "github.com/docker/libnetwork"
  25. "github.com/docker/libnetwork/api"
  26. "github.com/docker/libnetwork/config"
  27. "github.com/docker/libnetwork/datastore"
  28. "github.com/docker/libnetwork/driverapi"
  29. "github.com/docker/libnetwork/ipamutils"
  30. "github.com/docker/libnetwork/netlabel"
  31. "github.com/docker/libnetwork/options"
  32. "github.com/docker/libnetwork/types"
  33. "github.com/gorilla/mux"
  34. )
  35. const (
  36. // DefaultHTTPHost is used if only port is provided to -H flag e.g. docker -d -H tcp://:8080
  37. DefaultHTTPHost = "0.0.0.0"
  38. // DefaultHTTPPort is the default http port used by dnet
  39. DefaultHTTPPort = 2385
  40. // DefaultUnixSocket exported
  41. DefaultUnixSocket = "/var/run/dnet.sock"
  42. cfgFileEnv = "LIBNETWORK_CFG"
  43. defaultCfgFile = "/etc/default/libnetwork.toml"
  44. defaultHeartbeat = time.Duration(10) * time.Second
  45. ttlFactor = 2
  46. )
  47. var epConn *dnetConnection
  48. func main() {
  49. if reexec.Init() {
  50. return
  51. }
  52. _, stdout, stderr := term.StdStreams()
  53. logrus.SetOutput(stderr)
  54. err := dnetApp(stdout, stderr)
  55. if err != nil {
  56. os.Exit(1)
  57. }
  58. }
  59. func parseConfig(cfgFile string) (*config.Config, error) {
  60. if strings.Trim(cfgFile, " ") == "" {
  61. cfgFile = os.Getenv(cfgFileEnv)
  62. if strings.Trim(cfgFile, " ") == "" {
  63. cfgFile = defaultCfgFile
  64. }
  65. }
  66. return config.ParseConfig(cfgFile)
  67. }
  68. func processConfig(cfg *config.Config) []config.Option {
  69. options := []config.Option{}
  70. if cfg == nil {
  71. return options
  72. }
  73. dn := "bridge"
  74. if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
  75. dn = cfg.Daemon.DefaultNetwork
  76. }
  77. options = append(options, config.OptionDefaultNetwork(dn))
  78. dd := "bridge"
  79. if strings.TrimSpace(cfg.Daemon.DefaultDriver) != "" {
  80. dd = cfg.Daemon.DefaultDriver
  81. }
  82. options = append(options, config.OptionDefaultDriver(dd))
  83. if cfg.Daemon.Labels != nil {
  84. options = append(options, config.OptionLabels(cfg.Daemon.Labels))
  85. }
  86. if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
  87. options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
  88. options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
  89. }
  90. dOptions, err := startDiscovery(&cfg.Cluster)
  91. if err != nil {
  92. logrus.Infof("Skipping discovery : %s", err.Error())
  93. } else {
  94. options = append(options, dOptions...)
  95. }
  96. return options
  97. }
  98. func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
  99. if cfg == nil {
  100. return nil, fmt.Errorf("discovery requires a valid configuration")
  101. }
  102. hb := time.Duration(cfg.Heartbeat) * time.Second
  103. if hb == 0 {
  104. hb = defaultHeartbeat
  105. }
  106. logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String())
  107. d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb)
  108. if err != nil {
  109. return nil, err
  110. }
  111. if cfg.Address == "" {
  112. iface, err := net.InterfaceByName("eth0")
  113. if err != nil {
  114. return nil, err
  115. }
  116. addrs, err := iface.Addrs()
  117. if err != nil || len(addrs) == 0 {
  118. return nil, err
  119. }
  120. ip, _, _ := net.ParseCIDR(addrs[0].String())
  121. cfg.Address = ip.String()
  122. }
  123. if ip := net.ParseIP(cfg.Address); ip == nil {
  124. return nil, errors.New("address config should be either ipv4 or ipv6 address")
  125. }
  126. if err := d.Register(cfg.Address + ":0"); err != nil {
  127. return nil, err
  128. }
  129. options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
  130. go func() {
  131. for {
  132. select {
  133. case <-time.After(hb):
  134. if err := d.Register(cfg.Address + ":0"); err != nil {
  135. logrus.Warn(err)
  136. }
  137. }
  138. }
  139. }()
  140. return options, nil
  141. }
  142. func dnetApp(stdout, stderr io.Writer) error {
  143. app := cli.NewApp()
  144. app.Name = "dnet"
  145. app.Usage = "A self-sufficient runtime for container networking."
  146. app.Flags = dnetFlags
  147. app.Before = processFlags
  148. app.Commands = dnetCommands
  149. app.Run(os.Args)
  150. return nil
  151. }
  152. func createDefaultNetwork(c libnetwork.NetworkController) {
  153. nw := c.Config().Daemon.DefaultNetwork
  154. d := c.Config().Daemon.DefaultDriver
  155. createOptions := []libnetwork.NetworkOption{}
  156. genericOption := options.Generic{}
  157. if nw != "" && d != "" {
  158. // Bridge driver is special due to legacy reasons
  159. if d == "bridge" {
  160. genericOption[netlabel.GenericData] = map[string]string{
  161. "BridgeName": "docker0",
  162. "DefaultBridge": "true",
  163. }
  164. createOptions = append(createOptions,
  165. libnetwork.NetworkOptionGeneric(genericOption),
  166. ipamOption(nw))
  167. }
  168. if n, err := c.NetworkByName(nw); err == nil {
  169. logrus.Debugf("Default network %s already present. Deleting it", nw)
  170. if err = n.Delete(); err != nil {
  171. logrus.Debugf("Network could not be deleted: %v", err)
  172. return
  173. }
  174. }
  175. _, err := c.NewNetwork(d, nw, createOptions...)
  176. if err != nil {
  177. logrus.Errorf("Error creating default network : %s : %v", nw, err)
  178. }
  179. }
  180. }
  181. type dnetConnection struct {
  182. // proto holds the client protocol i.e. unix.
  183. proto string
  184. // addr holds the client address.
  185. addr string
  186. }
  187. func (d *dnetConnection) dnetDaemon(cfgFile string) error {
  188. if err := startTestDriver(); err != nil {
  189. return fmt.Errorf("failed to start test driver: %v\n", err)
  190. }
  191. cfg, err := parseConfig(cfgFile)
  192. var cOptions []config.Option
  193. if err == nil {
  194. cOptions = processConfig(cfg)
  195. }
  196. controller, err := libnetwork.New(cOptions...)
  197. if err != nil {
  198. fmt.Println("Error starting dnetDaemon :", err)
  199. return err
  200. }
  201. createDefaultNetwork(controller)
  202. httpHandler := api.NewHTTPHandler(controller)
  203. r := mux.NewRouter().StrictSlash(false)
  204. post := r.PathPrefix("/{.*}/networks").Subrouter()
  205. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  206. post = r.PathPrefix("/networks").Subrouter()
  207. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  208. post = r.PathPrefix("/{.*}/services").Subrouter()
  209. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  210. post = r.PathPrefix("/services").Subrouter()
  211. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  212. post = r.PathPrefix("/{.*}/sandboxes").Subrouter()
  213. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  214. post = r.PathPrefix("/sandboxes").Subrouter()
  215. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  216. handleSignals(controller)
  217. setupDumpStackTrap()
  218. return http.ListenAndServe(d.addr, r)
  219. }
  220. func setupDumpStackTrap() {
  221. c := make(chan os.Signal, 1)
  222. signal.Notify(c, syscall.SIGUSR1)
  223. go func() {
  224. for range c {
  225. psignal.DumpStacks()
  226. }
  227. }()
  228. }
  229. func handleSignals(controller libnetwork.NetworkController) {
  230. c := make(chan os.Signal, 1)
  231. signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}
  232. signal.Notify(c, signals...)
  233. go func() {
  234. for _ = range c {
  235. controller.Stop()
  236. os.Exit(0)
  237. }
  238. }()
  239. }
  240. func startTestDriver() error {
  241. mux := http.NewServeMux()
  242. server := httptest.NewServer(mux)
  243. if server == nil {
  244. return fmt.Errorf("Failed to start a HTTP Server")
  245. }
  246. mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
  247. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  248. fmt.Fprintf(w, `{"Implements": ["%s"]}`, driverapi.NetworkPluginEndpointType)
  249. })
  250. mux.HandleFunc(fmt.Sprintf("/%s.GetCapabilities", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  251. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  252. fmt.Fprintf(w, `{"Scope":"global"}`)
  253. })
  254. mux.HandleFunc(fmt.Sprintf("/%s.CreateNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  255. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  256. fmt.Fprintf(w, "null")
  257. })
  258. mux.HandleFunc(fmt.Sprintf("/%s.DeleteNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  259. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  260. fmt.Fprintf(w, "null")
  261. })
  262. mux.HandleFunc(fmt.Sprintf("/%s.CreateEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  263. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  264. fmt.Fprintf(w, "null")
  265. })
  266. mux.HandleFunc(fmt.Sprintf("/%s.DeleteEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  267. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  268. fmt.Fprintf(w, "null")
  269. })
  270. mux.HandleFunc(fmt.Sprintf("/%s.Join", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  271. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  272. fmt.Fprintf(w, "null")
  273. })
  274. mux.HandleFunc(fmt.Sprintf("/%s.Leave", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  275. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  276. fmt.Fprintf(w, "null")
  277. })
  278. if err := os.MkdirAll("/etc/docker/plugins", 0755); err != nil {
  279. return err
  280. }
  281. if err := ioutil.WriteFile("/etc/docker/plugins/test.spec", []byte(server.URL), 0644); err != nil {
  282. return err
  283. }
  284. return nil
  285. }
  286. func newDnetConnection(val string) (*dnetConnection, error) {
  287. url, err := parsers.ParseHost(DefaultHTTPHost, DefaultUnixSocket, val)
  288. if err != nil {
  289. return nil, err
  290. }
  291. protoAddrParts := strings.SplitN(url, "://", 2)
  292. if len(protoAddrParts) != 2 {
  293. return nil, fmt.Errorf("bad format, expected tcp://ADDR")
  294. }
  295. if strings.ToLower(protoAddrParts[0]) != "tcp" {
  296. return nil, fmt.Errorf("dnet currently only supports tcp transport")
  297. }
  298. return &dnetConnection{protoAddrParts[0], protoAddrParts[1]}, nil
  299. }
  300. func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
  301. var in io.Reader
  302. in, err := encodeData(data)
  303. if err != nil {
  304. return nil, nil, -1, err
  305. }
  306. req, err := http.NewRequest(method, fmt.Sprintf("%s", path), in)
  307. if err != nil {
  308. return nil, nil, -1, err
  309. }
  310. setupRequestHeaders(method, data, req, headers)
  311. req.URL.Host = d.addr
  312. req.URL.Scheme = "http"
  313. httpClient := &http.Client{}
  314. resp, err := httpClient.Do(req)
  315. statusCode := -1
  316. if resp != nil {
  317. statusCode = resp.StatusCode
  318. }
  319. if err != nil {
  320. return nil, nil, statusCode, fmt.Errorf("error when trying to connect: %v", err)
  321. }
  322. if statusCode < 200 || statusCode >= 400 {
  323. body, err := ioutil.ReadAll(resp.Body)
  324. if err != nil {
  325. return nil, nil, statusCode, err
  326. }
  327. return nil, nil, statusCode, fmt.Errorf("error : %s", bytes.TrimSpace(body))
  328. }
  329. return resp.Body, resp.Header, statusCode, nil
  330. }
  331. func setupRequestHeaders(method string, data interface{}, req *http.Request, headers map[string][]string) {
  332. if data != nil {
  333. if headers == nil {
  334. headers = make(map[string][]string)
  335. }
  336. headers["Content-Type"] = []string{"application/json"}
  337. }
  338. expectedPayload := (method == "POST" || method == "PUT")
  339. if expectedPayload && req.Header.Get("Content-Type") == "" {
  340. req.Header.Set("Content-Type", "text/plain")
  341. }
  342. if headers != nil {
  343. for k, v := range headers {
  344. req.Header[k] = v
  345. }
  346. }
  347. }
  348. func encodeData(data interface{}) (*bytes.Buffer, error) {
  349. params := bytes.NewBuffer(nil)
  350. if data != nil {
  351. if err := json.NewEncoder(params).Encode(data); err != nil {
  352. return nil, err
  353. }
  354. }
  355. return params, nil
  356. }
  357. func ipamOption(bridgeName string) libnetwork.NetworkOption {
  358. if nw, _, err := ipamutils.ElectInterfaceAddresses(bridgeName); err == nil {
  359. ipamV4Conf := &libnetwork.IpamConf{PreferredPool: nw.String()}
  360. hip, _ := types.GetHostPartIP(nw.IP, nw.Mask)
  361. if hip.IsGlobalUnicast() {
  362. ipamV4Conf.Gateway = nw.IP.String()
  363. }
  364. return libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil)
  365. }
  366. return nil
  367. }