splunk.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. // Package splunk provides the log driver for forwarding server logs to
  2. // Splunk HTTP Event Collector endpoint.
  3. package splunk
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "encoding/json"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "time"
  20. "github.com/docker/docker/daemon/logger"
  21. "github.com/docker/docker/daemon/logger/loggerutils"
  22. "github.com/docker/docker/pkg/urlutil"
  23. "github.com/sirupsen/logrus"
  24. )
  25. const (
  26. driverName = "splunk"
  27. splunkURLKey = "splunk-url"
  28. splunkTokenKey = "splunk-token"
  29. splunkSourceKey = "splunk-source"
  30. splunkSourceTypeKey = "splunk-sourcetype"
  31. splunkIndexKey = "splunk-index"
  32. splunkCAPathKey = "splunk-capath"
  33. splunkCANameKey = "splunk-caname"
  34. splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
  35. splunkFormatKey = "splunk-format"
  36. splunkVerifyConnectionKey = "splunk-verify-connection"
  37. splunkGzipCompressionKey = "splunk-gzip"
  38. splunkGzipCompressionLevelKey = "splunk-gzip-level"
  39. envKey = "env"
  40. envRegexKey = "env-regex"
  41. labelsKey = "labels"
  42. tagKey = "tag"
  43. )
  44. const (
  45. // How often do we send messages (if we are not reaching batch size)
  46. defaultPostMessagesFrequency = 5 * time.Second
  47. // How big can be batch of messages
  48. defaultPostMessagesBatchSize = 1000
  49. // Maximum number of messages we can store in buffer
  50. defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
  51. // Number of messages allowed to be queued in the channel
  52. defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
  53. )
  54. const (
  55. envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
  56. envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
  57. envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
  58. envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
  59. )
  60. type splunkLoggerInterface interface {
  61. logger.Logger
  62. worker()
  63. }
  64. type splunkLogger struct {
  65. client *http.Client
  66. transport *http.Transport
  67. url string
  68. auth string
  69. nullMessage *splunkMessage
  70. // http compression
  71. gzipCompression bool
  72. gzipCompressionLevel int
  73. // Advanced options
  74. postMessagesFrequency time.Duration
  75. postMessagesBatchSize int
  76. bufferMaximum int
  77. // For synchronization between background worker and logger.
  78. // We use channel to send messages to worker go routine.
  79. // All other variables for blocking Close call before we flush all messages to HEC
  80. stream chan *splunkMessage
  81. lock sync.RWMutex
  82. closed bool
  83. closedCond *sync.Cond
  84. }
  85. type splunkLoggerInline struct {
  86. *splunkLogger
  87. nullEvent *splunkMessageEvent
  88. }
  89. type splunkLoggerJSON struct {
  90. *splunkLoggerInline
  91. }
  92. type splunkLoggerRaw struct {
  93. *splunkLogger
  94. prefix []byte
  95. }
  96. type splunkMessage struct {
  97. Event interface{} `json:"event"`
  98. Time string `json:"time"`
  99. Host string `json:"host"`
  100. Source string `json:"source,omitempty"`
  101. SourceType string `json:"sourcetype,omitempty"`
  102. Index string `json:"index,omitempty"`
  103. }
  104. type splunkMessageEvent struct {
  105. Line interface{} `json:"line"`
  106. Source string `json:"source"`
  107. Tag string `json:"tag,omitempty"`
  108. Attrs map[string]string `json:"attrs,omitempty"`
  109. }
  110. const (
  111. splunkFormatRaw = "raw"
  112. splunkFormatJSON = "json"
  113. splunkFormatInline = "inline"
  114. )
  115. func init() {
  116. if err := logger.RegisterLogDriver(driverName, New); err != nil {
  117. logrus.Fatal(err)
  118. }
  119. if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
  120. logrus.Fatal(err)
  121. }
  122. }
  123. // New creates splunk logger driver using configuration passed in context
  124. func New(info logger.Info) (logger.Logger, error) {
  125. hostname, err := info.Hostname()
  126. if err != nil {
  127. return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
  128. }
  129. // Parse and validate Splunk URL
  130. splunkURL, err := parseURL(info)
  131. if err != nil {
  132. return nil, err
  133. }
  134. // Splunk Token is required parameter
  135. splunkToken, ok := info.Config[splunkTokenKey]
  136. if !ok {
  137. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
  138. }
  139. tlsConfig := &tls.Config{}
  140. // Splunk is using autogenerated certificates by default,
  141. // allow users to trust them with skipping verification
  142. if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok {
  143. insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
  144. if err != nil {
  145. return nil, err
  146. }
  147. tlsConfig.InsecureSkipVerify = insecureSkipVerify
  148. }
  149. // If path to the root certificate is provided - load it
  150. if caPath, ok := info.Config[splunkCAPathKey]; ok {
  151. caCert, err := ioutil.ReadFile(caPath)
  152. if err != nil {
  153. return nil, err
  154. }
  155. caPool := x509.NewCertPool()
  156. caPool.AppendCertsFromPEM(caCert)
  157. tlsConfig.RootCAs = caPool
  158. }
  159. if caName, ok := info.Config[splunkCANameKey]; ok {
  160. tlsConfig.ServerName = caName
  161. }
  162. gzipCompression := false
  163. if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok {
  164. gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
  165. if err != nil {
  166. return nil, err
  167. }
  168. }
  169. gzipCompressionLevel := gzip.DefaultCompression
  170. if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok {
  171. var err error
  172. gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
  173. if err != nil {
  174. return nil, err
  175. }
  176. gzipCompressionLevel = int(gzipCompressionLevel64)
  177. if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
  178. err := fmt.Errorf("not supported level '%s' for %s (supported values between %d and %d)",
  179. gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
  180. return nil, err
  181. }
  182. }
  183. transport := &http.Transport{
  184. TLSClientConfig: tlsConfig,
  185. }
  186. client := &http.Client{
  187. Transport: transport,
  188. }
  189. source := info.Config[splunkSourceKey]
  190. sourceType := info.Config[splunkSourceTypeKey]
  191. index := info.Config[splunkIndexKey]
  192. var nullMessage = &splunkMessage{
  193. Host: hostname,
  194. Source: source,
  195. SourceType: sourceType,
  196. Index: index,
  197. }
  198. // Allow user to remove tag from the messages by setting tag to empty string
  199. tag := ""
  200. if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" {
  201. tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  202. if err != nil {
  203. return nil, err
  204. }
  205. }
  206. attrs, err := info.ExtraAttributes(nil)
  207. if err != nil {
  208. return nil, err
  209. }
  210. var (
  211. postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
  212. postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
  213. bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
  214. streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
  215. )
  216. logger := &splunkLogger{
  217. client: client,
  218. transport: transport,
  219. url: splunkURL.String(),
  220. auth: "Splunk " + splunkToken,
  221. nullMessage: nullMessage,
  222. gzipCompression: gzipCompression,
  223. gzipCompressionLevel: gzipCompressionLevel,
  224. stream: make(chan *splunkMessage, streamChannelSize),
  225. postMessagesFrequency: postMessagesFrequency,
  226. postMessagesBatchSize: postMessagesBatchSize,
  227. bufferMaximum: bufferMaximum,
  228. }
  229. // By default we verify connection, but we allow use to skip that
  230. verifyConnection := true
  231. if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok {
  232. var err error
  233. verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
  234. if err != nil {
  235. return nil, err
  236. }
  237. }
  238. if verifyConnection {
  239. err = verifySplunkConnection(logger)
  240. if err != nil {
  241. return nil, err
  242. }
  243. }
  244. var splunkFormat string
  245. if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok {
  246. switch splunkFormatParsed {
  247. case splunkFormatInline:
  248. case splunkFormatJSON:
  249. case splunkFormatRaw:
  250. default:
  251. return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
  252. }
  253. splunkFormat = splunkFormatParsed
  254. } else {
  255. splunkFormat = splunkFormatInline
  256. }
  257. var loggerWrapper splunkLoggerInterface
  258. switch splunkFormat {
  259. case splunkFormatInline:
  260. nullEvent := &splunkMessageEvent{
  261. Tag: tag,
  262. Attrs: attrs,
  263. }
  264. loggerWrapper = &splunkLoggerInline{logger, nullEvent}
  265. case splunkFormatJSON:
  266. nullEvent := &splunkMessageEvent{
  267. Tag: tag,
  268. Attrs: attrs,
  269. }
  270. loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
  271. case splunkFormatRaw:
  272. var prefix bytes.Buffer
  273. if tag != "" {
  274. prefix.WriteString(tag)
  275. prefix.WriteString(" ")
  276. }
  277. for key, value := range attrs {
  278. prefix.WriteString(key)
  279. prefix.WriteString("=")
  280. prefix.WriteString(value)
  281. prefix.WriteString(" ")
  282. }
  283. loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
  284. default:
  285. return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
  286. }
  287. go loggerWrapper.worker()
  288. return loggerWrapper, nil
  289. }
  290. func (l *splunkLoggerInline) Log(msg *logger.Message) error {
  291. message := l.createSplunkMessage(msg)
  292. event := *l.nullEvent
  293. event.Line = string(msg.Line)
  294. event.Source = msg.Source
  295. message.Event = &event
  296. logger.PutMessage(msg)
  297. return l.queueMessageAsync(message)
  298. }
  299. func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
  300. message := l.createSplunkMessage(msg)
  301. event := *l.nullEvent
  302. var rawJSONMessage json.RawMessage
  303. if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
  304. event.Line = &rawJSONMessage
  305. } else {
  306. event.Line = string(msg.Line)
  307. }
  308. event.Source = msg.Source
  309. message.Event = &event
  310. logger.PutMessage(msg)
  311. return l.queueMessageAsync(message)
  312. }
  313. func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
  314. // empty or whitespace-only messages are not accepted by HEC
  315. if strings.TrimSpace(string(msg.Line)) == "" {
  316. return nil
  317. }
  318. message := l.createSplunkMessage(msg)
  319. message.Event = string(append(l.prefix, msg.Line...))
  320. logger.PutMessage(msg)
  321. return l.queueMessageAsync(message)
  322. }
  323. func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
  324. l.lock.RLock()
  325. defer l.lock.RUnlock()
  326. if l.closedCond != nil {
  327. return fmt.Errorf("%s: driver is closed", driverName)
  328. }
  329. l.stream <- message
  330. return nil
  331. }
  332. func (l *splunkLogger) worker() {
  333. timer := time.NewTicker(l.postMessagesFrequency)
  334. var messages []*splunkMessage
  335. for {
  336. select {
  337. case message, open := <-l.stream:
  338. if !open {
  339. l.postMessages(messages, true)
  340. l.lock.Lock()
  341. defer l.lock.Unlock()
  342. l.transport.CloseIdleConnections()
  343. l.closed = true
  344. l.closedCond.Signal()
  345. return
  346. }
  347. messages = append(messages, message)
  348. // Only sending when we get exactly to the batch size,
  349. // This also helps not to fire postMessages on every new message,
  350. // when previous try failed.
  351. if len(messages)%l.postMessagesBatchSize == 0 {
  352. messages = l.postMessages(messages, false)
  353. }
  354. case <-timer.C:
  355. messages = l.postMessages(messages, false)
  356. }
  357. }
  358. }
  359. func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
  360. messagesLen := len(messages)
  361. for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
  362. upperBound := i + l.postMessagesBatchSize
  363. if upperBound > messagesLen {
  364. upperBound = messagesLen
  365. }
  366. if err := l.tryPostMessages(messages[i:upperBound]); err != nil {
  367. logrus.Error(err)
  368. if messagesLen-i >= l.bufferMaximum || lastChance {
  369. // If this is last chance - print them all to the daemon log
  370. if lastChance {
  371. upperBound = messagesLen
  372. }
  373. // Not all sent, but buffer has got to its maximum, let's log all messages
  374. // we could not send and return buffer minus one batch size
  375. for j := i; j < upperBound; j++ {
  376. if jsonEvent, err := json.Marshal(messages[j]); err != nil {
  377. logrus.Error(err)
  378. } else {
  379. logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
  380. }
  381. }
  382. return messages[upperBound:messagesLen]
  383. }
  384. // Not all sent, returning buffer from where we have not sent messages
  385. return messages[i:messagesLen]
  386. }
  387. }
  388. // All sent, return empty buffer
  389. return messages[:0]
  390. }
  391. func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error {
  392. if len(messages) == 0 {
  393. return nil
  394. }
  395. var buffer bytes.Buffer
  396. var writer io.Writer
  397. var gzipWriter *gzip.Writer
  398. var err error
  399. // If gzip compression is enabled - create gzip writer with specified compression
  400. // level. If gzip compression is disabled, use standard buffer as a writer
  401. if l.gzipCompression {
  402. gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
  403. if err != nil {
  404. return err
  405. }
  406. writer = gzipWriter
  407. } else {
  408. writer = &buffer
  409. }
  410. for _, message := range messages {
  411. jsonEvent, err := json.Marshal(message)
  412. if err != nil {
  413. return err
  414. }
  415. if _, err := writer.Write(jsonEvent); err != nil {
  416. return err
  417. }
  418. }
  419. // If gzip compression is enabled, tell it, that we are done
  420. if l.gzipCompression {
  421. err = gzipWriter.Close()
  422. if err != nil {
  423. return err
  424. }
  425. }
  426. req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
  427. if err != nil {
  428. return err
  429. }
  430. req.Header.Set("Authorization", l.auth)
  431. // Tell if we are sending gzip compressed body
  432. if l.gzipCompression {
  433. req.Header.Set("Content-Encoding", "gzip")
  434. }
  435. res, err := l.client.Do(req)
  436. if err != nil {
  437. return err
  438. }
  439. defer res.Body.Close()
  440. if res.StatusCode != http.StatusOK {
  441. var body []byte
  442. body, err = ioutil.ReadAll(res.Body)
  443. if err != nil {
  444. return err
  445. }
  446. return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body)
  447. }
  448. io.Copy(ioutil.Discard, res.Body)
  449. return nil
  450. }
  451. func (l *splunkLogger) Close() error {
  452. l.lock.Lock()
  453. defer l.lock.Unlock()
  454. if l.closedCond == nil {
  455. l.closedCond = sync.NewCond(&l.lock)
  456. close(l.stream)
  457. for !l.closed {
  458. l.closedCond.Wait()
  459. }
  460. }
  461. return nil
  462. }
  463. func (l *splunkLogger) Name() string {
  464. return driverName
  465. }
  466. func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
  467. message := *l.nullMessage
  468. message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
  469. return &message
  470. }
  471. // ValidateLogOpt looks for all supported by splunk driver options
  472. func ValidateLogOpt(cfg map[string]string) error {
  473. for key := range cfg {
  474. switch key {
  475. case splunkURLKey:
  476. case splunkTokenKey:
  477. case splunkSourceKey:
  478. case splunkSourceTypeKey:
  479. case splunkIndexKey:
  480. case splunkCAPathKey:
  481. case splunkCANameKey:
  482. case splunkInsecureSkipVerifyKey:
  483. case splunkFormatKey:
  484. case splunkVerifyConnectionKey:
  485. case splunkGzipCompressionKey:
  486. case splunkGzipCompressionLevelKey:
  487. case envKey:
  488. case envRegexKey:
  489. case labelsKey:
  490. case tagKey:
  491. default:
  492. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
  493. }
  494. }
  495. return nil
  496. }
  497. func parseURL(info logger.Info) (*url.URL, error) {
  498. splunkURLStr, ok := info.Config[splunkURLKey]
  499. if !ok {
  500. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
  501. }
  502. splunkURL, err := url.Parse(splunkURLStr)
  503. if err != nil {
  504. return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
  505. }
  506. if !urlutil.IsURL(splunkURLStr) ||
  507. !splunkURL.IsAbs() ||
  508. (splunkURL.Path != "" && splunkURL.Path != "/") ||
  509. splunkURL.RawQuery != "" ||
  510. splunkURL.Fragment != "" {
  511. return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
  512. }
  513. splunkURL.Path = "/services/collector/event/1.0"
  514. return splunkURL, nil
  515. }
  516. func verifySplunkConnection(l *splunkLogger) error {
  517. req, err := http.NewRequest(http.MethodOptions, l.url, nil)
  518. if err != nil {
  519. return err
  520. }
  521. res, err := l.client.Do(req)
  522. if err != nil {
  523. return err
  524. }
  525. if res.Body != nil {
  526. defer res.Body.Close()
  527. }
  528. if res.StatusCode != http.StatusOK {
  529. var body []byte
  530. body, err = ioutil.ReadAll(res.Body)
  531. if err != nil {
  532. return err
  533. }
  534. return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body)
  535. }
  536. return nil
  537. }
  538. func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
  539. valueStr := os.Getenv(envName)
  540. if valueStr == "" {
  541. return defaultValue
  542. }
  543. parsedValue, err := time.ParseDuration(valueStr)
  544. if err != nil {
  545. logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
  546. return defaultValue
  547. }
  548. return parsedValue
  549. }
  550. func getAdvancedOptionInt(envName string, defaultValue int) int {
  551. valueStr := os.Getenv(envName)
  552. if valueStr == "" {
  553. return defaultValue
  554. }
  555. parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
  556. if err != nil {
  557. logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
  558. return defaultValue
  559. }
  560. return int(parsedValue)
  561. }