cloudwatchlogs.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "errors"
  5. "fmt"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  16. "github.com/aws/aws-sdk-go/aws/request"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/daemon/logger/loggerutils"
  21. "github.com/docker/docker/dockerversion"
  22. )
  23. const (
  24. name = "awslogs"
  25. regionKey = "awslogs-region"
  26. regionEnvKey = "AWS_REGION"
  27. logGroupKey = "awslogs-group"
  28. logStreamKey = "awslogs-stream"
  29. tagKey = "tag"
  30. batchPublishFrequency = 5 * time.Second
  31. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  32. perEventBytes = 26
  33. maximumBytesPerPut = 1048576
  34. maximumLogEventsPerPut = 10000
  35. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  36. maximumBytesPerEvent = 262144 - perEventBytes
  37. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  38. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  39. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  40. userAgentHeader = "User-Agent"
  41. )
  42. type logStream struct {
  43. logStreamName string
  44. logGroupName string
  45. client api
  46. messages chan *logger.Message
  47. lock sync.RWMutex
  48. closed bool
  49. sequenceToken *string
  50. }
  51. type api interface {
  52. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  53. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  54. }
  55. type regionFinder interface {
  56. Region() (string, error)
  57. }
  58. type wrappedEvent struct {
  59. inputLogEvent *cloudwatchlogs.InputLogEvent
  60. insertOrder int
  61. }
  62. type byTimestamp []wrappedEvent
  63. // init registers the awslogs driver
  64. func init() {
  65. if err := logger.RegisterLogDriver(name, New); err != nil {
  66. logrus.Fatal(err)
  67. }
  68. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  69. logrus.Fatal(err)
  70. }
  71. }
  72. // New creates an awslogs logger using the configuration passed in on the
  73. // context. Supported context configuration variables are awslogs-region,
  74. // awslogs-group, and awslogs-stream. When available, configuration is
  75. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  76. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  77. // the EC2 Instance Metadata Service.
  78. func New(ctx logger.Context) (logger.Logger, error) {
  79. logGroupName := ctx.Config[logGroupKey]
  80. logStreamName, err := loggerutils.ParseLogTag(ctx, "{{.FullID}}")
  81. if err != nil {
  82. return nil, err
  83. }
  84. if ctx.Config[logStreamKey] != "" {
  85. logStreamName = ctx.Config[logStreamKey]
  86. }
  87. client, err := newAWSLogsClient(ctx)
  88. if err != nil {
  89. return nil, err
  90. }
  91. containerStream := &logStream{
  92. logStreamName: logStreamName,
  93. logGroupName: logGroupName,
  94. client: client,
  95. messages: make(chan *logger.Message, 4096),
  96. }
  97. err = containerStream.create()
  98. if err != nil {
  99. return nil, err
  100. }
  101. go containerStream.collectBatch()
  102. return containerStream, nil
  103. }
  104. // newRegionFinder is a variable such that the implementation
  105. // can be swapped out for unit tests.
  106. var newRegionFinder = func() regionFinder {
  107. return ec2metadata.New(session.New())
  108. }
  109. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  110. // Customizations to the default client from the SDK include a Docker-specific
  111. // User-Agent string and automatic region detection using the EC2 Instance
  112. // Metadata Service when region is otherwise unspecified.
  113. func newAWSLogsClient(ctx logger.Context) (api, error) {
  114. var region *string
  115. if os.Getenv(regionEnvKey) != "" {
  116. region = aws.String(os.Getenv(regionEnvKey))
  117. }
  118. if ctx.Config[regionKey] != "" {
  119. region = aws.String(ctx.Config[regionKey])
  120. }
  121. if region == nil || *region == "" {
  122. logrus.Info("Trying to get region from EC2 Metadata")
  123. ec2MetadataClient := newRegionFinder()
  124. r, err := ec2MetadataClient.Region()
  125. if err != nil {
  126. logrus.WithFields(logrus.Fields{
  127. "error": err,
  128. }).Error("Could not get region from EC2 metadata, environment, or log option")
  129. return nil, errors.New("Cannot determine region for awslogs driver")
  130. }
  131. region = &r
  132. }
  133. logrus.WithFields(logrus.Fields{
  134. "region": *region,
  135. }).Debug("Created awslogs client")
  136. client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
  137. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  138. Name: "DockerUserAgentHandler",
  139. Fn: func(r *request.Request) {
  140. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  141. r.HTTPRequest.Header.Set(userAgentHeader,
  142. fmt.Sprintf("Docker %s (%s) %s",
  143. dockerversion.Version, runtime.GOOS, currentAgent))
  144. },
  145. })
  146. return client, nil
  147. }
  148. // Name returns the name of the awslogs logging driver
  149. func (l *logStream) Name() string {
  150. return name
  151. }
  152. // Log submits messages for logging by an instance of the awslogs logging driver
  153. func (l *logStream) Log(msg *logger.Message) error {
  154. l.lock.RLock()
  155. defer l.lock.RUnlock()
  156. if !l.closed {
  157. // buffer up the data, making sure to copy the Line data
  158. l.messages <- logger.CopyMessage(msg)
  159. }
  160. return nil
  161. }
  162. // Close closes the instance of the awslogs logging driver
  163. func (l *logStream) Close() error {
  164. l.lock.Lock()
  165. defer l.lock.Unlock()
  166. if !l.closed {
  167. close(l.messages)
  168. }
  169. l.closed = true
  170. return nil
  171. }
  172. // create creates a log stream for the instance of the awslogs logging driver
  173. func (l *logStream) create() error {
  174. input := &cloudwatchlogs.CreateLogStreamInput{
  175. LogGroupName: aws.String(l.logGroupName),
  176. LogStreamName: aws.String(l.logStreamName),
  177. }
  178. _, err := l.client.CreateLogStream(input)
  179. if err != nil {
  180. if awsErr, ok := err.(awserr.Error); ok {
  181. fields := logrus.Fields{
  182. "errorCode": awsErr.Code(),
  183. "message": awsErr.Message(),
  184. "origError": awsErr.OrigErr(),
  185. "logGroupName": l.logGroupName,
  186. "logStreamName": l.logStreamName,
  187. }
  188. if awsErr.Code() == resourceAlreadyExistsCode {
  189. // Allow creation to succeed
  190. logrus.WithFields(fields).Info("Log stream already exists")
  191. return nil
  192. }
  193. logrus.WithFields(fields).Error("Failed to create log stream")
  194. }
  195. }
  196. return err
  197. }
  198. // newTicker is used for time-based batching. newTicker is a variable such
  199. // that the implementation can be swapped out for unit tests.
  200. var newTicker = func(freq time.Duration) *time.Ticker {
  201. return time.NewTicker(freq)
  202. }
  203. // collectBatch executes as a goroutine to perform batching of log events for
  204. // submission to the log stream. Batching is performed on time- and size-
  205. // bases. Time-based batching occurs at a 5 second interval (defined in the
  206. // batchPublishFrequency const). Size-based batching is performed on the
  207. // maximum number of events per batch (defined in maximumLogEventsPerPut) and
  208. // the maximum number of total bytes in a batch (defined in
  209. // maximumBytesPerPut). Log messages are split by the maximum bytes per event
  210. // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
  211. // (defined in perEventBytes) which is accounted for in split- and batch-
  212. // calculations.
  213. func (l *logStream) collectBatch() {
  214. timer := newTicker(batchPublishFrequency)
  215. var events []wrappedEvent
  216. bytes := 0
  217. for {
  218. select {
  219. case <-timer.C:
  220. l.publishBatch(events)
  221. events = events[:0]
  222. bytes = 0
  223. case msg, more := <-l.messages:
  224. if !more {
  225. l.publishBatch(events)
  226. return
  227. }
  228. unprocessedLine := msg.Line
  229. for len(unprocessedLine) > 0 {
  230. // Split line length so it does not exceed the maximum
  231. lineBytes := len(unprocessedLine)
  232. if lineBytes > maximumBytesPerEvent {
  233. lineBytes = maximumBytesPerEvent
  234. }
  235. line := unprocessedLine[:lineBytes]
  236. unprocessedLine = unprocessedLine[lineBytes:]
  237. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  238. // Publish an existing batch if it's already over the maximum number of events or if adding this
  239. // event would push it over the maximum number of total bytes.
  240. l.publishBatch(events)
  241. events = events[:0]
  242. bytes = 0
  243. }
  244. events = append(events, wrappedEvent{
  245. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  246. Message: aws.String(string(line)),
  247. Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
  248. },
  249. insertOrder: len(events),
  250. })
  251. bytes += (lineBytes + perEventBytes)
  252. }
  253. }
  254. }
  255. }
  256. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  257. // accounting for sequencing requirements (each request must reference the
  258. // sequence token returned by the previous request).
  259. func (l *logStream) publishBatch(events []wrappedEvent) {
  260. if len(events) == 0 {
  261. return
  262. }
  263. // events in a batch must be sorted by timestamp
  264. // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  265. sort.Sort(byTimestamp(events))
  266. cwEvents := unwrapEvents(events)
  267. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  268. if err != nil {
  269. if awsErr, ok := err.(awserr.Error); ok {
  270. if awsErr.Code() == dataAlreadyAcceptedCode {
  271. // already submitted, just grab the correct sequence token
  272. parts := strings.Split(awsErr.Message(), " ")
  273. nextSequenceToken = &parts[len(parts)-1]
  274. logrus.WithFields(logrus.Fields{
  275. "errorCode": awsErr.Code(),
  276. "message": awsErr.Message(),
  277. "logGroupName": l.logGroupName,
  278. "logStreamName": l.logStreamName,
  279. }).Info("Data already accepted, ignoring error")
  280. err = nil
  281. } else if awsErr.Code() == invalidSequenceTokenCode {
  282. // sequence code is bad, grab the correct one and retry
  283. parts := strings.Split(awsErr.Message(), " ")
  284. token := parts[len(parts)-1]
  285. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  286. }
  287. }
  288. }
  289. if err != nil {
  290. logrus.Error(err)
  291. } else {
  292. l.sequenceToken = nextSequenceToken
  293. }
  294. }
  295. // putLogEvents wraps the PutLogEvents API
  296. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  297. input := &cloudwatchlogs.PutLogEventsInput{
  298. LogEvents: events,
  299. SequenceToken: sequenceToken,
  300. LogGroupName: aws.String(l.logGroupName),
  301. LogStreamName: aws.String(l.logStreamName),
  302. }
  303. resp, err := l.client.PutLogEvents(input)
  304. if err != nil {
  305. if awsErr, ok := err.(awserr.Error); ok {
  306. logrus.WithFields(logrus.Fields{
  307. "errorCode": awsErr.Code(),
  308. "message": awsErr.Message(),
  309. "origError": awsErr.OrigErr(),
  310. "logGroupName": l.logGroupName,
  311. "logStreamName": l.logStreamName,
  312. }).Error("Failed to put log events")
  313. }
  314. return nil, err
  315. }
  316. return resp.NextSequenceToken, nil
  317. }
  318. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  319. // awslogs-group, and awslogs-stream
  320. func ValidateLogOpt(cfg map[string]string) error {
  321. for key := range cfg {
  322. switch key {
  323. case logGroupKey:
  324. case logStreamKey:
  325. case regionKey:
  326. case tagKey:
  327. default:
  328. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  329. }
  330. }
  331. if cfg[logGroupKey] == "" {
  332. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  333. }
  334. return nil
  335. }
  336. // Len returns the length of a byTimestamp slice. Len is required by the
  337. // sort.Interface interface.
  338. func (slice byTimestamp) Len() int {
  339. return len(slice)
  340. }
  341. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  342. // required by the sort.Interface interface.
  343. func (slice byTimestamp) Less(i, j int) bool {
  344. iTimestamp, jTimestamp := int64(0), int64(0)
  345. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  346. iTimestamp = *slice[i].inputLogEvent.Timestamp
  347. }
  348. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  349. jTimestamp = *slice[j].inputLogEvent.Timestamp
  350. }
  351. if iTimestamp == jTimestamp {
  352. return slice[i].insertOrder < slice[j].insertOrder
  353. }
  354. return iTimestamp < jTimestamp
  355. }
  356. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  357. // required by the sort.Interface interface.
  358. func (slice byTimestamp) Swap(i, j int) {
  359. slice[i], slice[j] = slice[j], slice[i]
  360. }
  361. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  362. cwEvents := []*cloudwatchlogs.InputLogEvent{}
  363. for _, input := range events {
  364. cwEvents = append(cwEvents, input.inputLogEvent)
  365. }
  366. return cwEvents
  367. }