jsonfilelog.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package jsonfilelog
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "gopkg.in/fsnotify.v1"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/logger"
  14. "github.com/docker/docker/pkg/ioutils"
  15. "github.com/docker/docker/pkg/jsonlog"
  16. "github.com/docker/docker/pkg/pubsub"
  17. "github.com/docker/docker/pkg/tailfile"
  18. "github.com/docker/docker/pkg/timeutils"
  19. "github.com/docker/docker/pkg/units"
  20. )
  21. const (
  22. Name = "json-file"
  23. maxJSONDecodeRetry = 10
  24. )
  25. // JSONFileLogger is Logger implementation for default docker logging:
  26. // JSON objects to file
  27. type JSONFileLogger struct {
  28. buf *bytes.Buffer
  29. f *os.File // store for closing
  30. mu sync.Mutex // protects buffer
  31. capacity int64 //maximum size of each file
  32. n int //maximum number of files
  33. ctx logger.Context
  34. readers map[*logger.LogWatcher]struct{} // stores the active log followers
  35. notifyRotate *pubsub.Publisher
  36. }
  37. func init() {
  38. if err := logger.RegisterLogDriver(Name, New); err != nil {
  39. logrus.Fatal(err)
  40. }
  41. if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
  42. logrus.Fatal(err)
  43. }
  44. }
  45. // New creates new JSONFileLogger which writes to filename
  46. func New(ctx logger.Context) (logger.Logger, error) {
  47. log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  48. if err != nil {
  49. return nil, err
  50. }
  51. var capval int64 = -1
  52. if capacity, ok := ctx.Config["max-size"]; ok {
  53. var err error
  54. capval, err = units.FromHumanSize(capacity)
  55. if err != nil {
  56. return nil, err
  57. }
  58. }
  59. var maxFiles int = 1
  60. if maxFileString, ok := ctx.Config["max-file"]; ok {
  61. maxFiles, err = strconv.Atoi(maxFileString)
  62. if err != nil {
  63. return nil, err
  64. }
  65. if maxFiles < 1 {
  66. return nil, fmt.Errorf("max-files cannot be less than 1.")
  67. }
  68. }
  69. return &JSONFileLogger{
  70. f: log,
  71. buf: bytes.NewBuffer(nil),
  72. ctx: ctx,
  73. capacity: capval,
  74. n: maxFiles,
  75. readers: make(map[*logger.LogWatcher]struct{}),
  76. notifyRotate: pubsub.NewPublisher(0, 1),
  77. }, nil
  78. }
  79. // Log converts logger.Message to jsonlog.JSONLog and serializes it to file
  80. func (l *JSONFileLogger) Log(msg *logger.Message) error {
  81. l.mu.Lock()
  82. defer l.mu.Unlock()
  83. timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
  84. if err != nil {
  85. return err
  86. }
  87. err = (&jsonlog.JSONLogBytes{Log: append(msg.Line, '\n'), Stream: msg.Source, Created: timestamp}).MarshalJSONBuf(l.buf)
  88. if err != nil {
  89. return err
  90. }
  91. l.buf.WriteByte('\n')
  92. _, err = writeLog(l)
  93. return err
  94. }
  95. func writeLog(l *JSONFileLogger) (int64, error) {
  96. if l.capacity == -1 {
  97. return writeToBuf(l)
  98. }
  99. meta, err := l.f.Stat()
  100. if err != nil {
  101. return -1, err
  102. }
  103. if meta.Size() >= l.capacity {
  104. name := l.f.Name()
  105. if err := l.f.Close(); err != nil {
  106. return -1, err
  107. }
  108. if err := rotate(name, l.n); err != nil {
  109. return -1, err
  110. }
  111. file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666)
  112. if err != nil {
  113. return -1, err
  114. }
  115. l.f = file
  116. l.notifyRotate.Publish(struct{}{})
  117. }
  118. return writeToBuf(l)
  119. }
  120. func writeToBuf(l *JSONFileLogger) (int64, error) {
  121. i, err := l.buf.WriteTo(l.f)
  122. if err != nil {
  123. l.buf = bytes.NewBuffer(nil)
  124. }
  125. return i, err
  126. }
  127. func rotate(name string, n int) error {
  128. if n < 2 {
  129. return nil
  130. }
  131. for i := n - 1; i > 1; i-- {
  132. oldFile := name + "." + strconv.Itoa(i)
  133. replacingFile := name + "." + strconv.Itoa(i-1)
  134. if err := backup(oldFile, replacingFile); err != nil {
  135. return err
  136. }
  137. }
  138. if err := backup(name+".1", name); err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. func backup(old, curr string) error {
  144. if _, err := os.Stat(old); !os.IsNotExist(err) {
  145. err := os.Remove(old)
  146. if err != nil {
  147. return err
  148. }
  149. }
  150. if _, err := os.Stat(curr); os.IsNotExist(err) {
  151. f, err := os.Create(curr)
  152. if err != nil {
  153. return err
  154. }
  155. f.Close()
  156. }
  157. return os.Rename(curr, old)
  158. }
  159. func ValidateLogOpt(cfg map[string]string) error {
  160. for key := range cfg {
  161. switch key {
  162. case "max-file":
  163. case "max-size":
  164. default:
  165. return fmt.Errorf("unknown log opt '%s' for json-file log driver", key)
  166. }
  167. }
  168. return nil
  169. }
  170. func (l *JSONFileLogger) LogPath() string {
  171. return l.ctx.LogPath
  172. }
  173. // Close closes underlying file and signals all readers to stop
  174. func (l *JSONFileLogger) Close() error {
  175. l.mu.Lock()
  176. err := l.f.Close()
  177. for r := range l.readers {
  178. r.Close()
  179. delete(l.readers, r)
  180. }
  181. l.mu.Unlock()
  182. return err
  183. }
  184. // Name returns name of this logger
  185. func (l *JSONFileLogger) Name() string {
  186. return Name
  187. }
  188. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  189. l.Reset()
  190. if err := dec.Decode(l); err != nil {
  191. return nil, err
  192. }
  193. msg := &logger.Message{
  194. Source: l.Stream,
  195. Timestamp: l.Created,
  196. Line: []byte(l.Log),
  197. }
  198. return msg, nil
  199. }
  200. // Reads from the log file
  201. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  202. logWatcher := logger.NewLogWatcher()
  203. go l.readLogs(logWatcher, config)
  204. return logWatcher
  205. }
  206. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  207. defer close(logWatcher.Msg)
  208. pth := l.ctx.LogPath
  209. var files []io.ReadSeeker
  210. for i := l.n; i > 1; i-- {
  211. f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
  212. if err != nil {
  213. if !os.IsNotExist(err) {
  214. logWatcher.Err <- err
  215. break
  216. }
  217. continue
  218. }
  219. defer f.Close()
  220. files = append(files, f)
  221. }
  222. latestFile, err := os.Open(pth)
  223. if err != nil {
  224. logWatcher.Err <- err
  225. return
  226. }
  227. defer latestFile.Close()
  228. files = append(files, latestFile)
  229. tailer := ioutils.MultiReadSeeker(files...)
  230. if config.Tail != 0 {
  231. tailFile(tailer, logWatcher, config.Tail, config.Since)
  232. }
  233. if !config.Follow {
  234. return
  235. }
  236. if config.Tail == 0 {
  237. latestFile.Seek(0, os.SEEK_END)
  238. }
  239. l.mu.Lock()
  240. l.readers[logWatcher] = struct{}{}
  241. l.mu.Unlock()
  242. notifyRotate := l.notifyRotate.Subscribe()
  243. followLogs(latestFile, logWatcher, notifyRotate, config.Since)
  244. l.mu.Lock()
  245. delete(l.readers, logWatcher)
  246. l.mu.Unlock()
  247. l.notifyRotate.Evict(notifyRotate)
  248. }
  249. func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
  250. var rdr io.Reader = f
  251. if tail > 0 {
  252. ls, err := tailfile.TailFile(f, tail)
  253. if err != nil {
  254. logWatcher.Err <- err
  255. return
  256. }
  257. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  258. }
  259. dec := json.NewDecoder(rdr)
  260. l := &jsonlog.JSONLog{}
  261. for {
  262. msg, err := decodeLogLine(dec, l)
  263. if err != nil {
  264. if err != io.EOF {
  265. logWatcher.Err <- err
  266. }
  267. return
  268. }
  269. if !since.IsZero() && msg.Timestamp.Before(since) {
  270. continue
  271. }
  272. logWatcher.Msg <- msg
  273. }
  274. }
  275. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  276. dec := json.NewDecoder(f)
  277. l := &jsonlog.JSONLog{}
  278. fileWatcher, err := fsnotify.NewWatcher()
  279. if err != nil {
  280. logWatcher.Err <- err
  281. return
  282. }
  283. defer fileWatcher.Close()
  284. if err := fileWatcher.Add(f.Name()); err != nil {
  285. logWatcher.Err <- err
  286. return
  287. }
  288. var retries int
  289. for {
  290. msg, err := decodeLogLine(dec, l)
  291. if err != nil {
  292. if err != io.EOF {
  293. // try again because this shouldn't happen
  294. if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  295. dec = json.NewDecoder(f)
  296. retries += 1
  297. continue
  298. }
  299. logWatcher.Err <- err
  300. return
  301. }
  302. select {
  303. case <-fileWatcher.Events:
  304. dec = json.NewDecoder(f)
  305. continue
  306. case <-fileWatcher.Errors:
  307. logWatcher.Err <- err
  308. return
  309. case <-logWatcher.WatchClose():
  310. return
  311. case <-notifyRotate:
  312. fileWatcher.Remove(f.Name())
  313. f, err = os.Open(f.Name())
  314. if err != nil {
  315. logWatcher.Err <- err
  316. return
  317. }
  318. if err := fileWatcher.Add(f.Name()); err != nil {
  319. logWatcher.Err <- err
  320. }
  321. dec = json.NewDecoder(f)
  322. continue
  323. }
  324. }
  325. retries = 0 // reset retries since we've succeeded
  326. if !since.IsZero() && msg.Timestamp.Before(since) {
  327. continue
  328. }
  329. select {
  330. case logWatcher.Msg <- msg:
  331. case <-logWatcher.WatchClose():
  332. logWatcher.Msg <- msg
  333. for {
  334. msg, err := decodeLogLine(dec, l)
  335. if err != nil {
  336. return
  337. }
  338. if !since.IsZero() && msg.Timestamp.Before(since) {
  339. continue
  340. }
  341. logWatcher.Msg <- msg
  342. }
  343. }
  344. }
  345. }