api.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package api
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. // QueryOptions are used to parameterize a query
  18. type QueryOptions struct {
  19. // Providing a datacenter overwrites the DC provided
  20. // by the Config
  21. Datacenter string
  22. // AllowStale allows any Consul server (non-leader) to service
  23. // a read. This allows for lower latency and higher throughput
  24. AllowStale bool
  25. // RequireConsistent forces the read to be fully consistent.
  26. // This is more expensive but prevents ever performing a stale
  27. // read.
  28. RequireConsistent bool
  29. // WaitIndex is used to enable a blocking query. Waits
  30. // until the timeout or the next index is reached
  31. WaitIndex uint64
  32. // WaitTime is used to bound the duration of a wait.
  33. // Defaults to that of the Config, but can be overriden.
  34. WaitTime time.Duration
  35. // Token is used to provide a per-request ACL token
  36. // which overrides the agent's default token.
  37. Token string
  38. }
  39. // WriteOptions are used to parameterize a write
  40. type WriteOptions struct {
  41. // Providing a datacenter overwrites the DC provided
  42. // by the Config
  43. Datacenter string
  44. // Token is used to provide a per-request ACL token
  45. // which overrides the agent's default token.
  46. Token string
  47. }
  48. // QueryMeta is used to return meta data about a query
  49. type QueryMeta struct {
  50. // LastIndex. This can be used as a WaitIndex to perform
  51. // a blocking query
  52. LastIndex uint64
  53. // Time of last contact from the leader for the
  54. // server servicing the request
  55. LastContact time.Duration
  56. // Is there a known leader
  57. KnownLeader bool
  58. // How long did the request take
  59. RequestTime time.Duration
  60. }
  61. // WriteMeta is used to return meta data about a write
  62. type WriteMeta struct {
  63. // How long did the request take
  64. RequestTime time.Duration
  65. }
  66. // HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
  67. type HttpBasicAuth struct {
  68. // Username to use for HTTP Basic Authentication
  69. Username string
  70. // Password to use for HTTP Basic Authentication
  71. Password string
  72. }
  73. // Config is used to configure the creation of a client
  74. type Config struct {
  75. // Address is the address of the Consul server
  76. Address string
  77. // Scheme is the URI scheme for the Consul server
  78. Scheme string
  79. // Datacenter to use. If not provided, the default agent datacenter is used.
  80. Datacenter string
  81. // HttpClient is the client to use. Default will be
  82. // used if not provided.
  83. HttpClient *http.Client
  84. // HttpAuth is the auth info to use for http access.
  85. HttpAuth *HttpBasicAuth
  86. // WaitTime limits how long a Watch will block. If not provided,
  87. // the agent default values will be used.
  88. WaitTime time.Duration
  89. // Token is used to provide a per-request ACL token
  90. // which overrides the agent's default token.
  91. Token string
  92. }
  93. // DefaultConfig returns a default configuration for the client
  94. func DefaultConfig() *Config {
  95. config := &Config{
  96. Address: "127.0.0.1:8500",
  97. Scheme: "http",
  98. HttpClient: http.DefaultClient,
  99. }
  100. if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
  101. config.Address = addr
  102. }
  103. if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
  104. config.Token = token
  105. }
  106. if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" {
  107. var username, password string
  108. if strings.Contains(auth, ":") {
  109. split := strings.SplitN(auth, ":", 2)
  110. username = split[0]
  111. password = split[1]
  112. } else {
  113. username = auth
  114. }
  115. config.HttpAuth = &HttpBasicAuth{
  116. Username: username,
  117. Password: password,
  118. }
  119. }
  120. if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" {
  121. enabled, err := strconv.ParseBool(ssl)
  122. if err != nil {
  123. log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err)
  124. }
  125. if enabled {
  126. config.Scheme = "https"
  127. }
  128. }
  129. if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" {
  130. doVerify, err := strconv.ParseBool(verify)
  131. if err != nil {
  132. log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err)
  133. }
  134. if !doVerify {
  135. config.HttpClient.Transport = &http.Transport{
  136. TLSClientConfig: &tls.Config{
  137. InsecureSkipVerify: true,
  138. },
  139. }
  140. }
  141. }
  142. return config
  143. }
  144. // Client provides a client to the Consul API
  145. type Client struct {
  146. config Config
  147. }
  148. // NewClient returns a new client
  149. func NewClient(config *Config) (*Client, error) {
  150. // bootstrap the config
  151. defConfig := DefaultConfig()
  152. if len(config.Address) == 0 {
  153. config.Address = defConfig.Address
  154. }
  155. if len(config.Scheme) == 0 {
  156. config.Scheme = defConfig.Scheme
  157. }
  158. if config.HttpClient == nil {
  159. config.HttpClient = defConfig.HttpClient
  160. }
  161. if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
  162. config.HttpClient = &http.Client{
  163. Transport: &http.Transport{
  164. Dial: func(_, _ string) (net.Conn, error) {
  165. return net.Dial("unix", parts[1])
  166. },
  167. },
  168. }
  169. config.Address = parts[1]
  170. }
  171. client := &Client{
  172. config: *config,
  173. }
  174. return client, nil
  175. }
  176. // request is used to help build up a request
  177. type request struct {
  178. config *Config
  179. method string
  180. url *url.URL
  181. params url.Values
  182. body io.Reader
  183. obj interface{}
  184. }
  185. // setQueryOptions is used to annotate the request with
  186. // additional query options
  187. func (r *request) setQueryOptions(q *QueryOptions) {
  188. if q == nil {
  189. return
  190. }
  191. if q.Datacenter != "" {
  192. r.params.Set("dc", q.Datacenter)
  193. }
  194. if q.AllowStale {
  195. r.params.Set("stale", "")
  196. }
  197. if q.RequireConsistent {
  198. r.params.Set("consistent", "")
  199. }
  200. if q.WaitIndex != 0 {
  201. r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
  202. }
  203. if q.WaitTime != 0 {
  204. r.params.Set("wait", durToMsec(q.WaitTime))
  205. }
  206. if q.Token != "" {
  207. r.params.Set("token", q.Token)
  208. }
  209. }
  210. // durToMsec converts a duration to a millisecond specified string
  211. func durToMsec(dur time.Duration) string {
  212. return fmt.Sprintf("%dms", dur/time.Millisecond)
  213. }
  214. // setWriteOptions is used to annotate the request with
  215. // additional write options
  216. func (r *request) setWriteOptions(q *WriteOptions) {
  217. if q == nil {
  218. return
  219. }
  220. if q.Datacenter != "" {
  221. r.params.Set("dc", q.Datacenter)
  222. }
  223. if q.Token != "" {
  224. r.params.Set("token", q.Token)
  225. }
  226. }
  227. // toHTTP converts the request to an HTTP request
  228. func (r *request) toHTTP() (*http.Request, error) {
  229. // Encode the query parameters
  230. r.url.RawQuery = r.params.Encode()
  231. // Check if we should encode the body
  232. if r.body == nil && r.obj != nil {
  233. if b, err := encodeBody(r.obj); err != nil {
  234. return nil, err
  235. } else {
  236. r.body = b
  237. }
  238. }
  239. // Create the HTTP request
  240. req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
  241. if err != nil {
  242. return nil, err
  243. }
  244. req.URL.Host = r.url.Host
  245. req.URL.Scheme = r.url.Scheme
  246. req.Host = r.url.Host
  247. // Setup auth
  248. if r.config.HttpAuth != nil {
  249. req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
  250. }
  251. return req, nil
  252. }
  253. // newRequest is used to create a new request
  254. func (c *Client) newRequest(method, path string) *request {
  255. r := &request{
  256. config: &c.config,
  257. method: method,
  258. url: &url.URL{
  259. Scheme: c.config.Scheme,
  260. Host: c.config.Address,
  261. Path: path,
  262. },
  263. params: make(map[string][]string),
  264. }
  265. if c.config.Datacenter != "" {
  266. r.params.Set("dc", c.config.Datacenter)
  267. }
  268. if c.config.WaitTime != 0 {
  269. r.params.Set("wait", durToMsec(r.config.WaitTime))
  270. }
  271. if c.config.Token != "" {
  272. r.params.Set("token", r.config.Token)
  273. }
  274. return r
  275. }
  276. // doRequest runs a request with our client
  277. func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
  278. req, err := r.toHTTP()
  279. if err != nil {
  280. return 0, nil, err
  281. }
  282. start := time.Now()
  283. resp, err := c.config.HttpClient.Do(req)
  284. diff := time.Now().Sub(start)
  285. return diff, resp, err
  286. }
  287. // Query is used to do a GET request against an endpoint
  288. // and deserialize the response into an interface using
  289. // standard Consul conventions.
  290. func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
  291. r := c.newRequest("GET", endpoint)
  292. r.setQueryOptions(q)
  293. rtt, resp, err := requireOK(c.doRequest(r))
  294. if err != nil {
  295. return nil, err
  296. }
  297. defer resp.Body.Close()
  298. qm := &QueryMeta{}
  299. parseQueryMeta(resp, qm)
  300. qm.RequestTime = rtt
  301. if err := decodeBody(resp, out); err != nil {
  302. return nil, err
  303. }
  304. return qm, nil
  305. }
  306. // write is used to do a PUT request against an endpoint
  307. // and serialize/deserialized using the standard Consul conventions.
  308. func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
  309. r := c.newRequest("PUT", endpoint)
  310. r.setWriteOptions(q)
  311. r.obj = in
  312. rtt, resp, err := requireOK(c.doRequest(r))
  313. if err != nil {
  314. return nil, err
  315. }
  316. defer resp.Body.Close()
  317. wm := &WriteMeta{RequestTime: rtt}
  318. if out != nil {
  319. if err := decodeBody(resp, &out); err != nil {
  320. return nil, err
  321. }
  322. }
  323. return wm, nil
  324. }
  325. // parseQueryMeta is used to help parse query meta-data
  326. func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
  327. header := resp.Header
  328. // Parse the X-Consul-Index
  329. index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64)
  330. if err != nil {
  331. return fmt.Errorf("Failed to parse X-Consul-Index: %v", err)
  332. }
  333. q.LastIndex = index
  334. // Parse the X-Consul-LastContact
  335. last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64)
  336. if err != nil {
  337. return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err)
  338. }
  339. q.LastContact = time.Duration(last) * time.Millisecond
  340. // Parse the X-Consul-KnownLeader
  341. switch header.Get("X-Consul-KnownLeader") {
  342. case "true":
  343. q.KnownLeader = true
  344. default:
  345. q.KnownLeader = false
  346. }
  347. return nil
  348. }
  349. // decodeBody is used to JSON decode a body
  350. func decodeBody(resp *http.Response, out interface{}) error {
  351. dec := json.NewDecoder(resp.Body)
  352. return dec.Decode(out)
  353. }
  354. // encodeBody is used to encode a request body
  355. func encodeBody(obj interface{}) (io.Reader, error) {
  356. buf := bytes.NewBuffer(nil)
  357. enc := json.NewEncoder(buf)
  358. if err := enc.Encode(obj); err != nil {
  359. return nil, err
  360. }
  361. return buf, nil
  362. }
  363. // requireOK is used to wrap doRequest and check for a 200
  364. func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
  365. if e != nil {
  366. if resp != nil {
  367. resp.Body.Close()
  368. }
  369. return d, nil, e
  370. }
  371. if resp.StatusCode != 200 {
  372. var buf bytes.Buffer
  373. io.Copy(&buf, resp.Body)
  374. resp.Body.Close()
  375. return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
  376. }
  377. return d, resp, nil
  378. }