dnet.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "net/http/httptest"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "syscall"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/codegangsta/cli"
  19. "github.com/docker/docker/opts"
  20. "github.com/docker/docker/pkg/discovery"
  21. "github.com/docker/docker/pkg/reexec"
  22. "github.com/Sirupsen/logrus"
  23. "github.com/docker/docker/api/types/network"
  24. "github.com/docker/docker/pkg/term"
  25. "github.com/docker/libnetwork"
  26. "github.com/docker/libnetwork/api"
  27. "github.com/docker/libnetwork/cluster"
  28. "github.com/docker/libnetwork/config"
  29. "github.com/docker/libnetwork/datastore"
  30. "github.com/docker/libnetwork/driverapi"
  31. "github.com/docker/libnetwork/netlabel"
  32. "github.com/docker/libnetwork/netutils"
  33. "github.com/docker/libnetwork/options"
  34. "github.com/docker/libnetwork/types"
  35. "github.com/gorilla/mux"
  36. "golang.org/x/net/context"
  37. )
  38. const (
  39. // DefaultHTTPHost is used if only port is provided to -H flag e.g. docker -d -H tcp://:8080
  40. DefaultHTTPHost = "0.0.0.0"
  41. // DefaultHTTPPort is the default http port used by dnet
  42. DefaultHTTPPort = 2385
  43. // DefaultUnixSocket exported
  44. DefaultUnixSocket = "/var/run/dnet.sock"
  45. cfgFileEnv = "LIBNETWORK_CFG"
  46. defaultCfgFile = "/etc/default/libnetwork.toml"
  47. defaultHeartbeat = time.Duration(10) * time.Second
  48. ttlFactor = 2
  49. )
  50. var epConn *dnetConnection
  51. func main() {
  52. if reexec.Init() {
  53. return
  54. }
  55. _, stdout, stderr := term.StdStreams()
  56. logrus.SetOutput(stderr)
  57. err := dnetApp(stdout, stderr)
  58. if err != nil {
  59. os.Exit(1)
  60. }
  61. }
  62. // ParseConfig parses the libnetwork configuration file
  63. func (d *dnetConnection) parseOrchestrationConfig(tomlCfgFile string) error {
  64. dummy := &dnetConnection{}
  65. if _, err := toml.DecodeFile(tomlCfgFile, dummy); err != nil {
  66. return err
  67. }
  68. if dummy.Orchestration != nil {
  69. d.Orchestration = dummy.Orchestration
  70. }
  71. return nil
  72. }
  73. func (d *dnetConnection) parseConfig(cfgFile string) (*config.Config, error) {
  74. if strings.Trim(cfgFile, " ") == "" {
  75. cfgFile = os.Getenv(cfgFileEnv)
  76. if strings.Trim(cfgFile, " ") == "" {
  77. cfgFile = defaultCfgFile
  78. }
  79. }
  80. if err := d.parseOrchestrationConfig(cfgFile); err != nil {
  81. return nil, err
  82. }
  83. return config.ParseConfig(cfgFile)
  84. }
  85. func processConfig(cfg *config.Config) []config.Option {
  86. options := []config.Option{}
  87. if cfg == nil {
  88. return options
  89. }
  90. dn := "bridge"
  91. if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
  92. dn = cfg.Daemon.DefaultNetwork
  93. }
  94. options = append(options, config.OptionDefaultNetwork(dn))
  95. dd := "bridge"
  96. if strings.TrimSpace(cfg.Daemon.DefaultDriver) != "" {
  97. dd = cfg.Daemon.DefaultDriver
  98. }
  99. options = append(options, config.OptionDefaultDriver(dd))
  100. if cfg.Daemon.Labels != nil {
  101. options = append(options, config.OptionLabels(cfg.Daemon.Labels))
  102. }
  103. if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
  104. options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
  105. options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
  106. }
  107. dOptions, err := startDiscovery(&cfg.Cluster)
  108. if err != nil {
  109. logrus.Infof("Skipping discovery : %s", err.Error())
  110. } else {
  111. options = append(options, dOptions...)
  112. }
  113. return options
  114. }
  115. func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
  116. if cfg == nil {
  117. return nil, errors.New("discovery requires a valid configuration")
  118. }
  119. hb := time.Duration(cfg.Heartbeat) * time.Second
  120. if hb == 0 {
  121. hb = defaultHeartbeat
  122. }
  123. logrus.Infof("discovery : %s %s", cfg.Discovery, hb.String())
  124. d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb, map[string]string{})
  125. if err != nil {
  126. return nil, err
  127. }
  128. if cfg.Address == "" {
  129. iface, err := net.InterfaceByName("eth0")
  130. if err != nil {
  131. return nil, err
  132. }
  133. addrs, err := iface.Addrs()
  134. if err != nil || len(addrs) == 0 {
  135. return nil, err
  136. }
  137. ip, _, _ := net.ParseCIDR(addrs[0].String())
  138. cfg.Address = ip.String()
  139. }
  140. if ip := net.ParseIP(cfg.Address); ip == nil {
  141. return nil, errors.New("address config should be either ipv4 or ipv6 address")
  142. }
  143. if err := d.Register(cfg.Address + ":0"); err != nil {
  144. return nil, err
  145. }
  146. options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
  147. go func() {
  148. for {
  149. select {
  150. case <-time.After(hb):
  151. if err := d.Register(cfg.Address + ":0"); err != nil {
  152. logrus.Warn(err)
  153. }
  154. }
  155. }
  156. }()
  157. return options, nil
  158. }
  159. func dnetApp(stdout, stderr io.Writer) error {
  160. app := cli.NewApp()
  161. app.Name = "dnet"
  162. app.Usage = "A self-sufficient runtime for container networking."
  163. app.Flags = dnetFlags
  164. app.Before = processFlags
  165. app.Commands = dnetCommands
  166. app.Run(os.Args)
  167. return nil
  168. }
  169. func createDefaultNetwork(c libnetwork.NetworkController) {
  170. nw := c.Config().Daemon.DefaultNetwork
  171. d := c.Config().Daemon.DefaultDriver
  172. createOptions := []libnetwork.NetworkOption{}
  173. genericOption := options.Generic{}
  174. if nw != "" && d != "" {
  175. // Bridge driver is special due to legacy reasons
  176. if d == "bridge" {
  177. genericOption[netlabel.GenericData] = map[string]string{
  178. "BridgeName": "docker0",
  179. "DefaultBridge": "true",
  180. }
  181. createOptions = append(createOptions,
  182. libnetwork.NetworkOptionGeneric(genericOption),
  183. ipamOption(nw))
  184. }
  185. if n, err := c.NetworkByName(nw); err == nil {
  186. logrus.Debugf("Default network %s already present. Deleting it", nw)
  187. if err = n.Delete(); err != nil {
  188. logrus.Debugf("Network could not be deleted: %v", err)
  189. return
  190. }
  191. }
  192. _, err := c.NewNetwork(d, nw, "", createOptions...)
  193. if err != nil {
  194. logrus.Errorf("Error creating default network : %s : %v", nw, err)
  195. }
  196. }
  197. }
  198. type dnetConnection struct {
  199. // proto holds the client protocol i.e. unix.
  200. proto string
  201. // addr holds the client address.
  202. addr string
  203. Orchestration *NetworkOrchestration
  204. configEvent chan cluster.ConfigEventType
  205. }
  206. // NetworkOrchestration exported
  207. type NetworkOrchestration struct {
  208. Agent bool
  209. Manager bool
  210. Bind string
  211. Peer string
  212. }
  213. func (d *dnetConnection) dnetDaemon(cfgFile string) error {
  214. if err := startTestDriver(); err != nil {
  215. return fmt.Errorf("failed to start test driver: %v", err)
  216. }
  217. cfg, err := d.parseConfig(cfgFile)
  218. var cOptions []config.Option
  219. if err == nil {
  220. cOptions = processConfig(cfg)
  221. } else {
  222. logrus.Errorf("Error parsing config %v", err)
  223. }
  224. bridgeConfig := options.Generic{
  225. "EnableIPForwarding": true,
  226. "EnableIPTables": true,
  227. }
  228. bridgeOption := options.Generic{netlabel.GenericData: bridgeConfig}
  229. cOptions = append(cOptions, config.OptionDriverConfig("bridge", bridgeOption))
  230. controller, err := libnetwork.New(cOptions...)
  231. if err != nil {
  232. fmt.Println("Error starting dnetDaemon :", err)
  233. return err
  234. }
  235. controller.SetClusterProvider(d)
  236. if d.Orchestration.Agent || d.Orchestration.Manager {
  237. d.configEvent <- cluster.EventNodeReady
  238. }
  239. createDefaultNetwork(controller)
  240. httpHandler := api.NewHTTPHandler(controller)
  241. r := mux.NewRouter().StrictSlash(false)
  242. post := r.PathPrefix("/{.*}/networks").Subrouter()
  243. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  244. post = r.PathPrefix("/networks").Subrouter()
  245. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  246. post = r.PathPrefix("/{.*}/services").Subrouter()
  247. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  248. post = r.PathPrefix("/services").Subrouter()
  249. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  250. post = r.PathPrefix("/{.*}/sandboxes").Subrouter()
  251. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  252. post = r.PathPrefix("/sandboxes").Subrouter()
  253. post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
  254. handleSignals(controller)
  255. setupDumpStackTrap()
  256. return http.ListenAndServe(d.addr, r)
  257. }
  258. func (d *dnetConnection) IsManager() bool {
  259. return d.Orchestration.Manager
  260. }
  261. func (d *dnetConnection) IsAgent() bool {
  262. return d.Orchestration.Agent
  263. }
  264. func (d *dnetConnection) GetAdvertiseAddress() string {
  265. return d.Orchestration.Bind
  266. }
  267. func (d *dnetConnection) GetDataPathAddress() string {
  268. return d.Orchestration.Bind
  269. }
  270. func (d *dnetConnection) GetLocalAddress() string {
  271. return d.Orchestration.Bind
  272. }
  273. func (d *dnetConnection) GetListenAddress() string {
  274. return d.Orchestration.Bind
  275. }
  276. func (d *dnetConnection) GetRemoteAddressList() []string {
  277. return []string{d.Orchestration.Peer}
  278. }
  279. func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
  280. return nil
  281. }
  282. func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
  283. }
  284. func (d *dnetConnection) ListenClusterEvents() <-chan cluster.ConfigEventType {
  285. return d.configEvent
  286. }
  287. func (d *dnetConnection) AttachNetwork(string, string, []string) (*network.NetworkingConfig, error) {
  288. return nil, nil
  289. }
  290. func (d *dnetConnection) DetachNetwork(string, string) error {
  291. return nil
  292. }
  293. func (d *dnetConnection) UpdateAttachment(string, string, *network.NetworkingConfig) error {
  294. return nil
  295. }
  296. func (d *dnetConnection) WaitForDetachment(context.Context, string, string, string, string) error {
  297. return nil
  298. }
  299. func handleSignals(controller libnetwork.NetworkController) {
  300. c := make(chan os.Signal, 1)
  301. signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}
  302. signal.Notify(c, signals...)
  303. go func() {
  304. for range c {
  305. controller.Stop()
  306. os.Exit(0)
  307. }
  308. }()
  309. }
  310. func startTestDriver() error {
  311. mux := http.NewServeMux()
  312. server := httptest.NewServer(mux)
  313. if server == nil {
  314. return errors.New("Failed to start an HTTP Server")
  315. }
  316. mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
  317. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  318. fmt.Fprintf(w, `{"Implements": ["%s"]}`, driverapi.NetworkPluginEndpointType)
  319. })
  320. mux.HandleFunc(fmt.Sprintf("/%s.GetCapabilities", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  321. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  322. fmt.Fprint(w, `{"Scope":"global"}`)
  323. })
  324. mux.HandleFunc(fmt.Sprintf("/%s.CreateNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  325. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  326. fmt.Fprint(w, "null")
  327. })
  328. mux.HandleFunc(fmt.Sprintf("/%s.DeleteNetwork", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  329. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  330. fmt.Fprint(w, "null")
  331. })
  332. mux.HandleFunc(fmt.Sprintf("/%s.CreateEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  333. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  334. fmt.Fprint(w, "null")
  335. })
  336. mux.HandleFunc(fmt.Sprintf("/%s.DeleteEndpoint", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  337. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  338. fmt.Fprint(w, "null")
  339. })
  340. mux.HandleFunc(fmt.Sprintf("/%s.Join", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  341. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  342. fmt.Fprint(w, "null")
  343. })
  344. mux.HandleFunc(fmt.Sprintf("/%s.Leave", driverapi.NetworkPluginEndpointType), func(w http.ResponseWriter, r *http.Request) {
  345. w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1+json")
  346. fmt.Fprint(w, "null")
  347. })
  348. if err := os.MkdirAll("/etc/docker/plugins", 0755); err != nil {
  349. return err
  350. }
  351. if err := ioutil.WriteFile("/etc/docker/plugins/test.spec", []byte(server.URL), 0644); err != nil {
  352. return err
  353. }
  354. return nil
  355. }
  356. func newDnetConnection(val string) (*dnetConnection, error) {
  357. url, err := opts.ParseHost(false, val)
  358. if err != nil {
  359. return nil, err
  360. }
  361. protoAddrParts := strings.SplitN(url, "://", 2)
  362. if len(protoAddrParts) != 2 {
  363. return nil, errors.New("bad format, expected tcp://ADDR")
  364. }
  365. if strings.ToLower(protoAddrParts[0]) != "tcp" {
  366. return nil, errors.New("dnet currently only supports tcp transport")
  367. }
  368. return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan cluster.ConfigEventType, 10)}, nil
  369. }
  370. func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
  371. var in io.Reader
  372. in, err := encodeData(data)
  373. if err != nil {
  374. return nil, nil, -1, err
  375. }
  376. req, err := http.NewRequest(method, fmt.Sprintf("%s", path), in)
  377. if err != nil {
  378. return nil, nil, -1, err
  379. }
  380. setupRequestHeaders(method, data, req, headers)
  381. req.URL.Host = d.addr
  382. req.URL.Scheme = "http"
  383. httpClient := &http.Client{}
  384. resp, err := httpClient.Do(req)
  385. statusCode := -1
  386. if resp != nil {
  387. statusCode = resp.StatusCode
  388. }
  389. if err != nil {
  390. return nil, nil, statusCode, fmt.Errorf("error when trying to connect: %v", err)
  391. }
  392. if statusCode < 200 || statusCode >= 400 {
  393. body, err := ioutil.ReadAll(resp.Body)
  394. if err != nil {
  395. return nil, nil, statusCode, err
  396. }
  397. return nil, nil, statusCode, fmt.Errorf("error : %s", bytes.TrimSpace(body))
  398. }
  399. return resp.Body, resp.Header, statusCode, nil
  400. }
  401. func setupRequestHeaders(method string, data interface{}, req *http.Request, headers map[string][]string) {
  402. if data != nil {
  403. if headers == nil {
  404. headers = make(map[string][]string)
  405. }
  406. headers["Content-Type"] = []string{"application/json"}
  407. }
  408. expectedPayload := (method == "POST" || method == "PUT")
  409. if expectedPayload && req.Header.Get("Content-Type") == "" {
  410. req.Header.Set("Content-Type", "text/plain")
  411. }
  412. if headers != nil {
  413. for k, v := range headers {
  414. req.Header[k] = v
  415. }
  416. }
  417. }
  418. func encodeData(data interface{}) (*bytes.Buffer, error) {
  419. params := bytes.NewBuffer(nil)
  420. if data != nil {
  421. if err := json.NewEncoder(params).Encode(data); err != nil {
  422. return nil, err
  423. }
  424. }
  425. return params, nil
  426. }
  427. func ipamOption(bridgeName string) libnetwork.NetworkOption {
  428. if nws, _, err := netutils.ElectInterfaceAddresses(bridgeName); err == nil {
  429. ipamV4Conf := &libnetwork.IpamConf{PreferredPool: nws[0].String()}
  430. hip, _ := types.GetHostPartIP(nws[0].IP, nws[0].Mask)
  431. if hip.IsGlobalUnicast() {
  432. ipamV4Conf.Gateway = nws[0].IP.String()
  433. }
  434. return libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil, nil)
  435. }
  436. return nil
  437. }