splunk.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. // Package splunk provides the log driver for forwarding server logs to
  2. // Splunk HTTP Event Collector endpoint.
  3. package splunk
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "encoding/json"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "github.com/Sirupsen/logrus"
  20. "github.com/docker/docker/daemon/logger"
  21. "github.com/docker/docker/daemon/logger/loggerutils"
  22. "github.com/docker/docker/pkg/urlutil"
  23. )
  24. const (
  25. driverName = "splunk"
  26. splunkURLKey = "splunk-url"
  27. splunkTokenKey = "splunk-token"
  28. splunkSourceKey = "splunk-source"
  29. splunkSourceTypeKey = "splunk-sourcetype"
  30. splunkIndexKey = "splunk-index"
  31. splunkCAPathKey = "splunk-capath"
  32. splunkCANameKey = "splunk-caname"
  33. splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
  34. splunkFormatKey = "splunk-format"
  35. splunkVerifyConnectionKey = "splunk-verify-connection"
  36. splunkGzipCompressionKey = "splunk-gzip"
  37. splunkGzipCompressionLevelKey = "splunk-gzip-level"
  38. envKey = "env"
  39. envRegexKey = "env-regex"
  40. labelsKey = "labels"
  41. tagKey = "tag"
  42. )
  43. const (
  44. // How often do we send messages (if we are not reaching batch size)
  45. defaultPostMessagesFrequency = 5 * time.Second
  46. // How big can be batch of messages
  47. defaultPostMessagesBatchSize = 1000
  48. // Maximum number of messages we can store in buffer
  49. defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
  50. // Number of messages allowed to be queued in the channel
  51. defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
  52. )
  53. const (
  54. envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
  55. envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
  56. envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
  57. envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
  58. )
  59. type splunkLoggerInterface interface {
  60. logger.Logger
  61. worker()
  62. }
  63. type splunkLogger struct {
  64. client *http.Client
  65. transport *http.Transport
  66. url string
  67. auth string
  68. nullMessage *splunkMessage
  69. // http compression
  70. gzipCompression bool
  71. gzipCompressionLevel int
  72. // Advanced options
  73. postMessagesFrequency time.Duration
  74. postMessagesBatchSize int
  75. bufferMaximum int
  76. // For synchronization between background worker and logger.
  77. // We use channel to send messages to worker go routine.
  78. // All other variables for blocking Close call before we flush all messages to HEC
  79. stream chan *splunkMessage
  80. lock sync.RWMutex
  81. closed bool
  82. closedCond *sync.Cond
  83. }
  84. type splunkLoggerInline struct {
  85. *splunkLogger
  86. nullEvent *splunkMessageEvent
  87. }
  88. type splunkLoggerJSON struct {
  89. *splunkLoggerInline
  90. }
  91. type splunkLoggerRaw struct {
  92. *splunkLogger
  93. prefix []byte
  94. }
  95. type splunkMessage struct {
  96. Event interface{} `json:"event"`
  97. Time string `json:"time"`
  98. Host string `json:"host"`
  99. Source string `json:"source,omitempty"`
  100. SourceType string `json:"sourcetype,omitempty"`
  101. Index string `json:"index,omitempty"`
  102. }
  103. type splunkMessageEvent struct {
  104. Line interface{} `json:"line"`
  105. Source string `json:"source"`
  106. Tag string `json:"tag,omitempty"`
  107. Attrs map[string]string `json:"attrs,omitempty"`
  108. }
  109. const (
  110. splunkFormatRaw = "raw"
  111. splunkFormatJSON = "json"
  112. splunkFormatInline = "inline"
  113. )
  114. func init() {
  115. if err := logger.RegisterLogDriver(driverName, New); err != nil {
  116. logrus.Fatal(err)
  117. }
  118. if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
  119. logrus.Fatal(err)
  120. }
  121. }
  122. // New creates splunk logger driver using configuration passed in context
  123. func New(info logger.Info) (logger.Logger, error) {
  124. hostname, err := info.Hostname()
  125. if err != nil {
  126. return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
  127. }
  128. // Parse and validate Splunk URL
  129. splunkURL, err := parseURL(info)
  130. if err != nil {
  131. return nil, err
  132. }
  133. // Splunk Token is required parameter
  134. splunkToken, ok := info.Config[splunkTokenKey]
  135. if !ok {
  136. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
  137. }
  138. tlsConfig := &tls.Config{}
  139. // Splunk is using autogenerated certificates by default,
  140. // allow users to trust them with skipping verification
  141. if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok {
  142. insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
  143. if err != nil {
  144. return nil, err
  145. }
  146. tlsConfig.InsecureSkipVerify = insecureSkipVerify
  147. }
  148. // If path to the root certificate is provided - load it
  149. if caPath, ok := info.Config[splunkCAPathKey]; ok {
  150. caCert, err := ioutil.ReadFile(caPath)
  151. if err != nil {
  152. return nil, err
  153. }
  154. caPool := x509.NewCertPool()
  155. caPool.AppendCertsFromPEM(caCert)
  156. tlsConfig.RootCAs = caPool
  157. }
  158. if caName, ok := info.Config[splunkCANameKey]; ok {
  159. tlsConfig.ServerName = caName
  160. }
  161. gzipCompression := false
  162. if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok {
  163. gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
  164. if err != nil {
  165. return nil, err
  166. }
  167. }
  168. gzipCompressionLevel := gzip.DefaultCompression
  169. if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok {
  170. var err error
  171. gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
  172. if err != nil {
  173. return nil, err
  174. }
  175. gzipCompressionLevel = int(gzipCompressionLevel64)
  176. if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
  177. err := fmt.Errorf("Not supported level '%s' for %s (supported values between %d and %d).",
  178. gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
  179. return nil, err
  180. }
  181. }
  182. transport := &http.Transport{
  183. TLSClientConfig: tlsConfig,
  184. }
  185. client := &http.Client{
  186. Transport: transport,
  187. }
  188. source := info.Config[splunkSourceKey]
  189. sourceType := info.Config[splunkSourceTypeKey]
  190. index := info.Config[splunkIndexKey]
  191. var nullMessage = &splunkMessage{
  192. Host: hostname,
  193. Source: source,
  194. SourceType: sourceType,
  195. Index: index,
  196. }
  197. // Allow user to remove tag from the messages by setting tag to empty string
  198. tag := ""
  199. if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" {
  200. tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  201. if err != nil {
  202. return nil, err
  203. }
  204. }
  205. attrs, err := info.ExtraAttributes(nil)
  206. if err != nil {
  207. return nil, err
  208. }
  209. var (
  210. postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
  211. postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
  212. bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
  213. streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
  214. )
  215. logger := &splunkLogger{
  216. client: client,
  217. transport: transport,
  218. url: splunkURL.String(),
  219. auth: "Splunk " + splunkToken,
  220. nullMessage: nullMessage,
  221. gzipCompression: gzipCompression,
  222. gzipCompressionLevel: gzipCompressionLevel,
  223. stream: make(chan *splunkMessage, streamChannelSize),
  224. postMessagesFrequency: postMessagesFrequency,
  225. postMessagesBatchSize: postMessagesBatchSize,
  226. bufferMaximum: bufferMaximum,
  227. }
  228. // By default we verify connection, but we allow use to skip that
  229. verifyConnection := true
  230. if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok {
  231. var err error
  232. verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
  233. if err != nil {
  234. return nil, err
  235. }
  236. }
  237. if verifyConnection {
  238. err = verifySplunkConnection(logger)
  239. if err != nil {
  240. return nil, err
  241. }
  242. }
  243. var splunkFormat string
  244. if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok {
  245. switch splunkFormatParsed {
  246. case splunkFormatInline:
  247. case splunkFormatJSON:
  248. case splunkFormatRaw:
  249. default:
  250. return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
  251. }
  252. splunkFormat = splunkFormatParsed
  253. } else {
  254. splunkFormat = splunkFormatInline
  255. }
  256. var loggerWrapper splunkLoggerInterface
  257. switch splunkFormat {
  258. case splunkFormatInline:
  259. nullEvent := &splunkMessageEvent{
  260. Tag: tag,
  261. Attrs: attrs,
  262. }
  263. loggerWrapper = &splunkLoggerInline{logger, nullEvent}
  264. case splunkFormatJSON:
  265. nullEvent := &splunkMessageEvent{
  266. Tag: tag,
  267. Attrs: attrs,
  268. }
  269. loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
  270. case splunkFormatRaw:
  271. var prefix bytes.Buffer
  272. if tag != "" {
  273. prefix.WriteString(tag)
  274. prefix.WriteString(" ")
  275. }
  276. for key, value := range attrs {
  277. prefix.WriteString(key)
  278. prefix.WriteString("=")
  279. prefix.WriteString(value)
  280. prefix.WriteString(" ")
  281. }
  282. loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
  283. default:
  284. return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
  285. }
  286. go loggerWrapper.worker()
  287. return loggerWrapper, nil
  288. }
  289. func (l *splunkLoggerInline) Log(msg *logger.Message) error {
  290. message := l.createSplunkMessage(msg)
  291. event := *l.nullEvent
  292. event.Line = string(msg.Line)
  293. event.Source = msg.Source
  294. message.Event = &event
  295. logger.PutMessage(msg)
  296. return l.queueMessageAsync(message)
  297. }
  298. func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
  299. message := l.createSplunkMessage(msg)
  300. event := *l.nullEvent
  301. var rawJSONMessage json.RawMessage
  302. if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
  303. event.Line = &rawJSONMessage
  304. } else {
  305. event.Line = string(msg.Line)
  306. }
  307. event.Source = msg.Source
  308. message.Event = &event
  309. logger.PutMessage(msg)
  310. return l.queueMessageAsync(message)
  311. }
  312. func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
  313. message := l.createSplunkMessage(msg)
  314. message.Event = string(append(l.prefix, msg.Line...))
  315. logger.PutMessage(msg)
  316. return l.queueMessageAsync(message)
  317. }
  318. func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
  319. l.lock.RLock()
  320. defer l.lock.RUnlock()
  321. if l.closedCond != nil {
  322. return fmt.Errorf("%s: driver is closed", driverName)
  323. }
  324. l.stream <- message
  325. return nil
  326. }
  327. func (l *splunkLogger) worker() {
  328. timer := time.NewTicker(l.postMessagesFrequency)
  329. var messages []*splunkMessage
  330. for {
  331. select {
  332. case message, open := <-l.stream:
  333. if !open {
  334. l.postMessages(messages, true)
  335. l.lock.Lock()
  336. defer l.lock.Unlock()
  337. l.transport.CloseIdleConnections()
  338. l.closed = true
  339. l.closedCond.Signal()
  340. return
  341. }
  342. messages = append(messages, message)
  343. // Only sending when we get exactly to the batch size,
  344. // This also helps not to fire postMessages on every new message,
  345. // when previous try failed.
  346. if len(messages)%l.postMessagesBatchSize == 0 {
  347. messages = l.postMessages(messages, false)
  348. }
  349. case <-timer.C:
  350. messages = l.postMessages(messages, false)
  351. }
  352. }
  353. }
  354. func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
  355. messagesLen := len(messages)
  356. for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
  357. upperBound := i + l.postMessagesBatchSize
  358. if upperBound > messagesLen {
  359. upperBound = messagesLen
  360. }
  361. if err := l.tryPostMessages(messages[i:upperBound]); err != nil {
  362. logrus.Error(err)
  363. if messagesLen-i >= l.bufferMaximum || lastChance {
  364. // If this is last chance - print them all to the daemon log
  365. if lastChance {
  366. upperBound = messagesLen
  367. }
  368. // Not all sent, but buffer has got to its maximum, let's log all messages
  369. // we could not send and return buffer minus one batch size
  370. for j := i; j < upperBound; j++ {
  371. if jsonEvent, err := json.Marshal(messages[j]); err != nil {
  372. logrus.Error(err)
  373. } else {
  374. logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
  375. }
  376. }
  377. return messages[upperBound:messagesLen]
  378. }
  379. // Not all sent, returning buffer from where we have not sent messages
  380. return messages[i:messagesLen]
  381. }
  382. }
  383. // All sent, return empty buffer
  384. return messages[:0]
  385. }
  386. func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error {
  387. if len(messages) == 0 {
  388. return nil
  389. }
  390. var buffer bytes.Buffer
  391. var writer io.Writer
  392. var gzipWriter *gzip.Writer
  393. var err error
  394. // If gzip compression is enabled - create gzip writer with specified compression
  395. // level. If gzip compression is disabled, use standard buffer as a writer
  396. if l.gzipCompression {
  397. gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
  398. if err != nil {
  399. return err
  400. }
  401. writer = gzipWriter
  402. } else {
  403. writer = &buffer
  404. }
  405. for _, message := range messages {
  406. jsonEvent, err := json.Marshal(message)
  407. if err != nil {
  408. return err
  409. }
  410. if _, err := writer.Write(jsonEvent); err != nil {
  411. return err
  412. }
  413. }
  414. // If gzip compression is enabled, tell it, that we are done
  415. if l.gzipCompression {
  416. err = gzipWriter.Close()
  417. if err != nil {
  418. return err
  419. }
  420. }
  421. req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
  422. if err != nil {
  423. return err
  424. }
  425. req.Header.Set("Authorization", l.auth)
  426. // Tell if we are sending gzip compressed body
  427. if l.gzipCompression {
  428. req.Header.Set("Content-Encoding", "gzip")
  429. }
  430. res, err := l.client.Do(req)
  431. if err != nil {
  432. return err
  433. }
  434. defer res.Body.Close()
  435. if res.StatusCode != http.StatusOK {
  436. var body []byte
  437. body, err = ioutil.ReadAll(res.Body)
  438. if err != nil {
  439. return err
  440. }
  441. return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body)
  442. }
  443. io.Copy(ioutil.Discard, res.Body)
  444. return nil
  445. }
  446. func (l *splunkLogger) Close() error {
  447. l.lock.Lock()
  448. defer l.lock.Unlock()
  449. if l.closedCond == nil {
  450. l.closedCond = sync.NewCond(&l.lock)
  451. close(l.stream)
  452. for !l.closed {
  453. l.closedCond.Wait()
  454. }
  455. }
  456. return nil
  457. }
  458. func (l *splunkLogger) Name() string {
  459. return driverName
  460. }
  461. func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
  462. message := *l.nullMessage
  463. message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
  464. return &message
  465. }
  466. // ValidateLogOpt looks for all supported by splunk driver options
  467. func ValidateLogOpt(cfg map[string]string) error {
  468. for key := range cfg {
  469. switch key {
  470. case splunkURLKey:
  471. case splunkTokenKey:
  472. case splunkSourceKey:
  473. case splunkSourceTypeKey:
  474. case splunkIndexKey:
  475. case splunkCAPathKey:
  476. case splunkCANameKey:
  477. case splunkInsecureSkipVerifyKey:
  478. case splunkFormatKey:
  479. case splunkVerifyConnectionKey:
  480. case splunkGzipCompressionKey:
  481. case splunkGzipCompressionLevelKey:
  482. case envKey:
  483. case envRegexKey:
  484. case labelsKey:
  485. case tagKey:
  486. default:
  487. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
  488. }
  489. }
  490. return nil
  491. }
  492. func parseURL(info logger.Info) (*url.URL, error) {
  493. splunkURLStr, ok := info.Config[splunkURLKey]
  494. if !ok {
  495. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
  496. }
  497. splunkURL, err := url.Parse(splunkURLStr)
  498. if err != nil {
  499. return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
  500. }
  501. if !urlutil.IsURL(splunkURLStr) ||
  502. !splunkURL.IsAbs() ||
  503. (splunkURL.Path != "" && splunkURL.Path != "/") ||
  504. splunkURL.RawQuery != "" ||
  505. splunkURL.Fragment != "" {
  506. return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
  507. }
  508. splunkURL.Path = "/services/collector/event/1.0"
  509. return splunkURL, nil
  510. }
  511. func verifySplunkConnection(l *splunkLogger) error {
  512. req, err := http.NewRequest(http.MethodOptions, l.url, nil)
  513. if err != nil {
  514. return err
  515. }
  516. res, err := l.client.Do(req)
  517. if err != nil {
  518. return err
  519. }
  520. if res.Body != nil {
  521. defer res.Body.Close()
  522. }
  523. if res.StatusCode != http.StatusOK {
  524. var body []byte
  525. body, err = ioutil.ReadAll(res.Body)
  526. if err != nil {
  527. return err
  528. }
  529. return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body)
  530. }
  531. return nil
  532. }
  533. func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
  534. valueStr := os.Getenv(envName)
  535. if valueStr == "" {
  536. return defaultValue
  537. }
  538. parsedValue, err := time.ParseDuration(valueStr)
  539. if err != nil {
  540. logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
  541. return defaultValue
  542. }
  543. return parsedValue
  544. }
  545. func getAdvancedOptionInt(envName string, defaultValue int) int {
  546. valueStr := os.Getenv(envName)
  547. if valueStr == "" {
  548. return defaultValue
  549. }
  550. parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
  551. if err != nil {
  552. logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
  553. return defaultValue
  554. }
  555. return int(parsedValue)
  556. }