prompt.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. import collections
  16. import struct
  17. from datetime import datetime
  18. import pebble.pulse2.exceptions
  19. from .. import exceptions
  20. class Prompt(object):
  21. PORT_NUMBER = 0x3e20
  22. def __init__(self, link):
  23. self.socket = link.open_socket('reliable', self.PORT_NUMBER)
  24. def command_and_response(self, command_string, timeout=20):
  25. log = []
  26. self.socket.send(command_string.encode())
  27. is_done = False
  28. while not is_done:
  29. try:
  30. response = PromptResponse.parse(
  31. self.socket.receive(timeout=timeout))
  32. if response.is_done_response:
  33. is_done = True
  34. elif response.is_message_response:
  35. log.append(response.message.decode())
  36. except pebble.pulse2.exceptions.ReceiveQueueEmpty:
  37. raise exceptions.CommandTimedOut
  38. return log
  39. def close(self):
  40. self.socket.close()
  41. class PromptResponse(collections.namedtuple('PromptResponse',
  42. 'response_type timestamp message')):
  43. DONE_RESPONSE = 101
  44. MESSAGE_RESPONSE = 102
  45. response_struct = struct.Struct('<BQ')
  46. @property
  47. def is_done_response(self):
  48. return self.response_type == self.DONE_RESPONSE
  49. @property
  50. def is_message_response(self):
  51. return self.response_type == self.MESSAGE_RESPONSE
  52. @classmethod
  53. def parse(cls, response):
  54. result = cls.response_struct.unpack(response[:cls.response_struct.size])
  55. response_type = result[0]
  56. timestamp = datetime.fromtimestamp(result[1] / 1000.0)
  57. message = response[cls.response_struct.size:]
  58. return cls(response_type, timestamp, message)