cloudwatchlogs.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "errors"
  5. "fmt"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  16. "github.com/aws/aws-sdk-go/aws/request"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/dockerversion"
  21. )
  22. const (
  23. name = "awslogs"
  24. regionKey = "awslogs-region"
  25. regionEnvKey = "AWS_REGION"
  26. logGroupKey = "awslogs-group"
  27. logStreamKey = "awslogs-stream"
  28. batchPublishFrequency = 5 * time.Second
  29. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  30. perEventBytes = 26
  31. maximumBytesPerPut = 1048576
  32. maximumLogEventsPerPut = 10000
  33. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  34. maximumBytesPerEvent = 262144 - perEventBytes
  35. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  36. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  37. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  38. userAgentHeader = "User-Agent"
  39. )
  40. type logStream struct {
  41. logStreamName string
  42. logGroupName string
  43. client api
  44. messages chan *logger.Message
  45. lock sync.RWMutex
  46. closed bool
  47. sequenceToken *string
  48. }
  49. type api interface {
  50. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  51. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  52. }
  53. type regionFinder interface {
  54. Region() (string, error)
  55. }
  56. type byTimestamp []*cloudwatchlogs.InputLogEvent
  57. // init registers the awslogs driver
  58. func init() {
  59. if err := logger.RegisterLogDriver(name, New); err != nil {
  60. logrus.Fatal(err)
  61. }
  62. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  63. logrus.Fatal(err)
  64. }
  65. }
  66. // New creates an awslogs logger using the configuration passed in on the
  67. // context. Supported context configuration variables are awslogs-region,
  68. // awslogs-group, and awslogs-stream. When available, configuration is
  69. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  70. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  71. // the EC2 Instance Metadata Service.
  72. func New(ctx logger.Context) (logger.Logger, error) {
  73. logGroupName := ctx.Config[logGroupKey]
  74. logStreamName := ctx.ContainerID
  75. if ctx.Config[logStreamKey] != "" {
  76. logStreamName = ctx.Config[logStreamKey]
  77. }
  78. client, err := newAWSLogsClient(ctx)
  79. if err != nil {
  80. return nil, err
  81. }
  82. containerStream := &logStream{
  83. logStreamName: logStreamName,
  84. logGroupName: logGroupName,
  85. client: client,
  86. messages: make(chan *logger.Message, 4096),
  87. }
  88. err = containerStream.create()
  89. if err != nil {
  90. return nil, err
  91. }
  92. go containerStream.collectBatch()
  93. return containerStream, nil
  94. }
  95. // newRegionFinder is a variable such that the implementation
  96. // can be swapped out for unit tests.
  97. var newRegionFinder = func() regionFinder {
  98. return ec2metadata.New(session.New())
  99. }
  100. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  101. // Customizations to the default client from the SDK include a Docker-specific
  102. // User-Agent string and automatic region detection using the EC2 Instance
  103. // Metadata Service when region is otherwise unspecified.
  104. func newAWSLogsClient(ctx logger.Context) (api, error) {
  105. var region *string
  106. if os.Getenv(regionEnvKey) != "" {
  107. region = aws.String(os.Getenv(regionEnvKey))
  108. }
  109. if ctx.Config[regionKey] != "" {
  110. region = aws.String(ctx.Config[regionKey])
  111. }
  112. if region == nil || *region == "" {
  113. logrus.Info("Trying to get region from EC2 Metadata")
  114. ec2MetadataClient := newRegionFinder()
  115. r, err := ec2MetadataClient.Region()
  116. if err != nil {
  117. logrus.WithFields(logrus.Fields{
  118. "error": err,
  119. }).Error("Could not get region from EC2 metadata, environment, or log option")
  120. return nil, errors.New("Cannot determine region for awslogs driver")
  121. }
  122. region = &r
  123. }
  124. logrus.WithFields(logrus.Fields{
  125. "region": *region,
  126. }).Debug("Created awslogs client")
  127. client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
  128. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  129. Name: "DockerUserAgentHandler",
  130. Fn: func(r *request.Request) {
  131. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  132. r.HTTPRequest.Header.Set(userAgentHeader,
  133. fmt.Sprintf("Docker %s (%s) %s",
  134. dockerversion.Version, runtime.GOOS, currentAgent))
  135. },
  136. })
  137. return client, nil
  138. }
  139. // Name returns the name of the awslogs logging driver
  140. func (l *logStream) Name() string {
  141. return name
  142. }
  143. // Log submits messages for logging by an instance of the awslogs logging driver
  144. func (l *logStream) Log(msg *logger.Message) error {
  145. l.lock.RLock()
  146. defer l.lock.RUnlock()
  147. if !l.closed {
  148. // buffer up the data, making sure to copy the Line data
  149. l.messages <- logger.CopyMessage(msg)
  150. }
  151. return nil
  152. }
  153. // Close closes the instance of the awslogs logging driver
  154. func (l *logStream) Close() error {
  155. l.lock.Lock()
  156. defer l.lock.Unlock()
  157. if !l.closed {
  158. close(l.messages)
  159. }
  160. l.closed = true
  161. return nil
  162. }
  163. // create creates a log stream for the instance of the awslogs logging driver
  164. func (l *logStream) create() error {
  165. input := &cloudwatchlogs.CreateLogStreamInput{
  166. LogGroupName: aws.String(l.logGroupName),
  167. LogStreamName: aws.String(l.logStreamName),
  168. }
  169. _, err := l.client.CreateLogStream(input)
  170. if err != nil {
  171. if awsErr, ok := err.(awserr.Error); ok {
  172. fields := logrus.Fields{
  173. "errorCode": awsErr.Code(),
  174. "message": awsErr.Message(),
  175. "origError": awsErr.OrigErr(),
  176. "logGroupName": l.logGroupName,
  177. "logStreamName": l.logStreamName,
  178. }
  179. if awsErr.Code() == resourceAlreadyExistsCode {
  180. // Allow creation to succeed
  181. logrus.WithFields(fields).Info("Log stream already exists")
  182. return nil
  183. }
  184. logrus.WithFields(fields).Error("Failed to create log stream")
  185. }
  186. }
  187. return err
  188. }
  189. // newTicker is used for time-based batching. newTicker is a variable such
  190. // that the implementation can be swapped out for unit tests.
  191. var newTicker = func(freq time.Duration) *time.Ticker {
  192. return time.NewTicker(freq)
  193. }
  194. // collectBatch executes as a goroutine to perform batching of log events for
  195. // submission to the log stream. Batching is performed on time- and size-
  196. // bases. Time-based batching occurs at a 5 second interval (defined in the
  197. // batchPublishFrequency const). Size-based batching is performed on the
  198. // maximum number of events per batch (defined in maximumLogEventsPerPut) and
  199. // the maximum number of total bytes in a batch (defined in
  200. // maximumBytesPerPut). Log messages are split by the maximum bytes per event
  201. // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
  202. // (defined in perEventBytes) which is accounted for in split- and batch-
  203. // calculations.
  204. func (l *logStream) collectBatch() {
  205. timer := newTicker(batchPublishFrequency)
  206. var events []*cloudwatchlogs.InputLogEvent
  207. bytes := 0
  208. for {
  209. select {
  210. case <-timer.C:
  211. l.publishBatch(events)
  212. events = events[:0]
  213. bytes = 0
  214. case msg, more := <-l.messages:
  215. if !more {
  216. l.publishBatch(events)
  217. return
  218. }
  219. unprocessedLine := msg.Line
  220. for len(unprocessedLine) > 0 {
  221. // Split line length so it does not exceed the maximum
  222. lineBytes := len(unprocessedLine)
  223. if lineBytes > maximumBytesPerEvent {
  224. lineBytes = maximumBytesPerEvent
  225. }
  226. line := unprocessedLine[:lineBytes]
  227. unprocessedLine = unprocessedLine[lineBytes:]
  228. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  229. // Publish an existing batch if it's already over the maximum number of events or if adding this
  230. // event would push it over the maximum number of total bytes.
  231. l.publishBatch(events)
  232. events = events[:0]
  233. bytes = 0
  234. }
  235. events = append(events, &cloudwatchlogs.InputLogEvent{
  236. Message: aws.String(string(line)),
  237. Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
  238. })
  239. bytes += (lineBytes + perEventBytes)
  240. }
  241. }
  242. }
  243. }
  244. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  245. // accounting for sequencing requirements (each request must reference the
  246. // sequence token returned by the previous request).
  247. func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
  248. if len(events) == 0 {
  249. return
  250. }
  251. sort.Sort(byTimestamp(events))
  252. nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
  253. if err != nil {
  254. if awsErr, ok := err.(awserr.Error); ok {
  255. if awsErr.Code() == dataAlreadyAcceptedCode {
  256. // already submitted, just grab the correct sequence token
  257. parts := strings.Split(awsErr.Message(), " ")
  258. nextSequenceToken = &parts[len(parts)-1]
  259. logrus.WithFields(logrus.Fields{
  260. "errorCode": awsErr.Code(),
  261. "message": awsErr.Message(),
  262. "logGroupName": l.logGroupName,
  263. "logStreamName": l.logStreamName,
  264. }).Info("Data already accepted, ignoring error")
  265. err = nil
  266. } else if awsErr.Code() == invalidSequenceTokenCode {
  267. // sequence code is bad, grab the correct one and retry
  268. parts := strings.Split(awsErr.Message(), " ")
  269. token := parts[len(parts)-1]
  270. nextSequenceToken, err = l.putLogEvents(events, &token)
  271. }
  272. }
  273. }
  274. if err != nil {
  275. logrus.Error(err)
  276. } else {
  277. l.sequenceToken = nextSequenceToken
  278. }
  279. }
  280. // putLogEvents wraps the PutLogEvents API
  281. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  282. input := &cloudwatchlogs.PutLogEventsInput{
  283. LogEvents: events,
  284. SequenceToken: sequenceToken,
  285. LogGroupName: aws.String(l.logGroupName),
  286. LogStreamName: aws.String(l.logStreamName),
  287. }
  288. resp, err := l.client.PutLogEvents(input)
  289. if err != nil {
  290. if awsErr, ok := err.(awserr.Error); ok {
  291. logrus.WithFields(logrus.Fields{
  292. "errorCode": awsErr.Code(),
  293. "message": awsErr.Message(),
  294. "origError": awsErr.OrigErr(),
  295. "logGroupName": l.logGroupName,
  296. "logStreamName": l.logStreamName,
  297. }).Error("Failed to put log events")
  298. }
  299. return nil, err
  300. }
  301. return resp.NextSequenceToken, nil
  302. }
  303. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  304. // awslogs-group, and awslogs-stream
  305. func ValidateLogOpt(cfg map[string]string) error {
  306. for key := range cfg {
  307. switch key {
  308. case logGroupKey:
  309. case logStreamKey:
  310. case regionKey:
  311. default:
  312. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  313. }
  314. }
  315. if cfg[logGroupKey] == "" {
  316. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  317. }
  318. return nil
  319. }
  320. // Len returns the length of a byTimestamp slice. Len is required by the
  321. // sort.Interface interface.
  322. func (slice byTimestamp) Len() int {
  323. return len(slice)
  324. }
  325. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  326. // required by the sort.Interface interface.
  327. func (slice byTimestamp) Less(i, j int) bool {
  328. iTimestamp, jTimestamp := int64(0), int64(0)
  329. if slice != nil && slice[i].Timestamp != nil {
  330. iTimestamp = *slice[i].Timestamp
  331. }
  332. if slice != nil && slice[j].Timestamp != nil {
  333. jTimestamp = *slice[j].Timestamp
  334. }
  335. return iTimestamp < jTimestamp
  336. }
  337. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  338. // required by the sort.Interface interface.
  339. func (slice byTimestamp) Swap(i, j int) {
  340. slice[i], slice[j] = slice[j], slice[i]
  341. }