read.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // +build linux,cgo,!static_build,journald
  2. package journald
  3. // #include <sys/types.h>
  4. // #include <sys/poll.h>
  5. // #include <systemd/sd-journal.h>
  6. // #include <errno.h>
  7. // #include <stdio.h>
  8. // #include <stdlib.h>
  9. // #include <string.h>
  10. // #include <time.h>
  11. // #include <unistd.h>
  12. //
  13. //static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
  14. //{
  15. // int rc;
  16. // size_t plength;
  17. // *msg = NULL;
  18. // *length = 0;
  19. // plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
  20. // rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
  21. // *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
  22. // rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
  23. // if (rc == 0) {
  24. // if (*length > 8) {
  25. // (*msg) += 8;
  26. // *length -= 8;
  27. // } else {
  28. // *msg = NULL;
  29. // *length = 0;
  30. // rc = -ENOENT;
  31. // }
  32. // }
  33. // return rc;
  34. //}
  35. //static int get_priority(sd_journal *j, int *priority)
  36. //{
  37. // const void *data;
  38. // size_t i, length;
  39. // int rc;
  40. // *priority = -1;
  41. // rc = sd_journal_get_data(j, "PRIORITY", &data, &length);
  42. // if (rc == 0) {
  43. // if ((length > 9) && (strncmp(data, "PRIORITY=", 9) == 0)) {
  44. // *priority = 0;
  45. // for (i = 9; i < length; i++) {
  46. // *priority = *priority * 10 + ((const char *)data)[i] - '0';
  47. // }
  48. // if (length > 9) {
  49. // rc = 0;
  50. // }
  51. // }
  52. // }
  53. // return rc;
  54. //}
  55. //static int is_attribute_field(const char *msg, size_t length)
  56. //{
  57. // static const struct known_field {
  58. // const char *name;
  59. // size_t length;
  60. // } fields[] = {
  61. // {"MESSAGE", sizeof("MESSAGE") - 1},
  62. // {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1},
  63. // {"PRIORITY", sizeof("PRIORITY") - 1},
  64. // {"CODE_FILE", sizeof("CODE_FILE") - 1},
  65. // {"CODE_LINE", sizeof("CODE_LINE") - 1},
  66. // {"CODE_FUNC", sizeof("CODE_FUNC") - 1},
  67. // {"ERRNO", sizeof("ERRNO") - 1},
  68. // {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1},
  69. // {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1},
  70. // {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1},
  71. // {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1},
  72. // {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1},
  73. // {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1},
  74. // {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1},
  75. // };
  76. // unsigned int i;
  77. // void *p;
  78. // if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) {
  79. // return -1;
  80. // }
  81. // length = ((const char *) p) - msg;
  82. // for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) {
  83. // if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) {
  84. // return -1;
  85. // }
  86. // }
  87. // return 0;
  88. //}
  89. //static int get_attribute_field(sd_journal *j, const char **msg, size_t *length)
  90. //{
  91. // int rc;
  92. // *msg = NULL;
  93. // *length = 0;
  94. // while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) {
  95. // if (is_attribute_field(*msg, *length) == 0) {
  96. // break;
  97. // }
  98. // rc = -ENOENT;
  99. // }
  100. // return rc;
  101. //}
  102. //static int wait_for_data_cancelable(sd_journal *j, int pipefd)
  103. //{
  104. // struct pollfd fds[2];
  105. // uint64_t when = 0;
  106. // int timeout, jevents, i;
  107. // struct timespec ts;
  108. // uint64_t now;
  109. //
  110. // memset(&fds, 0, sizeof(fds));
  111. // fds[0].fd = pipefd;
  112. // fds[0].events = POLLHUP;
  113. // fds[1].fd = sd_journal_get_fd(j);
  114. // if (fds[1].fd < 0) {
  115. // return fds[1].fd;
  116. // }
  117. //
  118. // do {
  119. // jevents = sd_journal_get_events(j);
  120. // if (jevents < 0) {
  121. // return jevents;
  122. // }
  123. // fds[1].events = jevents;
  124. // sd_journal_get_timeout(j, &when);
  125. // if (when == -1) {
  126. // timeout = -1;
  127. // } else {
  128. // clock_gettime(CLOCK_MONOTONIC, &ts);
  129. // now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
  130. // timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
  131. // }
  132. // i = poll(fds, 2, timeout);
  133. // if ((i == -1) && (errno != EINTR)) {
  134. // /* An unexpected error. */
  135. // return (errno != 0) ? -errno : -EINTR;
  136. // }
  137. // if (fds[0].revents & POLLHUP) {
  138. // /* The close notification pipe was closed. */
  139. // return 0;
  140. // }
  141. // if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
  142. // /* Data, which we might care about, was appended. */
  143. // return 1;
  144. // }
  145. // } while ((fds[0].revents & POLLHUP) == 0);
  146. // return 0;
  147. //}
  148. import "C"
  149. import (
  150. "fmt"
  151. "strings"
  152. "time"
  153. "unsafe"
  154. "github.com/Sirupsen/logrus"
  155. "github.com/coreos/go-systemd/journal"
  156. "github.com/docker/docker/daemon/logger"
  157. )
  158. func (s *journald) Close() error {
  159. s.mu.Lock()
  160. s.closed = true
  161. for reader := range s.readers.readers {
  162. reader.Close()
  163. }
  164. s.mu.Unlock()
  165. return nil
  166. }
  167. func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
  168. var msg, data, cursor *C.char
  169. var length C.size_t
  170. var stamp C.uint64_t
  171. var priority, partial C.int
  172. // Walk the journal from here forward until we run out of new entries.
  173. drain:
  174. for {
  175. // Try not to send a given entry twice.
  176. if oldCursor != nil {
  177. for C.sd_journal_test_cursor(j, oldCursor) > 0 {
  178. if C.sd_journal_next(j) <= 0 {
  179. break drain
  180. }
  181. }
  182. }
  183. // Read and send the logged message, if there is one to read.
  184. i := C.get_message(j, &msg, &length, &partial)
  185. if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
  186. // Read the entry's timestamp.
  187. if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
  188. break
  189. }
  190. // Set up the time and text of the entry.
  191. timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
  192. line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
  193. if partial == 0 {
  194. line = append(line, "\n"...)
  195. }
  196. // Recover the stream name by mapping
  197. // from the journal priority back to
  198. // the stream that we would have
  199. // assigned that value.
  200. source := ""
  201. if C.get_priority(j, &priority) != 0 {
  202. source = ""
  203. } else if priority == C.int(journal.PriErr) {
  204. source = "stderr"
  205. } else if priority == C.int(journal.PriInfo) {
  206. source = "stdout"
  207. }
  208. // Retrieve the values of any variables we're adding to the journal.
  209. attrs := make(map[string]string)
  210. C.sd_journal_restart_data(j)
  211. for C.get_attribute_field(j, &data, &length) > C.int(0) {
  212. kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
  213. attrs[kv[0]] = kv[1]
  214. }
  215. if len(attrs) == 0 {
  216. attrs = nil
  217. }
  218. // Send the log message.
  219. logWatcher.Msg <- &logger.Message{
  220. Line: line,
  221. Source: source,
  222. Timestamp: timestamp.In(time.UTC),
  223. Attrs: attrs,
  224. }
  225. }
  226. // If we're at the end of the journal, we're done (for now).
  227. if C.sd_journal_next(j) <= 0 {
  228. break
  229. }
  230. }
  231. // free(NULL) is safe
  232. C.free(unsafe.Pointer(oldCursor))
  233. if C.sd_journal_get_cursor(j, &cursor) != 0 {
  234. // ensure that we won't be freeing an address that's invalid
  235. cursor = nil
  236. }
  237. return cursor
  238. }
  239. func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
  240. s.mu.Lock()
  241. s.readers.readers[logWatcher] = logWatcher
  242. if s.closed {
  243. // the journald Logger is closed, presumably because the container has been
  244. // reset. So we shouldn't follow, because we'll never be woken up. But we
  245. // should make one more drainJournal call to be sure we've got all the logs.
  246. // Close pfd[1] so that one drainJournal happens, then cleanup, then return.
  247. C.close(pfd[1])
  248. }
  249. s.mu.Unlock()
  250. newCursor := make(chan *C.char)
  251. go func() {
  252. for {
  253. // Keep copying journal data out until we're notified to stop
  254. // or we hit an error.
  255. status := C.wait_for_data_cancelable(j, pfd[0])
  256. if status < 0 {
  257. cerrstr := C.strerror(C.int(-status))
  258. errstr := C.GoString(cerrstr)
  259. fmtstr := "error %q while attempting to follow journal for container %q"
  260. logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
  261. break
  262. }
  263. cursor = s.drainJournal(logWatcher, config, j, cursor)
  264. if status != 1 {
  265. // We were notified to stop
  266. break
  267. }
  268. }
  269. // Clean up.
  270. C.close(pfd[0])
  271. s.mu.Lock()
  272. delete(s.readers.readers, logWatcher)
  273. s.mu.Unlock()
  274. close(logWatcher.Msg)
  275. newCursor <- cursor
  276. }()
  277. // Wait until we're told to stop.
  278. select {
  279. case cursor = <-newCursor:
  280. case <-logWatcher.WatchClose():
  281. // Notify the other goroutine that its work is done.
  282. C.close(pfd[1])
  283. cursor = <-newCursor
  284. }
  285. return cursor
  286. }
  287. func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  288. var j *C.sd_journal
  289. var cmatch, cursor *C.char
  290. var stamp C.uint64_t
  291. var sinceUnixMicro uint64
  292. var pipes [2]C.int
  293. // Get a handle to the journal.
  294. rc := C.sd_journal_open(&j, C.int(0))
  295. if rc != 0 {
  296. logWatcher.Err <- fmt.Errorf("error opening journal")
  297. close(logWatcher.Msg)
  298. return
  299. }
  300. // If we end up following the log, we can set the journal context
  301. // pointer and the channel pointer to nil so that we won't close them
  302. // here, potentially while the goroutine that uses them is still
  303. // running. Otherwise, close them when we return from this function.
  304. following := false
  305. defer func(pfollowing *bool) {
  306. if !*pfollowing {
  307. close(logWatcher.Msg)
  308. }
  309. C.sd_journal_close(j)
  310. }(&following)
  311. // Remove limits on the size of data items that we'll retrieve.
  312. rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
  313. if rc != 0 {
  314. logWatcher.Err <- fmt.Errorf("error setting journal data threshold")
  315. return
  316. }
  317. // Add a match to have the library do the searching for us.
  318. cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
  319. defer C.free(unsafe.Pointer(cmatch))
  320. rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch))
  321. if rc != 0 {
  322. logWatcher.Err <- fmt.Errorf("error setting journal match")
  323. return
  324. }
  325. // If we have a cutoff time, convert it to Unix time once.
  326. if !config.Since.IsZero() {
  327. nano := config.Since.UnixNano()
  328. sinceUnixMicro = uint64(nano / 1000)
  329. }
  330. if config.Tail > 0 {
  331. lines := config.Tail
  332. // Start at the end of the journal.
  333. if C.sd_journal_seek_tail(j) < 0 {
  334. logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
  335. return
  336. }
  337. if C.sd_journal_previous(j) < 0 {
  338. logWatcher.Err <- fmt.Errorf("error backtracking to previous journal entry")
  339. return
  340. }
  341. // Walk backward.
  342. for lines > 0 {
  343. // Stop if the entry time is before our cutoff.
  344. // We'll need the entry time if it isn't, so go
  345. // ahead and parse it now.
  346. if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
  347. break
  348. } else {
  349. // Compare the timestamp on the entry
  350. // to our threshold value.
  351. if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
  352. break
  353. }
  354. }
  355. lines--
  356. // If we're at the start of the journal, or
  357. // don't need to back up past any more entries,
  358. // stop.
  359. if lines == 0 || C.sd_journal_previous(j) <= 0 {
  360. break
  361. }
  362. }
  363. } else {
  364. // Start at the beginning of the journal.
  365. if C.sd_journal_seek_head(j) < 0 {
  366. logWatcher.Err <- fmt.Errorf("error seeking to start of journal")
  367. return
  368. }
  369. // If we have a cutoff date, fast-forward to it.
  370. if sinceUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) != 0 {
  371. logWatcher.Err <- fmt.Errorf("error seeking to start time in journal")
  372. return
  373. }
  374. if C.sd_journal_next(j) < 0 {
  375. logWatcher.Err <- fmt.Errorf("error skipping to next journal entry")
  376. return
  377. }
  378. }
  379. cursor = s.drainJournal(logWatcher, config, j, nil)
  380. if config.Follow {
  381. // Allocate a descriptor for following the journal, if we'll
  382. // need one. Do it here so that we can report if it fails.
  383. if fd := C.sd_journal_get_fd(j); fd < C.int(0) {
  384. logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd)))
  385. } else {
  386. // Create a pipe that we can poll at the same time as
  387. // the journald descriptor.
  388. if C.pipe(&pipes[0]) == C.int(-1) {
  389. logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
  390. } else {
  391. cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
  392. // Let followJournal handle freeing the journal context
  393. // object and closing the channel.
  394. following = true
  395. }
  396. }
  397. }
  398. C.free(unsafe.Pointer(cursor))
  399. return
  400. }
  401. func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  402. logWatcher := logger.NewLogWatcher()
  403. go s.readLogs(logWatcher, config)
  404. return logWatcher
  405. }