Commit c87b79c5 authored by Federico Asara's avatar Federico Asara

PAGE API introduction.

Agilent33250A support added.
Sis33 support added.
SineWaveform descriptor added.

demo-sweep sweep frequency of a sinusoidal wave while reading data from an ADC.
parent 23311255
from Generator import *
from SineWaveform import SineWaveform
from serial import Serial
from struct import pack
from time import sleep
from numpy import ndarray
#~ import progressbar as pbar
"""This class should manage the Agilent 33250A waveform generator"""
class Agilent33250A(Generator):
_parameters = {'device':('Serial device', 'Serial device used to communicate with the generator', "/dev/ttyUSB1", 'file'),
'bauds':('Bauds', 'Speed of the communication', 9600, int),
'to':('Timeout', 'Timeout during read operations', 2, int),
'ict':('Inter character space', 'Pause time between each character sent', 1, int)}
functionList = ('SIN', 'SQU', 'RAMP', 'PULS', 'NOIS', 'DC', 'USER')
def __init__(self, *args, **kwargs):
self.parameters = dict(self._parameters)
for i in kwargs:
if i in self.parameters.keys():
self.parameters[i][2] = kwargs[i]
for i in self.parameters.keys():
self.__setattr__(i, self.parameters[i][2])
self.adaptDict = {SineWaveform: self.adaptSine,
list: self.adaptData,
tuple: self.adaptData,
ndarray: self.adaptData,
str: self.adaptSavedFunction}
def connect(self):
self.comm = Serial(port = self.device, baudrate = self.bauds, timeout = self.to, interCharTimeout=self.ict)
self.comm.read(2)
# utilities
def adaptSavedFunction(self, wave, *args, **kwargs):
self.function = ('USER', wave)
return ""
def adaptSine(self, wave, *args, **kwargs):
return "APPL:SIN %d HZ, %d VPP, %d V" % (wave.frequency, wave.amplitude, wave.dc)
def adaptData(self, data, *args, **kwargs):
self.dataUpload(data, *args, **kwargs)
self.function = ('USER')
self.function = ('USER', 'VOLATILE')
return ''
def play(self, wave, *args, **kwargs):
self.command(self.adapt(wave, *args, **kwargs))
def command(self, what):
if len(what) == 0:
return
if type(what) is str:
what = (what, )
return sum(map(lambda x: self.comm.write("%s\n" % x), what))
# output
@Property
def output():
doc = "Output status of the generator"
def fget(self):
self.command("OUTP?")
output = self.comm.read(2)[0]
return output == "1"
def fset(self, status):
if type(status) is not bool:
return
self.command("OUTP %d" % (1 if status else 0))
return locals()
@Property
def function():
doc = "Function used by the generator"
def fget(self):
self.command("FUNC?")
output = self.comm.readline()[:-1] # avoid \n
if output == 'USER':
self.command('FUNC:USER?')
u = self.comm.readline()[:-1] # avoid \n
return (output, u)
return (output, )
def fset(self, f):
if type(f) in (tuple, list):
if len(f) == 2:
f, n = f
else:
return
if type(f) != str:
return
f = f.upper()
if ' ' in f:
f, n = f.split(' ')
else:
n = ''
if f not in self.functionList:
return
self.command("FUNC %s %s" % (f, n))
return locals()
@Property
def frequency():
doc = "Frequency used by the generator"
def fget(self):
self.command("FREQ?")
output = eval(self.comm.readline()[:-1]) # avoid \n
return output
def fset(self, value):
f = ' '.join(parse(value, 'HZ'))
self.command("FREQ %s" % f)
return locals()
@Property
def voltage():
doc = "Output amplitude"
def fget(self):
self.command("VOLT?")
V = eval(self.comm.readline()[:-1]) # avoid \n
self.command("VOLT? MIN")
m = eval(self.comm.readline()[:-1])
self.command("VOLT? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('VOLT %s' % v[:3])
return
f = ' '.join(parse(v, 'V'))
self.command("VOLT %s" % f)
return locals()
@Property
def voltageOffset():
doc = "Offset of the output signal"
def fget(self):
self.command("VOLT:OFFS?")
V = eval(self.comm.readline()[:-1]) # avoid \n
self.command("VOLT:OFFS? MIN")
m = eval(self.comm.readline()[:-1])
self.command("VOLT:OFFS? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('VOLT:OFFS %s' % v[:3])
return
f = ' '.join(parse(v, 'V'))
self.command("VOLT:OFFS %s" % f)
return locals()
# skipping volt:high volt:low
@Property
def voltageRangeAuto():
doc = "Voltage autoranging for all function. Setter supports also ONCE"
def fget(self):
self.command("VOLT:RANG:AUTO?")
output = self.comm.read(2)[0]
return output == "1"
def fset(self, status):
if type(status) is not bool:
if status != 'ONCE':
return
else:
if status: status = 'ON'
else: status = 'OFF'
self.command("VOLT:RANG:AUTO %s" % status)
return locals()
# skipping volt:high or volt:low
@Property
def squareDutyCycle():
doc = "Duty cycle of a square wave"
def fget(self):
self.command("FUNC:SQU:DCYC?")
V = eval(self.comm.readline()[:-1])
self.command("FUNC:SQU:DCYC? MIN")
m = eval(self.comm.readline()[:-1])
self.command("FUNC:SQU:DCYC? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('FUNC:SQU:DCYC %s' % v[:3])
return
self.command("FUNC:SQU:DCYC %f" % v)
return locals()
# data
def dataUpload(self, data, ttw = 0.002):
"""Upload a sequence of integers to the volatile memory of the generator.
TTW is the time to wait between each character of the sequence, which
is transferred in ASCII"""
#~ i = str(len(data))
#~ widgets = ['Something: ', pbar.Percentage(), ' ', pbar.Bar(),
#~ ' ', pbar.ETA(), ' ', pbar.FileTransferSpeed()]
#~ progress = pbar.ProgressBar(widgets=widgets)
#~ d = pack('>%sh', data)
#~ command = 'DATA:DAC VOLATILE, #%d%s %s\n' % (len(i), i, d)
#~ progress = pbar.ProgressBar(widgets=widgets)
#~ for i in progress(command):
#~ self.comm.write(i)
#~ sleep(ttw)
#~ return
command = 'DATA:DAC VOLATILE, %s\n' % ', '.join(str(i) for i in data)
print "writing"
self.comm.write(command)
print "done"
#for i in progress(command):
# self.comm.write(i)
# sleep(ttw)
def dataStore(self, destination):
"""Save VOLATILE waveform into 'destination'"""
if type(destination) is not str: return
self.command("DATA:COPY %s" % destination)
@Property
def dataCatalog():
doc = "List of all available arbitrary waveforms"
def fget(self):
self.command('DATA:CAT?')
return tuple(self.comm.readline()[:-1].replace('"', '').split(','))
return locals()
@Property
def dataNVCatalog():
doc = "List of the user defined waveforms store in non-volatile memory"
def fget(self):
self.command('DATA:NVOL:CAT?')
return tuple(self.comm.readline()[:-1].replace('"', '').split(','))
return locals()
@Property
def dataFree():
doc = "Free arbitrary waveform slots in non-volatile memory"
def fget(self):
self.command('DATA:NVOL:FREE?')
return int(self.comm.readline()[:-1])
return locals()
def dataDel(self, what):
"""Delete the waveform 'what'. If 'what' is all, then delete everything."""
if type(what) is not str: return
if what.upper() == 'ALL':
self.command('DATA:DEL:ALL')
else:
self.command('DATA:DEL %s' % what)
# commands
def sweep(self, interval, waves, callback = None):
for w in waves:
self.play(w)
sleep(interval)
if callback is not None:
callback(w)
if __name__ == '__main__':
import sys
g = Agilent33250A(sys.argv[1])
g.output = True
waves = [SineWaveform(f = (i, 'KHZ')) for i in xrange(1, 30)]
def callback(x):
print x.frequency
g.sweep(0.2, waves, callback)
g.output = False
class Configurable(object):
parameters = {}
def applyDefaults(self, o = None):
if o is None:
o = self
for i in self.parameters:
self.save(i,self.parameters[i][2])
def register(self, parameter, position, description, default, kind):
self.parameters[parameter] = (position, description, default, kind)
def save(self, parameter, value, o = None):
if o is None:
o = self
o.__setattr__(self.parameters[parameter][0], value)
def read(self, parameter, o = None):
if o is None:
o = self
return o.__getattr__(self.parameters[parameter][0])
import serial
import time
from Configurable import *
from Utilities import *
"""This class should manage a generic waveform generator"""
class Generator(object):
adaptDict = {}
config = Configurable()
def adapt(self, wave, *args, **kwargs):
return self.adaptDict[type(wave)](wave, *args, **kwargs)
import Waveform
from Utilities import *
from numpy import *
class SineWaveform(Waveform.Waveform):
_parameters = {'frequency':('Frequency', 'Frequency of the sinewave, in HZ', 1000, float),
'amplitude':('Amplitude', 'Amplitude of the sinewave, in Vpp', 1, float),
'dc':('DC Compoment', 'DC component of the sinewave, in Vpp', 1, float)}
def __init__(self, *args, **kwargs):
self.parameters = dict(self._parameters)
for i in kwargs:
if i in self.parameters.keys():
self.parameters[i][2] = kwargs[i]
for i in self.parameters.keys():
self.__setattr__(i, self.parameters[i][2])
def generate(self, sampleRate, samples, nbits, fsr):
f = self.frequency
A = self.amplitude
C = self.dc
t = arange(samples, dtype=float)/sampleRate
s = A*sin(2*pi*f*t) +C
lsb = fsr/(2**nbits)
return (s/lsb).astype(int)
#
from ctypes import *
from Configurable import *
lib = CDLL("./libsis33.L865.so")
SIS33_ROUND_NEAREST, SIS33_ROUND_DOWN, SIS33_ROUND_UP = range(3)
SIS33_TRIGGER_START, SIS33_TRIGGER_STOP = range(2)
SIS33_CLKSRC_INTERNAL, SIS33_CLKSRC_EXTERNAL = range(2)
class Sis33Acq(Structure):
_fields_ = [("data",POINTER(c_uint16)),
("nr_samples" , c_uint32),
("prevticks", c_uint64),
("size", c_uint32)]
@classmethod
def zalloc(cls, events, ev_length):
pointer = POINTER(cls)
lib.sis33_acqs_zalloc.restype = pointer
acqs = lib.sis33_acqs_zalloc(events, ev_length)
# insert error control
return acqs
@staticmethod
def free(item, n_acqs):
lib.sis33_acqs_free(item, n_acqs)
class Timeval(Structure):
_fields_ = [("tv_sec", c_uint32),
("tv_usec", c_uint32)]
@classmethod
def create(cls, s, u):
t = cls()
t.tv_sec = s;
t.tv_usec = u;
return t
class Sis33Exception(Exception):
@classmethod
def spawn(cls, desc):
return cls(strerror(errno()), desc)
def Property(func):
return property(**func())
def logLevel(l):
return lib.sis33_loglevel(l)
def errno():
return lib.sis33_errno()
def strerror(i):
return lib.sis33_strerror(i)
def perror(s):
lib.sis33_perror(s)
"""This class should manage a generic waveform generator"""
class Sis33Device(object):
_ptr = 0
class SineWaveform(Waveform.Waveform):
_parameters = {'index':('Device Index', 'Serial device used to communicate with the generator', 2, int)}
def __init__(self, *args, **kwargs):
self.parameters = dict(self._parameters)
for i in kwargs:
if i in self.parameters.keys():
self.parameters[i][2] = kwargs[i]
for i in self.parameters.keys():
self.__setattr__(i, self.parameters[i][2])
self.pointer = lib.sis33_open(self.index)
def __del__(self, index):
if self._ptr != 0:
self.close()
def close(self):
lib.sis33_close(self.pointer)
self._ptr = 0 # bypass pointer
@Property
def pointer():
doc = "Device descriptor"
def fget(self):
if (self._ptr == 0): raise Sis33Exception('Null pointer')
return self._ptr
def fset(self, value):
if (value == 0): raise Sis33Exception('Null pointer')
self._ptr = value
return locals()
@Property
def clockSource():
doc = "Clock source of the device"
def fget(self):
i = c_int()
if lib.sis33_get_clock_source(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Clock Source')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_clock_source(self.pointer, value):
raise Sis33Exception.spawn('Set Clock Source (%d)' % value)
return locals()
@Property
def clockFrequency():
doc = "Clock frequency used"
def fget(self):
i = c_int()
if lib.sis33_get_clock_frequency(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Clock Frequency')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_clock_frequency(self.pointer, value):
raise Sis33Exception.spawn('Set Clock Frequency (%d)' % value)
return locals()
@Property
def clockFrequencies():
doc = "Clock frequencies"
def fget(self):
i = lib.sis33_get_nr_clock_frequencies(self.pointer)
if i == 0:
raise Sis33Exception('Clock Frequencies number is 0')
output = (c_uint*i)()
if lib.sis33_get_available_clock_frequencies(self.pointer, output, c_long(i)):
raise Sis33Exception.spawn('Get Clock Frequencies')
return tuple(output)
return locals()
def roundClockFrequency(self, clkfreq, roundType):
"""Round a clock frequency to a valid value. """
return lib.sis33_round_clock_frequency(self.pointer, clkfreq, roundType)
@Property
def eventLengths():
doc = "Get the available event lengths. "
def fget(self):
i = lib.sis33_get_nr_event_lengths(self.pointer)
if i == 0:
raise Sis33Exception('Event Length number is 0')
output = (c_uint*i)()
if lib.sis33_get_available_event_lengths(self.pointer, output, c_long(i)):
raise Sis33Exception.spawn('Get Event Lengths')
return tuple(output)
return locals()
def roundEventLength(self, evLen, roundType):
"""Round an event length to a valid value. """
return lib.sis33_round_event_length(self.pointer, evLen, roundType)
def trigger(self, trigger):
if lib.sis33_trigger(self.pointer, trigger):
raise Sis33Exception.spawn('Trigger (%d)' % trigger)
@Property
def enableExternalTrigger():
doc = "Enable/Disable status of the external trigger"
def fget(self):
i = c_int()
if lib.sis33_get_enable_external_trigger(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Enble External Trigger')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_enable_external_trigger(self.pointer, value):
raise Sis33Exception.spawn('Set Enable External Trigger (%d)' % value)
return locals()
@Property
def startAuto():
doc = "Autostart mode"
def fget(self):
i = c_int()
if lib.sis33_get_start_auto(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Start Auto')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_start_auto(self.pointer, value):
raise Sis33Exception.spawn('Set Start Auto (%d)' % value)
return locals()
@Property
def startDelay():
doc = "Start delay"
def fget(self):
i = c_int()
if lib.sis33_get_start_delay(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Start Delay')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_start_delay(self.pointer, value):
raise Sis33Exception.spawn('Set Start Delay (%d)' % value)
return locals()
@Property
def stopAuto():
doc = "Autostop mode"
def fget(self):
i = c_int()
if lib.sis33_get_stop_auto(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Stop Auto')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_stop_auto(self.pointer, value):
raise Sis33Exception.spawn('Set Stop Auto (%d)' % value)
return locals()
@Property
def stopDelay():
doc = "Stop delay"
def fget(self):
i = c_int()
if lib.sis33_get_stop_delay(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Stop Delay')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_stop_delay(self.pointer, value):
raise Sis33Exception.spawn('Set Stop Delay (%d)' % value)
return locals()
@Property
def nrChannels():
doc = "Number of channels on the device."
def fget(self):
return lib.sis33_get_nr_channels(self.pointer)
return locals()
def channelSetOffset(self, channel, offset):
i = lib.sis33_channel_set_offset(self.pointer, channel, offset)
if i != 0:
raise Sis33Exception.spawn('Channel Set Offset (%d)' % channel)
def channelGetOffset(self, channel):
r = c_int()
i = lib.sis33_channel_get_offset(self.pointer, channel, byref(r))
if i != 0:
raise Sis33Exception.spawn('Channel Get Offset (%d)' % channel)
return r
def channelSetOffsetAll(self, offset):
i = lib.sis33_channel_set_offset_all(self.pointer, offset)
if i != 0:
raise Sis33Exception.spawn('Channel Set Offset All')
@Property
def nrSegments():
doc = "Number of segments on the device."
def fget(self):
i = c_int()
if lib.sis33_get_nr_segments(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Nr Segments')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_nr_segments(self.pointer, value):
raise Sis33Exception.spawn('Set Nr Segments (%d)' % value)
return locals()
@Property
def maxNrSegments():
doc = "Maximum number of segments"
def fget(self):
i = c_int()
if lib.sis33_get_max_nr_segments(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Maximum Nr Segments')
return i.value
return locals()
@Property
def minNrSegments():
doc = "Minimum number of segments"
def fget(self):
i = c_int()
if lib.sis33_get_min_nr_segments(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Minimum Nr Segments')
return i.value
return locals()
@Property
def maxNrEvents():
doc = "Maximum number of events"
def fget(self):
i = c_int()
if lib.sis33_get_max_nr_events(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Maximum Nr Events')
return i.value
return locals()
@Property
def nrBits():
doc = "Number of bits of the device."
def fget(self):
return lib.sis33_get_nr_bits(self.pointer)
return locals()
@Property
def eventTimestampingIsSupported():
doc = "Device implements event hardware timestamping."
def fget(self):
return lib.sis33_event_timestamping_is_supported(self.pointer)
return locals()
@Property
def maxEventTimestampingClockTicks():
doc = "Maximum number of clock ticks of the event timestamping unit. "
def fget(self):
i = c_int()
if lib.sis33_get_max_event_timestamping_clock_ticks(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Maximum Event Timestamping Clock Ticks')
return i.value
return locals()
@Property
def maxEventTimestampingDivider():
doc = "Maximum event timestamping divider available on a device. "
def fget(self):
i = c_int()
if lib.sis33_get_max_event_timestamping_divider(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Maximum Event Timestamping Divider')
return i.value
return locals()
@Property
def eventTimestampingDivider():
doc = "Current event timestamping divider on a device. "
def fget(self):
i = c_int()
if lib.sis33_get_event_timestamping_divider(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Event Timestamping Divider')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_event_timestamping_divider(self.pointer, value):
raise Sis33Exception.spawn('Set Event Timestamping Divider (%d)' % value)
return locals()
def acq(self, segment, nr_events, ev_length):
ev_length = self.roundEventLength(ev_length, SIS33_ROUND_NEAREST)
if lib.sis33_acq(self.pointer, segment, nr_events, ev_length):
raise Sis33Exception.spawn('Acq')
def acqWait(self, segment, nr_events, ev_length):
ev_length = self.roundEventLength(ev_length, SIS33_ROUND_NEAREST)
if lib.sis33_acq_wait(self.pointer, segment, nr_events, ev_length):
raise Sis33Exception.spawn('Acq Wait')
def acqTimeout(self, segment, nr_events, ev_length, timeout):
ev_length = self.roundEventLength(ev_length, SIS33_ROUND_NEAREST)
if lib.sis33_acq_timeout(self.pointer, segment, nr_events, ev_length, timeout):
raise Sis33Exception.spawn('Acq Timeout')
def acqCancel(self):
if lib.sis33_acq_cancel(self.pointer):
raise Sis33Exception.spawn('Acq Cancel')
def fetch(self, segment, channel, acqs, n_acqs, endtime):
if lib.sis33_fetch(self.pointer, segment, channel, acqs, n_acqs, endtime) < 0:
raise Sis33Exception.spawn('Fetch')
def fetchWait(self, segment, channel, acqs, n_acqs, endtime):
if lib.sis33_fetch_wait(self.pointer, segment, channel, acqs, n_acqs, endtime) < 0:
raise Sis33Exception.spawn('Fetch Wait')
def fetchTimeout(self, segment, channel, acqs, n_acqs, endtime, timeout):
if lib.sis33_fetch_timeout(self.pointer, segment, channel, acqs, n_acqs, endtime, timeout) < 0:
raise Sis33Exception.spawn('Fetch Timeout')
def readEvent(self, channel, samples, segment = 0):
ev_length = self.roundEventLength(samples, SIS33_ROUND_UP)
self.acqWait(segment, 1, ev_length)
a = Sis33Acq.zalloc(1, ev_length)
t = Timeval()
self.fetchWait(segment, channel, byref(a[0]), 1, byref(t))
# memory leak!
return [a[0].data[i] for i in xrange(samples)]
def Property(func):
return property(**func())
decodeDict = {'KHZ': 3, 'MHZ': 6, 'HZ':0, 'UHZ': -6, 'NHZ': 9,
'MV': -3, 'NV': -6, 'KV': 3, 'V':0,
'MVPP': -3, 'NVPP': -6, 'KVPP': 3, 'VPP':0}
def decode(values):
return float(values[0])* (10.**float(decodeDict[values[1].upper()]))
def parse(value, s):
if type(value) is str:
value = value.split(" ")
value[0] = float(value[0])
value = tuple(value)
if type(value) is tuple and len(value) == 1:
value = value[0]
if type(value) is not tuple:
value = (value, s)
return tuple(str(i) for i in value)
from numpy import array
from Configurable import *
class Waveform(object):
def generate(self, nbits, frequency, samples, fsr):
return array([])
__author__="Federico"
__date__ ="$Aug 17, 2011 4:43:08 PM$"
# PAIn: Python ADC Interface
import SineWaveform
import Agilent33250A
import Sis33
waveforms = (SineWaveform, )
generators = (Agilent33250A, )
adcs = (Sis33, )
import sys
from Agilent33250A import *
from SineWaveform import *
from Sis33 import *
import cPickle
g = Agilent33250A(sys.argv[1])
s = Sis33Device(int(sys.argv[2]))
w1 = 5e6
w2 = 6e6
step = 1e6
def generateTwoTone(i):
return SineWaveform(w1+step*i).generate(30e6, 1e4, 12, 1.) + SineWaveform(w2+step*i).generate(30e6, 1e4, 12, 1.)
waves = (SineWaveform(w1+step*i, 2, 0) for i in xrange(11))#(generateTwoTone(i) for i in xrange(11))
output = []
def callback(x):
output.append(s.readEvent(7, 10000))
g.output = True
g.sweep(1, waves, callback)
g.output = False
cPickle.dump(output, file('/tmp/output.pickle', 'w'))
s.close()
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
'''Text progress bar library for Python.
A text progress bar is typically used to display the progress of a long
running operation, providing a visual cue that processing is underway.
The ProgressBar class manages the current progress, and the format of the line
is given by a number of widgets. A widget is an object that may display
differently depending on the state of the progress bar. There are three types
of widgets:
- a string, which always shows itself
- a ProgressBarWidget, which may return a different value every time its
update method is called
- a ProgressBarWidgetHFill, which is like ProgressBarWidget, except it
expands to fill the remaining width of the line.
The progressbar module is very easy to use, yet very powerful. It will also
automatically enable features like auto-resizing when the system supports it.
'''
from __future__ import division
import math
import os
import signal
import sys
import time
try:
from fcntl import ioctl
from array import array
import termios
except ImportError:
pass
from progressbar.compat import *
from progressbar.widgets import *
__author__ = 'Nilton Volpato'
__author_email__ = 'first-name dot last-name @ gmail.com'
__date__ = '2011-05-14'
__version__ = '2.3'
class UnknownLength: pass
class ProgressBar(object):
'''The ProgressBar class which updates and prints the bar.
A common way of using it is like:
>>> pbar = ProgressBar().start()
>>> for i in range(100):
... # do something
... pbar.update(i+1)
...
>>> pbar.finish()
You can also use a ProgressBar as an iterator:
>>> progress = ProgressBar()
>>> for i in progress(some_iterable):
... # do something
...
Since the progress bar is incredibly customizable you can specify
different widgets of any type in any order. You can even write your own
widgets! However, since there are already a good number of widgets you
should probably play around with them before moving on to create your own
widgets.
The term_width parameter represents the current terminal width. If the
parameter is set to an integer then the progress bar will use that,
otherwise it will attempt to determine the terminal width falling back to
80 columns if the width cannot be determined.
When implementing a widget's update method you are passed a reference to
the current progress bar. As a result, you have access to the
ProgressBar's methods and attributes. Although there is nothing preventing
you from changing the ProgressBar you should treat it as read only.
Useful methods and attributes include (Public API):
- currval: current progress (0 <= currval <= maxval)
- maxval: maximum (and final) value
- finished: True if the bar has finished (reached 100%)
- start_time: the time when start() method of ProgressBar was called
- seconds_elapsed: seconds elapsed since start_time and last call to
update
- percentage(): progress in percent [0..100]
'''
__slots__ = ('currval', 'fd', 'finished', 'last_update_time',
'left_justify', 'maxval', 'next_update', 'num_intervals',
'poll', 'seconds_elapsed', 'signal_set', 'start_time',
'term_width', 'update_interval', 'widgets', '_time_sensitive',
'__iterable')
_DEFAULT_MAXVAL = 100
_DEFAULT_TERMSIZE = 80
_DEFAULT_WIDGETS = [Percentage(), ' ', Bar()]
def __init__(self, maxval=None, widgets=None, term_width=None, poll=1,
left_justify=True, fd=sys.stderr):
'''Initializes a progress bar with sane defaults'''
# Don't share a reference with any other progress bars
if widgets is None:
widgets = list(self._DEFAULT_WIDGETS)
self.maxval = maxval
self.widgets = widgets
self.fd = fd
self.left_justify = left_justify
self.signal_set = False
if term_width is not None:
self.term_width = term_width
else:
try:
self._handle_resize()
signal.signal(signal.SIGWINCH, self._handle_resize)
self.signal_set = True
except (SystemExit, KeyboardInterrupt): raise
except:
self.term_width = self._env_size()
self.__iterable = None
self._update_widgets()
self.currval = 0
self.finished = False
self.last_update_time = None
self.poll = poll
self.seconds_elapsed = 0
self.start_time = None
self.update_interval = 1
def __call__(self, iterable):
'Use a ProgressBar to iterate through an iterable'
try:
self.maxval = len(iterable)
except:
if self.maxval is None:
self.maxval = UnknownLength
self.__iterable = iter(iterable)
return self
def __iter__(self):
return self
def __next__(self):
try:
value = next(self.__iterable)
if self.start_time is None: self.start()
else: self.update(self.currval + 1)
return value
except StopIteration:
self.finish()
raise
# Create an alias so that Python 2.x won't complain about not being
# an iterator.
next = __next__
def _env_size(self):
'Tries to find the term_width from the environment.'
return int(os.environ.get('COLUMNS', self._DEFAULT_TERMSIZE)) - 1
def _handle_resize(self, signum=None, frame=None):
'Tries to catch resize signals sent from the terminal.'
h, w = array('h', ioctl(self.fd, termios.TIOCGWINSZ, '\0' * 8))[:2]
self.term_width = w
def percentage(self):
'Returns the progress as a percentage.'
return self.currval * 100.0 / self.maxval
percent = property(percentage)
def _format_widgets(self):
result = []
expanding = []
width = self.term_width
for index, widget in enumerate(self.widgets):
if isinstance(widget, WidgetHFill):
result.append(widget)
expanding.insert(0, index)
else:
widget = format_updatable(widget, self)
result.append(widget)
width -= len(widget)
count = len(expanding)
while count:
portion = max(int(math.ceil(width * 1. / count)), 0)
index = expanding.pop()
count -= 1
widget = result[index].update(self, portion)
width -= len(widget)
result[index] = widget
return result
def _format_line(self):
'Joins the widgets and justifies the line'
widgets = ''.join(self._format_widgets())
if self.left_justify: return widgets.ljust(self.term_width)
else: return widgets.rjust(self.term_width)
def _need_update(self):
'Returns whether the ProgressBar should redraw the line.'
if self.currval >= self.next_update or self.finished: return True
delta = time.time() - self.last_update_time
return self._time_sensitive and delta > self.poll
def _update_widgets(self):
'Checks all widgets for the time sensitive bit'
self._time_sensitive = any(getattr(w, 'TIME_SENSITIVE', False)
for w in self.widgets)
def update(self, value=None):
'Updates the ProgressBar to a new value.'
if value is not None and value is not UnknownLength:
if (self.maxval is not UnknownLength
and not 0 <= value <= self.maxval):
raise ValueError('Value out of range')
self.currval = value
if not self._need_update(): return
if self.start_time is None:
raise RuntimeError('You must call "start" before calling "update"')
now = time.time()
self.seconds_elapsed = now - self.start_time
self.next_update = self.currval + self.update_interval
self.fd.write(self._format_line() + '\r')
self.last_update_time = now
def start(self):
'''Starts measuring time, and prints the bar at 0%.
It returns self so you can use it like this:
>>> pbar = ProgressBar().start()
>>> for i in range(100):
... # do something
... pbar.update(i+1)
...
>>> pbar.finish()
'''
if self.maxval is None:
self.maxval = self._DEFAULT_MAXVAL
self.num_intervals = max(100, self.term_width)
self.next_update = 0
if self.maxval is not UnknownLength:
if self.maxval < 0: raise ValueError('Value out of range')
self.update_interval = self.maxval / self.num_intervals
self.start_time = self.last_update_time = time.time()
self.update(0)
return self
def finish(self):
'Puts the ProgressBar bar in the finished state.'
self.finished = True
self.update(self.maxval)
self.fd.write('\n')
if self.signal_set:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
'''Compatability methods and classes for the progressbar module'''
# Python 3.x (and backports) use a modified iterator syntax
# This will allow 2.x to behave with 3.x iterators
if not hasattr(__builtins__, 'next'):
def next(iter):
try:
# Try new style iterators
return iter.__next__()
except AttributeError:
# Fallback in case of a "native" iterator
return iter.next()
# Python < 2.5 does not have "any"
if not hasattr(__builtins__, 'any'):
def any(iterator):
for item in iterator:
if item: return True
return False
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# progressbar - Text progress bar library for Python.
# Copyright (c) 2005 Nilton Volpato
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
'''Default ProgressBar widgets'''
from __future__ import division
import datetime
import math
try:
from abc import ABCMeta, abstractmethod
except ImportError:
AbstractWidget = object
abstractmethod = lambda fn: fn
else:
AbstractWidget = ABCMeta('AbstractWidget', (object,), {})
def format_updatable(updatable, pbar):
if hasattr(updatable, 'update'): return updatable.update(pbar)
else: return updatable
class Widget(AbstractWidget):
'''The base class for all widgets
The ProgressBar will call the widget's update value when the widget should
be updated. The widget's size may change between calls, but the widget may
display incorrectly if the size changes drastically and repeatedly.
The boolean TIME_SENSITIVE informs the ProgressBar that it should be
updated more often because it is time sensitive.
'''
TIME_SENSITIVE = False
__slots__ = ()
@abstractmethod
def update(self, pbar):
'''Updates the widget.
pbar - a reference to the calling ProgressBar
'''
class WidgetHFill(Widget):
'''The base class for all variable width widgets.
This widget is much like the \\hfill command in TeX, it will expand to
fill the line. You can use more than one in the same line, and they will
all have the same width, and together will fill the line.
'''
@abstractmethod
def update(self, pbar, width):
'''Updates the widget providing the total width the widget must fill.
pbar - a reference to the calling ProgressBar
width - The total width the widget must fill
'''
class Timer(Widget):
'Widget which displays the elapsed seconds.'
__slots__ = ('format',)
TIME_SENSITIVE = True
def __init__(self, format='Elapsed Time: %s'):
self.format = format
@staticmethod
def format_time(seconds):
'Formats time as the string "HH:MM:SS".'
return str(datetime.timedelta(seconds=int(seconds)))
def update(self, pbar):
'Updates the widget to show the elapsed time.'
return self.format % self.format_time(pbar.seconds_elapsed)
class ETA(Timer):
'Widget which attempts to estimate the time of arrival.'
TIME_SENSITIVE = True
def update(self, pbar):
'Updates the widget to show the ETA or total time when finished.'
if pbar.currval == 0:
return 'ETA: --:--:--'
elif pbar.finished:
return 'Time: %s' % self.format_time(pbar.seconds_elapsed)
else:
elapsed = pbar.seconds_elapsed
eta = elapsed * pbar.maxval / pbar.currval - elapsed
return 'ETA: %s' % self.format_time(eta)
class FileTransferSpeed(Widget):
'Widget for showing the transfer speed (useful for file transfers).'
format = '%6.2f %s%s/s'
prefixes = ' kMGTPEZY'
__slots__ = ('unit', 'format')
def __init__(self, unit='B'):
self.unit = unit
def update(self, pbar):
'Updates the widget with the current SI prefixed speed.'
if pbar.seconds_elapsed < 2e-6 or pbar.currval < 2e-6: # =~ 0
scaled = power = 0
else:
speed = pbar.currval / pbar.seconds_elapsed
power = int(math.log(speed, 1000))
scaled = speed / 1000.**power
return self.format % (scaled, self.prefixes[power], self.unit)
class AnimatedMarker(Widget):
'''An animated marker for the progress bar which defaults to appear as if
it were rotating.
'''
__slots__ = ('markers', 'curmark')
def __init__(self, markers='|/-\\'):
self.markers = markers
self.curmark = -1
def update(self, pbar):
'''Updates the widget to show the next marker or the first marker when
finished'''
if pbar.finished: return self.markers[0]
self.curmark = (self.curmark + 1) % len(self.markers)
return self.markers[self.curmark]
# Alias for backwards compatibility
RotatingMarker = AnimatedMarker
class Counter(Widget):
'Displays the current count'
__slots__ = ('format',)
def __init__(self, format='%d'):
self.format = format
def update(self, pbar):
return self.format % pbar.currval
class Percentage(Widget):
'Displays the current percentage as a number with a percent sign.'
def update(self, pbar):
return '%3d%%' % pbar.percentage()
class FormatLabel(Timer):
'Displays a formatted label'
mapping = {
'elapsed': ('seconds_elapsed', Timer.format_time),
'finished': ('finished', None),
'last_update': ('last_update_time', None),
'max': ('maxval', None),
'seconds': ('seconds_elapsed', None),
'start': ('start_time', None),
'value': ('currval', None)
}
__slots__ = ('format',)
def __init__(self, format):
self.format = format
def update(self, pbar):
context = {}
for name, (key, transform) in self.mapping.items():
try:
value = getattr(pbar, key)
if transform is None:
context[name] = value
else:
context[name] = transform(value)
except: pass
return self.format % context
class SimpleProgress(Widget):
'Returns progress as a count of the total (e.g.: "5 of 47")'
__slots__ = ('sep',)
def __init__(self, sep=' of '):
self.sep = sep
def update(self, pbar):
return '%d%s%d' % (pbar.currval, self.sep, pbar.maxval)
class Bar(WidgetHFill):
'A progress bar which stretches to fill the line.'
__slots__ = ('marker', 'left', 'right', 'fill', 'fill_left')
def __init__(self, marker='#', left='|', right='|', fill=' ',
fill_left=True):
'''Creates a customizable progress bar.
marker - string or updatable object to use as a marker
left - string or updatable object to use as a left border
right - string or updatable object to use as a right border
fill - character to use for the empty part of the progress bar
fill_left - whether to fill from the left or the right
'''
self.marker = marker
self.left = left
self.right = right
self.fill = fill
self.fill_left = fill_left
def update(self, pbar, width):
'Updates the progress bar and its subcomponents'
left, marker, right = (format_updatable(i, pbar) for i in
(self.left, self.marker, self.right))
width -= len(left) + len(right)
# Marker must *always* have length of 1
marker *= int(pbar.currval / pbar.maxval * width)
if self.fill_left:
return '%s%s%s' % (left, marker.ljust(width, self.fill), right)
else:
return '%s%s%s' % (left, marker.rjust(width, self.fill), right)
class ReverseBar(Bar):
'A bar which has a marker which bounces from side to side.'
def __init__(self, marker='#', left='|', right='|', fill=' ',
fill_left=False):
'''Creates a customizable progress bar.
marker - string or updatable object to use as a marker
left - string or updatable object to use as a left border
right - string or updatable object to use as a right border
fill - character to use for the empty part of the progress bar
fill_left - whether to fill from the left or the right
'''
self.marker = marker
self.left = left
self.right = right
self.fill = fill
self.fill_left = fill_left
class BouncingBar(Bar):
def update(self, pbar, width):
'Updates the progress bar and its subcomponents'
left, marker, right = (format_updatable(i, pbar) for i in
(self.left, self.marker, self.right))
width -= len(left) + len(right)
if pbar.finished: return '%s%s%s' % (left, width * marker, right)
position = int(pbar.currval % (width * 2 - 1))
if position > width: position = width * 2 - position
lpad = self.fill * (position - 1)
rpad = self.fill * (width - len(marker) - len(lpad))
# Swap if we want to bounce the other way
if not self.fill_left: rpad, lpad = lpad, rpad
return '%s%s%s%s%s' % (left, lpad, marker, rpad, right)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment