test_framing.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. import unittest
  16. from pebble.pulse2 import framing
  17. class TestEncodeFrame(unittest.TestCase):
  18. def test_empty_frame(self):
  19. # CRC-32 of nothing is 0
  20. # COBS encoding of b'\0\0\0\0' is b'\x01\x01\x01\x01\x01' (5 bytes)
  21. self.assertEqual(framing.encode_frame(b''),
  22. b'\x55\x01\x01\x01\x01\x01\x55')
  23. def test_simple_data(self):
  24. self.assertEqual(framing.encode_frame(b'abcdefg'),
  25. b'\x55\x0cabcdefg\xa6\x6a\x2a\x31\x55')
  26. def test_flag_in_datagram(self):
  27. # ASCII 'U' is 0x55 hex
  28. self.assertEqual(framing.encode_frame(b'QUACK'),
  29. b'\x55\x0aQ\0ACK\xdf\x8d\x80\x74\x55')
  30. def test_flag_in_fcs(self):
  31. # crc32(b'R') -> 0x5767df55
  32. # Since there is an \x55 byte in the FCS, it must be substituted,
  33. # just like when that byte value is present in the datagram itself.
  34. self.assertEqual(framing.encode_frame(b'R'),
  35. b'\x55\x06R\0\xdf\x67\x57\x55')
  36. class TestFrameSplitter(unittest.TestCase):
  37. def setUp(self):
  38. self.splitter = framing.FrameSplitter()
  39. def test_basic_functionality(self):
  40. self.splitter.write(b'\x55abcdefg\x55foobar\x55asdf\x55')
  41. self.assertEqual(list(self.splitter),
  42. [b'abcdefg', b'foobar', b'asdf'])
  43. def test_wait_for_sync(self):
  44. self.splitter.write(b'garbage data\x55frame 1\x55')
  45. self.assertEqual(list(self.splitter), [b'frame 1'])
  46. def test_doubled_flags(self):
  47. self.splitter.write(b'\x55abcd\x55\x55efgh\x55')
  48. self.assertEqual(list(self.splitter), [b'abcd', b'efgh'])
  49. def test_multiple_writes(self):
  50. self.splitter.write(b'\x55ab')
  51. self.assertEqual(list(self.splitter), [])
  52. self.splitter.write(b'cd\x55')
  53. self.assertEqual(list(self.splitter), [b'abcd'])
  54. def test_lots_of_writes(self):
  55. for char in b'\x55abcd\x55ef':
  56. self.splitter.write(bytearray([char]))
  57. self.assertEqual(list(self.splitter), [b'abcd'])
  58. def test_iteration_pops_frames(self):
  59. self.splitter.write(b'\x55frame 1\x55frame 2\x55frame 3\x55')
  60. self.assertEqual(next(iter(self.splitter)), b'frame 1')
  61. self.assertEqual(list(self.splitter), [b'frame 2', b'frame 3'])
  62. def test_stopiteration_latches(self):
  63. # The iterator protocol requires that once an iterator raises
  64. # StopIteration, it must continue to do so for all subsequent calls
  65. # to its next() method.
  66. self.splitter.write(b'\x55frame 1\x55')
  67. iterator = iter(self.splitter)
  68. self.assertEqual(next(iterator), b'frame 1')
  69. with self.assertRaises(StopIteration):
  70. next(iterator)
  71. next(iterator)
  72. self.splitter.write(b'\x55frame 2\x55')
  73. with self.assertRaises(StopIteration):
  74. next(iterator)
  75. self.assertEqual(list(self.splitter), [b'frame 2'])
  76. def test_max_frame_length(self):
  77. splitter = framing.FrameSplitter(max_frame_length=6)
  78. splitter.write(
  79. b'\x5512345\x55123456\x551234567\x551234\x5512345678\x55')
  80. self.assertEqual(list(splitter), [b'12345', b'123456', b'1234'])
  81. def test_dynamic_max_length_1(self):
  82. self.splitter.write(b'\x5512345')
  83. self.splitter.max_frame_length = 6
  84. self.splitter.write(b'6\x551234567\x551234\x55')
  85. self.assertEqual(list(self.splitter), [b'123456', b'1234'])
  86. def test_dynamic_max_length_2(self):
  87. self.splitter.write(b'\x551234567')
  88. self.splitter.max_frame_length = 6
  89. self.splitter.write(b'89\x55123456\x55')
  90. self.assertEqual(list(self.splitter), [b'123456'])
  91. class TestDecodeTransparency(unittest.TestCase):
  92. def test_easy_decode(self):
  93. self.assertEqual(framing.decode_transparency(b'\x06abcde'), b'abcde')
  94. def test_escaped_flag(self):
  95. self.assertEqual(framing.decode_transparency(b'\x06Q\0ACK'), b'QUACK')
  96. def test_flag_byte_in_frame(self):
  97. with self.assertRaises(framing.DecodeError):
  98. framing.decode_transparency(b'\x06ab\x55de')
  99. def test_truncated_cobs_block(self):
  100. with self.assertRaises(framing.DecodeError):
  101. framing.decode_transparency(b'\x0aabc')
  102. class TestStripFCS(unittest.TestCase):
  103. def test_frame_too_short(self):
  104. with self.assertRaises(framing.CorruptFrame):
  105. framing.strip_fcs(b'abcd')
  106. def test_good_fcs(self):
  107. self.assertEqual(framing.strip_fcs(b'abcd\x11\xcd\x82\xed'), b'abcd')
  108. def test_frame_corrupted(self):
  109. with self.assertRaises(framing.CorruptFrame):
  110. framing.strip_fcs(b'abce\x11\xcd\x82\xed')
  111. def test_fcs_corrupted(self):
  112. with self.assertRaises(framing.CorruptFrame):
  113. framing.strip_fcs(b'abcd\x13\xcd\x82\xed')
  114. class TestDecodeFrame(unittest.TestCase):
  115. def test_it_works(self):
  116. # Not much to test; decode_frame is just chained decode_transparency
  117. # with strip_fcs, and both of those have already been tested separately.
  118. self.assertEqual(framing.decode_frame(b'\x0aQ\0ACK\xdf\x8d\x80t'),
  119. b'QUACK')