Merge pull request #23136 from nalind/logscmd_read_journald_attrs
Add support for reading journal extras details, and setting timestamps to UTC
This commit is contained in:
commit
94b2d3554c
1 changed files with 65 additions and 2 deletions
|
@ -50,6 +50,53 @@ package journald
|
|||
// }
|
||||
// return rc;
|
||||
//}
|
||||
//static int is_attribute_field(const char *msg, size_t length)
|
||||
//{
|
||||
// const struct known_field {
|
||||
// const char *name;
|
||||
// size_t length;
|
||||
// } fields[] = {
|
||||
// {"MESSAGE", sizeof("MESSAGE") - 1},
|
||||
// {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1},
|
||||
// {"PRIORITY", sizeof("PRIORITY") - 1},
|
||||
// {"CODE_FILE", sizeof("CODE_FILE") - 1},
|
||||
// {"CODE_LINE", sizeof("CODE_LINE") - 1},
|
||||
// {"CODE_FUNC", sizeof("CODE_FUNC") - 1},
|
||||
// {"ERRNO", sizeof("ERRNO") - 1},
|
||||
// {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1},
|
||||
// {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1},
|
||||
// {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1},
|
||||
// {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1},
|
||||
// {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1},
|
||||
// {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1},
|
||||
// {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1},
|
||||
// };
|
||||
// unsigned int i;
|
||||
// void *p;
|
||||
// if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) {
|
||||
// return -1;
|
||||
// }
|
||||
// length = ((const char *) p) - msg;
|
||||
// for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) {
|
||||
// if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) {
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
// return 0;
|
||||
//}
|
||||
//static int get_attribute_field(sd_journal *j, const char **msg, size_t *length)
|
||||
//{
|
||||
// int rc;
|
||||
// *msg = NULL;
|
||||
// *length = 0;
|
||||
// while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) {
|
||||
// if (is_attribute_field(*msg, *length) == 0) {
|
||||
// break;
|
||||
// }
|
||||
// rc = -ENOENT;
|
||||
// }
|
||||
// return rc;
|
||||
//}
|
||||
//static int wait_for_data_or_close(sd_journal *j, int pipefd)
|
||||
//{
|
||||
// struct pollfd fds[2];
|
||||
|
@ -98,6 +145,7 @@ import "C"
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
|
@ -116,7 +164,7 @@ func (s *journald) Close() error {
|
|||
}
|
||||
|
||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor string) string {
|
||||
var msg, cursor *C.char
|
||||
var msg, data, cursor *C.char
|
||||
var length C.size_t
|
||||
var stamp C.uint64_t
|
||||
var priority C.int
|
||||
|
@ -156,8 +204,23 @@ drain:
|
|||
} else if priority == C.int(journal.PriInfo) {
|
||||
source = "stdout"
|
||||
}
|
||||
// Retrieve the values of any variables we're adding to the journal.
|
||||
attrs := make(map[string]string)
|
||||
C.sd_journal_restart_data(j)
|
||||
for C.get_attribute_field(j, &data, &length) > C.int(0) {
|
||||
kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
|
||||
attrs[kv[0]] = kv[1]
|
||||
}
|
||||
if len(attrs) == 0 {
|
||||
attrs = nil
|
||||
}
|
||||
// Send the log message.
|
||||
logWatcher.Msg <- &logger.Message{Line: line, Source: source, Timestamp: timestamp}
|
||||
logWatcher.Msg <- &logger.Message{
|
||||
Line: line,
|
||||
Source: source,
|
||||
Timestamp: timestamp.In(time.UTC),
|
||||
Attrs: attrs,
|
||||
}
|
||||
}
|
||||
// If we're at the end of the journal, we're done (for now).
|
||||
if C.sd_journal_next(j) <= 0 {
|
||||
|
|
Loading…
Add table
Reference in a new issue