# coding: utf-8
"""
brownie.terminal.progress
~~~~~~~~~~~~~~~~~~~~~~~~~
A widget-based progress bar implementation.
.. versionadded:: 0.6
:copyright: 2010-2011 by Daniel Neuhäuser
:license: BSD, see LICENSE.rst for details
"""
from __future__ import division
import re
import math
from functools import wraps
from datetime import datetime
from brownie.caching import LFUCache
from brownie.datastructures import ImmutableDict
#: Binary prefixes, largest first.
BINARY_PREFIXES = [
(u'Yi', 2 ** 80), # yobi
(u'Zi', 2 ** 70), # zebi
(u'Ei', 2 ** 60), # exbi
(u'Pi', 2 ** 50), # pebi
(u'Ti', 2 ** 40), # tebi
(u'Gi', 2 ** 30), # gibi
(u'Mi', 2 ** 20), # mebi
(u'Ki', 2 ** 10) # kibi
]
#: Positive SI prefixes, largest first.
SI_PREFIXES = [
(u'Y', 10 ** 24), # yotta
(u'Z', 10 ** 21), # zetta
(u'E', 10 ** 18), # exa
(u'P', 10 ** 15), # peta
(u'T', 10 ** 12), # tera
(u'G', 10 ** 9), # giga
(u'M', 10 ** 6), # mega
(u'k', 10 ** 3) # kilo
]
_progressbar_re = re.compile(ur"""
(?<!\$)\$([a-zA-Z]+) # identifier
(: # initial widget value
(?: # grouping to avoid : to be treated as part of
# the left or operand
"( # quoted string
(?:
[^"]| # any character except " or ...
(?<=\\)" # ... " preceded by a backslash
)*
)"|
([a-zA-Z]+) # identifiers can be used instead of strings
)
)?|
(\$\$) # escaped $
""", re.VERBOSE)
def count_digits(n):
if n == 0:
return 1
return int(math.log10(abs(n)) + (2 if n < 0 else 1))
def bytes_to_readable_format(bytes, binary=True):
prefixes = BINARY_PREFIXES if binary else SI_PREFIXES
for prefix, size in prefixes:
if bytes >= size:
result = bytes / size
return result, prefix + 'B'
return bytes, 'B'
def bytes_to_string(bytes, binary=True):
"""
Provides a nice readable string representation for `bytes`.
:param binary:
If ``True`` uses binary prefixes otherwise SI prefixes are used.
"""
result, prefix = bytes_to_readable_format(bytes, binary=binary)
if isinstance(result, int) or getattr(result, 'is_integer', lambda: False)():
return '%i%s' % (result, prefix)
return '%.02f%s' % (result, prefix)
@LFUCache.decorate(maxsize=64)
def parse_progressbar(string):
"""
Parses a string representing a progress bar.
"""
def add_text(text):
if not rv or rv[-1][0] != 'text':
rv.append(['text', text])
else:
rv[-1][1] += text
rv = []
remaining = string
while remaining:
match = _progressbar_re.match(remaining)
if match is None:
add_text(remaining[0])
remaining = remaining[1:]
elif match.group(5):
add_text(u'$')
remaining = remaining[match.end():]
else:
if match.group(3) is None:
value = match.group(4)
else:
value = match.group(3).decode('string-escape')
rv.append([match.group(1), value])
remaining = remaining[match.end():]
return rv
[docs]class Widget(object):
"""
Represents a part of a progress bar.
"""
#: The priority of the widget defines in which order they are updated. The
#: default priority is 0.
#:
#: This is important as the first widget being updated has the entire
#: line available.
priority = 0
#: Should be ``True`` if this widget depends on
#: :attr:`ProgressBar.maxsteps` being set to something other than ``None``.
requires_fixed_size = False
@property
def provides_size_hint(self):
return self.size_hint.im_func is not Widget.size_hint.im_func
[docs] def size_hint(self, progressbar):
"""
Should return the required size or ``None`` if it cannot be given.
"""
return None
[docs] def init(self, progressbar, remaining_width, **kwargs):
"""
Called when the progress bar is initialized.
Should return the output of the widget as string.
"""
raise NotImplementedError('%s.init' % self.__class__.__name__)
[docs] def update(self, progressbar, remaining_width, **kwargs):
"""
Called when the progress bar is updated, not necessarily with each
step.
Should return the output of the widget as string.
"""
raise NotImplementedError('%s.update' % self.__class__.__name__)
[docs] def finish(self, progressbar, remaining_width, **kwargs):
"""
Called when the progress bar is finished, not necessarily after
maxsteps has been reached, per default this calls :meth:`update`.
Should return the output of the widget as string.
"""
return self.update(progressbar, remaining_width, **kwargs)
def __repr__(self):
return '%s()' % self.__class__.__name__
[docs]class TextWidget(Widget):
"""
Represents static text in a progress bar.
"""
def __init__(self, text):
self.text = text
def size_hint(self, progressbar):
return len(self.text)
def update(self, progressbar, remaining_width, **kwargs):
return self.text
init = update
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.text)
[docs]class HintWidget(Widget):
"""
Represents a 'hint', changing text passed with each update, in a progress
bar.
Requires that :meth:`ProgressBar.next` is called with a `hint` keyword
argument.
This widget has a priority of 1.
"""
priority = 1
def __init__(self, initial_hint=u''):
self.initial_hint = initial_hint
def init(self, progressbar, remaining_width, **kwargs):
return self.initial_hint
def update(self, progressbar, remaining_width, **kwargs):
try:
return kwargs.get('hint', u'')
except KeyError:
raise TypeError("expected 'hint' as a keyword argument")
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.initial_hint)
[docs]class PercentageWidget(Widget):
"""
Represents a string showing the progress as percentage.
"""
requires_fixed_size = True
def calculate_percentage(self, progressbar):
return 100 / progressbar.maxsteps * progressbar.step
def size_hint(self, progressbar):
return count_digits(self.calculate_percentage(progressbar)) + 1
def init(self, progressbar, remaining_width, **kwargs):
return '0%'
def update(self, progressbar, remaining_width, **kwargs):
return '%i%%' % self.calculate_percentage(progressbar)
def finish(self, progressbar, remaining_width, **kwargs):
return '100%'
[docs]class BarWidget(Widget):
"""
A simple bar which moves with each update not corresponding with the
progress being made.
The bar is enclosed in brackets, progress is visualized by tree hashes
`###` moving forwards or backwards with each update; the rest of the bar is
filled with dots `.`.
"""
def __init__(self):
self.position = 0
self.going_forward = True
def make_bar(self, width):
parts = ['.'] * (width - 2)
parts[self.position:self.position+3] = '###'
return '[%s]' % ''.join(parts)
def init(self, progressbar, remaining_width, **kwargs):
return self.make_bar(remaining_width)
def update(self, progressbar, remaining_width, **kwargs):
width = remaining_width - 2
if (self.position + 3) > width:
self.position = width - 4
self.going_forward = False
elif self.going_forward:
self.position += 1
if self.position + 3 == width:
self.going_forward = False
else:
self.position -= 1
if self.position == 0:
self.going_forward = True
return self.make_bar(remaining_width)
[docs]class PercentageBarWidget(Widget):
"""
A simple bar which shows the progress in terms of a bar being filled
corresponding to the percentage of progress.
The bar is enclosed in brackets, progress is visualized with hashes `#`
the remaining part uses dots `.`.
"""
requires_fixed_size = True
def init(self, progressbar, remaining_width, **kwargs):
return '[%s]' % ('.' * (remaining_width - 2))
def update(self, progressbar, remaining_width, **kwargs):
percentage = 100 / progressbar.maxsteps * progressbar.step
marked_width = int(percentage * (remaining_width - 2) / 100)
return '[%s]' % ('#' * marked_width).ljust(remaining_width - 2, '.')
def finish(self, progressbar, remaining_width, **kwargs):
return '[%s]' % ('#' * (remaining_width - 2))
[docs]class StepWidget(Widget):
"""
Shows at which step we are currently at and how many are remaining as
`step of steps`.
:param unit:
If each step represents something other than a simple task e.g. a byte
when doing file transactions, you can specify a unit which is used.
Supported units:
- `'bytes'` - binary prefix only, SI might be added in the future
"""
requires_fixed_size = True
units = ImmutableDict({
'bytes': bytes_to_string,
None: unicode
})
def __init__(self, unit=None):
if unit not in self.units:
raise ValueError('unknown unit: %s' % unit)
self.unit = unit
def get_values(self, progressbar):
convert = self.units[self.unit]
return convert(progressbar.step), convert(progressbar.maxsteps)
def size_hint(self, progressbar):
step, maxsteps = self.get_values(progressbar)
return len(step) + len(maxsteps) + 4 # ' of '
def init(self, progressbar, remaining_width, **kwargs):
return u'%s of %s' % self.get_values(progressbar)
update = init
[docs]class TimeWidget(Widget):
"""
Shows the elapsed time in hours, minutes and seconds as
``$hours:$minutes:$seconds``.
This widget has a priority of 2.
"""
priority = 2
def init(self, progressbar, remaining_width, **kwargs):
self.start_time = datetime.now()
return '00:00:00'
def update(self, progressbar, remaining_width, **kwargs):
seconds = (datetime.now() - self.start_time).seconds
minutes = 0
hours = 0
minute = 60
hour = minute * 60
if seconds > hour:
hours, seconds = divmod(seconds, hour)
if seconds > minute:
minutes, seconds = divmod(seconds, minute)
return '%02i:%02i:%02i' % (hours, minutes, seconds)
[docs]class DataTransferSpeedWidget(Widget):
"""
Shows the data transfer speed in bytes per second using SI prefixes.
This widget has a priority of 2.
"""
priority = 2
def init(self, progressbar, remaining_width, **kwargs):
self.begin_timing = datetime.now()
self.last_step = 0
return '0kb/s'
def update(self, progressbar, remaining_width, **kwargs):
end_timing = datetime.now()
# .seconds is an integer so our calculations result in 0 if each update
# takes less than a second, therefore we have to calculate the exact
# time in seconds
elapsed = (end_timing - self.begin_timing).microseconds * 10 ** -6
step = progressbar.step - self.last_step
if elapsed == 0:
result = '%.02f%s/s' % bytes_to_readable_format(0, binary=False)
else:
result = '%.02f%s/s' % bytes_to_readable_format(
step / elapsed,
binary=False
)
self.begin_timing = end_timing
self.last_step = progressbar.step
return result
[docs]class ProgressBar(object):
"""
A progress bar which acts as a container for various widgets which may be
part of a progress bar.
Initializing and finishing can be done by using the progress bar as a
context manager instead of calling :meth:`init` and :meth:`finish`.
:param widgets:
An iterable of widgets which should be used.
:param writer:
A :class:`~brownie.terminal.TerminalWriter` which is used by the
progress bar.
:param maxsteps:
The number of steps, not necessarily updates, which are to be made.
"""
@classmethod
[docs] def from_string(cls, string, writer, maxsteps=None, widgets=None):
"""
Returns a :class:`ProgressBar` from a string.
The string is used as a progressbar, ``$[a-zA-Z]+`` is substituted with
a widget as defined by `widgets`.
``$`` can be escaped with another ``$`` e.g. ``$$foo`` will not be
substituted.
Initial values as required for the :class:`HintWidget` are given like
this ``$hint:initial``, if the initial value is supposed to contain a
space you have to use a quoted string ``$hint:"foo bar"``; quoted can
be escaped using a backslash.
If you want to provide your own widgets or overwrite existing ones
pass a dictionary mapping the desired names to the widget classes to
this method using the `widgets` keyword argument. The default widgets
are:
+--------------+----------------------------------+-------------------+
| Name | Class | Requires maxsteps |
+==============+==================================+===================+
| `text` | :class:`TextWidget` | No |
+--------------+----------------------------------+-------------------+
| `hint` | :class:`HintWidget` | No |
+--------------+----------------------------------+-------------------+
| `percentage` | :class:`Percentage` | Yes |
+--------------+----------------------------------+-------------------+
| `bar` | :class:`BarWidget` | No |
+--------------+----------------------------------+-------------------+
| `sizedbar` | :class:`PercentageBarWidget` | Yes |
+--------------+----------------------------------+-------------------+
| `step` | :class:`StepWidget` | Yes |
+--------------+----------------------------------+-------------------+
| `time` | :class:`TimeWidget` | No |
+--------------+----------------------------------+-------------------+
| `speed` | :class:`DataTransferSpeedWidget` | No |
+--------------+----------------------------------+-------------------+
"""
default_widgets = {
'text': TextWidget,
'hint': HintWidget,
'percentage': PercentageWidget,
'bar': BarWidget,
'sizedbar': PercentageBarWidget,
'step': StepWidget,
'time': TimeWidget,
'speed': DataTransferSpeedWidget
}
widgets = dict(default_widgets.copy(), **(widgets or {}))
rv = []
for name, initial in parse_progressbar(string):
if name not in widgets:
raise ValueError('widget not found: %s' % name)
if initial:
widget = widgets[name](initial)
else:
widget = widgets[name]()
rv.append(widget)
return cls(rv, writer, maxsteps=maxsteps)
def __init__(self, widgets, writer, maxsteps=None):
widgets = list(widgets)
if maxsteps is None:
for widget in widgets:
if widget.requires_fixed_size:
raise ValueError(
'%r requires maxsteps to be given' % widget
)
self.widgets = widgets
self.writer = writer
self.maxsteps = maxsteps
self.step = 0
def get_step(self):
return self._step
def set_step(self, new_step):
if self.maxsteps is None or new_step <= self.maxsteps:
self._step = new_step
else:
raise ValueError('step cannot be larger than maxsteps')
step = property(get_step, set_step)
del get_step, set_step
def __iter__(self):
return self
[docs] def get_widgets_by_priority(self):
"""
Returns an iterable of tuples consisting of the position of the widget
and the widget itself ordered by each widgets priority.
"""
return sorted(
enumerate(self.widgets),
key=lambda x: x[1].priority,
reverse=True
)
[docs] def get_usable_width(self):
"""
Returns the width usable by all widgets which don't provide a size
hint.
"""
return self.writer.get_usable_width() - sum(
widget.size_hint(self) for widget in self.widgets
if widget.provides_size_hint
)
def write(self, string, update=True):
if update:
self.writer.write('\r', escape=False, flush=False)
self.writer.begin_line()
self.writer.write(string)
def make_writer(updating=True, finishing=False):
def decorate(func):
@wraps(func)
def wrapper(self, **kwargs):
if finishing and self.step == self.maxsteps:
return
if updating and not finishing:
self.step += kwargs.get('step', 1)
parts = []
remaining_width = self.get_usable_width()
for i, widget in self.get_widgets_by_priority():
part = func(self, widget, remaining_width, **kwargs)
if not widget.provides_size_hint:
remaining_width -= len(part)
parts.append((i, part))
parts.sort()
self.write(''.join(part for _, part in parts), update=updating)
if finishing:
self.writer.newline()
return wrapper
return decorate
@make_writer(updating=False)
[docs] def init(self, widget, remaining_width, **kwargs):
"""
Writes the initial progress bar to the terminal.
"""
return widget.init(self, remaining_width, **kwargs)
@make_writer()
[docs] def next(self, widget, remaining_width, step=1, **kwargs):
"""
Writes an updated version of the progress bar to the terminal.
If the update corresponds to multiple steps, pass the number of steps
which have been made as an argument. If `step` is larger than
`maxsteps` a :exc:`ValueError` is raised.
"""
return widget.update(self, remaining_width, **kwargs)
@make_writer(finishing=True)
[docs] def finish(self, widget, remaining_width, **kwargs):
"""
Writes the finished version of the progress bar to the terminal.
This method may be called even if `maxsteps` has not been reached or
has not been defined.
"""
return widget.finish(self, remaining_width, **kwargs)
del make_writer
def __enter__(self):
self.init()
return self
def __exit__(self, etype, evalue, traceback):
if etype is None:
self.finish()
def __repr__(self):
return '%s(%r, %r, maxsteps=%r)' % (
self.__class__.__name__, self.widgets, self.writer, self.maxsteps
)
__all__ = [
'ProgressBar', 'TextWidget', 'HintWidget', 'PercentageWidget', 'BarWidget',
'PercentageBarWidget', 'StepWidget', 'TimeWidget', 'DataTransferSpeedWidget'
]