splunk.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // Package splunk provides the log driver for forwarding server logs to
  2. // Splunk HTTP Event Collector endpoint.
  3. package splunk
  4. import (
  5. "bytes"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "time"
  16. "github.com/Sirupsen/logrus"
  17. "github.com/docker/docker/daemon/logger"
  18. "github.com/docker/docker/daemon/logger/loggerutils"
  19. "github.com/docker/docker/pkg/urlutil"
  20. )
  21. const (
  22. driverName = "splunk"
  23. splunkURLKey = "splunk-url"
  24. splunkTokenKey = "splunk-token"
  25. splunkSourceKey = "splunk-source"
  26. splunkSourceTypeKey = "splunk-sourcetype"
  27. splunkIndexKey = "splunk-index"
  28. splunkCAPathKey = "splunk-capath"
  29. splunkCANameKey = "splunk-caname"
  30. splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
  31. splunkFormatKey = "splunk-format"
  32. splunkVerifyConnectionKey = "splunk-verify-connection"
  33. envKey = "env"
  34. labelsKey = "labels"
  35. tagKey = "tag"
  36. )
  37. type splunkLogger struct {
  38. client *http.Client
  39. transport *http.Transport
  40. url string
  41. auth string
  42. nullMessage *splunkMessage
  43. }
  44. type splunkLoggerInline struct {
  45. *splunkLogger
  46. nullEvent *splunkMessageEvent
  47. }
  48. type splunkLoggerJSON struct {
  49. *splunkLoggerInline
  50. }
  51. type splunkLoggerRaw struct {
  52. *splunkLogger
  53. prefix []byte
  54. }
  55. type splunkMessage struct {
  56. Event interface{} `json:"event"`
  57. Time string `json:"time"`
  58. Host string `json:"host"`
  59. Source string `json:"source,omitempty"`
  60. SourceType string `json:"sourcetype,omitempty"`
  61. Index string `json:"index,omitempty"`
  62. }
  63. type splunkMessageEvent struct {
  64. Line interface{} `json:"line"`
  65. Source string `json:"source"`
  66. Tag string `json:"tag,omitempty"`
  67. Attrs map[string]string `json:"attrs,omitempty"`
  68. }
  69. const (
  70. splunkFormatRaw = "raw"
  71. splunkFormatJSON = "json"
  72. splunkFormatInline = "inline"
  73. )
  74. func init() {
  75. if err := logger.RegisterLogDriver(driverName, New); err != nil {
  76. logrus.Fatal(err)
  77. }
  78. if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
  79. logrus.Fatal(err)
  80. }
  81. }
  82. // New creates splunk logger driver using configuration passed in context
  83. func New(ctx logger.Context) (logger.Logger, error) {
  84. hostname, err := ctx.Hostname()
  85. if err != nil {
  86. return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
  87. }
  88. // Parse and validate Splunk URL
  89. splunkURL, err := parseURL(ctx)
  90. if err != nil {
  91. return nil, err
  92. }
  93. // Splunk Token is required parameter
  94. splunkToken, ok := ctx.Config[splunkTokenKey]
  95. if !ok {
  96. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
  97. }
  98. tlsConfig := &tls.Config{}
  99. // Splunk is using autogenerated certificates by default,
  100. // allow users to trust them with skipping verification
  101. if insecureSkipVerifyStr, ok := ctx.Config[splunkInsecureSkipVerifyKey]; ok {
  102. insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
  103. if err != nil {
  104. return nil, err
  105. }
  106. tlsConfig.InsecureSkipVerify = insecureSkipVerify
  107. }
  108. // If path to the root certificate is provided - load it
  109. if caPath, ok := ctx.Config[splunkCAPathKey]; ok {
  110. caCert, err := ioutil.ReadFile(caPath)
  111. if err != nil {
  112. return nil, err
  113. }
  114. caPool := x509.NewCertPool()
  115. caPool.AppendCertsFromPEM(caCert)
  116. tlsConfig.RootCAs = caPool
  117. }
  118. if caName, ok := ctx.Config[splunkCANameKey]; ok {
  119. tlsConfig.ServerName = caName
  120. }
  121. transport := &http.Transport{
  122. TLSClientConfig: tlsConfig,
  123. }
  124. client := &http.Client{
  125. Transport: transport,
  126. }
  127. source := ctx.Config[splunkSourceKey]
  128. sourceType := ctx.Config[splunkSourceTypeKey]
  129. index := ctx.Config[splunkIndexKey]
  130. var nullMessage = &splunkMessage{
  131. Host: hostname,
  132. Source: source,
  133. SourceType: sourceType,
  134. Index: index,
  135. }
  136. tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
  137. if err != nil {
  138. return nil, err
  139. }
  140. attrs := ctx.ExtraAttributes(nil)
  141. logger := &splunkLogger{
  142. client: client,
  143. transport: transport,
  144. url: splunkURL.String(),
  145. auth: "Splunk " + splunkToken,
  146. nullMessage: nullMessage,
  147. }
  148. // By default we verify connection, but we allow use to skip that
  149. verifyConnection := true
  150. if verifyConnectionStr, ok := ctx.Config[splunkVerifyConnectionKey]; ok {
  151. var err error
  152. verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
  153. if err != nil {
  154. return nil, err
  155. }
  156. }
  157. if verifyConnection {
  158. err = verifySplunkConnection(logger)
  159. if err != nil {
  160. return nil, err
  161. }
  162. }
  163. var splunkFormat string
  164. if splunkFormatParsed, ok := ctx.Config[splunkFormatKey]; ok {
  165. switch splunkFormatParsed {
  166. case splunkFormatInline:
  167. case splunkFormatJSON:
  168. case splunkFormatRaw:
  169. default:
  170. return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
  171. }
  172. splunkFormat = splunkFormatParsed
  173. } else {
  174. splunkFormat = splunkFormatInline
  175. }
  176. switch splunkFormat {
  177. case splunkFormatInline:
  178. nullEvent := &splunkMessageEvent{
  179. Tag: tag,
  180. Attrs: attrs,
  181. }
  182. return &splunkLoggerInline{logger, nullEvent}, nil
  183. case splunkFormatJSON:
  184. nullEvent := &splunkMessageEvent{
  185. Tag: tag,
  186. Attrs: attrs,
  187. }
  188. return &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}, nil
  189. case splunkFormatRaw:
  190. var prefix bytes.Buffer
  191. prefix.WriteString(tag)
  192. prefix.WriteString(" ")
  193. for key, value := range attrs {
  194. prefix.WriteString(key)
  195. prefix.WriteString("=")
  196. prefix.WriteString(value)
  197. prefix.WriteString(" ")
  198. }
  199. return &splunkLoggerRaw{logger, prefix.Bytes()}, nil
  200. default:
  201. return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
  202. }
  203. }
  204. func (l *splunkLoggerInline) Log(msg *logger.Message) error {
  205. message := l.createSplunkMessage(msg)
  206. event := *l.nullEvent
  207. event.Line = string(msg.Line)
  208. event.Source = msg.Source
  209. message.Event = &event
  210. return l.postMessage(message)
  211. }
  212. func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
  213. message := l.createSplunkMessage(msg)
  214. event := *l.nullEvent
  215. var rawJSONMessage json.RawMessage
  216. if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
  217. event.Line = &rawJSONMessage
  218. } else {
  219. event.Line = string(msg.Line)
  220. }
  221. event.Source = msg.Source
  222. message.Event = &event
  223. return l.postMessage(message)
  224. }
  225. func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
  226. message := l.createSplunkMessage(msg)
  227. message.Event = string(append(l.prefix, msg.Line...))
  228. return l.postMessage(message)
  229. }
  230. func (l *splunkLogger) postMessage(message *splunkMessage) error {
  231. jsonEvent, err := json.Marshal(message)
  232. if err != nil {
  233. return err
  234. }
  235. req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(jsonEvent))
  236. if err != nil {
  237. return err
  238. }
  239. req.Header.Set("Authorization", l.auth)
  240. res, err := l.client.Do(req)
  241. if err != nil {
  242. return err
  243. }
  244. defer res.Body.Close()
  245. if res.StatusCode != http.StatusOK {
  246. var body []byte
  247. body, err = ioutil.ReadAll(res.Body)
  248. if err != nil {
  249. return err
  250. }
  251. return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body)
  252. }
  253. io.Copy(ioutil.Discard, res.Body)
  254. return nil
  255. }
  256. func (l *splunkLogger) Close() error {
  257. l.transport.CloseIdleConnections()
  258. return nil
  259. }
  260. func (l *splunkLogger) Name() string {
  261. return driverName
  262. }
  263. func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
  264. message := *l.nullMessage
  265. message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
  266. return &message
  267. }
  268. // ValidateLogOpt looks for all supported by splunk driver options
  269. func ValidateLogOpt(cfg map[string]string) error {
  270. for key := range cfg {
  271. switch key {
  272. case splunkURLKey:
  273. case splunkTokenKey:
  274. case splunkSourceKey:
  275. case splunkSourceTypeKey:
  276. case splunkIndexKey:
  277. case splunkCAPathKey:
  278. case splunkCANameKey:
  279. case splunkInsecureSkipVerifyKey:
  280. case splunkFormatKey:
  281. case splunkVerifyConnectionKey:
  282. case envKey:
  283. case labelsKey:
  284. case tagKey:
  285. default:
  286. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
  287. }
  288. }
  289. return nil
  290. }
  291. func parseURL(ctx logger.Context) (*url.URL, error) {
  292. splunkURLStr, ok := ctx.Config[splunkURLKey]
  293. if !ok {
  294. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
  295. }
  296. splunkURL, err := url.Parse(splunkURLStr)
  297. if err != nil {
  298. return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
  299. }
  300. if !urlutil.IsURL(splunkURLStr) ||
  301. !splunkURL.IsAbs() ||
  302. (splunkURL.Path != "" && splunkURL.Path != "/") ||
  303. splunkURL.RawQuery != "" ||
  304. splunkURL.Fragment != "" {
  305. return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
  306. }
  307. splunkURL.Path = "/services/collector/event/1.0"
  308. return splunkURL, nil
  309. }
  310. func verifySplunkConnection(l *splunkLogger) error {
  311. req, err := http.NewRequest("OPTIONS", l.url, nil)
  312. if err != nil {
  313. return err
  314. }
  315. res, err := l.client.Do(req)
  316. if err != nil {
  317. return err
  318. }
  319. if res.Body != nil {
  320. defer res.Body.Close()
  321. }
  322. if res.StatusCode != http.StatusOK {
  323. var body []byte
  324. body, err = ioutil.ReadAll(res.Body)
  325. if err != nil {
  326. return err
  327. }
  328. return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body)
  329. }
  330. return nil
  331. }