jsonfilelog.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // Package jsonfilelog provides the default Logger implementation for
  2. // Docker logging. This logger logs to files on the host server in the
  3. // JSON format.
  4. package jsonfilelog
  5. import (
  6. "bytes"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "os"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "gopkg.in/fsnotify.v1"
  15. "github.com/Sirupsen/logrus"
  16. "github.com/docker/docker/daemon/logger"
  17. "github.com/docker/docker/pkg/ioutils"
  18. "github.com/docker/docker/pkg/jsonlog"
  19. "github.com/docker/docker/pkg/pubsub"
  20. "github.com/docker/docker/pkg/tailfile"
  21. "github.com/docker/docker/pkg/timeutils"
  22. "github.com/docker/docker/pkg/units"
  23. )
  24. const (
  25. // Name is the name of the file that the jsonlogger logs to.
  26. Name = "json-file"
  27. maxJSONDecodeRetry = 10
  28. )
  29. // JSONFileLogger is Logger implementation for default Docker logging.
  30. type JSONFileLogger struct {
  31. buf *bytes.Buffer
  32. f *os.File // store for closing
  33. mu sync.Mutex // protects buffer
  34. capacity int64 //maximum size of each file
  35. n int //maximum number of files
  36. ctx logger.Context
  37. readers map[*logger.LogWatcher]struct{} // stores the active log followers
  38. notifyRotate *pubsub.Publisher
  39. extra []byte // json-encoded extra attributes
  40. }
  41. func init() {
  42. if err := logger.RegisterLogDriver(Name, New); err != nil {
  43. logrus.Fatal(err)
  44. }
  45. if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
  46. logrus.Fatal(err)
  47. }
  48. }
  49. // New creates new JSONFileLogger which writes to filename passed in
  50. // on given context.
  51. func New(ctx logger.Context) (logger.Logger, error) {
  52. log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  53. if err != nil {
  54. return nil, err
  55. }
  56. var capval int64 = -1
  57. if capacity, ok := ctx.Config["max-size"]; ok {
  58. var err error
  59. capval, err = units.FromHumanSize(capacity)
  60. if err != nil {
  61. return nil, err
  62. }
  63. }
  64. var maxFiles = 1
  65. if maxFileString, ok := ctx.Config["max-file"]; ok {
  66. maxFiles, err = strconv.Atoi(maxFileString)
  67. if err != nil {
  68. return nil, err
  69. }
  70. if maxFiles < 1 {
  71. return nil, fmt.Errorf("max-file cannot be less than 1")
  72. }
  73. }
  74. var extra []byte
  75. if attrs := ctx.ExtraAttributes(nil); len(attrs) > 0 {
  76. var err error
  77. extra, err = json.Marshal(attrs)
  78. if err != nil {
  79. return nil, err
  80. }
  81. }
  82. return &JSONFileLogger{
  83. f: log,
  84. buf: bytes.NewBuffer(nil),
  85. ctx: ctx,
  86. capacity: capval,
  87. n: maxFiles,
  88. readers: make(map[*logger.LogWatcher]struct{}),
  89. notifyRotate: pubsub.NewPublisher(0, 1),
  90. extra: extra,
  91. }, nil
  92. }
  93. // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
  94. func (l *JSONFileLogger) Log(msg *logger.Message) error {
  95. l.mu.Lock()
  96. defer l.mu.Unlock()
  97. timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
  98. if err != nil {
  99. return err
  100. }
  101. err = (&jsonlog.JSONLogs{
  102. Log: append(msg.Line, '\n'),
  103. Stream: msg.Source,
  104. Created: timestamp,
  105. RawAttrs: l.extra,
  106. }).MarshalJSONBuf(l.buf)
  107. if err != nil {
  108. return err
  109. }
  110. l.buf.WriteByte('\n')
  111. _, err = writeLog(l)
  112. return err
  113. }
  114. func writeLog(l *JSONFileLogger) (int64, error) {
  115. if l.capacity == -1 {
  116. return writeToBuf(l)
  117. }
  118. meta, err := l.f.Stat()
  119. if err != nil {
  120. return -1, err
  121. }
  122. if meta.Size() >= l.capacity {
  123. name := l.f.Name()
  124. if err := l.f.Close(); err != nil {
  125. return -1, err
  126. }
  127. if err := rotate(name, l.n); err != nil {
  128. return -1, err
  129. }
  130. file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666)
  131. if err != nil {
  132. return -1, err
  133. }
  134. l.f = file
  135. l.notifyRotate.Publish(struct{}{})
  136. }
  137. return writeToBuf(l)
  138. }
  139. func writeToBuf(l *JSONFileLogger) (int64, error) {
  140. i, err := l.buf.WriteTo(l.f)
  141. if err != nil {
  142. l.buf = bytes.NewBuffer(nil)
  143. }
  144. return i, err
  145. }
  146. func rotate(name string, n int) error {
  147. if n < 2 {
  148. return nil
  149. }
  150. for i := n - 1; i > 1; i-- {
  151. oldFile := name + "." + strconv.Itoa(i)
  152. replacingFile := name + "." + strconv.Itoa(i-1)
  153. if err := backup(oldFile, replacingFile); err != nil {
  154. return err
  155. }
  156. }
  157. if err := backup(name+".1", name); err != nil {
  158. return err
  159. }
  160. return nil
  161. }
  162. // backup renames a file from curr to old, creating an empty file curr if it does not exist.
  163. func backup(old, curr string) error {
  164. if _, err := os.Stat(old); !os.IsNotExist(err) {
  165. err := os.Remove(old)
  166. if err != nil {
  167. return err
  168. }
  169. }
  170. if _, err := os.Stat(curr); os.IsNotExist(err) {
  171. f, err := os.Create(curr)
  172. if err != nil {
  173. return err
  174. }
  175. f.Close()
  176. }
  177. return os.Rename(curr, old)
  178. }
  179. // ValidateLogOpt looks for json specific log options max-file & max-size.
  180. func ValidateLogOpt(cfg map[string]string) error {
  181. for key := range cfg {
  182. switch key {
  183. case "max-file":
  184. case "max-size":
  185. case "labels":
  186. case "env":
  187. default:
  188. return fmt.Errorf("unknown log opt '%s' for json-file log driver", key)
  189. }
  190. }
  191. return nil
  192. }
  193. // LogPath returns the location the given json logger logs to.
  194. func (l *JSONFileLogger) LogPath() string {
  195. return l.ctx.LogPath
  196. }
  197. // Close closes underlying file and signals all readers to stop.
  198. func (l *JSONFileLogger) Close() error {
  199. l.mu.Lock()
  200. err := l.f.Close()
  201. for r := range l.readers {
  202. r.Close()
  203. delete(l.readers, r)
  204. }
  205. l.mu.Unlock()
  206. return err
  207. }
  208. // Name returns name of this logger.
  209. func (l *JSONFileLogger) Name() string {
  210. return Name
  211. }
  212. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  213. l.Reset()
  214. if err := dec.Decode(l); err != nil {
  215. return nil, err
  216. }
  217. msg := &logger.Message{
  218. Source: l.Stream,
  219. Timestamp: l.Created,
  220. Line: []byte(l.Log),
  221. }
  222. return msg, nil
  223. }
  224. // ReadLogs implements the logger's LogReader interface for the logs
  225. // created by this driver.
  226. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  227. logWatcher := logger.NewLogWatcher()
  228. go l.readLogs(logWatcher, config)
  229. return logWatcher
  230. }
  231. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  232. defer close(logWatcher.Msg)
  233. pth := l.ctx.LogPath
  234. var files []io.ReadSeeker
  235. for i := l.n; i > 1; i-- {
  236. f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
  237. if err != nil {
  238. if !os.IsNotExist(err) {
  239. logWatcher.Err <- err
  240. break
  241. }
  242. continue
  243. }
  244. defer f.Close()
  245. files = append(files, f)
  246. }
  247. latestFile, err := os.Open(pth)
  248. if err != nil {
  249. logWatcher.Err <- err
  250. return
  251. }
  252. defer latestFile.Close()
  253. files = append(files, latestFile)
  254. tailer := ioutils.MultiReadSeeker(files...)
  255. if config.Tail != 0 {
  256. tailFile(tailer, logWatcher, config.Tail, config.Since)
  257. }
  258. if !config.Follow {
  259. return
  260. }
  261. if config.Tail >= 0 {
  262. latestFile.Seek(0, os.SEEK_END)
  263. }
  264. l.mu.Lock()
  265. l.readers[logWatcher] = struct{}{}
  266. l.mu.Unlock()
  267. notifyRotate := l.notifyRotate.Subscribe()
  268. followLogs(latestFile, logWatcher, notifyRotate, config.Since)
  269. l.mu.Lock()
  270. delete(l.readers, logWatcher)
  271. l.mu.Unlock()
  272. l.notifyRotate.Evict(notifyRotate)
  273. }
  274. func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
  275. var rdr io.Reader = f
  276. if tail > 0 {
  277. ls, err := tailfile.TailFile(f, tail)
  278. if err != nil {
  279. logWatcher.Err <- err
  280. return
  281. }
  282. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  283. }
  284. dec := json.NewDecoder(rdr)
  285. l := &jsonlog.JSONLog{}
  286. for {
  287. msg, err := decodeLogLine(dec, l)
  288. if err != nil {
  289. if err != io.EOF {
  290. logWatcher.Err <- err
  291. }
  292. return
  293. }
  294. if !since.IsZero() && msg.Timestamp.Before(since) {
  295. continue
  296. }
  297. logWatcher.Msg <- msg
  298. }
  299. }
  300. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  301. dec := json.NewDecoder(f)
  302. l := &jsonlog.JSONLog{}
  303. fileWatcher, err := fsnotify.NewWatcher()
  304. if err != nil {
  305. logWatcher.Err <- err
  306. return
  307. }
  308. defer fileWatcher.Close()
  309. if err := fileWatcher.Add(f.Name()); err != nil {
  310. logWatcher.Err <- err
  311. return
  312. }
  313. var retries int
  314. for {
  315. msg, err := decodeLogLine(dec, l)
  316. if err != nil {
  317. if err != io.EOF {
  318. // try again because this shouldn't happen
  319. if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  320. dec = json.NewDecoder(f)
  321. retries++
  322. continue
  323. }
  324. logWatcher.Err <- err
  325. return
  326. }
  327. select {
  328. case <-fileWatcher.Events:
  329. dec = json.NewDecoder(f)
  330. continue
  331. case <-fileWatcher.Errors:
  332. logWatcher.Err <- err
  333. return
  334. case <-logWatcher.WatchClose():
  335. return
  336. case <-notifyRotate:
  337. fileWatcher.Remove(f.Name())
  338. f, err = os.Open(f.Name())
  339. if err != nil {
  340. logWatcher.Err <- err
  341. return
  342. }
  343. if err := fileWatcher.Add(f.Name()); err != nil {
  344. logWatcher.Err <- err
  345. }
  346. dec = json.NewDecoder(f)
  347. continue
  348. }
  349. }
  350. retries = 0 // reset retries since we've succeeded
  351. if !since.IsZero() && msg.Timestamp.Before(since) {
  352. continue
  353. }
  354. select {
  355. case logWatcher.Msg <- msg:
  356. case <-logWatcher.WatchClose():
  357. logWatcher.Msg <- msg
  358. for {
  359. msg, err := decodeLogLine(dec, l)
  360. if err != nil {
  361. return
  362. }
  363. if !since.IsZero() && msg.Timestamp.Before(since) {
  364. continue
  365. }
  366. logWatcher.Msg <- msg
  367. }
  368. }
  369. }
  370. }