cloudwatchlogs.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "bytes"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "runtime"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  18. "github.com/aws/aws-sdk-go/aws/request"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  21. "github.com/docker/docker/daemon/logger"
  22. "github.com/docker/docker/daemon/logger/loggerutils"
  23. "github.com/docker/docker/dockerversion"
  24. "github.com/docker/docker/pkg/templates"
  25. )
  26. const (
  27. name = "awslogs"
  28. regionKey = "awslogs-region"
  29. regionEnvKey = "AWS_REGION"
  30. logGroupKey = "awslogs-group"
  31. logStreamKey = "awslogs-stream"
  32. logCreateGroupKey = "awslogs-create-group"
  33. tagKey = "tag"
  34. batchPublishFrequency = 5 * time.Second
  35. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  36. perEventBytes = 26
  37. maximumBytesPerPut = 1048576
  38. maximumLogEventsPerPut = 10000
  39. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  40. maximumBytesPerEvent = 262144 - perEventBytes
  41. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  42. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  43. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  44. resourceNotFoundCode = "ResourceNotFoundException"
  45. userAgentHeader = "User-Agent"
  46. )
  47. type logStream struct {
  48. logStreamName string
  49. logGroupName string
  50. logCreateGroup bool
  51. client api
  52. messages chan *logger.Message
  53. lock sync.RWMutex
  54. closed bool
  55. sequenceToken *string
  56. }
  57. type api interface {
  58. CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
  59. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  60. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  61. }
  62. type regionFinder interface {
  63. Region() (string, error)
  64. }
  65. type wrappedEvent struct {
  66. inputLogEvent *cloudwatchlogs.InputLogEvent
  67. insertOrder int
  68. }
  69. type byTimestamp []wrappedEvent
  70. // init registers the awslogs driver
  71. func init() {
  72. if err := logger.RegisterLogDriver(name, New); err != nil {
  73. logrus.Fatal(err)
  74. }
  75. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  76. logrus.Fatal(err)
  77. }
  78. }
  79. // New creates an awslogs logger using the configuration passed in on the
  80. // context. Supported context configuration variables are awslogs-region,
  81. // awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is
  82. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  83. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  84. // the EC2 Instance Metadata Service.
  85. func New(info logger.Info) (logger.Logger, error) {
  86. logGroupName := info.Config[logGroupKey]
  87. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  88. if err != nil {
  89. return nil, err
  90. }
  91. logCreateGroup := false
  92. if info.Config[logCreateGroupKey] != "" {
  93. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  94. if err != nil {
  95. return nil, err
  96. }
  97. }
  98. if info.Config[logStreamKey] != "" {
  99. logStreamName = info.Config[logStreamKey]
  100. }
  101. client, err := newAWSLogsClient(info)
  102. if err != nil {
  103. return nil, err
  104. }
  105. containerStream := &logStream{
  106. logStreamName: logStreamName,
  107. logGroupName: logGroupName,
  108. logCreateGroup: logCreateGroup,
  109. client: client,
  110. messages: make(chan *logger.Message, 4096),
  111. }
  112. err = containerStream.create()
  113. if err != nil {
  114. return nil, err
  115. }
  116. go containerStream.collectBatch()
  117. return containerStream, nil
  118. }
  119. func parseLogGroup(info logger.Info, groupTemplate string) (string, error) {
  120. tmpl, err := templates.NewParse("log-group", groupTemplate)
  121. if err != nil {
  122. return "", err
  123. }
  124. buf := new(bytes.Buffer)
  125. if err := tmpl.Execute(buf, &info); err != nil {
  126. return "", err
  127. }
  128. return buf.String(), nil
  129. }
  130. // newRegionFinder is a variable such that the implementation
  131. // can be swapped out for unit tests.
  132. var newRegionFinder = func() regionFinder {
  133. return ec2metadata.New(session.New())
  134. }
  135. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  136. // Customizations to the default client from the SDK include a Docker-specific
  137. // User-Agent string and automatic region detection using the EC2 Instance
  138. // Metadata Service when region is otherwise unspecified.
  139. func newAWSLogsClient(info logger.Info) (api, error) {
  140. var region *string
  141. if os.Getenv(regionEnvKey) != "" {
  142. region = aws.String(os.Getenv(regionEnvKey))
  143. }
  144. if info.Config[regionKey] != "" {
  145. region = aws.String(info.Config[regionKey])
  146. }
  147. if region == nil || *region == "" {
  148. logrus.Info("Trying to get region from EC2 Metadata")
  149. ec2MetadataClient := newRegionFinder()
  150. r, err := ec2MetadataClient.Region()
  151. if err != nil {
  152. logrus.WithFields(logrus.Fields{
  153. "error": err,
  154. }).Error("Could not get region from EC2 metadata, environment, or log option")
  155. return nil, errors.New("Cannot determine region for awslogs driver")
  156. }
  157. region = &r
  158. }
  159. logrus.WithFields(logrus.Fields{
  160. "region": *region,
  161. }).Debug("Created awslogs client")
  162. client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
  163. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  164. Name: "DockerUserAgentHandler",
  165. Fn: func(r *request.Request) {
  166. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  167. r.HTTPRequest.Header.Set(userAgentHeader,
  168. fmt.Sprintf("Docker %s (%s) %s",
  169. dockerversion.Version, runtime.GOOS, currentAgent))
  170. },
  171. })
  172. return client, nil
  173. }
  174. // Name returns the name of the awslogs logging driver
  175. func (l *logStream) Name() string {
  176. return name
  177. }
  178. // Log submits messages for logging by an instance of the awslogs logging driver
  179. func (l *logStream) Log(msg *logger.Message) error {
  180. l.lock.RLock()
  181. defer l.lock.RUnlock()
  182. if !l.closed {
  183. l.messages <- msg
  184. }
  185. return nil
  186. }
  187. // Close closes the instance of the awslogs logging driver
  188. func (l *logStream) Close() error {
  189. l.lock.Lock()
  190. defer l.lock.Unlock()
  191. if !l.closed {
  192. close(l.messages)
  193. }
  194. l.closed = true
  195. return nil
  196. }
  197. // create creates log group and log stream for the instance of the awslogs logging driver
  198. func (l *logStream) create() error {
  199. if err := l.createLogStream(); err != nil {
  200. if l.logCreateGroup {
  201. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
  202. if err := l.createLogGroup(); err != nil {
  203. return err
  204. }
  205. return l.createLogStream()
  206. }
  207. }
  208. return err
  209. }
  210. return nil
  211. }
  212. // createLogGroup creates a log group for the instance of the awslogs logging driver
  213. func (l *logStream) createLogGroup() error {
  214. if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
  215. LogGroupName: aws.String(l.logGroupName),
  216. }); err != nil {
  217. if awsErr, ok := err.(awserr.Error); ok {
  218. fields := logrus.Fields{
  219. "errorCode": awsErr.Code(),
  220. "message": awsErr.Message(),
  221. "origError": awsErr.OrigErr(),
  222. "logGroupName": l.logGroupName,
  223. "logCreateGroup": l.logCreateGroup,
  224. }
  225. if awsErr.Code() == resourceAlreadyExistsCode {
  226. // Allow creation to succeed
  227. logrus.WithFields(fields).Info("Log group already exists")
  228. return nil
  229. }
  230. logrus.WithFields(fields).Error("Failed to create log group")
  231. }
  232. return err
  233. }
  234. return nil
  235. }
  236. // createLogStream creates a log stream for the instance of the awslogs logging driver
  237. func (l *logStream) createLogStream() error {
  238. input := &cloudwatchlogs.CreateLogStreamInput{
  239. LogGroupName: aws.String(l.logGroupName),
  240. LogStreamName: aws.String(l.logStreamName),
  241. }
  242. _, err := l.client.CreateLogStream(input)
  243. if err != nil {
  244. if awsErr, ok := err.(awserr.Error); ok {
  245. fields := logrus.Fields{
  246. "errorCode": awsErr.Code(),
  247. "message": awsErr.Message(),
  248. "origError": awsErr.OrigErr(),
  249. "logGroupName": l.logGroupName,
  250. "logStreamName": l.logStreamName,
  251. }
  252. if awsErr.Code() == resourceAlreadyExistsCode {
  253. // Allow creation to succeed
  254. logrus.WithFields(fields).Info("Log stream already exists")
  255. return nil
  256. }
  257. logrus.WithFields(fields).Error("Failed to create log stream")
  258. }
  259. }
  260. return err
  261. }
  262. // newTicker is used for time-based batching. newTicker is a variable such
  263. // that the implementation can be swapped out for unit tests.
  264. var newTicker = func(freq time.Duration) *time.Ticker {
  265. return time.NewTicker(freq)
  266. }
  267. // collectBatch executes as a goroutine to perform batching of log events for
  268. // submission to the log stream. Batching is performed on time- and size-
  269. // bases. Time-based batching occurs at a 5 second interval (defined in the
  270. // batchPublishFrequency const). Size-based batching is performed on the
  271. // maximum number of events per batch (defined in maximumLogEventsPerPut) and
  272. // the maximum number of total bytes in a batch (defined in
  273. // maximumBytesPerPut). Log messages are split by the maximum bytes per event
  274. // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
  275. // (defined in perEventBytes) which is accounted for in split- and batch-
  276. // calculations.
  277. func (l *logStream) collectBatch() {
  278. timer := newTicker(batchPublishFrequency)
  279. var events []wrappedEvent
  280. bytes := 0
  281. for {
  282. select {
  283. case <-timer.C:
  284. l.publishBatch(events)
  285. events = events[:0]
  286. bytes = 0
  287. case msg, more := <-l.messages:
  288. if !more {
  289. l.publishBatch(events)
  290. return
  291. }
  292. unprocessedLine := msg.Line
  293. for len(unprocessedLine) > 0 {
  294. // Split line length so it does not exceed the maximum
  295. lineBytes := len(unprocessedLine)
  296. if lineBytes > maximumBytesPerEvent {
  297. lineBytes = maximumBytesPerEvent
  298. }
  299. line := unprocessedLine[:lineBytes]
  300. unprocessedLine = unprocessedLine[lineBytes:]
  301. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  302. // Publish an existing batch if it's already over the maximum number of events or if adding this
  303. // event would push it over the maximum number of total bytes.
  304. l.publishBatch(events)
  305. events = events[:0]
  306. bytes = 0
  307. }
  308. events = append(events, wrappedEvent{
  309. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  310. Message: aws.String(string(line)),
  311. Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
  312. },
  313. insertOrder: len(events),
  314. })
  315. bytes += (lineBytes + perEventBytes)
  316. }
  317. logger.PutMessage(msg)
  318. }
  319. }
  320. }
  321. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  322. // accounting for sequencing requirements (each request must reference the
  323. // sequence token returned by the previous request).
  324. func (l *logStream) publishBatch(events []wrappedEvent) {
  325. if len(events) == 0 {
  326. return
  327. }
  328. // events in a batch must be sorted by timestamp
  329. // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  330. sort.Sort(byTimestamp(events))
  331. cwEvents := unwrapEvents(events)
  332. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  333. if err != nil {
  334. if awsErr, ok := err.(awserr.Error); ok {
  335. if awsErr.Code() == dataAlreadyAcceptedCode {
  336. // already submitted, just grab the correct sequence token
  337. parts := strings.Split(awsErr.Message(), " ")
  338. nextSequenceToken = &parts[len(parts)-1]
  339. logrus.WithFields(logrus.Fields{
  340. "errorCode": awsErr.Code(),
  341. "message": awsErr.Message(),
  342. "logGroupName": l.logGroupName,
  343. "logStreamName": l.logStreamName,
  344. }).Info("Data already accepted, ignoring error")
  345. err = nil
  346. } else if awsErr.Code() == invalidSequenceTokenCode {
  347. // sequence code is bad, grab the correct one and retry
  348. parts := strings.Split(awsErr.Message(), " ")
  349. token := parts[len(parts)-1]
  350. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  351. }
  352. }
  353. }
  354. if err != nil {
  355. logrus.Error(err)
  356. } else {
  357. l.sequenceToken = nextSequenceToken
  358. }
  359. }
  360. // putLogEvents wraps the PutLogEvents API
  361. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  362. input := &cloudwatchlogs.PutLogEventsInput{
  363. LogEvents: events,
  364. SequenceToken: sequenceToken,
  365. LogGroupName: aws.String(l.logGroupName),
  366. LogStreamName: aws.String(l.logStreamName),
  367. }
  368. resp, err := l.client.PutLogEvents(input)
  369. if err != nil {
  370. if awsErr, ok := err.(awserr.Error); ok {
  371. logrus.WithFields(logrus.Fields{
  372. "errorCode": awsErr.Code(),
  373. "message": awsErr.Message(),
  374. "origError": awsErr.OrigErr(),
  375. "logGroupName": l.logGroupName,
  376. "logStreamName": l.logStreamName,
  377. }).Error("Failed to put log events")
  378. }
  379. return nil, err
  380. }
  381. return resp.NextSequenceToken, nil
  382. }
  383. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  384. // awslogs-group, awslogs-stream, awslogs-create-group
  385. func ValidateLogOpt(cfg map[string]string) error {
  386. for key := range cfg {
  387. switch key {
  388. case logGroupKey:
  389. case logStreamKey:
  390. case logCreateGroupKey:
  391. case regionKey:
  392. case tagKey:
  393. default:
  394. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  395. }
  396. }
  397. if cfg[logGroupKey] == "" {
  398. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  399. }
  400. if cfg[logCreateGroupKey] != "" {
  401. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  402. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  403. }
  404. }
  405. return nil
  406. }
  407. // Len returns the length of a byTimestamp slice. Len is required by the
  408. // sort.Interface interface.
  409. func (slice byTimestamp) Len() int {
  410. return len(slice)
  411. }
  412. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  413. // required by the sort.Interface interface.
  414. func (slice byTimestamp) Less(i, j int) bool {
  415. iTimestamp, jTimestamp := int64(0), int64(0)
  416. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  417. iTimestamp = *slice[i].inputLogEvent.Timestamp
  418. }
  419. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  420. jTimestamp = *slice[j].inputLogEvent.Timestamp
  421. }
  422. if iTimestamp == jTimestamp {
  423. return slice[i].insertOrder < slice[j].insertOrder
  424. }
  425. return iTimestamp < jTimestamp
  426. }
  427. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  428. // required by the sort.Interface interface.
  429. func (slice byTimestamp) Swap(i, j int) {
  430. slice[i], slice[j] = slice[j], slice[i]
  431. }
  432. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  433. cwEvents := []*cloudwatchlogs.InputLogEvent{}
  434. for _, input := range events {
  435. cwEvents = append(cwEvents, input.inputLogEvent)
  436. }
  437. return cwEvents
  438. }