splunk.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. // Package splunk provides the log driver for forwarding server logs to
  2. // Splunk HTTP Event Collector endpoint.
  3. package splunk // import "github.com/docker/docker/daemon/logger/splunk"
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "context"
  8. "crypto/tls"
  9. "crypto/x509"
  10. "encoding/json"
  11. "fmt"
  12. "io"
  13. "io/ioutil"
  14. "net/http"
  15. "net/url"
  16. "os"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/docker/docker/daemon/logger"
  22. "github.com/docker/docker/daemon/logger/loggerutils"
  23. "github.com/docker/docker/pkg/pools"
  24. "github.com/docker/docker/pkg/urlutil"
  25. "github.com/sirupsen/logrus"
  26. )
  27. const (
  28. driverName = "splunk"
  29. splunkURLKey = "splunk-url"
  30. splunkTokenKey = "splunk-token"
  31. splunkSourceKey = "splunk-source"
  32. splunkSourceTypeKey = "splunk-sourcetype"
  33. splunkIndexKey = "splunk-index"
  34. splunkCAPathKey = "splunk-capath"
  35. splunkCANameKey = "splunk-caname"
  36. splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
  37. splunkFormatKey = "splunk-format"
  38. splunkVerifyConnectionKey = "splunk-verify-connection"
  39. splunkGzipCompressionKey = "splunk-gzip"
  40. splunkGzipCompressionLevelKey = "splunk-gzip-level"
  41. envKey = "env"
  42. envRegexKey = "env-regex"
  43. labelsKey = "labels"
  44. tagKey = "tag"
  45. )
  46. const (
  47. // How often do we send messages (if we are not reaching batch size)
  48. defaultPostMessagesFrequency = 5 * time.Second
  49. // How big can be batch of messages
  50. defaultPostMessagesBatchSize = 1000
  51. // Maximum number of messages we can store in buffer
  52. defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
  53. // Number of messages allowed to be queued in the channel
  54. defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
  55. // maxResponseSize is the max amount that will be read from an http response
  56. maxResponseSize = 1024
  57. )
  58. const (
  59. envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
  60. envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
  61. envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
  62. envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
  63. )
  64. var batchSendTimeout = 30 * time.Second
  65. type splunkLoggerInterface interface {
  66. logger.Logger
  67. worker()
  68. }
  69. type splunkLogger struct {
  70. client *http.Client
  71. transport *http.Transport
  72. url string
  73. auth string
  74. nullMessage *splunkMessage
  75. // http compression
  76. gzipCompression bool
  77. gzipCompressionLevel int
  78. // Advanced options
  79. postMessagesFrequency time.Duration
  80. postMessagesBatchSize int
  81. bufferMaximum int
  82. // For synchronization between background worker and logger.
  83. // We use channel to send messages to worker go routine.
  84. // All other variables for blocking Close call before we flush all messages to HEC
  85. stream chan *splunkMessage
  86. lock sync.RWMutex
  87. closed bool
  88. closedCond *sync.Cond
  89. }
  90. type splunkLoggerInline struct {
  91. *splunkLogger
  92. nullEvent *splunkMessageEvent
  93. }
  94. type splunkLoggerJSON struct {
  95. *splunkLoggerInline
  96. }
  97. type splunkLoggerRaw struct {
  98. *splunkLogger
  99. prefix []byte
  100. }
  101. type splunkMessage struct {
  102. Event interface{} `json:"event"`
  103. Time string `json:"time"`
  104. Host string `json:"host"`
  105. Source string `json:"source,omitempty"`
  106. SourceType string `json:"sourcetype,omitempty"`
  107. Index string `json:"index,omitempty"`
  108. }
  109. type splunkMessageEvent struct {
  110. Line interface{} `json:"line"`
  111. Source string `json:"source"`
  112. Tag string `json:"tag,omitempty"`
  113. Attrs map[string]string `json:"attrs,omitempty"`
  114. }
  115. const (
  116. splunkFormatRaw = "raw"
  117. splunkFormatJSON = "json"
  118. splunkFormatInline = "inline"
  119. )
  120. func init() {
  121. if err := logger.RegisterLogDriver(driverName, New); err != nil {
  122. logrus.Fatal(err)
  123. }
  124. if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
  125. logrus.Fatal(err)
  126. }
  127. }
  128. // New creates splunk logger driver using configuration passed in context
  129. func New(info logger.Info) (logger.Logger, error) {
  130. hostname, err := info.Hostname()
  131. if err != nil {
  132. return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
  133. }
  134. // Parse and validate Splunk URL
  135. splunkURL, err := parseURL(info)
  136. if err != nil {
  137. return nil, err
  138. }
  139. // Splunk Token is required parameter
  140. splunkToken, ok := info.Config[splunkTokenKey]
  141. if !ok {
  142. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
  143. }
  144. tlsConfig := &tls.Config{}
  145. // Splunk is using autogenerated certificates by default,
  146. // allow users to trust them with skipping verification
  147. if insecureSkipVerifyStr, ok := info.Config[splunkInsecureSkipVerifyKey]; ok {
  148. insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
  149. if err != nil {
  150. return nil, err
  151. }
  152. tlsConfig.InsecureSkipVerify = insecureSkipVerify
  153. }
  154. // If path to the root certificate is provided - load it
  155. if caPath, ok := info.Config[splunkCAPathKey]; ok {
  156. caCert, err := ioutil.ReadFile(caPath)
  157. if err != nil {
  158. return nil, err
  159. }
  160. caPool := x509.NewCertPool()
  161. caPool.AppendCertsFromPEM(caCert)
  162. tlsConfig.RootCAs = caPool
  163. }
  164. if caName, ok := info.Config[splunkCANameKey]; ok {
  165. tlsConfig.ServerName = caName
  166. }
  167. gzipCompression := false
  168. if gzipCompressionStr, ok := info.Config[splunkGzipCompressionKey]; ok {
  169. gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
  170. if err != nil {
  171. return nil, err
  172. }
  173. }
  174. gzipCompressionLevel := gzip.DefaultCompression
  175. if gzipCompressionLevelStr, ok := info.Config[splunkGzipCompressionLevelKey]; ok {
  176. var err error
  177. gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
  178. if err != nil {
  179. return nil, err
  180. }
  181. gzipCompressionLevel = int(gzipCompressionLevel64)
  182. if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
  183. err := fmt.Errorf("not supported level '%s' for %s (supported values between %d and %d)",
  184. gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
  185. return nil, err
  186. }
  187. }
  188. transport := &http.Transport{
  189. TLSClientConfig: tlsConfig,
  190. Proxy: http.ProxyFromEnvironment,
  191. }
  192. client := &http.Client{
  193. Transport: transport,
  194. }
  195. source := info.Config[splunkSourceKey]
  196. sourceType := info.Config[splunkSourceTypeKey]
  197. index := info.Config[splunkIndexKey]
  198. var nullMessage = &splunkMessage{
  199. Host: hostname,
  200. Source: source,
  201. SourceType: sourceType,
  202. Index: index,
  203. }
  204. // Allow user to remove tag from the messages by setting tag to empty string
  205. tag := ""
  206. if tagTemplate, ok := info.Config[tagKey]; !ok || tagTemplate != "" {
  207. tag, err = loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  208. if err != nil {
  209. return nil, err
  210. }
  211. }
  212. attrs, err := info.ExtraAttributes(nil)
  213. if err != nil {
  214. return nil, err
  215. }
  216. var (
  217. postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
  218. postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
  219. bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
  220. streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
  221. )
  222. logger := &splunkLogger{
  223. client: client,
  224. transport: transport,
  225. url: splunkURL.String(),
  226. auth: "Splunk " + splunkToken,
  227. nullMessage: nullMessage,
  228. gzipCompression: gzipCompression,
  229. gzipCompressionLevel: gzipCompressionLevel,
  230. stream: make(chan *splunkMessage, streamChannelSize),
  231. postMessagesFrequency: postMessagesFrequency,
  232. postMessagesBatchSize: postMessagesBatchSize,
  233. bufferMaximum: bufferMaximum,
  234. }
  235. // By default we verify connection, but we allow use to skip that
  236. verifyConnection := true
  237. if verifyConnectionStr, ok := info.Config[splunkVerifyConnectionKey]; ok {
  238. var err error
  239. verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
  240. if err != nil {
  241. return nil, err
  242. }
  243. }
  244. if verifyConnection {
  245. err = verifySplunkConnection(logger)
  246. if err != nil {
  247. return nil, err
  248. }
  249. }
  250. var splunkFormat string
  251. if splunkFormatParsed, ok := info.Config[splunkFormatKey]; ok {
  252. switch splunkFormatParsed {
  253. case splunkFormatInline:
  254. case splunkFormatJSON:
  255. case splunkFormatRaw:
  256. default:
  257. return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
  258. }
  259. splunkFormat = splunkFormatParsed
  260. } else {
  261. splunkFormat = splunkFormatInline
  262. }
  263. var loggerWrapper splunkLoggerInterface
  264. switch splunkFormat {
  265. case splunkFormatInline:
  266. nullEvent := &splunkMessageEvent{
  267. Tag: tag,
  268. Attrs: attrs,
  269. }
  270. loggerWrapper = &splunkLoggerInline{logger, nullEvent}
  271. case splunkFormatJSON:
  272. nullEvent := &splunkMessageEvent{
  273. Tag: tag,
  274. Attrs: attrs,
  275. }
  276. loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
  277. case splunkFormatRaw:
  278. var prefix bytes.Buffer
  279. if tag != "" {
  280. prefix.WriteString(tag)
  281. prefix.WriteString(" ")
  282. }
  283. for key, value := range attrs {
  284. prefix.WriteString(key)
  285. prefix.WriteString("=")
  286. prefix.WriteString(value)
  287. prefix.WriteString(" ")
  288. }
  289. loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
  290. default:
  291. return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
  292. }
  293. go loggerWrapper.worker()
  294. return loggerWrapper, nil
  295. }
  296. func (l *splunkLoggerInline) Log(msg *logger.Message) error {
  297. message := l.createSplunkMessage(msg)
  298. event := *l.nullEvent
  299. event.Line = string(msg.Line)
  300. event.Source = msg.Source
  301. message.Event = &event
  302. logger.PutMessage(msg)
  303. return l.queueMessageAsync(message)
  304. }
  305. func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
  306. message := l.createSplunkMessage(msg)
  307. event := *l.nullEvent
  308. var rawJSONMessage json.RawMessage
  309. if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
  310. event.Line = &rawJSONMessage
  311. } else {
  312. event.Line = string(msg.Line)
  313. }
  314. event.Source = msg.Source
  315. message.Event = &event
  316. logger.PutMessage(msg)
  317. return l.queueMessageAsync(message)
  318. }
  319. func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
  320. // empty or whitespace-only messages are not accepted by HEC
  321. if strings.TrimSpace(string(msg.Line)) == "" {
  322. return nil
  323. }
  324. message := l.createSplunkMessage(msg)
  325. message.Event = string(append(l.prefix, msg.Line...))
  326. logger.PutMessage(msg)
  327. return l.queueMessageAsync(message)
  328. }
  329. func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
  330. l.lock.RLock()
  331. defer l.lock.RUnlock()
  332. if l.closedCond != nil {
  333. return fmt.Errorf("%s: driver is closed", driverName)
  334. }
  335. l.stream <- message
  336. return nil
  337. }
  338. func (l *splunkLogger) worker() {
  339. timer := time.NewTicker(l.postMessagesFrequency)
  340. var messages []*splunkMessage
  341. for {
  342. select {
  343. case message, open := <-l.stream:
  344. if !open {
  345. l.postMessages(messages, true)
  346. l.lock.Lock()
  347. defer l.lock.Unlock()
  348. l.transport.CloseIdleConnections()
  349. l.closed = true
  350. l.closedCond.Signal()
  351. return
  352. }
  353. messages = append(messages, message)
  354. // Only sending when we get exactly to the batch size,
  355. // This also helps not to fire postMessages on every new message,
  356. // when previous try failed.
  357. if len(messages)%l.postMessagesBatchSize == 0 {
  358. messages = l.postMessages(messages, false)
  359. }
  360. case <-timer.C:
  361. messages = l.postMessages(messages, false)
  362. }
  363. }
  364. }
  365. func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
  366. messagesLen := len(messages)
  367. ctx, cancel := context.WithTimeout(context.Background(), batchSendTimeout)
  368. defer cancel()
  369. for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
  370. upperBound := i + l.postMessagesBatchSize
  371. if upperBound > messagesLen {
  372. upperBound = messagesLen
  373. }
  374. if err := l.tryPostMessages(ctx, messages[i:upperBound]); err != nil {
  375. logrus.WithError(err).WithField("module", "logger/splunk").Warn("Error while sending logs")
  376. if messagesLen-i >= l.bufferMaximum || lastChance {
  377. // If this is last chance - print them all to the daemon log
  378. if lastChance {
  379. upperBound = messagesLen
  380. }
  381. // Not all sent, but buffer has got to its maximum, let's log all messages
  382. // we could not send and return buffer minus one batch size
  383. for j := i; j < upperBound; j++ {
  384. if jsonEvent, err := json.Marshal(messages[j]); err != nil {
  385. logrus.Error(err)
  386. } else {
  387. logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
  388. }
  389. }
  390. return messages[upperBound:messagesLen]
  391. }
  392. // Not all sent, returning buffer from where we have not sent messages
  393. return messages[i:messagesLen]
  394. }
  395. }
  396. // All sent, return empty buffer
  397. return messages[:0]
  398. }
  399. func (l *splunkLogger) tryPostMessages(ctx context.Context, messages []*splunkMessage) error {
  400. if len(messages) == 0 {
  401. return nil
  402. }
  403. var buffer bytes.Buffer
  404. var writer io.Writer
  405. var gzipWriter *gzip.Writer
  406. var err error
  407. // If gzip compression is enabled - create gzip writer with specified compression
  408. // level. If gzip compression is disabled, use standard buffer as a writer
  409. if l.gzipCompression {
  410. gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
  411. if err != nil {
  412. return err
  413. }
  414. writer = gzipWriter
  415. } else {
  416. writer = &buffer
  417. }
  418. for _, message := range messages {
  419. jsonEvent, err := json.Marshal(message)
  420. if err != nil {
  421. return err
  422. }
  423. if _, err := writer.Write(jsonEvent); err != nil {
  424. return err
  425. }
  426. }
  427. // If gzip compression is enabled, tell it, that we are done
  428. if l.gzipCompression {
  429. err = gzipWriter.Close()
  430. if err != nil {
  431. return err
  432. }
  433. }
  434. req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
  435. if err != nil {
  436. return err
  437. }
  438. req = req.WithContext(ctx)
  439. req.Header.Set("Authorization", l.auth)
  440. // Tell if we are sending gzip compressed body
  441. if l.gzipCompression {
  442. req.Header.Set("Content-Encoding", "gzip")
  443. }
  444. resp, err := l.client.Do(req)
  445. if err != nil {
  446. return err
  447. }
  448. defer func() {
  449. pools.Copy(ioutil.Discard, resp.Body)
  450. resp.Body.Close()
  451. }()
  452. if resp.StatusCode != http.StatusOK {
  453. rdr := io.LimitReader(resp.Body, maxResponseSize)
  454. body, err := ioutil.ReadAll(rdr)
  455. if err != nil {
  456. return err
  457. }
  458. return fmt.Errorf("%s: failed to send event - %s - %s", driverName, resp.Status, string(body))
  459. }
  460. return nil
  461. }
  462. func (l *splunkLogger) Close() error {
  463. l.lock.Lock()
  464. defer l.lock.Unlock()
  465. if l.closedCond == nil {
  466. l.closedCond = sync.NewCond(&l.lock)
  467. close(l.stream)
  468. for !l.closed {
  469. l.closedCond.Wait()
  470. }
  471. }
  472. return nil
  473. }
  474. func (l *splunkLogger) Name() string {
  475. return driverName
  476. }
  477. func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
  478. message := *l.nullMessage
  479. message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
  480. return &message
  481. }
  482. // ValidateLogOpt looks for all supported by splunk driver options
  483. func ValidateLogOpt(cfg map[string]string) error {
  484. for key := range cfg {
  485. switch key {
  486. case splunkURLKey:
  487. case splunkTokenKey:
  488. case splunkSourceKey:
  489. case splunkSourceTypeKey:
  490. case splunkIndexKey:
  491. case splunkCAPathKey:
  492. case splunkCANameKey:
  493. case splunkInsecureSkipVerifyKey:
  494. case splunkFormatKey:
  495. case splunkVerifyConnectionKey:
  496. case splunkGzipCompressionKey:
  497. case splunkGzipCompressionLevelKey:
  498. case envKey:
  499. case envRegexKey:
  500. case labelsKey:
  501. case tagKey:
  502. default:
  503. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
  504. }
  505. }
  506. return nil
  507. }
  508. func parseURL(info logger.Info) (*url.URL, error) {
  509. splunkURLStr, ok := info.Config[splunkURLKey]
  510. if !ok {
  511. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
  512. }
  513. splunkURL, err := url.Parse(splunkURLStr)
  514. if err != nil {
  515. return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
  516. }
  517. if !urlutil.IsURL(splunkURLStr) ||
  518. !splunkURL.IsAbs() ||
  519. (splunkURL.Path != "" && splunkURL.Path != "/") ||
  520. splunkURL.RawQuery != "" ||
  521. splunkURL.Fragment != "" {
  522. return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
  523. }
  524. splunkURL.Path = "/services/collector/event/1.0"
  525. return splunkURL, nil
  526. }
  527. func verifySplunkConnection(l *splunkLogger) error {
  528. req, err := http.NewRequest(http.MethodOptions, l.url, nil)
  529. if err != nil {
  530. return err
  531. }
  532. resp, err := l.client.Do(req)
  533. if err != nil {
  534. return err
  535. }
  536. defer func() {
  537. pools.Copy(ioutil.Discard, resp.Body)
  538. resp.Body.Close()
  539. }()
  540. if resp.StatusCode != http.StatusOK {
  541. rdr := io.LimitReader(resp.Body, maxResponseSize)
  542. body, err := ioutil.ReadAll(rdr)
  543. if err != nil {
  544. return err
  545. }
  546. return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, resp.Status, string(body))
  547. }
  548. return nil
  549. }
  550. func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
  551. valueStr := os.Getenv(envName)
  552. if valueStr == "" {
  553. return defaultValue
  554. }
  555. parsedValue, err := time.ParseDuration(valueStr)
  556. if err != nil {
  557. logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
  558. return defaultValue
  559. }
  560. return parsedValue
  561. }
  562. func getAdvancedOptionInt(envName string, defaultValue int) int {
  563. valueStr := os.Getenv(envName)
  564. if valueStr == "" {
  565. return defaultValue
  566. }
  567. parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
  568. if err != nil {
  569. logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
  570. return defaultValue
  571. }
  572. return int(parsedValue)
  573. }