123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # Copyright 2024 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from array import array
- from pyftdi.ftdi import Ftdi
- import usb.util
- class FTDISerialWireDebug(object):
- def __init__(self, vid, pid, interface, direction, output_mask, reset_mask, frequency):
- self._direction = direction
- self._output_mask = output_mask
- self._reset_mask = reset_mask
- self._ftdi = Ftdi()
- try:
- self._ftdi.open_mpsse(vid, pid, interface, direction=direction, frequency=frequency,
- latency=1)
- except:
- self._ftdi = None
- raise
- # get the FTDI FIFO size and increase the chuncksize to match
- self._ftdi_fifo_size = min(self._ftdi.fifo_sizes)
- self._ftdi.write_data_set_chunksize(self._ftdi_fifo_size)
- self._cmd_buffer = array('B')
- self._output_enabled = False
- self._pending_acks = 0
- self._sequence_cmd_buffer = None
- def close(self):
- if not self._ftdi:
- return
- self.send_cmds()
- self._ftdi.close()
- # PyFTDI doesn't do a good job of cleaning up - make sure we release the usb device
- usb.util.dispose_resources(self._ftdi.usb_dev)
- self._ftdi = None
- def _fatal(self, message):
- raise Exception('FATAL ERROR: {}'.format(message))
- def _queue_cmd(self, write_data):
- if len(write_data) > self._ftdi_fifo_size:
- raise Exception('Data too big!')
- if self._sequence_cmd_buffer is not None:
- self._sequence_cmd_buffer.extend(write_data)
- else:
- if len(self._cmd_buffer) + len(write_data) > self._ftdi_fifo_size:
- self.send_cmds()
- self._cmd_buffer.extend(write_data)
- def _set_output_enabled(self, enabled):
- if enabled == self._output_enabled:
- return
- self._output_enabled = enabled
- direction = self._direction & ~(0x00 if enabled else self._output_mask)
- self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction])
- def reset(self):
- # toggle the reset line
- self.reset_lo()
- self.reset_hi()
- def reset_lo(self):
- direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
- self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction | self._reset_mask])
- self.send_cmds()
- def reset_hi(self):
- direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
- self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction & ~self._reset_mask])
- self.send_cmds()
- def send_cmds(self):
- if self._sequence_cmd_buffer is not None:
- self._ftdi.write_data(self._sequence_cmd_buffer)
- elif len(self._cmd_buffer) > 0:
- self._ftdi.write_data(self._cmd_buffer)
- self._cmd_buffer = array('B')
- def write_bits_cmd(self, data, num_bits):
- if num_bits < 0 or num_bits > 8:
- self._fatal('Invalid num_bits')
- elif (data & ((1 << num_bits) - 1)) != data:
- self._fatal('Invalid data!')
- self._set_output_enabled(True)
- self._queue_cmd([Ftdi.WRITE_BITS_NVE_LSB, num_bits - 1, data])
- def write_bytes_cmd(self, data):
- length = len(data) - 1
- if length < 0 or length > 0xffff:
- self._fatal('Invalid length')
- self._set_output_enabled(True)
- self._queue_cmd([Ftdi.WRITE_BYTES_NVE_LSB, length & 0xff, length >> 8] + data)
- def read_bits_cmd(self, num_bits):
- if num_bits < 0 or num_bits > 8:
- self._fatal('Invalid num_bits')
- self._set_output_enabled(False)
- self._queue_cmd([Ftdi.READ_BITS_PVE_LSB, num_bits - 1])
- def read_bytes_cmd(self, length):
- length -= 1
- if length < 0 or length > 0xffff:
- self._fatal('Invalid length')
- self._set_output_enabled(False)
- self._queue_cmd([Ftdi.READ_BYTES_PVE_LSB, length & 0xff, length >> 8])
- def get_read_bytes(self, length):
- return self._ftdi.read_data_bytes(length)
- def get_read_fifo_size(self):
- return self._ftdi_fifo_size
- def start_sequence(self):
- if self._sequence_cmd_buffer is not None:
- self._fatal('Attempted to start a sequence while one is in progress')
- self.send_cmds()
- self._sequence_cmd_buffer = array('B')
- def end_sequence(self):
- if self._sequence_cmd_buffer is None:
- self._fatal('No sequence started')
- self._sequence_cmd_buffer = None
|