splunk.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Package splunk provides the log driver for forwarding server logs to
  2. // Splunk HTTP Event Collector endpoint.
  3. package splunk
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "encoding/json"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "github.com/Sirupsen/logrus"
  20. "github.com/docker/docker/daemon/logger"
  21. "github.com/docker/docker/daemon/logger/loggerutils"
  22. "github.com/docker/docker/pkg/urlutil"
  23. )
  24. const (
  25. driverName = "splunk"
  26. splunkURLKey = "splunk-url"
  27. splunkTokenKey = "splunk-token"
  28. splunkSourceKey = "splunk-source"
  29. splunkSourceTypeKey = "splunk-sourcetype"
  30. splunkIndexKey = "splunk-index"
  31. splunkCAPathKey = "splunk-capath"
  32. splunkCANameKey = "splunk-caname"
  33. splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
  34. splunkFormatKey = "splunk-format"
  35. splunkVerifyConnectionKey = "splunk-verify-connection"
  36. splunkGzipCompressionKey = "splunk-gzip"
  37. splunkGzipCompressionLevelKey = "splunk-gzip-level"
  38. envKey = "env"
  39. labelsKey = "labels"
  40. tagKey = "tag"
  41. )
  42. const (
  43. // How often do we send messages (if we are not reaching batch size)
  44. defaultPostMessagesFrequency = 5 * time.Second
  45. // How big can be batch of messages
  46. defaultPostMessagesBatchSize = 1000
  47. // Maximum number of messages we can store in buffer
  48. defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
  49. // Number of messages allowed to be queued in the channel
  50. defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
  51. )
  52. const (
  53. envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
  54. envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
  55. envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
  56. envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
  57. )
  58. type splunkLoggerInterface interface {
  59. logger.Logger
  60. worker()
  61. }
  62. type splunkLogger struct {
  63. client *http.Client
  64. transport *http.Transport
  65. url string
  66. auth string
  67. nullMessage *splunkMessage
  68. // http compression
  69. gzipCompression bool
  70. gzipCompressionLevel int
  71. // Advanced options
  72. postMessagesFrequency time.Duration
  73. postMessagesBatchSize int
  74. bufferMaximum int
  75. // For synchronization between background worker and logger.
  76. // We use channel to send messages to worker go routine.
  77. // All other variables for blocking Close call before we flush all messages to HEC
  78. stream chan *splunkMessage
  79. lock sync.RWMutex
  80. closed bool
  81. closedCond *sync.Cond
  82. }
  83. type splunkLoggerInline struct {
  84. *splunkLogger
  85. nullEvent *splunkMessageEvent
  86. }
  87. type splunkLoggerJSON struct {
  88. *splunkLoggerInline
  89. }
  90. type splunkLoggerRaw struct {
  91. *splunkLogger
  92. prefix []byte
  93. }
  94. type splunkMessage struct {
  95. Event interface{} `json:"event"`
  96. Time string `json:"time"`
  97. Host string `json:"host"`
  98. Source string `json:"source,omitempty"`
  99. SourceType string `json:"sourcetype,omitempty"`
  100. Index string `json:"index,omitempty"`
  101. }
  102. type splunkMessageEvent struct {
  103. Line interface{} `json:"line"`
  104. Source string `json:"source"`
  105. Tag string `json:"tag,omitempty"`
  106. Attrs map[string]string `json:"attrs,omitempty"`
  107. }
  108. const (
  109. splunkFormatRaw = "raw"
  110. splunkFormatJSON = "json"
  111. splunkFormatInline = "inline"
  112. )
  113. func init() {
  114. if err := logger.RegisterLogDriver(driverName, New); err != nil {
  115. logrus.Fatal(err)
  116. }
  117. if err := logger.RegisterLogOptValidator(driverName, ValidateLogOpt); err != nil {
  118. logrus.Fatal(err)
  119. }
  120. }
  121. // New creates splunk logger driver using configuration passed in context
  122. func New(ctx logger.Context) (logger.Logger, error) {
  123. hostname, err := ctx.Hostname()
  124. if err != nil {
  125. return nil, fmt.Errorf("%s: cannot access hostname to set source field", driverName)
  126. }
  127. // Parse and validate Splunk URL
  128. splunkURL, err := parseURL(ctx)
  129. if err != nil {
  130. return nil, err
  131. }
  132. // Splunk Token is required parameter
  133. splunkToken, ok := ctx.Config[splunkTokenKey]
  134. if !ok {
  135. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkTokenKey)
  136. }
  137. tlsConfig := &tls.Config{}
  138. // Splunk is using autogenerated certificates by default,
  139. // allow users to trust them with skipping verification
  140. if insecureSkipVerifyStr, ok := ctx.Config[splunkInsecureSkipVerifyKey]; ok {
  141. insecureSkipVerify, err := strconv.ParseBool(insecureSkipVerifyStr)
  142. if err != nil {
  143. return nil, err
  144. }
  145. tlsConfig.InsecureSkipVerify = insecureSkipVerify
  146. }
  147. // If path to the root certificate is provided - load it
  148. if caPath, ok := ctx.Config[splunkCAPathKey]; ok {
  149. caCert, err := ioutil.ReadFile(caPath)
  150. if err != nil {
  151. return nil, err
  152. }
  153. caPool := x509.NewCertPool()
  154. caPool.AppendCertsFromPEM(caCert)
  155. tlsConfig.RootCAs = caPool
  156. }
  157. if caName, ok := ctx.Config[splunkCANameKey]; ok {
  158. tlsConfig.ServerName = caName
  159. }
  160. gzipCompression := false
  161. if gzipCompressionStr, ok := ctx.Config[splunkGzipCompressionKey]; ok {
  162. gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
  163. if err != nil {
  164. return nil, err
  165. }
  166. }
  167. gzipCompressionLevel := gzip.DefaultCompression
  168. if gzipCompressionLevelStr, ok := ctx.Config[splunkGzipCompressionLevelKey]; ok {
  169. var err error
  170. gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
  171. if err != nil {
  172. return nil, err
  173. }
  174. gzipCompressionLevel = int(gzipCompressionLevel64)
  175. if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
  176. err := fmt.Errorf("Not supported level '%s' for %s (supported values between %d and %d).",
  177. gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
  178. return nil, err
  179. }
  180. }
  181. transport := &http.Transport{
  182. TLSClientConfig: tlsConfig,
  183. }
  184. client := &http.Client{
  185. Transport: transport,
  186. }
  187. source := ctx.Config[splunkSourceKey]
  188. sourceType := ctx.Config[splunkSourceTypeKey]
  189. index := ctx.Config[splunkIndexKey]
  190. var nullMessage = &splunkMessage{
  191. Host: hostname,
  192. Source: source,
  193. SourceType: sourceType,
  194. Index: index,
  195. }
  196. // Allow user to remove tag from the messages by setting tag to empty string
  197. tag := ""
  198. if tagTemplate, ok := ctx.Config[tagKey]; !ok || tagTemplate != "" {
  199. tag, err = loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
  200. if err != nil {
  201. return nil, err
  202. }
  203. }
  204. attrs := ctx.ExtraAttributes(nil)
  205. var (
  206. postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
  207. postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
  208. bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
  209. streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
  210. )
  211. logger := &splunkLogger{
  212. client: client,
  213. transport: transport,
  214. url: splunkURL.String(),
  215. auth: "Splunk " + splunkToken,
  216. nullMessage: nullMessage,
  217. gzipCompression: gzipCompression,
  218. gzipCompressionLevel: gzipCompressionLevel,
  219. stream: make(chan *splunkMessage, streamChannelSize),
  220. postMessagesFrequency: postMessagesFrequency,
  221. postMessagesBatchSize: postMessagesBatchSize,
  222. bufferMaximum: bufferMaximum,
  223. }
  224. // By default we verify connection, but we allow use to skip that
  225. verifyConnection := true
  226. if verifyConnectionStr, ok := ctx.Config[splunkVerifyConnectionKey]; ok {
  227. var err error
  228. verifyConnection, err = strconv.ParseBool(verifyConnectionStr)
  229. if err != nil {
  230. return nil, err
  231. }
  232. }
  233. if verifyConnection {
  234. err = verifySplunkConnection(logger)
  235. if err != nil {
  236. return nil, err
  237. }
  238. }
  239. var splunkFormat string
  240. if splunkFormatParsed, ok := ctx.Config[splunkFormatKey]; ok {
  241. switch splunkFormatParsed {
  242. case splunkFormatInline:
  243. case splunkFormatJSON:
  244. case splunkFormatRaw:
  245. default:
  246. return nil, fmt.Errorf("Unknown format specified %s, supported formats are inline, json and raw", splunkFormat)
  247. }
  248. splunkFormat = splunkFormatParsed
  249. } else {
  250. splunkFormat = splunkFormatInline
  251. }
  252. var loggerWrapper splunkLoggerInterface
  253. switch splunkFormat {
  254. case splunkFormatInline:
  255. nullEvent := &splunkMessageEvent{
  256. Tag: tag,
  257. Attrs: attrs,
  258. }
  259. loggerWrapper = &splunkLoggerInline{logger, nullEvent}
  260. case splunkFormatJSON:
  261. nullEvent := &splunkMessageEvent{
  262. Tag: tag,
  263. Attrs: attrs,
  264. }
  265. loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
  266. case splunkFormatRaw:
  267. var prefix bytes.Buffer
  268. if tag != "" {
  269. prefix.WriteString(tag)
  270. prefix.WriteString(" ")
  271. }
  272. for key, value := range attrs {
  273. prefix.WriteString(key)
  274. prefix.WriteString("=")
  275. prefix.WriteString(value)
  276. prefix.WriteString(" ")
  277. }
  278. loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
  279. default:
  280. return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
  281. }
  282. go loggerWrapper.worker()
  283. return loggerWrapper, nil
  284. }
  285. func (l *splunkLoggerInline) Log(msg *logger.Message) error {
  286. message := l.createSplunkMessage(msg)
  287. event := *l.nullEvent
  288. event.Line = string(msg.Line)
  289. event.Source = msg.Source
  290. message.Event = &event
  291. return l.queueMessageAsync(message)
  292. }
  293. func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
  294. message := l.createSplunkMessage(msg)
  295. event := *l.nullEvent
  296. var rawJSONMessage json.RawMessage
  297. if err := json.Unmarshal(msg.Line, &rawJSONMessage); err == nil {
  298. event.Line = &rawJSONMessage
  299. } else {
  300. event.Line = string(msg.Line)
  301. }
  302. event.Source = msg.Source
  303. message.Event = &event
  304. return l.queueMessageAsync(message)
  305. }
  306. func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
  307. message := l.createSplunkMessage(msg)
  308. message.Event = string(append(l.prefix, msg.Line...))
  309. return l.queueMessageAsync(message)
  310. }
  311. func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
  312. l.lock.RLock()
  313. defer l.lock.RUnlock()
  314. if l.closedCond != nil {
  315. return fmt.Errorf("%s: driver is closed", driverName)
  316. }
  317. l.stream <- message
  318. return nil
  319. }
  320. func (l *splunkLogger) worker() {
  321. timer := time.NewTicker(l.postMessagesFrequency)
  322. var messages []*splunkMessage
  323. for {
  324. select {
  325. case message, open := <-l.stream:
  326. if !open {
  327. l.postMessages(messages, true)
  328. l.lock.Lock()
  329. defer l.lock.Unlock()
  330. l.transport.CloseIdleConnections()
  331. l.closed = true
  332. l.closedCond.Signal()
  333. return
  334. }
  335. messages = append(messages, message)
  336. // Only sending when we get exactly to the batch size,
  337. // This also helps not to fire postMessages on every new message,
  338. // when previous try failed.
  339. if len(messages)%l.postMessagesBatchSize == 0 {
  340. messages = l.postMessages(messages, false)
  341. }
  342. case <-timer.C:
  343. messages = l.postMessages(messages, false)
  344. }
  345. }
  346. }
  347. func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
  348. messagesLen := len(messages)
  349. for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
  350. upperBound := i + l.postMessagesBatchSize
  351. if upperBound > messagesLen {
  352. upperBound = messagesLen
  353. }
  354. if err := l.tryPostMessages(messages[i:upperBound]); err != nil {
  355. logrus.Error(err)
  356. if messagesLen-i >= l.bufferMaximum || lastChance {
  357. // If this is last chance - print them all to the daemon log
  358. if lastChance {
  359. upperBound = messagesLen
  360. }
  361. // Not all sent, but buffer has got to its maximum, let's log all messages
  362. // we could not send and return buffer minus one batch size
  363. for j := i; j < upperBound; j++ {
  364. if jsonEvent, err := json.Marshal(messages[j]); err != nil {
  365. logrus.Error(err)
  366. } else {
  367. logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
  368. }
  369. }
  370. return messages[upperBound:messagesLen]
  371. }
  372. // Not all sent, returning buffer from where we have not sent messages
  373. return messages[i:messagesLen]
  374. }
  375. }
  376. // All sent, return empty buffer
  377. return messages[:0]
  378. }
  379. func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error {
  380. if len(messages) == 0 {
  381. return nil
  382. }
  383. var buffer bytes.Buffer
  384. var writer io.Writer
  385. var gzipWriter *gzip.Writer
  386. var err error
  387. // If gzip compression is enabled - create gzip writer with specified compression
  388. // level. If gzip compression is disabled, use standard buffer as a writer
  389. if l.gzipCompression {
  390. gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
  391. if err != nil {
  392. return err
  393. }
  394. writer = gzipWriter
  395. } else {
  396. writer = &buffer
  397. }
  398. for _, message := range messages {
  399. jsonEvent, err := json.Marshal(message)
  400. if err != nil {
  401. return err
  402. }
  403. if _, err := writer.Write(jsonEvent); err != nil {
  404. return err
  405. }
  406. }
  407. // If gzip compression is enabled, tell it, that we are done
  408. if l.gzipCompression {
  409. err = gzipWriter.Close()
  410. if err != nil {
  411. return err
  412. }
  413. }
  414. req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
  415. if err != nil {
  416. return err
  417. }
  418. req.Header.Set("Authorization", l.auth)
  419. // Tell if we are sending gzip compressed body
  420. if l.gzipCompression {
  421. req.Header.Set("Content-Encoding", "gzip")
  422. }
  423. res, err := l.client.Do(req)
  424. if err != nil {
  425. return err
  426. }
  427. defer res.Body.Close()
  428. if res.StatusCode != http.StatusOK {
  429. var body []byte
  430. body, err = ioutil.ReadAll(res.Body)
  431. if err != nil {
  432. return err
  433. }
  434. return fmt.Errorf("%s: failed to send event - %s - %s", driverName, res.Status, body)
  435. }
  436. io.Copy(ioutil.Discard, res.Body)
  437. return nil
  438. }
  439. func (l *splunkLogger) Close() error {
  440. l.lock.Lock()
  441. defer l.lock.Unlock()
  442. if l.closedCond == nil {
  443. l.closedCond = sync.NewCond(&l.lock)
  444. close(l.stream)
  445. for !l.closed {
  446. l.closedCond.Wait()
  447. }
  448. }
  449. return nil
  450. }
  451. func (l *splunkLogger) Name() string {
  452. return driverName
  453. }
  454. func (l *splunkLogger) createSplunkMessage(msg *logger.Message) *splunkMessage {
  455. message := *l.nullMessage
  456. message.Time = fmt.Sprintf("%f", float64(msg.Timestamp.UnixNano())/float64(time.Second))
  457. return &message
  458. }
  459. // ValidateLogOpt looks for all supported by splunk driver options
  460. func ValidateLogOpt(cfg map[string]string) error {
  461. for key := range cfg {
  462. switch key {
  463. case splunkURLKey:
  464. case splunkTokenKey:
  465. case splunkSourceKey:
  466. case splunkSourceTypeKey:
  467. case splunkIndexKey:
  468. case splunkCAPathKey:
  469. case splunkCANameKey:
  470. case splunkInsecureSkipVerifyKey:
  471. case splunkFormatKey:
  472. case splunkVerifyConnectionKey:
  473. case splunkGzipCompressionKey:
  474. case splunkGzipCompressionLevelKey:
  475. case envKey:
  476. case labelsKey:
  477. case tagKey:
  478. default:
  479. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, driverName)
  480. }
  481. }
  482. return nil
  483. }
  484. func parseURL(ctx logger.Context) (*url.URL, error) {
  485. splunkURLStr, ok := ctx.Config[splunkURLKey]
  486. if !ok {
  487. return nil, fmt.Errorf("%s: %s is expected", driverName, splunkURLKey)
  488. }
  489. splunkURL, err := url.Parse(splunkURLStr)
  490. if err != nil {
  491. return nil, fmt.Errorf("%s: failed to parse %s as url value in %s", driverName, splunkURLStr, splunkURLKey)
  492. }
  493. if !urlutil.IsURL(splunkURLStr) ||
  494. !splunkURL.IsAbs() ||
  495. (splunkURL.Path != "" && splunkURL.Path != "/") ||
  496. splunkURL.RawQuery != "" ||
  497. splunkURL.Fragment != "" {
  498. return nil, fmt.Errorf("%s: expected format scheme://dns_name_or_ip:port for %s", driverName, splunkURLKey)
  499. }
  500. splunkURL.Path = "/services/collector/event/1.0"
  501. return splunkURL, nil
  502. }
  503. func verifySplunkConnection(l *splunkLogger) error {
  504. req, err := http.NewRequest(http.MethodOptions, l.url, nil)
  505. if err != nil {
  506. return err
  507. }
  508. res, err := l.client.Do(req)
  509. if err != nil {
  510. return err
  511. }
  512. if res.Body != nil {
  513. defer res.Body.Close()
  514. }
  515. if res.StatusCode != http.StatusOK {
  516. var body []byte
  517. body, err = ioutil.ReadAll(res.Body)
  518. if err != nil {
  519. return err
  520. }
  521. return fmt.Errorf("%s: failed to verify connection - %s - %s", driverName, res.Status, body)
  522. }
  523. return nil
  524. }
  525. func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
  526. valueStr := os.Getenv(envName)
  527. if valueStr == "" {
  528. return defaultValue
  529. }
  530. parsedValue, err := time.ParseDuration(valueStr)
  531. if err != nil {
  532. logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
  533. return defaultValue
  534. }
  535. return parsedValue
  536. }
  537. func getAdvancedOptionInt(envName string, defaultValue int) int {
  538. valueStr := os.Getenv(envName)
  539. if valueStr == "" {
  540. return defaultValue
  541. }
  542. parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
  543. if err != nil {
  544. logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
  545. return defaultValue
  546. }
  547. return int(parsedValue)
  548. }