[docs]classReader:""" Implementation of the reader interface which helps reading data from raw bytes. Parameters ---------- payload: Payload to read. Attributes ---------- buffer: Provided payload. position: Current reader position. """__slots__=("buffer","position")def__init__(self,payload:bytes)->None:self.buffer:bytes=payload.replace(b"\r",b"")self.position:int=0def__repr__(self)->str:returnf"<Reader(position={self.position})>"
[docs]defread_until(self,element:bytes)->bytes:""" Method to read bytes from the buffer until the specified element is encountered. Parameters ---------- element: The byte sequence indicating the end of reading. Raises ------ RuntimeError There is no specific item in the buffer starting from the current position. Notes ----- This method reads bytes from the buffer until the specified `element` is encountered. It starts reading from the current position in the buffer. """ifelementnotinself.buffer[self.position::]:raiseRuntimeError("There is no specific element in the buffer starting from the current position.")total:bytes=self.read(1)whilenottotal.endswith(element):total+=self.read(1)returntotal[:-len(element)]
[docs]defread(self,size:int|None=None)->bytes:""" Method to read bytes starting at current position. Parameters ---------- size: Number of bytes to read. """ifsizeisNone:size=len(self.buffer)-self.positiondata:bytes=self.buffer[self.position:self.position+size]self.position+=len(data)returndata