streaming_logs.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. import collections
  16. import struct
  17. from datetime import datetime
  18. class LogMessage(collections.namedtuple('LogMessage',
  19. 'log_level task timestamp file_name line_number message')):
  20. __slots__ = ()
  21. response_struct = struct.Struct('<c16sccQH')
  22. def __str__(self):
  23. msec_timestamp = self.timestamp.strftime("%H:%M:%S.%f")[:-3]
  24. template = ('{self.log_level} {self.task} {msec_timestamp} '
  25. '{self.file_name}:{self.line_number}> {self.message}')
  26. return template.format(self=self, msec_timestamp=msec_timestamp)
  27. @classmethod
  28. def parse(cls, packet):
  29. result = cls.response_struct.unpack(packet[:cls.response_struct.size])
  30. msg = packet[cls.response_struct.size:].decode("utf8")
  31. log_level = result[2].decode("utf8")
  32. task = result[3].decode("utf8")
  33. timestamp = datetime.fromtimestamp(result[4] / 1000.0)
  34. file_name = result[1].split(b'\x00', 1)[0].decode("utf8") # NUL terminated
  35. line_number = result[5]
  36. return cls(log_level, task, timestamp, file_name, line_number, msg)
  37. class StreamingLogs(object):
  38. '''App for receiving log messages streamed by the firmware.
  39. '''
  40. PORT_NUMBER = 0x0003
  41. def __init__(self, interface):
  42. try:
  43. self.socket = interface.simplex_transport.open_socket(
  44. self.PORT_NUMBER)
  45. except AttributeError:
  46. raise TypeError('LoggingApp must be bound directly '
  47. 'to an Interface, not a Link')
  48. def receive(self, block=True, timeout=None):
  49. return LogMessage.parse(self.socket.receive(block, timeout))
  50. def close(self):
  51. self.socket.close()