Source code for pyrser.parsing.stream
import collections
"""An immutable position in a Stream.
index is the index in the Stream.
lineno is the line number of the position.
col_offset is the column offset of the position on the line.
"""
Position = collections.namedtuple('Position', 'index lineno col_offset')
[docs]class Cursor:
"""A mutable position in a Stream.
It can be initialized or set from an immutable Position.
"""
[docs] def __init__(self, position: Position=Position(0, 1, 1)):
self._maxindex = self._index = position.index
self._maxcol = self._col_offset = position.col_offset
self._maxline = self._lineno = position.lineno
self._eol = []
@property
def index(self) -> int:
"""The current index of the cursor."""
return self._index
@property
def lineno(self) -> int:
"""The current line number of the cursor."""
return self._lineno
@property
def col_offset(self) -> int:
"""The current column offset of the cursor."""
return self._col_offset
@property
def position(self) -> Position:
"""The current position of the cursor."""
return Position(self._index, self._lineno, self._col_offset)
@position.setter
def position(self, position: Position):
self._index = position.index
self._lineno = position.lineno
self._col_offset = position.col_offset
@property
def max_readed_position(self) -> Position:
"""The index of the deepest character readed."""
return Position(self._maxindex, self._maxline, self._maxcol)
[docs] def step_next_char(self):
"""Puts the cursor on the next character."""
self._index += 1
self._col_offset += 1
if self._index > self._maxindex:
self._maxindex = self._index
self._maxcol = self._col_offset
self._maxline = self._lineno
[docs] def step_prev_char(self):
"""Puts the cursor on the previous character."""
self._col_offset -= 1
self._index -= 1
[docs] def step_next_line(self):
"""Sets cursor as beginning of next line."""
self._eol.append(self.position)
self._lineno += 1
self._col_offset = 0
[docs] def step_prev_line(self):
"""Sets cursor as end of previous line."""
#TODO(bps): raise explicit error for unregistered eol
#assert self._eol[-1].index == self._index
if len(self._eol) > 0:
self.position = self._eol.pop()
[docs]class Tag:
"""Provide capture facilities"""
[docs] def __init__(self, stream: str, begin: int, end=0):
self._stream = stream
self._begin = begin
if end == 0:
self._end = begin
else:
self._end = end
[docs] def set_begin(self, begin: int):
self._begin = begin
[docs] def set_end(self, end: int):
self._end = end
[docs] def __str__(self) -> str:
if self._begin == self._end:
return ""
return self._stream[self._begin:self._end]
[docs] def __repr__(self) -> str:
return "strid:%d %s:%s" % (id(self._stream), self._begin, self._end)
[docs]class Stream:
"""Helps keep track of stream processing progress."""
[docs] def __init__(self, content: str=None, name: str=None):
self._content = content
self._len = len(content)
self._name = name
self._contexts = []
self._cursor = Cursor()
# use to store begin:end => value
self.value_cache = dict()
[docs] def __len__(self) -> int:
return self._len
[docs] def __getitem__(self, key: int or slice) -> str:
return self._content.__getitem__(key)
@property
def name(self) -> int:
"""Name of the stream."""
return self._name
@property
def eos_index(self) -> int:
"""End Of Stream index."""
return self._len
@property
def index(self) -> int:
"""The current position index."""
return self._cursor.position.index
@property
def lineno(self) -> int:
"""The current position line number."""
return self._cursor.lineno
@property
def col_offset(self) -> int:
"""The current position column offset."""
return self._cursor.col_offset
@property
def peek_char(self) -> str:
"""The current position character value."""
return self._content[self._cursor.index]
@property
def last_readed_line(self) -> str:
"""Usefull string to compute error message."""
mpos = self._cursor.max_readed_position
mindex = mpos.index
# search last \n
prevline = mindex - 1 if mindex == self.eos_index else mindex
while prevline >= 0 and self._content[prevline] != '\n':
prevline -= 1
# search next \n
nextline = mindex
while nextline < self.eos_index and self._content[nextline] != '\n':
nextline += 1
last_line = self._content[prevline + 1:nextline]
return last_line
[docs] def incpos(self, length: int=1) -> int:
"""Increment the cursor to the next character."""
if length < 0:
raise ValueError("length must be positive")
i = 0
while (i < length):
if self._cursor.index < self._len:
if self.peek_char == '\n':
self._cursor.step_next_line()
self._cursor.step_next_char()
i += 1
return self._cursor.index
[docs] def decpos(self, length: int=1) -> int:
if length < 0:
raise ValueError("length must be positive")
if (self._cursor.index - length) < 0:
raise ValueError("can't go before first byte")
i = 0
while (i < length):
if self.peek_char == '\n':
self._cursor.step_prev_line()
i += 1
else:
self._cursor.step_prev_char()
i += 1
return self._cursor.index
[docs] def save_context(self) -> bool:
"""Save current position."""
self._contexts.append(self._cursor.position)
return True
[docs] def restore_context(self) -> bool:
"""Rollback to previous saved position."""
self._cursor.position = self._contexts.pop()
return False
[docs] def validate_context(self) -> bool:
"""Discard previous saved position."""
del self._contexts[-1]
return True