logdehash.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # encoding=utf8
  15. # PBL-31508: This is pretty bad and we want to re-do a lot of this.
  16. import os
  17. import sys
  18. import threading
  19. import time
  20. from datetime import datetime
  21. import unicodedata as ud
  22. from pebble.loghashing import newlogging
  23. from .newlogging import get_log_dict_from_file
  24. LOG_DICT_KEY_CORE_ID = 'core_'
  25. COLOR_DICT = {
  26. "BLACK": "\x1b[30m", "0": "\x1b[30m", # Not the most useful
  27. "RED": "\x1b[31m", "1": "\x1b[31m",
  28. "GREEN": "\x1b[32m", "2": "\x1b[32m",
  29. "YELLOW": "\x1b[33m", "3": "\x1b[33m",
  30. "BLUE": "\x1b[34m", "4": "\x1b[34m",
  31. "MAGENTA": "\x1b[35m", "5": "\x1b[35m",
  32. "CYAN": "\x1b[36m", "6": "\x1b[36m",
  33. "GREY": "\x1b[37m", "7": "\x1b[37m",
  34. "LIGHT_GREY": "\x1b[1m;30m", "8": "\x1b[1m;30m",
  35. "LIGHT_RED": "\x1b[1m;31m", "9": "\x1b[1m;31m",
  36. "LIGHT_GREEN": "\x1b[1m;32m", "10": "\x1b[1m;32m",
  37. "LIGHT_YELLOW": "\x1b[1m;33m", "11": "\x1b[1m;33m",
  38. "LIGHT_BLUE": "\x1b[1m;34m", "12": "\x1b[1m;34m",
  39. "LIGHT_MAGENTA": "\x1b[1m;35m", "13": "\x1b[1m;35m",
  40. "LIGHT_CYAN": "\x1b[1m;36m", "14": "\x1b[1m;36m",
  41. "WHITE": "\x1b[1m;37m", "15": "\x1b[1m;37m"}
  42. COLOR_BOLD_RESET = "\x1b[0m"
  43. BOLD = "\x1b[1m"
  44. # Control code to clear the current line
  45. CLEAR_LINE = "\x1b[2K"
  46. class LogDehash(object):
  47. """ Dehashing helper with a file update watch thread
  48. """
  49. def __init__(self, dict_path, justify="small", color=False, bold=-1, print_core=False,
  50. monitor_dict_file=True):
  51. self.path = dict_path
  52. self.dict_mtime = None
  53. self.arg_justify = justify
  54. self.arg_color = color
  55. self.arg_bold = bold
  56. self.justify_size = 0
  57. self.load_log_strings()
  58. self.print_core = print_core
  59. if self.print_core:
  60. self.print_core_header()
  61. self.running = False
  62. if monitor_dict_file:
  63. self.running = True
  64. self.thread = threading.Thread(target=self.run)
  65. self.thread.setDaemon(True)
  66. self.thread.start()
  67. def run(self):
  68. while self.running:
  69. if os.path.lexists(self.path) and (not self.dict_mtime or
  70. os.path.getmtime(self.path) > self.dict_mtime):
  71. # We don't need to worry about thread safety here because the dict is getting
  72. # replaced entirely. If anyone has a reference to the old one, it will stay
  73. # alive until they're done.
  74. self.load_log_strings()
  75. time.sleep(5) # It takes at least this long to flash an update to the board
  76. def load_log_strings(self):
  77. if os.path.lexists(self.path):
  78. self.dict_mtime = os.path.getmtime(self.path)
  79. self.loghash_dict = get_log_dict_from_file(self.path)
  80. else:
  81. self.dict_mtime = None
  82. self.loghash_dict = None
  83. self.update_log_string_metrics()
  84. def load_log_strings_from_dict(self, dict):
  85. self.running = False
  86. self.dict_mtime = None
  87. self.loghash_dict = dict
  88. self.update_log_string_metrics()
  89. def print_core_header(self):
  90. if not self.loghash_dict:
  91. return
  92. print('Supported Cores:')
  93. for key in sorted(self.loghash_dict, key=self.loghash_dict.get):
  94. if key.startswith(LOG_DICT_KEY_CORE_ID):
  95. print(' {}: {}'.format(key, self.loghash_dict[key]))
  96. def update_log_string_metrics(self):
  97. if not self.loghash_dict:
  98. self.arg_justify = 0
  99. return
  100. # Handle justification
  101. max_basename = 0
  102. max_linenum = 0
  103. for line_dict in self.loghash_dict.itervalues():
  104. if 'file' in line_dict and 'line' in line_dict:
  105. max_basename = max(max_basename, len(os.path.basename(line_dict['file'])))
  106. max_linenum = max(max_basename, len(os.path.basename(line_dict['line'])))
  107. justify_width = max_basename + 1 + max_linenum # Include the ':'
  108. if self.arg_justify == 'small':
  109. self.justify_size = 0
  110. elif self.arg_justify == 'right':
  111. self.justify_size = justify_width * -1
  112. elif self.arg_justify == 'left':
  113. self.justify_size = justify_width
  114. else:
  115. self.justify_size = int(self.arg_justify)
  116. def dehash(self, msg):
  117. """ Dehashes a logging message.
  118. """
  119. string = str(msg)
  120. if "NL:" in string: # Newlogging
  121. decoded_line = string.decode('utf-8', 'replace')
  122. safe_line = ud.normalize('NFKD', decoded_line).encode('utf-8', 'ignore')
  123. line_dict = newlogging.dehash_line_unformatted(safe_line, self.loghash_dict)
  124. return line_dict
  125. else:
  126. return {"formatted_msg": string, "unhashed": True}
  127. def basic_format_line(self, line_dict):
  128. output = []
  129. if 'support' not in line_dict and 're_level' in line_dict:
  130. output.append(line_dict['re_level'])
  131. if self.print_core and 'core_number' in line_dict:
  132. output.append(line_dict['core_number'])
  133. if 'task' in line_dict:
  134. output.append(line_dict['task'])
  135. if 'date' in line_dict:
  136. output.append(line_dict['date'])
  137. if 'time' in line_dict:
  138. output.append(line_dict['time'])
  139. elif 'support' not in line_dict:
  140. # Use the current time if one isn't provided by the system
  141. now = datetime.now()
  142. output.append('%02d:%02d:%02d.%03d' % (now.hour, now.minute, now.second,
  143. now.microsecond/1000))
  144. pre_padding = ''
  145. post_padding = ''
  146. if 'file' in line_dict and 'line' in line_dict:
  147. filename = os.path.basename(line_dict['file'])
  148. file_line = '{}:{}>'.format(filename, line_dict['line'])
  149. if self.justify_size < 0:
  150. output.append(file_line.rjust(abs(self.justify_size)))
  151. else:
  152. output.append(file_line.ljust(abs(self.justify_size)))
  153. output.append(line_dict['formatted_msg'])
  154. try:
  155. return ' '.join(output)
  156. except UnicodeDecodeError:
  157. return ''
  158. def minicom_format_line(self, line_dict):
  159. """This routine reformats a line already printed to the console if it was
  160. hashed. It does this by clearing the line which was already printed
  161. using a control escape command and relies on lines ending in "\r\n".
  162. If the line was not hashed in the first place, it simply returns a
  163. newline character to print
  164. """
  165. if 'unhashed' in line_dict:
  166. return '\n'
  167. output = []
  168. if self.arg_color and 'color' in line_dict:
  169. color = line_dict['color']
  170. if color in COLOR_DICT:
  171. output.append(COLOR_DICT[color])
  172. if 'level' in line_dict:
  173. if int(line_dict['level']) <= self.arg_bold:
  174. output.append(BOLD)
  175. output.append(CLEAR_LINE)
  176. output.append(self.basic_format_line(line_dict))
  177. output.append('\n')
  178. output.append(COLOR_BOLD_RESET)
  179. return ''.join(output)
  180. def commander_format_line(self, line_dict):
  181. output = []
  182. if self.arg_color and 'color' in line_dict:
  183. color = line_dict['color']
  184. if color in COLOR_DICT:
  185. output.append(COLOR_DICT[color])
  186. if 'level' in line_dict:
  187. if int(line_dict['level']) <= self.arg_bold:
  188. output.append(BOLD)
  189. output.append(self.basic_format_line(line_dict))
  190. output.append(COLOR_BOLD_RESET)
  191. return ''.join(output)