文本进度条
pipehappy
|
1#
pipehappy 发表于 2006-07-12 23:21
文本进度条
#########################
""" Here is a silly example of its usage: import progress import time import random total = 1000 p = progress.ProgressMeter(total=total) while total > 0: cnt = random.randint(1, 25) p.update(cnt) total -= cnt time.sleep(random.random()) Here is an example of its output: [-------------------------> ] 41% 821.2/sec 2006-02-20 Denis Barmenkov: ANSI codes replaced by Backspace (0x08) characters """ import time, sys, math class ProgressMeter(object): #ESC = chr(27) def __init__(self, **kw): # What time do we start tracking our progress from? self.timestamp = kw.get('timestamp', time.time()) # What kind of unit are we tracking? self.unit = str(kw.get('unit', '')) # Number of units to process self.total = int(kw.get('total', 100)) # Number of units already processed self.count = int(kw.get('count', 0)) # Refresh rate in seconds self.rate_refresh = float(kw.get('rate_refresh', .5)) # Number of ticks in meter self.meter_ticks = int(kw.get('ticks', 60)) self.meter_division = float(self.total) / self.meter_ticks self.meter_value = int(self.count / self.meter_division) self.last_update = None self.rate_history_idx = 0 self.rate_history_len = 10 self.rate_history = [None] * self.rate_history_len self.rate_current = 0.0 self.last_refresh = 0 self.prev_meter_len = 0 def update(self, count, **kw): now = time.time() # Caclulate rate of progress rate = 0.0 # Add count to Total self.count += count self.count = min(self.count, self.total) if self.last_update: delta = now - float(self.last_update) if delta: rate = count / delta else: rate = count self.rate_history[self.rate_history_idx] = rate self.rate_history_idx += 1 self.rate_history_idx %= self.rate_history_len cnt = 0 total = 0.0 # Average rate history for rate in self.rate_history: if rate == None: continue cnt += 1 total += rate rate = total / cnt self.rate_current = rate self.last_update = now # Device Total by meter division value = int(self.count / self.meter_division) if value > self.meter_value: self.meter_value = value if self.last_refresh: if (now - self.last_refresh) > self.rate_refresh or \ (self.count >= self.total): self.refresh() else: self.refresh() def get_meter(self, **kw): bar = '-' * self.meter_value pad = ' ' * (self.meter_ticks - self.meter_value) perc = (float(self.count) / self.total) * 100 return '[%s>%s] %d%% %.1f/sec' % (bar, pad, perc, self.rate_current) def refresh(self, **kw): # Clear line and return cursor to start-of-line sys.stdout.write(' ' * self.prev_meter_len + '\x08' * self.prev_meter_len) # Get meter text meter_text = self.get_meter(**kw) # Write meter and return cursor to start-of-line sys.stdout.write(meter_text + '\x08'*len(meter_text)) self.prev_meter_len = len(meter_text) # Are we finished? if self.count >= self.total: sys.stdout.write('\n') sys.stdout.flush() # Timestamp self.last_refresh = time.time() ############################### # CLASS NAME: DLLInterface # # Author: Larry Bates (lbates@syscononline.com) # # Written: 12/09/2002 # # Released under: GNU GENERAL PUBLIC LICENSE # # class progressbarClass: def __init__(self, finalcount, progresschar=None): import sys self.finalcount=finalcount self.blockcount=0 # # See if caller passed me a character to use on the # progress bar (like "*"). If not use the block # character that makes it look like a real progress # bar. # if not progresschar: self.block=chr(178) else: self.block=progresschar # # Get pointer to sys.stdout so I can use the write/flush # methods to display the progress bar. # self.f=sys.stdout # # If the final count is zero, don't start the progress gauge # if not self.finalcount : return self.f.write('\n------------------ % Progress -------------------1\n') self.f.write(' 1 2 3 4 5 6 7 8 9 0\n') self.f.write('----0----0----0----0----0----0----0----0----0----0\n') return def progress(self, count): # # Make sure I don't try to go off the end (e.g. >100%) # count=min(count, self.finalcount) # # If finalcount is zero, I'm done # if self.finalcount: percentcomplete=int(round(100*count/self.finalcount)) if percentcomplete self.blockcount: for i in range(self.blockcount,blockcount): self.f.write(self.block) self.f.flush() if percentcomplete == 100: self.f.write("\n") self.blockcount=blockcount return if __name__ == "__main__": from time import sleep pb=progressbarClass(8,"*") count=0 while count##############################!/usr/local/bin/python""" A progress display module.See the doc string for the Progress class for details. """import sys, timeclass Progress: """A class to display progress of an application. The Progress class is typically used to track the progress of long-running applications. For instance, if you want to track a loop that executes thousands of iterations it might look like: import progress ticker = progress.Progress(title='Big Loop') for elt in big_list: calculate_something() ticker.tick() del ticker The output will look something like: Big Loop: .........1.........2.........3 If you didn't want title, wanted to track not only the major and minor intervals but the average tick interval in milliseconds and number of seconds between major ticks, and set a major tick interval of 100 and minor tick interval of 25, you could instantiate the progress meter like ticker = progress.Progress(major=100, minor=25, majormark='', multiplier=1000) The output would then look something like: ......... Several formats for major and minor marks are understood - dictionary style string interpolation is used, so you can include as many or as few as you like and include arbitrary characters in mark format strings. The currently understood formats are: %(M) - the current major interval (integer) %(m) - the current minor interval (integer) %(T) - the clock time since the last major tick in seconds (float) %(t) - the clock time since the last minor tick in seconds (float) %(A) - the average clock time per tick since the last major tick in seconds (float) %(a) - the average clock time per tick since the last minor tick in seconds (float) The default minor mark is '.'. The default major mark is '%(M)d'. The default minor interval is 100 ticks. The default major interval is 1000 ticks. Mark display can be suppressed by instantiating the progress meter with verbose=0. The units of the average tick time (default is seconds) can be changed by changing the multiplier. For example, to change from seconds to milliseconds, set the multiplier argument to 1000 at instance creation. The default output stream is sys.stderr. Any object that supports write and flush methods can be used. """ def __init__(self, major=1000, minor=100, stream=sys.stderr, verbose=1, title="", minormark=".", majormark="%(M)d", multiplier=1, start=0): """create a progress meter argument default meaning major 1000 number of ticks between major marks minor 100 number of ticks between minor marks stream sys.stderr where to write output (write & flush required) verbose 1 non-zero means to display output title '' string to display before ticking minormark '.' string to display to mark minor intervals majormark '%(M)d' string to display to mark major intervals multiplier 1 multiplier to apply for A or a formats """ self.i = start self._T = self._t = 0 self.multiplier = multiplier self.major = major self.minor = minor self.stream = stream self.minormark = minormark self.majormark = majormark self.verbose = verbose if self.verbose and title: self.stream.write(title+": ") self.stream.flush() def tick(self,majormark=None,minormark=None): """mark the passage of time""" if not self._t: # delay initializing the timers until the first tick # this avoids annoying first interval inaccuracies when using # the class interactively self._t = self._T = time.clock() self.i += 1 if not self.verbose: return d = { # current Major interval "M": (self.i/self.major), # current minor interval "m": (self.i), # time since last Major tick "T": (time.clock()-self._T), # time since last minor tick "t": (time.clock()-self._t), # avg tick time since last Major tick "A": (time.clock()-self._T)*self.multiplier/self.major, # time since last minor tick "a": (time.clock()-self._t)*self.multiplier/self.minor, } if majormark is None: majormark = self.majormark if minormark is None: minormark = self.minormark if self.i % self.major == 0: self.stream.write(majormark % d) self.stream.flush() self._T = time.clock() elif self.i % self.minor == 0: self.stream.write(minormark % d) self.stream.flush() self._t = time.clock() def __del__(self): if self.verbose: self.stream.write(" (%d)\n" % self.i) self.stream.flush() def value(self): return self.iclass Counter: """A counter which displays an ever increasing number in place""" def __init__(self, interval=10, stream=sys.stderr, start=0): self.counter = start self.interval = interval self.stream = stream def tick(self): self.counter += 1 if self.counter % self.interval == 0: self.stream.write("\r%6d" % self.counter) self.stream.flush() def __del__(self): self.stream.write("\r%6d\n" % self.counter) self.stream.flush()if __name__ == "__main__": import unittest import StringIO class CounterTestCase(unittest.TestCase): def test_by_tens(self): s = StringIO.StringIO() c = Counter(stream=s) c.tick() self.assertEqual(s.getvalue(), '') c.tick();c.tick();c.tick();c.tick();c.tick() c.tick();c.tick();c.tick();c.tick();c.tick() self.assertEqual(s.getvalue(), '\r 10') del c self.assertEqual(s.getvalue(), '\r 10\r 11\n') s = StringIO.StringIO() c = Counter(stream=s) c.tick();c.tick();c.tick();c.tick();c.tick() c.tick();c.tick();c.tick();c.tick() self.assertEqual(s.getvalue(), '') c.tick() self.assertEqual(s.getvalue(), '\r 10') del c self.assertEqual(s.getvalue(), '\r 10\r 10\n') def test_by_ones(self): s = StringIO.StringIO() c = Counter(stream=s, interval=1) c.tick() self.assertEqual(s.getvalue(), '\r 1') c.tick() self.assertEqual(s.getvalue(), '\r 1\r 2') del c self.assertEqual(s.getvalue(), '\r 1\r 2\r 2\n') def test_nonzero_start(self): s = StringIO.StringIO() c = Counter(stream=s, interval=1, start=3) c.tick() self.assertEqual(s.getvalue(), '\r 4') c.tick() self.assertEqual(s.getvalue(), '\r 4\r 5') del c self.assertEqual(s.getvalue(), '\r 4\r 5\r 5\n') def test_negative_start(self): s = StringIO.StringIO() c = Counter(stream=s, interval=1, start=-10) c.tick() self.assertEqual(s.getvalue(), '\r -9') c.tick() self.assertEqual(s.getvalue(), '\r -9\r -8') del c self.assertEqual(s.getvalue(), '\r -9\r -8\r -8\n') class ProgressTestCase(unittest.TestCase): def test_by_tens(self): s = StringIO.StringIO() c = Progress(stream=s) c.tick() self.assertEqual(s.getvalue(), '') for i in xrange(100): c.tick() self.assertEqual(s.getvalue(), '.') del c self.assertEqual(s.getvalue(), '. (101)\n') s = StringIO.StringIO() c = Progress(stream=s, minor=10) c.tick();c.tick();c.tick();c.tick();c.tick() c.tick();c.tick();c.tick();c.tick() self.assertEqual(s.getvalue(), '') c.tick() self.assertEqual(s.getvalue(), '.') del c self.assertEqual(s.getvalue(), '. (10)\n') def test_by_ones(self): s = StringIO.StringIO() c = Progress(stream=s, minor=1, major=10) c.tick() self.assertEqual(s.getvalue(), '.') c.tick() self.assertEqual(s.getvalue(), '..') for i in range(10): c.tick() self.assertEqual(s.getvalue(), '.........1..') del c self.assertEqual(s.getvalue(), '.........1.. (12)\n') def test_nonzero_start(self): s = StringIO.StringIO() c = Progress(stream=s, minor=1, start=3, major=10) c.tick() self.assertEqual(s.getvalue(), '.') c.tick() self.assertEqual(s.getvalue(), '..') for i in range(10): c.tick() del c self.assertEqual(s.getvalue(), '......1..... (15)\n') def test_negative_start(self): s = StringIO.StringIO() c = Progress(stream=s, minor=1, start=-10) c.tick() self.assertEqual(s.getvalue(), '.') c.tick() self.assertEqual(s.getvalue(), '..') del c self.assertEqual(s.getvalue(), '.. (-8)\n') def test_negative_start_small_major(self): s = StringIO.StringIO() c = Progress(stream=s, minor=1, major=5, start=-10) c.tick() self.assertEqual(s.getvalue(), '.') c.tick() self.assertEqual(s.getvalue(), '..') for i in range(10): c.tick() self.assertEqual(s.getvalue(), '....-1....0..') unittest.main() 文件:progress.rar大小:2KB下载: 下载 |