cloudwatchlogs.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "errors"
  5. "fmt"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/defaults"
  16. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/version"
  21. )
  22. const (
  23. name = "awslogs"
  24. regionKey = "awslogs-region"
  25. regionEnvKey = "AWS_REGION"
  26. logGroupKey = "awslogs-group"
  27. logStreamKey = "awslogs-stream"
  28. batchPublishFrequency = 5 * time.Second
  29. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  30. perEventBytes = 26
  31. maximumBytesPerPut = 1048576
  32. maximumLogEventsPerPut = 10000
  33. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  34. maximumBytesPerEvent = 262144 - perEventBytes
  35. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  36. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  37. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  38. userAgentHeader = "User-Agent"
  39. )
  40. type logStream struct {
  41. logStreamName string
  42. logGroupName string
  43. client api
  44. messages chan *logger.Message
  45. lock sync.RWMutex
  46. closed bool
  47. sequenceToken *string
  48. }
  49. type api interface {
  50. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  51. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  52. }
  53. type regionFinder interface {
  54. Region() (string, error)
  55. }
  56. type byTimestamp []*cloudwatchlogs.InputLogEvent
  57. // init registers the awslogs driver and sets the default region, if provided
  58. func init() {
  59. if os.Getenv(regionEnvKey) != "" {
  60. defaults.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey))
  61. }
  62. if err := logger.RegisterLogDriver(name, New); err != nil {
  63. logrus.Fatal(err)
  64. }
  65. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  66. logrus.Fatal(err)
  67. }
  68. }
  69. // New creates an awslogs logger using the configuration passed in on the
  70. // context. Supported context configuration variables are awslogs-region,
  71. // awslogs-group, and awslogs-stream. When available, configuration is
  72. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  73. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  74. // the EC2 Instance Metadata Service.
  75. func New(ctx logger.Context) (logger.Logger, error) {
  76. logGroupName := ctx.Config[logGroupKey]
  77. logStreamName := ctx.ContainerID
  78. if ctx.Config[logStreamKey] != "" {
  79. logStreamName = ctx.Config[logStreamKey]
  80. }
  81. client, err := newAWSLogsClient(ctx)
  82. if err != nil {
  83. return nil, err
  84. }
  85. containerStream := &logStream{
  86. logStreamName: logStreamName,
  87. logGroupName: logGroupName,
  88. client: client,
  89. messages: make(chan *logger.Message, 4096),
  90. }
  91. err = containerStream.create()
  92. if err != nil {
  93. return nil, err
  94. }
  95. go containerStream.collectBatch()
  96. return containerStream, nil
  97. }
  98. // newRegionFinder is a variable such that the implementation
  99. // can be swapped out for unit tests.
  100. var newRegionFinder = func() regionFinder {
  101. return ec2metadata.New(nil)
  102. }
  103. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  104. // Customizations to the default client from the SDK include a Docker-specific
  105. // User-Agent string and automatic region detection using the EC2 Instance
  106. // Metadata Service when region is otherwise unspecified.
  107. func newAWSLogsClient(ctx logger.Context) (api, error) {
  108. config := defaults.DefaultConfig
  109. if ctx.Config[regionKey] != "" {
  110. config = defaults.DefaultConfig.Merge(&aws.Config{
  111. Region: aws.String(ctx.Config[regionKey]),
  112. })
  113. }
  114. if config.Region == nil || *config.Region == "" {
  115. logrus.Info("Trying to get region from EC2 Metadata")
  116. ec2MetadataClient := newRegionFinder()
  117. region, err := ec2MetadataClient.Region()
  118. if err != nil {
  119. logrus.WithFields(logrus.Fields{
  120. "error": err,
  121. }).Error("Could not get region from EC2 metadata, environment, or log option")
  122. return nil, errors.New("Cannot determine region for awslogs driver")
  123. }
  124. config.Region = &region
  125. }
  126. logrus.WithFields(logrus.Fields{
  127. "region": *config.Region,
  128. }).Debug("Created awslogs client")
  129. client := cloudwatchlogs.New(config)
  130. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  131. Name: "DockerUserAgentHandler",
  132. Fn: func(r *request.Request) {
  133. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  134. r.HTTPRequest.Header.Set(userAgentHeader,
  135. fmt.Sprintf("Docker %s (%s) %s",
  136. version.VERSION, runtime.GOOS, currentAgent))
  137. },
  138. })
  139. return client, nil
  140. }
  141. // Name returns the name of the awslogs logging driver
  142. func (l *logStream) Name() string {
  143. return name
  144. }
  145. // Log submits messages for logging by an instance of the awslogs logging driver
  146. func (l *logStream) Log(msg *logger.Message) error {
  147. l.lock.RLock()
  148. defer l.lock.RUnlock()
  149. if !l.closed {
  150. l.messages <- msg
  151. }
  152. return nil
  153. }
  154. // Close closes the instance of the awslogs logging driver
  155. func (l *logStream) Close() error {
  156. l.lock.Lock()
  157. defer l.lock.Unlock()
  158. if !l.closed {
  159. close(l.messages)
  160. }
  161. l.closed = true
  162. return nil
  163. }
  164. // create creates a log stream for the instance of the awslogs logging driver
  165. func (l *logStream) create() error {
  166. input := &cloudwatchlogs.CreateLogStreamInput{
  167. LogGroupName: aws.String(l.logGroupName),
  168. LogStreamName: aws.String(l.logStreamName),
  169. }
  170. _, err := l.client.CreateLogStream(input)
  171. if err != nil {
  172. if awsErr, ok := err.(awserr.Error); ok {
  173. fields := logrus.Fields{
  174. "errorCode": awsErr.Code(),
  175. "message": awsErr.Message(),
  176. "origError": awsErr.OrigErr(),
  177. "logGroupName": l.logGroupName,
  178. "logStreamName": l.logStreamName,
  179. }
  180. if awsErr.Code() == resourceAlreadyExistsCode {
  181. // Allow creation to succeed
  182. logrus.WithFields(fields).Info("Log stream already exists")
  183. return nil
  184. }
  185. logrus.WithFields(fields).Error("Failed to create log stream")
  186. }
  187. }
  188. return err
  189. }
  190. // newTicker is used for time-based batching. newTicker is a variable such
  191. // that the implementation can be swapped out for unit tests.
  192. var newTicker = func(freq time.Duration) *time.Ticker {
  193. return time.NewTicker(freq)
  194. }
  195. // collectBatch executes as a goroutine to perform batching of log events for
  196. // submission to the log stream. Batching is performed on time- and size-
  197. // bases. Time-based batching occurs at a 5 second interval (defined in the
  198. // batchPublishFrequency const). Size-based batching is performed on the
  199. // maximum number of events per batch (defined in maximumLogEventsPerPut) and
  200. // the maximum number of total bytes in a batch (defined in
  201. // maximumBytesPerPut). Log messages are split by the maximum bytes per event
  202. // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
  203. // (defined in perEventBytes) which is accounted for in split- and batch-
  204. // calculations.
  205. func (l *logStream) collectBatch() {
  206. timer := newTicker(batchPublishFrequency)
  207. var events []*cloudwatchlogs.InputLogEvent
  208. bytes := 0
  209. for {
  210. select {
  211. case <-timer.C:
  212. l.publishBatch(events)
  213. events = events[:0]
  214. bytes = 0
  215. case msg, more := <-l.messages:
  216. if !more {
  217. l.publishBatch(events)
  218. return
  219. }
  220. unprocessedLine := msg.Line
  221. for len(unprocessedLine) > 0 {
  222. // Split line length so it does not exceed the maximum
  223. lineBytes := len(unprocessedLine)
  224. if lineBytes > maximumBytesPerEvent {
  225. lineBytes = maximumBytesPerEvent
  226. }
  227. line := unprocessedLine[:lineBytes]
  228. unprocessedLine = unprocessedLine[lineBytes:]
  229. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  230. // Publish an existing batch if it's already over the maximum number of events or if adding this
  231. // event would push it over the maximum number of total bytes.
  232. l.publishBatch(events)
  233. events = events[:0]
  234. bytes = 0
  235. }
  236. events = append(events, &cloudwatchlogs.InputLogEvent{
  237. Message: aws.String(string(line)),
  238. Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
  239. })
  240. bytes += (lineBytes + perEventBytes)
  241. }
  242. }
  243. }
  244. }
  245. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  246. // accounting for sequencing requirements (each request must reference the
  247. // sequence token returned by the previous request).
  248. func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
  249. if len(events) == 0 {
  250. return
  251. }
  252. sort.Sort(byTimestamp(events))
  253. nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
  254. if err != nil {
  255. if awsErr, ok := err.(awserr.Error); ok {
  256. if awsErr.Code() == dataAlreadyAcceptedCode {
  257. // already submitted, just grab the correct sequence token
  258. parts := strings.Split(awsErr.Message(), " ")
  259. nextSequenceToken = &parts[len(parts)-1]
  260. logrus.WithFields(logrus.Fields{
  261. "errorCode": awsErr.Code(),
  262. "message": awsErr.Message(),
  263. "logGroupName": l.logGroupName,
  264. "logStreamName": l.logStreamName,
  265. }).Info("Data already accepted, ignoring error")
  266. err = nil
  267. } else if awsErr.Code() == invalidSequenceTokenCode {
  268. // sequence code is bad, grab the correct one and retry
  269. parts := strings.Split(awsErr.Message(), " ")
  270. token := parts[len(parts)-1]
  271. nextSequenceToken, err = l.putLogEvents(events, &token)
  272. }
  273. }
  274. }
  275. if err != nil {
  276. logrus.Error(err)
  277. } else {
  278. l.sequenceToken = nextSequenceToken
  279. }
  280. }
  281. // putLogEvents wraps the PutLogEvents API
  282. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  283. input := &cloudwatchlogs.PutLogEventsInput{
  284. LogEvents: events,
  285. SequenceToken: sequenceToken,
  286. LogGroupName: aws.String(l.logGroupName),
  287. LogStreamName: aws.String(l.logStreamName),
  288. }
  289. resp, err := l.client.PutLogEvents(input)
  290. if err != nil {
  291. if awsErr, ok := err.(awserr.Error); ok {
  292. logrus.WithFields(logrus.Fields{
  293. "errorCode": awsErr.Code(),
  294. "message": awsErr.Message(),
  295. "origError": awsErr.OrigErr(),
  296. "logGroupName": l.logGroupName,
  297. "logStreamName": l.logStreamName,
  298. }).Error("Failed to put log events")
  299. }
  300. return nil, err
  301. }
  302. return resp.NextSequenceToken, nil
  303. }
  304. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  305. // awslogs-group, and awslogs-stream
  306. func ValidateLogOpt(cfg map[string]string) error {
  307. for key := range cfg {
  308. switch key {
  309. case logGroupKey:
  310. case logStreamKey:
  311. case regionKey:
  312. default:
  313. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  314. }
  315. }
  316. if cfg[logGroupKey] == "" {
  317. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  318. }
  319. return nil
  320. }
  321. // Len returns the length of a byTimestamp slice. Len is required by the
  322. // sort.Interface interface.
  323. func (slice byTimestamp) Len() int {
  324. return len(slice)
  325. }
  326. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  327. // required by the sort.Interface interface.
  328. func (slice byTimestamp) Less(i, j int) bool {
  329. iTimestamp, jTimestamp := int64(0), int64(0)
  330. if slice != nil && slice[i].Timestamp != nil {
  331. iTimestamp = *slice[i].Timestamp
  332. }
  333. if slice != nil && slice[j].Timestamp != nil {
  334. jTimestamp = *slice[j].Timestamp
  335. }
  336. return iTimestamp < jTimestamp
  337. }
  338. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  339. // required by the sort.Interface interface.
  340. func (slice byTimestamp) Swap(i, j int) {
  341. slice[i], slice[j] = slice[j], slice[i]
  342. }