logdehash.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # encoding=utf8
  15. # PBL-31508: This is pretty bad and we want to re-do a lot of this.
  16. import os
  17. import sys
  18. import threading
  19. import time
  20. from datetime import datetime
  21. import unicodedata as ud
  22. from pebble.loghashing import newlogging
  23. sys.path.insert(0, os.path.dirname(__file__))
  24. from newlogging import get_log_dict_from_file
  25. LOG_DICT_KEY_CORE_ID = 'core_'
  26. COLOR_DICT = {
  27. "BLACK": "\x1b[30m", "0": "\x1b[30m", # Not the most useful
  28. "RED": "\x1b[31m", "1": "\x1b[31m",
  29. "GREEN": "\x1b[32m", "2": "\x1b[32m",
  30. "YELLOW": "\x1b[33m", "3": "\x1b[33m",
  31. "BLUE": "\x1b[34m", "4": "\x1b[34m",
  32. "MAGENTA": "\x1b[35m", "5": "\x1b[35m",
  33. "CYAN": "\x1b[36m", "6": "\x1b[36m",
  34. "GREY": "\x1b[37m", "7": "\x1b[37m",
  35. "LIGHT_GREY": "\x1b[1m;30m", "8": "\x1b[1m;30m",
  36. "LIGHT_RED": "\x1b[1m;31m", "9": "\x1b[1m;31m",
  37. "LIGHT_GREEN": "\x1b[1m;32m", "10": "\x1b[1m;32m",
  38. "LIGHT_YELLOW": "\x1b[1m;33m", "11": "\x1b[1m;33m",
  39. "LIGHT_BLUE": "\x1b[1m;34m", "12": "\x1b[1m;34m",
  40. "LIGHT_MAGENTA": "\x1b[1m;35m", "13": "\x1b[1m;35m",
  41. "LIGHT_CYAN": "\x1b[1m;36m", "14": "\x1b[1m;36m",
  42. "WHITE": "\x1b[1m;37m", "15": "\x1b[1m;37m"}
  43. COLOR_BOLD_RESET = "\x1b[0m"
  44. BOLD = "\x1b[1m"
  45. # Control code to clear the current line
  46. CLEAR_LINE = "\x1b[2K"
  47. class LogDehash(object):
  48. """ Dehashing helper with a file update watch thread
  49. """
  50. def __init__(self, dict_path, justify="small", color=False, bold=-1, print_core=False,
  51. monitor_dict_file=True):
  52. self.path = dict_path
  53. self.dict_mtime = None
  54. self.arg_justify = justify
  55. self.arg_color = color
  56. self.arg_bold = bold
  57. self.justify_size = 0
  58. self.load_log_strings()
  59. self.print_core = print_core
  60. if self.print_core:
  61. self.print_core_header()
  62. self.running = False
  63. if monitor_dict_file:
  64. self.running = True
  65. self.thread = threading.Thread(target=self.run)
  66. self.thread.setDaemon(True)
  67. self.thread.start()
  68. def run(self):
  69. while self.running:
  70. if os.path.lexists(self.path) and (not self.dict_mtime or
  71. os.path.getmtime(self.path) > self.dict_mtime):
  72. # We don't need to worry about thread safety here because the dict is getting
  73. # replaced entirely. If anyone has a reference to the old one, it will stay
  74. # alive until they're done.
  75. self.load_log_strings()
  76. time.sleep(5) # It takes at least this long to flash an update to the board
  77. def load_log_strings(self):
  78. if os.path.lexists(self.path):
  79. self.dict_mtime = os.path.getmtime(self.path)
  80. self.loghash_dict = get_log_dict_from_file(self.path)
  81. else:
  82. self.dict_mtime = None
  83. self.loghash_dict = None
  84. self.update_log_string_metrics()
  85. def load_log_strings_from_dict(self, dict):
  86. self.running = False
  87. self.dict_mtime = None
  88. self.loghash_dict = dict
  89. self.update_log_string_metrics()
  90. def print_core_header(self):
  91. if not self.loghash_dict:
  92. return
  93. print('Supported Cores:')
  94. for key in sorted(self.loghash_dict, key=self.loghash_dict.get):
  95. if key.startswith(LOG_DICT_KEY_CORE_ID):
  96. print((' {}: {}'.format(key, self.loghash_dict[key])))
  97. def update_log_string_metrics(self):
  98. if not self.loghash_dict:
  99. self.arg_justify = 0
  100. return
  101. # Handle justification
  102. max_basename = 0
  103. max_linenum = 0
  104. for line_dict in self.loghash_dict.values():
  105. if 'file' in line_dict and 'line' in line_dict:
  106. max_basename = max(max_basename, len(os.path.basename(line_dict['file'])))
  107. max_linenum = max(max_basename, len(os.path.basename(line_dict['line'])))
  108. justify_width = max_basename + 1 + max_linenum # Include the ':'
  109. if self.arg_justify == 'small':
  110. self.justify_size = 0
  111. elif self.arg_justify == 'right':
  112. self.justify_size = justify_width * -1
  113. elif self.arg_justify == 'left':
  114. self.justify_size = justify_width
  115. else:
  116. self.justify_size = int(self.arg_justify)
  117. def dehash(self, msg):
  118. """ Dehashes a logging message.
  119. """
  120. string = str(msg)
  121. if "NL:" in string: # Newlogging
  122. safe_line = ud.normalize('NFKD', string)
  123. line_dict = newlogging.dehash_line_unformatted(safe_line, self.loghash_dict)
  124. return line_dict
  125. else:
  126. return {"formatted_msg": string, "unhashed": True}
  127. def basic_format_line(self, line_dict):
  128. output = []
  129. if 'support' not in line_dict and 're_level' in line_dict:
  130. output.append(line_dict['re_level'])
  131. if self.print_core and 'core_number' in line_dict:
  132. output.append(line_dict['core_number'])
  133. if 'task' in line_dict:
  134. output.append(line_dict['task'])
  135. if 'date' in line_dict:
  136. output.append(line_dict['date'])
  137. if 'time' in line_dict:
  138. output.append(line_dict['time'])
  139. elif 'support' not in line_dict:
  140. # Use the current time if one isn't provided by the system
  141. now = datetime.now()
  142. output.append('%02d:%02d:%02d.%03d' % (now.hour, now.minute, now.second,
  143. now.microsecond/1000))
  144. pre_padding = ''
  145. post_padding = ''
  146. if 'file' in line_dict and 'line' in line_dict:
  147. filename = os.path.basename(line_dict['file'])
  148. file_line = '{}:{}>'.format(filename, line_dict['line'])
  149. if self.justify_size < 0:
  150. output.append(file_line.rjust(abs(self.justify_size)))
  151. else:
  152. output.append(file_line.ljust(abs(self.justify_size)))
  153. output.append(line_dict['formatted_msg'])
  154. try:
  155. return ' '.join(output)
  156. except UnicodeDecodeError:
  157. return ''
  158. def minicom_format_line(self, line_dict):
  159. """This routine reformats a line already printed to the console if it was
  160. hashed. It does this by clearing the line which was already printed
  161. using a control escape command and relies on lines ending in "\r\n".
  162. If the line was not hashed in the first place, it simply returns a
  163. newline character to print
  164. """
  165. if 'unhashed' in line_dict:
  166. return '\n'
  167. output = []
  168. if self.arg_color and 'color' in line_dict:
  169. color = line_dict['color']
  170. if color in COLOR_DICT:
  171. output.append(COLOR_DICT[color])
  172. if 'level' in line_dict:
  173. if int(line_dict['level']) <= self.arg_bold:
  174. output.append(BOLD)
  175. output.append(CLEAR_LINE)
  176. output.append(self.basic_format_line(line_dict))
  177. output.append('\n')
  178. output.append(COLOR_BOLD_RESET)
  179. return ''.join(output)
  180. def commander_format_line(self, line_dict):
  181. output = []
  182. if self.arg_color and 'color' in line_dict:
  183. color = line_dict['color']
  184. if color in COLOR_DICT:
  185. output.append(COLOR_DICT[color])
  186. if 'level' in line_dict:
  187. if int(line_dict['level']) <= self.arg_bold:
  188. output.append(BOLD)
  189. output.append(self.basic_format_line(line_dict))
  190. output.append(COLOR_BOLD_RESET)
  191. return ''.join(output)