test_transports.py 22 KB


  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. import threading
  16. import unittest
  17. try:
  18. from unittest import mock
  19. except ImportError:
  20. import mock
  21. import construct
  22. from pebble.pulse2 import exceptions, pcmp, transports
  23. from .fake_timer import FakeTimer
  24. from . import timer_helper
  25. # Save a reference to the real threading.Timer for tests which need to
  26. # use timers even while threading.Timer is patched with FakeTimer.
  27. RealThreadingTimer = threading.Timer
  28. class CommonTransportBeforeOpenedTestCases(object):
  29. def test_send_raises_exception(self):
  30. with self.assertRaises(exceptions.TransportNotReady):
  31. self.uut.send(0xdead, b'not gonna get through')
  32. def test_open_socket_returns_None_when_ncp_fails_to_open(self):
  33. self.assertIsNone(self.uut.open_socket(0xbeef, timeout=0))
  34. class CommonTransportTestCases(object):
  35. def test_send_raises_exception_after_transport_is_closed(self):
  36. self.uut.down()
  37. with self.assertRaises(exceptions.TransportNotReady):
  38. self.uut.send(0xaaaa, b'asdf')
  39. def test_socket_is_closed_when_transport_is_closed(self):
  40. socket = self.uut.open_socket(0xabcd, timeout=0)
  41. self.uut.down()
  42. self.assertTrue(socket.closed)
  43. with self.assertRaises(exceptions.SocketClosed):
  44. socket.send(b'foo')
  45. def test_opening_two_sockets_on_same_port_is_an_error(self):
  46. socket1 = self.uut.open_socket(0xabcd, timeout=0)
  47. with self.assertRaises(KeyError):
  48. socket2 = self.uut.open_socket(0xabcd, timeout=0)
  49. def test_closing_a_socket_allows_another_to_be_opened(self):
  50. socket1 = self.uut.open_socket(0xabcd, timeout=0)
  51. socket1.close()
  52. socket2 = self.uut.open_socket(0xabcd, timeout=0)
  53. def test_opening_socket_fails_after_transport_down(self):
  54. self.uut.this_layer_down()
  55. self.assertIsNone(self.uut.open_socket(0xabcd, timeout=0))
  56. def test_opening_socket_succeeds_after_transport_bounces(self):
  57. self.uut.this_layer_down()
  58. self.uut.this_layer_up()
  59. self.uut.open_socket(0xabcd, timeout=0)
  60. class TestBestEffortTransportBeforeOpened(CommonTransportBeforeOpenedTestCases,
  61. unittest.TestCase):
  62. def setUp(self):
  63. control_protocol_patcher = mock.patch(
  64. 'pebble.pulse2.transports.TransportControlProtocol')
  65. control_protocol_patcher.start()
  66. self.addCleanup(control_protocol_patcher.stop)
  67. self.uut = transports.BestEffortApplicationTransport(
  68. interface=mock.MagicMock(), link_mtu=1500)
  69. self.uut.ncp.is_Opened.return_value = False
  70. def test_open_socket_waits_for_ncp_to_open(self):
  71. self.uut.ncp.is_Opened.return_value = True
  72. def on_ping(cb, *args):
  73. self.uut.packet_received(transports.BestEffortPacket.build(
  74. construct.Container(port=0x0001, length=5,
  75. information=b'\x02', padding=b'')))
  76. cb(True)
  77. with mock.patch.object(pcmp.PulseControlMessageProtocol, 'ping') \
  78. as mock_ping:
  79. mock_ping.side_effect = on_ping
  80. open_thread = RealThreadingTimer(0.01, self.uut.this_layer_up)
  81. open_thread.daemon = True
  82. open_thread.start()
  83. self.assertIsNotNone(self.uut.open_socket(0xbeef, timeout=0.5))
  84. open_thread.join()
  85. class TestBestEffortTransport(CommonTransportTestCases, unittest.TestCase):
  86. def setUp(self):
  87. self.addCleanup(timer_helper.cancel_all_timers)
  88. self.uut = transports.BestEffortApplicationTransport(
  89. interface=mock.MagicMock(), link_mtu=1500)
  90. self.uut.ncp.receive_configure_request_acceptable(0, [])
  91. self.uut.ncp.receive_configure_ack()
  92. self.uut.packet_received(transports.BestEffortPacket.build(
  93. construct.Container(port=0x0001, length=5,
  94. information=b'\x02', padding=b'')))
  95. def test_send(self):
  96. self.uut.send(0xabcd, b'information')
  97. self.uut.link_socket.send.assert_called_with(
  98. transports.BestEffortPacket.build(construct.Container(
  99. port=0xabcd, length=15, information=b'information',
  100. padding=b'')))
  101. def test_send_from_socket(self):
  102. socket = self.uut.open_socket(0xabcd, timeout=0)
  103. socket.send(b'info')
  104. self.uut.link_socket.send.assert_called_with(
  105. transports.BestEffortPacket.build(construct.Container(
  106. port=0xabcd, length=8, information=b'info', padding=b'')))
  107. def test_receive_from_socket_with_empty_queue(self):
  108. socket = self.uut.open_socket(0xabcd, timeout=0)
  109. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  110. socket.receive(block=False)
  111. def test_receive_from_socket(self):
  112. socket = self.uut.open_socket(0xabcd, timeout=0)
  113. self.uut.packet_received(
  114. transports.BestEffortPacket.build(construct.Container(
  115. port=0xabcd, length=8, information=b'info', padding=b'')))
  116. self.assertEqual(b'info', socket.receive(block=False))
  117. def test_receive_on_unopened_port_doesnt_reach_socket(self):
  118. socket = self.uut.open_socket(0xabcd, timeout=0)
  119. self.uut.packet_received(
  120. transports.BestEffortPacket.build(construct.Container(
  121. port=0xface, length=8, information=b'info', padding=b'')))
  122. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  123. socket.receive(block=False)
  124. def test_receive_malformed_packet(self):
  125. self.uut.packet_received(b'garbage')
  126. def test_send_equal_to_mtu(self):
  127. self.uut.send(0xaaaa, b'a'*1496)
  128. def test_send_greater_than_mtu(self):
  129. with self.assertRaisesRegex(ValueError, 'Packet length'):
  130. self.uut.send(0xaaaa, b'a'*1497)
  131. def test_transport_down_closes_link_socket_and_ncp(self):
  132. self.uut.down()
  133. self.uut.link_socket.close.assert_called_with()
  134. self.assertIsNone(self.uut.ncp.socket)
  135. def test_pcmp_port_closed_message_closes_socket(self):
  136. socket = self.uut.open_socket(0xabcd, timeout=0)
  137. self.assertFalse(socket.closed)
  138. self.uut.packet_received(
  139. transports.BestEffortPacket.build(construct.Container(
  140. port=0x0001, length=7, information=b'\x81\xab\xcd',
  141. padding=b'')))
  142. self.assertTrue(socket.closed)
  143. def test_pcmp_port_closed_message_without_socket(self):
  144. self.uut.packet_received(
  145. transports.BestEffortPacket.build(construct.Container(
  146. port=0x0001, length=7, information=b'\x81\xaa\xaa',
  147. padding=b'')))
  148. class TestReliableTransportPacketBuilders(unittest.TestCase):
  149. def test_build_info_packet(self):
  150. self.assertEqual(
  151. b'\x1e\x3f\xbe\xef\x00\x14Data goes here',
  152. transports.build_reliable_info_packet(
  153. sequence_number=15, ack_number=31, poll=True,
  154. port=0xbeef, information=b'Data goes here'))
  155. def test_build_receive_ready_packet(self):
  156. self.assertEqual(
  157. b'\x01\x18',
  158. transports.build_reliable_supervisory_packet(
  159. kind='RR', ack_number=12))
  160. def test_build_receive_ready_poll_packet(self):
  161. self.assertEqual(
  162. b'\x01\x19',
  163. transports.build_reliable_supervisory_packet(
  164. kind='RR', ack_number=12, poll=True))
  165. def test_build_receive_ready_final_packet(self):
  166. self.assertEqual(
  167. b'\x01\x19',
  168. transports.build_reliable_supervisory_packet(
  169. kind='RR', ack_number=12, final=True))
  170. def test_build_receive_not_ready_packet(self):
  171. self.assertEqual(
  172. b'\x05\x18',
  173. transports.build_reliable_supervisory_packet(
  174. kind='RNR', ack_number=12))
  175. def test_build_reject_packet(self):
  176. self.assertEqual(
  177. b'\x09\x18',
  178. transports.build_reliable_supervisory_packet(
  179. kind='REJ', ack_number=12))
  180. class TestReliableTransportBeforeOpened(CommonTransportBeforeOpenedTestCases,
  181. unittest.TestCase):
  182. def setUp(self):
  183. self.addCleanup(timer_helper.cancel_all_timers)
  184. self.uut = transports.ReliableTransport(
  185. interface=mock.MagicMock(), link_mtu=1500)
  186. def test_open_socket_waits_for_ncp_to_open(self):
  187. self.uut.ncp.is_Opened = mock.Mock()
  188. self.uut.ncp.is_Opened.return_value = True
  189. self.uut.command_socket.send = lambda packet: (
  190. self.uut.response_packet_received(
  191. transports.build_reliable_supervisory_packet(
  192. kind='RR', ack_number=0, final=True)))
  193. open_thread = RealThreadingTimer(0.01, self.uut.this_layer_up)
  194. open_thread.daemon = True
  195. open_thread.start()
  196. self.assertIsNotNone(self.uut.open_socket(0xbeef, timeout=0.5))
  197. open_thread.join()
  198. class TestReliableTransportConnectionEstablishment(unittest.TestCase):
  199. expected_rr_packet = transports.build_reliable_supervisory_packet(
  200. kind='RR', ack_number=0, poll=True)
  201. def setUp(self):
  202. FakeTimer.clear_timer_list()
  203. timer_patcher = mock.patch('threading.Timer', new=FakeTimer)
  204. timer_patcher.start()
  205. self.addCleanup(timer_patcher.stop)
  206. control_protocol_patcher = mock.patch(
  207. 'pebble.pulse2.transports.TransportControlProtocol')
  208. control_protocol_patcher.start()
  209. self.addCleanup(control_protocol_patcher.stop)
  210. self.uut = transports.ReliableTransport(
  211. interface=mock.MagicMock(), link_mtu=1500)
  212. assert isinstance(self.uut.ncp, mock.MagicMock)
  213. self.uut.ncp.is_Opened.return_value = True
  214. self.uut.this_layer_up()
  215. def send_rr_response(self):
  216. self.uut.response_packet_received(
  217. transports.build_reliable_supervisory_packet(
  218. kind='RR', ack_number=0, final=True))
  219. def test_rr_packet_is_sent_after_this_layer_up_event(self):
  220. self.uut.command_socket.send.assert_called_once_with(
  221. self.expected_rr_packet)
  222. def test_rr_command_is_retransmitted_until_response_is_received(self):
  223. for _ in range(3):
  224. FakeTimer.TIMERS[-1].expire()
  225. self.send_rr_response()
  226. self.assertFalse(FakeTimer.get_active_timers())
  227. self.assertEqual(self.uut.command_socket.send.call_args_list,
  228. [mock.call(self.expected_rr_packet)]*4)
  229. self.assertIsNotNone(self.uut.open_socket(0xabcd, timeout=0))
  230. def test_transport_negotiation_restarts_if_no_responses(self):
  231. for _ in range(self.uut.max_retransmits):
  232. FakeTimer.TIMERS[-1].expire()
  233. self.assertFalse(FakeTimer.get_active_timers())
  234. self.assertIsNone(self.uut.open_socket(0xabcd, timeout=0))
  235. self.uut.ncp.restart.assert_called_once_with()
  236. class TestReliableTransport(CommonTransportTestCases,
  237. unittest.TestCase):
  238. def setUp(self):
  239. FakeTimer.clear_timer_list()
  240. timer_patcher = mock.patch('threading.Timer', new=FakeTimer)
  241. timer_patcher.start()
  242. self.addCleanup(timer_patcher.stop)
  243. control_protocol_patcher = mock.patch(
  244. 'pebble.pulse2.transports.TransportControlProtocol')
  245. control_protocol_patcher.start()
  246. self.addCleanup(control_protocol_patcher.stop)
  247. self.uut = transports.ReliableTransport(
  248. interface=mock.MagicMock(), link_mtu=1500)
  249. assert isinstance(self.uut.ncp, mock.MagicMock)
  250. self.uut.ncp.is_Opened.return_value = True
  251. self.uut.this_layer_up()
  252. self.uut.command_socket.send.reset_mock()
  253. self.uut.response_packet_received(
  254. transports.build_reliable_supervisory_packet(
  255. kind='RR', ack_number=0, final=True))
  256. def test_send_with_immediate_ack(self):
  257. self.uut.send(0xbeef, b'Just some packet data')
  258. self.uut.command_socket.send.assert_called_once_with(
  259. transports.build_reliable_info_packet(
  260. sequence_number=0, ack_number=0, poll=True,
  261. port=0xbeef, information=b'Just some packet data'))
  262. self.assertEqual(1, len(FakeTimer.get_active_timers()))
  263. self.uut.response_packet_received(
  264. transports.build_reliable_supervisory_packet(
  265. kind='RR', ack_number=1, final=True))
  266. self.assertTrue(all(t.cancelled for t in FakeTimer.TIMERS))
  267. def test_send_with_one_timeout_before_ack(self):
  268. self.uut.send(0xabcd, b'this will be sent twice')
  269. active_timers = FakeTimer.get_active_timers()
  270. self.assertEqual(1, len(active_timers))
  271. active_timers[0].expire()
  272. self.assertEqual(1, len(FakeTimer.get_active_timers()))
  273. self.uut.command_socket.send.assert_has_calls(
  274. [mock.call(transports.build_reliable_info_packet(
  275. sequence_number=0, ack_number=0,
  276. poll=True, port=0xabcd,
  277. information=b'this will be sent twice'))]*2)
  278. self.uut.response_packet_received(
  279. transports.build_reliable_supervisory_packet(
  280. kind='RR', ack_number=1, final=True))
  281. self.assertTrue(all(t.cancelled for t in FakeTimer.TIMERS))
  282. def test_send_with_no_response(self):
  283. self.uut.send(0xd00d, b'blarg')
  284. for _ in range(self.uut.max_retransmits):
  285. FakeTimer.get_active_timers()[-1].expire()
  286. self.uut.ncp.restart.assert_called_once_with()
  287. def test_receive_info_packet(self):
  288. socket = self.uut.open_socket(0xcafe, timeout=0)
  289. self.uut.command_packet_received(transports.build_reliable_info_packet(
  290. sequence_number=0, ack_number=0, poll=True, port=0xcafe,
  291. information=b'info'))
  292. self.assertEqual(b'info', socket.receive(block=False))
  293. self.uut.response_socket.send.assert_called_once_with(
  294. transports.build_reliable_supervisory_packet(
  295. kind='RR', ack_number=1, final=True))
  296. def test_receive_duplicate_packet(self):
  297. socket = self.uut.open_socket(0xba5e, timeout=0)
  298. packet = transports.build_reliable_info_packet(
  299. sequence_number=0, ack_number=0, poll=True, port=0xba5e,
  300. information=b'all your base are belong to us')
  301. self.uut.command_packet_received(packet)
  302. self.assertEqual(b'all your base are belong to us',
  303. socket.receive(block=False))
  304. self.uut.response_socket.reset_mock()
  305. self.uut.command_packet_received(packet)
  306. self.uut.response_socket.send.assert_called_once_with(
  307. transports.build_reliable_supervisory_packet(
  308. kind='RR', ack_number=1, final=True))
  309. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  310. socket.receive(block=False)
  311. def test_queueing_multiple_packets_to_send(self):
  312. packets = [(0xfeed, b'Some data'),
  313. (0x6789, b'More data'),
  314. (0xfeed, b'Third packet')]
  315. for protocol, information in packets:
  316. self.uut.send(protocol, information)
  317. for seq, (port, information) in enumerate(packets):
  318. self.uut.command_socket.send.assert_called_once_with(
  319. transports.build_reliable_info_packet(
  320. sequence_number=seq, ack_number=0, poll=True,
  321. port=port, information=information))
  322. self.uut.command_socket.send.reset_mock()
  323. self.uut.response_packet_received(
  324. transports.build_reliable_supervisory_packet(
  325. kind='RR', ack_number=seq+1, final=True))
  326. def test_send_equal_to_mtu(self):
  327. self.uut.send(0xaaaa, b'a'*1494)
  328. def test_send_greater_than_mtu(self):
  329. with self.assertRaisesRegex(ValueError, 'Packet length'):
  330. self.uut.send(0xaaaa, b'a'*1496)
  331. def test_send_from_socket(self):
  332. socket = self.uut.open_socket(0xabcd, timeout=0)
  333. socket.send(b'info')
  334. self.uut.command_socket.send.assert_called_with(
  335. transports.build_reliable_info_packet(
  336. sequence_number=0, ack_number=0,
  337. poll=True, port=0xabcd, information=b'info'))
  338. def test_receive_from_socket_with_empty_queue(self):
  339. socket = self.uut.open_socket(0xabcd, timeout=0)
  340. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  341. socket.receive(block=False)
  342. def test_receive_from_socket(self):
  343. socket = self.uut.open_socket(0xabcd, timeout=0)
  344. self.uut.command_packet_received(transports.build_reliable_info_packet(
  345. sequence_number=0, ack_number=0, poll=True, port=0xabcd,
  346. information=b'info info info'))
  347. self.assertEqual(b'info info info', socket.receive(block=False))
  348. def test_receive_on_unopened_port_doesnt_reach_socket(self):
  349. socket = self.uut.open_socket(0xabcd, timeout=0)
  350. self.uut.command_packet_received(transports.build_reliable_info_packet(
  351. sequence_number=0, ack_number=0, poll=True, port=0x3333,
  352. information=b'info'))
  353. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  354. socket.receive(block=False)
  355. def test_receive_malformed_command_packet(self):
  356. self.uut.command_packet_received(b'garbage')
  357. self.uut.ncp.restart.assert_called_once_with()
  358. def test_receive_malformed_response_packet(self):
  359. self.uut.response_packet_received(b'garbage')
  360. self.uut.ncp.restart.assert_called_once_with()
  361. def test_transport_down_closes_link_sockets_and_ncp(self):
  362. self.uut.down()
  363. self.uut.command_socket.close.assert_called_with()
  364. self.uut.response_socket.close.assert_called_with()
  365. self.uut.ncp.down.assert_called_with()
  366. def test_pcmp_port_closed_message_closes_socket(self):
  367. socket = self.uut.open_socket(0xabcd, timeout=0)
  368. self.assertFalse(socket.closed)
  369. self.uut.command_packet_received(transports.build_reliable_info_packet(
  370. sequence_number=0, ack_number=0, poll=True, port=0x0001,
  371. information=b'\x81\xab\xcd'))
  372. self.assertTrue(socket.closed)
  373. def test_pcmp_port_closed_message_without_socket(self):
  374. self.uut.command_packet_received(transports.build_reliable_info_packet(
  375. sequence_number=0, ack_number=0, poll=True, port=0x0001,
  376. information=b'\x81\xaa\xaa'))
  377. class TestSocket(unittest.TestCase):
  378. def setUp(self):
  379. self.uut = transports.Socket(mock.Mock(), 1234)
  380. def test_empty_receive_queue(self):
  381. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  382. self.uut.receive(block=False)
  383. def test_empty_receive_queue_blocking(self):
  384. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  385. self.uut.receive(timeout=0.001)
  386. def test_receive(self):
  387. self.uut.on_receive(b'data')
  388. self.assertEqual(b'data', self.uut.receive(block=False))
  389. with self.assertRaises(exceptions.ReceiveQueueEmpty):
  390. self.uut.receive(block=False)
  391. def test_receive_twice(self):
  392. self.uut.on_receive(b'one')
  393. self.uut.on_receive(b'two')
  394. self.assertEqual(b'one', self.uut.receive(block=False))
  395. self.assertEqual(b'two', self.uut.receive(block=False))
  396. def test_receive_interleaved(self):
  397. self.uut.on_receive(b'one')
  398. self.assertEqual(b'one', self.uut.receive(block=False))
  399. self.uut.on_receive(b'two')
  400. self.assertEqual(b'two', self.uut.receive(block=False))
  401. def test_send(self):
  402. self.uut.send(b'data')
  403. self.uut.transport.send.assert_called_once_with(1234, b'data')
  404. def test_close(self):
  405. self.uut.close()
  406. self.uut.transport.unregister_socket.assert_called_once_with(1234)
  407. def test_send_after_close_is_an_error(self):
  408. self.uut.close()
  409. with self.assertRaises(exceptions.SocketClosed):
  410. self.uut.send(b'data')
  411. def test_receive_after_close_is_an_error(self):
  412. self.uut.close()
  413. with self.assertRaises(exceptions.SocketClosed):
  414. self.uut.receive(block=False)
  415. def test_blocking_receive_after_close_is_an_error(self):
  416. self.uut.close()
  417. with self.assertRaises(exceptions.SocketClosed):
  418. self.uut.receive(timeout=0.001)
  419. def test_close_during_blocking_receive_aborts_the_receive(self):
  420. thread_started = threading.Event()
  421. result = [None]
  422. def test_thread():
  423. thread_started.set()
  424. try:
  425. self.uut.receive(timeout=0.3)
  426. except Exception as e:
  427. result[0] = e
  428. thread = threading.Thread(target=test_thread)
  429. thread.daemon = True
  430. thread.start()
  431. assert thread_started.wait(timeout=0.5)
  432. self.uut.close()
  433. thread.join()
  434. self.assertIsInstance(result[0], exceptions.SocketClosed)
  435. def test_close_is_idempotent(self):
  436. self.uut.close()
  437. self.uut.close()
  438. self.assertEqual(1, self.uut.transport.unregister_socket.call_count)