test_ppp.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. import unittest
  16. try:
  17. from unittest import mock
  18. except ImportError:
  19. import mock
  20. import construct
  21. from pebble.pulse2 import ppp, exceptions
  22. from .fake_timer import FakeTimer
  23. from . import timer_helper
  24. class TestPPPEncapsulation(unittest.TestCase):
  25. def test_ppp_encapsulate(self):
  26. self.assertEqual(ppp.encapsulate(0xc021, b'Information'),
  27. b'\xc0\x21Information')
  28. class TestPPPUnencapsulate(unittest.TestCase):
  29. def test_ppp_unencapsulate(self):
  30. protocol, information = ppp.unencapsulate(b'\xc0\x21Information')
  31. self.assertEqual((protocol, information), (0xc021, b'Information'))
  32. def test_unencapsulate_empty_frame(self):
  33. with self.assertRaises(ppp.UnencapsulationError):
  34. ppp.unencapsulate(b'')
  35. def test_unencapsulate_too_short_frame(self):
  36. with self.assertRaises(ppp.UnencapsulationError):
  37. ppp.unencapsulate(b'\x21')
  38. def test_unencapsulate_empty_information(self):
  39. protocol, information = ppp.unencapsulate(b'\xc0\x21')
  40. self.assertEqual((protocol, information), (0xc021, b''))
  41. class TestConfigurationOptionsParser(unittest.TestCase):
  42. def test_no_options(self):
  43. options = ppp.OptionList.parse(b'')
  44. self.assertEqual(len(options), 0)
  45. def test_one_empty_option(self):
  46. options = ppp.OptionList.parse(b'\xaa\x02')
  47. self.assertEqual(len(options), 1)
  48. self.assertEqual(options[0].type, 0xaa)
  49. self.assertEqual(options[0].data, b'')
  50. def test_one_option_with_length(self):
  51. options = ppp.OptionList.parse(b'\xab\x07Data!')
  52. self.assertEqual((0xab, b'Data!'), options[0])
  53. def test_multiple_options_empty_first(self):
  54. options = ppp.OptionList.parse(b'\x22\x02\x23\x03a\x21\x04ab')
  55. self.assertEqual([(0x22, b''), (0x23, b'a'), (0x21, b'ab')], options)
  56. def test_multiple_options_dataful_first(self):
  57. options = ppp.OptionList.parse(b'\x31\x08option\x32\x02')
  58. self.assertEqual([(0x31, b'option'), (0x32, b'')], options)
  59. def test_option_with_length_too_short(self):
  60. with self.assertRaises(ppp.ParseError):
  61. ppp.OptionList.parse(b'\x41\x01')
  62. def test_option_list_with_malformed_option(self):
  63. with self.assertRaises(ppp.ParseError):
  64. ppp.OptionList.parse(b'\x0a\x02\x0b\x01\x0c\x03a')
  65. def test_truncated_terminal_option(self):
  66. with self.assertRaises(ppp.ParseError):
  67. ppp.OptionList.parse(b'\x61\x02\x62\x03a\x63\x0ccandleja')
  68. class TestConfigurationOptionsBuilder(unittest.TestCase):
  69. def test_no_options(self):
  70. serialized = ppp.OptionList.build([])
  71. self.assertEqual(b'', serialized)
  72. def test_one_empty_option(self):
  73. serialized = ppp.OptionList.build([ppp.Option(0xaa, b'')])
  74. self.assertEqual(b'\xaa\x02', serialized)
  75. def test_one_option_with_length(self):
  76. serialized = ppp.OptionList.build([ppp.Option(0xbb, b'Data!')])
  77. self.assertEqual(b'\xbb\x07Data!', serialized)
  78. def test_two_options(self):
  79. serialized = ppp.OptionList.build([
  80. ppp.Option(0xcc, b'foo'), ppp.Option(0xdd, b'xyzzy')])
  81. self.assertEqual(b'\xcc\x05foo\xdd\x07xyzzy', serialized)
  82. class TestLCPEnvelopeParsing(unittest.TestCase):
  83. def test_packet_no_padding(self):
  84. parsed = ppp.LCPEncapsulation.parse(b'\x01\xab\x00\x0aabcdef')
  85. self.assertEqual(parsed.code, 1)
  86. self.assertEqual(parsed.identifier, 0xab)
  87. self.assertEqual(parsed.data, b'abcdef')
  88. self.assertEqual(parsed.padding, b'')
  89. def test_padding(self):
  90. parsed = ppp.LCPEncapsulation.parse(b'\x01\xab\x00\x0aabcdefpadding')
  91. self.assertEqual(parsed.data, b'abcdef')
  92. self.assertEqual(parsed.padding, b'padding')
  93. def test_truncated_packet(self):
  94. with self.assertRaises(ppp.ParseError):
  95. ppp.LCPEncapsulation.parse(b'\x01\xab\x00\x0aabcde')
  96. def test_bogus_length(self):
  97. with self.assertRaises(ppp.ParseError):
  98. ppp.LCPEncapsulation.parse(b'\x01\xbc\x00\x03')
  99. def test_empty_data(self):
  100. parsed = ppp.LCPEncapsulation.parse(b'\x03\x01\x00\x04')
  101. self.assertEqual((3, 1, b'', b''), parsed)
  102. class TestLCPEnvelopeBuilder(unittest.TestCase):
  103. def test_build_empty_data(self):
  104. serialized = ppp.LCPEncapsulation.build(1, 0xfe, b'')
  105. self.assertEqual(b'\x01\xfe\x00\x04', serialized)
  106. def test_build_with_data(self):
  107. serialized = ppp.LCPEncapsulation.build(3, 0x2a, b'Hello, world!')
  108. self.assertEqual(b'\x03\x2a\x00\x11Hello, world!', serialized)
  109. class TestProtocolRejectParsing(unittest.TestCase):
  110. def test_protocol_and_info(self):
  111. self.assertEqual((0xabcd, b'asdfasdf'),
  112. ppp.ProtocolReject.parse(b'\xab\xcdasdfasdf'))
  113. def test_empty_info(self):
  114. self.assertEqual((0xf00d, b''),
  115. ppp.ProtocolReject.parse(b'\xf0\x0d'))
  116. def test_truncated_packet(self):
  117. with self.assertRaises(ppp.ParseError):
  118. ppp.ProtocolReject.parse(b'\xab')
  119. class TestMagicNumberAndDataParsing(unittest.TestCase):
  120. def test_magic_and_data(self):
  121. self.assertEqual(
  122. (0xabcdef01, b'datadata'),
  123. ppp.MagicNumberAndData.parse(b'\xab\xcd\xef\x01datadata'))
  124. def test_magic_no_data(self):
  125. self.assertEqual(
  126. (0xfeedface, b''),
  127. ppp.MagicNumberAndData.parse(b'\xfe\xed\xfa\xce'))
  128. def test_truncated_packet(self):
  129. with self.assertRaises(ppp.ParseError):
  130. ppp.MagicNumberAndData.parse(b'abc')
  131. class TestMagicNumberAndDataBuilder(unittest.TestCase):
  132. def test_build_empty_data(self):
  133. serialized = ppp.MagicNumberAndData.build(0x12345678, b'')
  134. self.assertEqual(b'\x12\x34\x56\x78', serialized)
  135. def test_build_with_data(self):
  136. serialized = ppp.MagicNumberAndData.build(0xabcdef01, b'foobar')
  137. self.assertEqual(b'\xab\xcd\xef\x01foobar', serialized)
  138. def test_build_with_named_attributes(self):
  139. serialized = ppp.MagicNumberAndData.build(magic_number=0, data=b'abc')
  140. self.assertEqual(b'\0\0\0\0abc', serialized)
  141. class TestControlProtocolRestartTimer(unittest.TestCase):
  142. def setUp(self):
  143. FakeTimer.clear_timer_list()
  144. timer_patcher = mock.patch('threading.Timer', new=FakeTimer)
  145. timer_patcher.start()
  146. self.addCleanup(timer_patcher.stop)
  147. self.uut = ppp.ControlProtocol()
  148. self.uut.timeout_retry = mock.Mock()
  149. self.uut.timeout_giveup = mock.Mock()
  150. self.uut.restart_count = 5
  151. def test_timeout_event_called_if_generation_ids_match(self):
  152. self.uut.restart_timer_expired(self.uut.restart_timer_generation_id)
  153. self.uut.timeout_retry.assert_called_once_with()
  154. def test_timeout_event_not_called_if_generation_ids_mismatch(self):
  155. self.uut.restart_timer_expired(42)
  156. self.uut.timeout_retry.assert_not_called()
  157. self.uut.timeout_giveup.assert_not_called()
  158. def test_timeout_event_not_called_after_stopped(self):
  159. self.uut.start_restart_timer(1)
  160. self.uut.stop_restart_timer()
  161. FakeTimer.TIMERS[-1].expire()
  162. self.uut.timeout_retry.assert_not_called()
  163. self.uut.timeout_giveup.assert_not_called()
  164. def test_timeout_event_not_called_from_old_timer_after_restart(self):
  165. self.uut.start_restart_timer(1)
  166. zombie_timer = FakeTimer.get_active_timers()[-1]
  167. self.uut.start_restart_timer(1)
  168. zombie_timer.expire()
  169. self.uut.timeout_retry.assert_not_called()
  170. self.uut.timeout_giveup.assert_not_called()
  171. def test_timeout_event_called_only_once_after_restart(self):
  172. self.uut.start_restart_timer(1)
  173. self.uut.start_restart_timer(1)
  174. for timer in FakeTimer.TIMERS:
  175. timer.expire()
  176. self.uut.timeout_retry.assert_called_once_with()
  177. self.uut.timeout_giveup.assert_not_called()
  178. class InstrumentedControlProtocol(ppp.ControlProtocol):
  179. methods_to_mock = (
  180. 'this_layer_up this_layer_down this_layer_started '
  181. 'this_layer_finished send_packet start_restart_timer '
  182. 'stop_restart_timer').split()
  183. attributes_to_mock = ('restart_timer',)
  184. def __init__(self):
  185. ppp.ControlProtocol.__init__(self)
  186. for method in self.methods_to_mock:
  187. setattr(self, method, mock.Mock())
  188. for attr in self.attributes_to_mock:
  189. setattr(self, attr, mock.NonCallableMock())
  190. class ControlProtocolTestMixin(object):
  191. CONTROL_CODE_ENUM = ppp.ControlCode
  192. def _map_control_code(self, code):
  193. try:
  194. return int(code)
  195. except ValueError:
  196. return self.CONTROL_CODE_ENUM[code].value
  197. def assert_packet_sent(self, code, identifier, body=b''):
  198. self.fsm.send_packet.assert_called_once_with(
  199. ppp.LCPEncapsulation.build(
  200. self._map_control_code(code), identifier, body))
  201. self.fsm.send_packet.reset_mock()
  202. def incoming_packet(self, code, identifier, body=b''):
  203. self.fsm.packet_received(
  204. ppp.LCPEncapsulation.build(self._map_control_code(code),
  205. identifier, body))
  206. class TestControlProtocolFSM(ControlProtocolTestMixin, unittest.TestCase):
  207. def setUp(self):
  208. self.addCleanup(timer_helper.cancel_all_timers)
  209. self.fsm = InstrumentedControlProtocol()
  210. def test_open_down(self):
  211. self.fsm.open()
  212. self.fsm.this_layer_started.assert_called_once_with()
  213. self.fsm.this_layer_up.assert_not_called()
  214. self.fsm.this_layer_down.assert_not_called()
  215. self.fsm.this_layer_finished.assert_not_called()
  216. def test_closed_up(self):
  217. self.fsm.up(mock.Mock())
  218. self.fsm.this_layer_up.assert_not_called()
  219. self.fsm.this_layer_down.assert_not_called()
  220. self.fsm.this_layer_started.assert_not_called()
  221. self.fsm.this_layer_finished.assert_not_called()
  222. def test_trivial_handshake(self):
  223. self.fsm.open()
  224. self.fsm.up(mock.Mock())
  225. self.assert_packet_sent('Configure_Request', 0)
  226. self.incoming_packet('Configure_Ack', 0)
  227. self.incoming_packet('Configure_Request', 17)
  228. self.assert_packet_sent('Configure_Ack', 17)
  229. self.assertEqual('Opened', self.fsm.state)
  230. self.assertTrue(self.fsm.this_layer_up.called)
  231. self.assertEqual(self.fsm.restart_count, self.fsm.max_configure)
  232. def test_terminate_cleanly(self):
  233. self.test_trivial_handshake()
  234. self.fsm.close()
  235. self.fsm.this_layer_down.assert_called_once_with()
  236. self.assert_packet_sent('Terminate_Request', 42)
  237. def test_remote_terminate(self):
  238. self.test_trivial_handshake()
  239. self.incoming_packet('Terminate_Request', 42)
  240. self.assert_packet_sent('Terminate_Ack', 42)
  241. self.assertTrue(self.fsm.this_layer_down.called)
  242. self.assertTrue(self.fsm.start_restart_timer.called)
  243. self.fsm.this_layer_finished.assert_not_called()
  244. self.fsm.restart_timer_expired(self.fsm.restart_timer_generation_id)
  245. self.assertTrue(self.fsm.this_layer_finished.called)
  246. self.assertEqual('Stopped', self.fsm.state)
  247. def test_remote_rejects_configure_request_code(self):
  248. self.fsm.open()
  249. self.fsm.up(mock.Mock())
  250. received_packet = self.fsm.send_packet.call_args[0][0]
  251. self.assert_packet_sent('Configure_Request', 0)
  252. self.incoming_packet('Code_Reject', 3, received_packet)
  253. self.assertEqual('Stopped', self.fsm.state)
  254. self.assertTrue(self.fsm.this_layer_finished.called)
  255. def test_receive_extended_code(self):
  256. self.fsm.handle_unknown_code = mock.Mock()
  257. self.test_trivial_handshake()
  258. self.incoming_packet(42, 11, b'Life, the universe and everything')
  259. self.fsm.handle_unknown_code.assert_called_once_with(
  260. 42, 11, b'Life, the universe and everything')
  261. def test_receive_unimplemented_code(self):
  262. self.test_trivial_handshake()
  263. self.incoming_packet(0x55, 0)
  264. self.assert_packet_sent('Code_Reject', 0, b'\x55\0\0\x04')
  265. def test_code_reject_truncates_rejected_packet(self):
  266. self.test_trivial_handshake()
  267. self.incoming_packet(0xaa, 0x20, b'a'*1496) # 1500-byte Info
  268. self.assert_packet_sent('Code_Reject', 0,
  269. b'\xaa\x20\x05\xdc' + b'a'*1492)
  270. def test_code_reject_identifier_changes(self):
  271. self.test_trivial_handshake()
  272. self.incoming_packet(0xaa, 0)
  273. self.assert_packet_sent('Code_Reject', 0, b'\xaa\0\0\x04')
  274. self.incoming_packet(0xaa, 0)
  275. self.assert_packet_sent('Code_Reject', 1, b'\xaa\0\0\x04')
  276. # Local events: up, down, open, close
  277. # Option negotiation: reject, nak
  278. # Exceptional situations: catastrophic code-reject
  279. # Restart negotiation after opening
  280. # Remote Terminate-Req, -Ack at various points in the lifecycle
  281. # Negotiation infinite loop
  282. # Local side gives up on negotiation
  283. # Corrupt packets received
  284. class TestLCPReceiveEchoRequest(ControlProtocolTestMixin, unittest.TestCase):
  285. CONTROL_CODE_ENUM = ppp.LCPCode
  286. def setUp(self):
  287. self.addCleanup(timer_helper.cancel_all_timers)
  288. self.fsm = ppp.LinkControlProtocol(mock.Mock())
  289. self.fsm.send_packet = mock.Mock()
  290. self.fsm.state = 'Opened'
  291. def send_echo_request(self, identifier=0, data=b'\0\0\0\0'):
  292. result = self.fsm.handle_unknown_code(
  293. ppp.LCPCode.Echo_Request.value, identifier, data)
  294. self.assertIsNot(result, NotImplemented)
  295. def test_echo_request_is_dropped_when_not_in_opened_state(self):
  296. self.fsm.state = 'Ack-Sent'
  297. self.send_echo_request()
  298. self.fsm.send_packet.assert_not_called()
  299. def test_echo_request_elicits_reply(self):
  300. self.send_echo_request()
  301. self.assert_packet_sent('Echo_Reply', 0, b'\0\0\0\0')
  302. def test_echo_request_with_data_is_echoed_in_reply(self):
  303. self.send_echo_request(5, b'\0\0\0\0datadata')
  304. self.assert_packet_sent('Echo_Reply', 5, b'\0\0\0\0datadata')
  305. def test_echo_request_missing_magic_number_field_is_dropped(self):
  306. self.send_echo_request(data=b'')
  307. self.fsm.send_packet.assert_not_called()
  308. def test_echo_request_with_nonzero_magic_number_is_dropped(self):
  309. self.send_echo_request(data=b'\0\0\0\x01')
  310. self.fsm.send_packet.assert_not_called()
  311. class TestLCPPing(ControlProtocolTestMixin, unittest.TestCase):
  312. CONTROL_CODE_ENUM = ppp.LCPCode
  313. def setUp(self):
  314. FakeTimer.clear_timer_list()
  315. timer_patcher = mock.patch('threading.Timer', new=FakeTimer)
  316. timer_patcher.start()
  317. self.addCleanup(timer_patcher.stop)
  318. self.fsm = ppp.LinkControlProtocol(mock.Mock())
  319. self.fsm.send_packet = mock.Mock()
  320. self.fsm.state = 'Opened'
  321. def respond_to_ping(self):
  322. [echo_request_packet], _ = self.fsm.send_packet.call_args
  323. self.assertEqual(b'\x09'[0], echo_request_packet[0])
  324. echo_response_packet = b'\x0a' + echo_request_packet[1:]
  325. self.fsm.packet_received(echo_response_packet)
  326. def test_ping_when_lcp_is_not_opened_is_an_error(self):
  327. cb = mock.Mock()
  328. self.fsm.state = 'Ack-Rcvd'
  329. with self.assertRaises(ppp.LinkStateError):
  330. self.fsm.ping(cb)
  331. cb.assert_not_called()
  332. def test_zero_attempts_is_an_error(self):
  333. with self.assertRaises(ValueError):
  334. self.fsm.ping(mock.Mock(), attempts=0)
  335. def test_negative_attempts_is_an_error(self):
  336. with self.assertRaises(ValueError):
  337. self.fsm.ping(mock.Mock(), attempts=-1)
  338. def test_zero_timeout_is_an_error(self):
  339. with self.assertRaises(ValueError):
  340. self.fsm.ping(mock.Mock(), timeout=0)
  341. def test_negative_timeout_is_an_error(self):
  342. with self.assertRaises(ValueError):
  343. self.fsm.ping(mock.Mock(), timeout=-0.1)
  344. def test_straightforward_ping(self):
  345. cb = mock.Mock()
  346. self.fsm.ping(cb)
  347. cb.assert_not_called()
  348. self.assertEqual(1, self.fsm.send_packet.call_count)
  349. self.respond_to_ping()
  350. cb.assert_called_once_with(True)
  351. def test_one_timeout_before_responding(self):
  352. cb = mock.Mock()
  353. self.fsm.ping(cb, attempts=2)
  354. FakeTimer.TIMERS[-1].expire()
  355. cb.assert_not_called()
  356. self.assertEqual(2, self.fsm.send_packet.call_count)
  357. self.respond_to_ping()
  358. cb.assert_called_once_with(True)
  359. def test_one_attempt_with_no_reply(self):
  360. cb = mock.Mock()
  361. self.fsm.ping(cb, attempts=1)
  362. FakeTimer.TIMERS[-1].expire()
  363. self.assertEqual(1, self.fsm.send_packet.call_count)
  364. cb.assert_called_once_with(False)
  365. def test_multiple_attempts_with_no_reply(self):
  366. cb = mock.Mock()
  367. self.fsm.ping(cb, attempts=2)
  368. timer_one = FakeTimer.TIMERS[-1]
  369. timer_one.expire()
  370. timer_two = FakeTimer.TIMERS[-1]
  371. self.assertIsNot(timer_one, timer_two)
  372. timer_two.expire()
  373. self.assertEqual(2, self.fsm.send_packet.call_count)
  374. cb.assert_called_once_with(False)
  375. def test_late_reply(self):
  376. cb = mock.Mock()
  377. self.fsm.ping(cb, attempts=1)
  378. FakeTimer.TIMERS[-1].expire()
  379. self.respond_to_ping()
  380. cb.assert_called_once_with(False)
  381. def test_this_layer_down_during_ping(self):
  382. cb = mock.Mock()
  383. self.fsm.ping(cb)
  384. self.fsm.this_layer_down()
  385. FakeTimer.TIMERS[-1].expire()
  386. cb.assert_not_called()
  387. def test_echo_reply_with_wrong_identifier(self):
  388. cb = mock.Mock()
  389. self.fsm.ping(cb, attempts=1)
  390. [echo_request_packet], _ = self.fsm.send_packet.call_args
  391. echo_response_packet = bytearray(echo_request_packet)
  392. echo_response_packet[0] = 0x0a
  393. echo_response_packet[1] += 1
  394. self.fsm.packet_received(bytes(echo_response_packet))
  395. cb.assert_not_called()
  396. FakeTimer.TIMERS[-1].expire()
  397. cb.assert_called_once_with(False)
  398. def test_echo_reply_with_wrong_data(self):
  399. cb = mock.Mock()
  400. self.fsm.ping(cb, attempts=1)
  401. [echo_request_packet], _ = self.fsm.send_packet.call_args
  402. # Generate a syntactically valid Echo-Reply with the right
  403. # identifier but completely different data.
  404. identifier = bytearray(echo_request_packet)[1]
  405. echo_response_packet = bytes(
  406. b'\x0a' + bytearray([identifier]) +
  407. b'\0\x26\0\0\0\0bad reply bad reply bad reply.')
  408. self.fsm.packet_received(bytes(echo_response_packet))
  409. cb.assert_not_called()
  410. FakeTimer.TIMERS[-1].expire()
  411. cb.assert_called_once_with(False)
  412. def test_successive_pings_use_different_identifiers(self):
  413. self.fsm.ping(mock.Mock(), attempts=1)
  414. [echo_request_packet_1], _ = self.fsm.send_packet.call_args
  415. identifier_1 = bytearray(echo_request_packet_1)[1]
  416. self.respond_to_ping()
  417. self.fsm.ping(mock.Mock(), attempts=1)
  418. [echo_request_packet_2], _ = self.fsm.send_packet.call_args
  419. identifier_2 = bytearray(echo_request_packet_2)[1]
  420. self.assertNotEqual(identifier_1, identifier_2)
  421. def test_unsolicited_echo_reply_doesnt_break_anything(self):
  422. self.fsm.packet_received(b'\x0a\0\0\x08\0\0\0\0')
  423. def test_malformed_echo_reply(self):
  424. cb = mock.Mock()
  425. self.fsm.ping(cb, attempts=1)
  426. # Only three bytes of Magic-Number
  427. self.fsm.packet_received(b'\x0a\0\0\x07\0\0\0')
  428. cb.assert_not_called()
  429. # Trying to start a second ping while the first ping is still happening
  430. def test_starting_a_ping_while_another_is_active_is_an_error(self):
  431. cb = mock.Mock()
  432. self.fsm.ping(cb, attempts=1)
  433. cb2 = mock.Mock()
  434. with self.assertRaises(exceptions.AlreadyInProgressError):
  435. self.fsm.ping(cb2, attempts=1)
  436. FakeTimer.TIMERS[-1].expire()
  437. cb.assert_called_once_with(False)
  438. cb2.assert_not_called()
  439. # General tests:
  440. # - Length too short for a valid packet
  441. # - Packet truncated (length field > packet len)
  442. # - Packet with padding
  443. # OptionList codes:
  444. # 1 Configure-Request
  445. # 2 Configure-Ack
  446. # 3 Configure-Nak
  447. # 4 Configure-Reject
  448. # Raw data codes:
  449. # 5 Terminate-Request
  450. # 6 Terminate-Ack
  451. # 7 Code-Reject
  452. # 8 Protocol-Reject
  453. # - Empty Rejected-Information field
  454. # - Rejected-Protocol field too short
  455. # Magic number + data codes:
  456. # 10 Echo-Reply
  457. # 11 Discard-Request
  458. # 12 Identification (RFC 1570)