# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import collections import logging import struct from ..exceptions import PebbleCommanderError class ResponseParseError(PebbleCommanderError): pass class EraseError(PebbleCommanderError): pass class OpenCommand(object): command_type = 1 command_struct = struct.Struct(' 0: ret = socket.receive(block=True) resp = EraseResponse.parse(ret) logging.debug("ERASE: domain %d status %d", resp.domain, resp.status) status = resp.status if status < 0: raise EraseError(status) def write(self, data): if self.fd is None: raise ValueError('Handle is not open') mss = self.socket.mtu - WriteCommand.header_size for offset in xrange(0, len(data), mss): segment = data[offset:offset+mss] resp = self._send_and_receive(WriteCommand, WriteResponse, self.fd, self.pos, segment) assert resp.fd == self.fd assert resp.address == self.pos self.pos += len(segment) def read(self, length): if self.fd is None: raise ValueError('Handle is not open') cmd = ReadCommand(self.fd, self.pos, length) self.socket.send(cmd.packet) data = bytearray() bytes_left = length while bytes_left > 0: packet = self.socket.receive(block=True) fd, chunk_offset, chunk = ReadResponse.parse(packet) assert fd == self.fd data.extend(chunk) bytes_left -= len(chunk) return data def crc(self, length): if self.fd is None: raise ValueError('Handle is not open') resp = self._send_and_receive(CRCCommand, CRCResponse, self.fd, self.pos, length) return resp.crc def stat(self): if self.fd is None: raise ValueError('Handle is not open') if not self.STAT_FORMAT: raise NotImplementedError("Stat is not supported for domain %d" % self.DOMAIN) return self._send_and_receive(StatCommand, self.STAT_FORMAT, self.fd) class PULSEIO_Memory(PULSEIO_Base): DOMAIN = ReadDomains.MEMORY # uint32 for address, uint32 for length ERASE_FORMAT = "II" class PULSEIO_ExternalFlash(PULSEIO_Base): DOMAIN = ReadDomains.EXTERNAL_FLASH # uint32 for address, uint32 for length ERASE_FORMAT = "II" class PULSEIO_Framebuffer(PULSEIO_Base): DOMAIN = ReadDomains.FRAMEBUFFER STAT_FORMAT = StatResponse('FramebufferAttributes', 'BBBI', 'width height bpp length') class PULSEIO_Coredump(PULSEIO_Base): DOMAIN = ReadDomains.COREDUMP STAT_FORMAT = StatResponse('CoredumpAttributes', 'BI', 'unread length') ERASE_FORMAT = "I" @staticmethod def _process_args(slot): return struct.pack("