mySerial
Adesso abbiamo la base di una piccola libreria di widgets GTK pronta per essere usata nelle nostre applicazioni.
Andremo a realizzare la nostra prima applicazione reale!
Ho scelto di realizzare un terminale seriale poichè, nelle applicazioni industriali, la comunicazione per lo sviluppo è quasi sempre basata su questo tipo di comunicazione.
myAppSerial
Creiamo la struttura della nostra applicazione.
Per fare questo dobbiamo creare una directory di lavoro a cui assegneremo il nome di myAppSerial.
Iniziamo col definire un nuovo package (libreria) a cui assegneremo il nome myLib.
Per fare questo creiamo la directory myLib. Qui aggiungiamo un file vuoto di nome __init__.py.
Ora decomprimiamo la nostro base di partenza 20150904.zip dentro la directory mySerial.
La libreria la trovate nel post del 4/set/2015 come si deduce anche dal nome della stessa .
Alla fine dovremo la seguente struttura:
myAppSerial/
l00_start.py
l01_startGtk.py
my00init.py
myLib/
__init__.py
myWidg/
__init__.py
my00init.py
my00initGtk.py
my01Box.py
my02Label.py
my02Entry.py
my02TxtView.py
my03Button.py
my03ChkButton.py
myWind.py
myApp.py
myLib
Ora iniziamo a popolare la nostra libreria. Partiamo dalle librerie di basso livello senza interfaccia grafica GUI.
Creiamo un’ altro package che conterrà al suo interno tutti i moduli che hanno a che fare con le comunicazioni seriali. Lo chiameremo myProtocol. Creiamo la directory di nome myProtocol e al suo interno creiamo un file vuoto di nome __init__.py.
struttura aggiornata
myAppSerial/
l00_start.py
l01_startGtk.py
my00init.py
myLib/
__init__.py
myProtocol/
__init__.py
Ora copiamo dentro la directory myProtocol lo stesso file che troviamo alla base myAppSerial/my00init. Questo ci serve come init iniziale del nostro micro ambiente myProtocol. Apriamolo ed eliminiamo le linee da 24 a 33 che sono quelle che interessano le GTK. In questo micro ambiente non ne abbiamo bisogno.
Inoltre aggiungiamo la variabile printable che ci sarà utile per verificare se un carattere è stampabile nel prossimo modulo che andremo a definire.
# caratteri stampabili
printable=list("'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ")
mySerial
Fa uso del package pySerial che dovrete installarlo nel vostro ambiente. Vi rimando al link del sito in cui troverete ampia documentazione.
Questo modulo mySerial sarà il responsabile della gestione delle nostre seriali.
#------------------------------------------------------------------------------
# myClass
#------------------------------------------------------------------------------
class MySerial(object):
""" gestione di una comunicazione Seriale
Attributi:
- ser istanza seriale
- dlTx ritardo tra un invio ed un'altro
- cou caratteri ricevuti
- buf buffer di ricezione
Metodi:
- close chiusura del socket
- chaSetting configurazione parametri
- sndChar invio di un char espresso come stringa es: '\n'
- sndString invio di una stringa es: 'str' (stringa ascii)
- rcvChar ricezione di un carattere (funzione non bloccante)
- rcvString ricezione di una N caratteri (funzione non bloccante)
- rcvStrTimeOut ricezione di N caratteri entro un certo tempo
"""
def __init__(self, por="/dev/ttyS", par=['1','115200','8','N','1'], ope=True, deb=False):
""" Inizializzo il dispositivo
-> por tipo device (dipende dal S.O.)
-> par parametri di configurazione
-> ope stabilisco se deve essere aperta la connessione
"""
# referenzio il flag di Debug
self.deb = deb
self.ope = ope
self.par = par
self.por = por+par[0]
# Gestione apertura collegamento
if self.ope:
try:
# provo ad aprire la connessione
self.ser = serial.Serial(self.por)
# configuro i parametri e provo apertura
try:
self.ser.close()
self.ser.open()
# set parameters
self.chaSetting()
# debug
if self.deb:
# ok
print "%s now is open." %self.por
except:
self.ser = None
# seriale gia' aperta
print "%s port not opened!" %self.por
except:
# nessuna seriale presente
self.ser = None
# debug
if self.deb:
print "%s not present!" %self.por
#sys.exit()
# nessuna verifica di presenza device!
else:
self.ser = serial.Serial()
# ritardo tra un invio e il successivo
self.dlTx = 0.002
def close(self):
"""Chiusura del socket"""
self.ser.close()
# debug
if self.deb:
print "%s now is close." %self.por
def __del__(self):
"""Distruzione dell'oggetto"""
try:
self.close()
del self.ser
except:
pass
def chaSetting(self):
"""Configurazione parametri """
# forzo la chiusura per inizializzare i nuovi parametri
self.ser.close()
self.ser.port = self.por
self.ser.baudrate = atoi(self.par[1])
self.ser.bytesize = atoi(self.par[2])
self.ser.parity = self.par[3]
self.ser.stopbits = atoi(self.par[4])
try:
self.ser.open()
except:
print "Problem to open the connection!!!"""
print self.ser
def sndByte(self, byt):
""" Invio 1 byte"""
cha = "%c" %byt
self.ser.write(cha)
# ritardo tra un invio e il successivo
sleep(self.dlTx)
if self.deb:
# rendo visibile il codice
cod = "\\x%02X" %ord(cha)
print cod,
def sndData(self, dat):
""" Invio di sequenza di bytes"""
# comando da inviare
strCmd=""
for byt in dat:
strCmd+=("%c" %byt)
# Non uso sndByte per evitare il ritardo self.dlTx
self.ser.write(strCmd)
if self.deb:
for ele in dat:
print "\\x%02X" %ele,
print
sleep(0.001)
def sndChar(self, cmd):
""" Invio di 1 carattere espresso come stringa es: '\n' """
for ele in cmd:
cha = "%c" %ele
self.ser.write(cha)
# ritardo tra un invio e il successivo
sleep(self.dlTx)
if self.deb:
if cha == '\r':
print "\n",
else:
print cha,
def sndString(self, str):
""" Invio di una stringa 'str' (stringa ascii) """
for ch in str:
self.ser.write(ch)
# ritardo tra un invio e il successivo
sleep(self.dlTx)
# debug
if self.deb:
print "send:",
printHex(str)
def rcvByte(self):
""" Ricezione di N bytes (funzione non bloccante) """
# verifico se c'e' qualcosa in ricezione
cou = self.ser.inWaiting()
if cou > 0:
# prelievo un byte
return self.ser.read(cou)
else:
return None
# alias di rcvByte
def rcvChar(self):
""" Ricezione di N bytes (funzione non bloccante) """
return self.rcvByte()
def rcvString(self, num=1):
""" Attesa di N bytes (funzione non bloccante) """
# verifico quanti caratteri ci sono gia' nel buffer
cou = self.ser.inWaiting()
if cou > 0:
if cou >= num:
# provo la ricezione
dat = self.ser.read(num)
else:
dat = self.ser.read(cou)
# dati letti
num = cou
else:
dat = None
num = 0
return (num, dat)
def rcvDataTimeOut(self, num=1, tou=0.1):
""" Ricezione di una sequenza di bytes entro un certo tempo
<- self.buf = caratteri acquisiti
"""
# pilisco il buffer prima della ricezione
self.buf = ""
# contatore caratteri da ricevere
self.cou = 0
# referenzio lo start per il tOut
tim = clock()
# flag
flg = True
while flg:
# calcolo il tempo trascorso
now = (clock() - tim)
if now > tou:
# forzo l'uscita (tempo scaduto)
flg = False
# verifico dati in ricezione
cou, dat = self.rcvString(num)
if cou:
# salvo i dati ricevuti
self.buf += dat
self.cou += cou
if self.cou >= num:
flg = False
# ritorno False se è scaduto il tempo
if now > tou:
ret = False
# ritorno True se ho ricevuto tutti i dati
else:
ret = True
return (ret, self.buf)
# alias di rcvDataTimeOut
def rcvStrTimeOut(self, num=1, tou=0.1):
""" Ricezione di N caratteri entro un certo tempo
<- self.buf = caratteri acquisiti
"""
return self.rcvDataTimeOut(num, tou)
#------------------------------------------------------------------------------
# myTry
#------------------------------------------------------------------------------
def myTry01():
# inizializzazione in base al S.O.
if sys.platform == 'win32':
# Windows (numerazione parte da 1)
por = "COM"
Il codice è stato scritto per non essere bloccante. Questo ci permetterà di lavorare in modo asincrono se ne avessimo bisogno.
Inoltre nel caso volessimo lavorare in modo sincrono ho sfruttato il timer del nostro PC per creare un time out oltre il quale comunque usciremo dal codice bloccante.
tim = clock()
# flag
flg = True
while flg:
# calcolo il tempo trascorso
now = (clock() - tim)
if now > tou:
# forzo l'uscita (tempo scaduto)
flg = False
# verifico dati in ricezione
cou, dat = self.rcvString(num)
if cou:
# salvo i dati ricevuti
self.buf += dat
self.cou += cou
if self.cou >= num:
flg = False
# ritorno False se è scaduto il tempo
if now > tou:
ret = False
# ritorno True se ho ricevuto tutti i dati
else:
ret = True
return (ret, self.buf)
# alias di rcvDataTimeOut
def rcvStrTimeOut(self, num=1, tou=0.1):
""" Ricezione di N caratteri entro un certo tempo
<- self.buf = caratteri acquisiti
"""
return self.rcvDataTimeOut(num, tou)
#------------------------------------------------------------------------------
# myTry
Vi lascio allo studio del rimanente codice.
myTry01
else:
# Unix (numerazione parte da 0)
#por = "/dev/ttyS"
#por = "/dev/ttymxc"
por = "/dev/ttyUSB"
par = ['0','115200','8','N','1']
# istanza di un seriale
self = MySerial(por=por, par=par, ope=1, deb=1)
if self.ser:
# init DTR/RTS
if 0:
# set to low
self.ser.setDTR(1)
sleep(0.01)
# set to high
self.ser.setDTR(0)
self.ser.setRTS(0)
print "serial open: OK!"
return self
else:
print "Serial not found!"
return None
#------------------------------------------------------------------------------
def myTry02():
self = myTry01()
if self:
# invio una stringa
self.sndString("Ciao mondo!\n")
# attendo (num) N caratteri entro (tou) un tempo
res,buf = self.rcvStrTimeOut(num=50, tou=0.01)
print "%s" %('timeOut:','ok:')[res], "chars receiver:",len(buf)
Qui vediamo il modo di selezionare l’ inizializzazione in base al S.O. sistema operativo in cui gira il codice. Può tornare utile anche in altre applicazioni multi piattaforma.
par = ['0','115200','8','N','1']
# istanza di un seriale
self = MySerial(por=por, par=par, ope=1, deb=1)
if self.ser:
# init DTR/RTS
if 0:
# set to low
self.ser.setDTR(1)
sleep(0.01)
# set to high
Inoltre ho aggiunto del codice di esempio per fare vedere come sollecitare l’ handshake.
else:
print "Serial not found!"
return None
#------------------------------------------------------------------------------
def myTry02():
myTry02
printHex(buf)
# print "%s" %buf
#-----------------------------------------------------------------------------
# Main
#------------------------------------------------------------------------------
if __name__ == "__main__":
# test arguments
if len(sys.argv) == 1:
# no arguments (scelgo io)
choi = 2
Qui sfruttiamo il codice di prima per l’ inizializzazione. Poi facendo un loop hardware sui segnali Tx e Rx del nostro dispositivo seriale proviamo ad inviare e ricevere una stringa col nostro codice.
Se proviamo ad avviare il test da terminale $ python mySerial.py vedremo:

mySerial in esecuzione.
Package
La struttura aggiornata del nostro package è la seguente:
myAppSerial/
l00_start.py
l01_startGtk.py
my00init.py
myLib/
__init__.py
myProtocol/
__init__.py
my00init.py
mySerial.py
Per scaricare la nuova versione 20150905.zip
Saluti
Per oggi mi fermo qui.
Nel prossimo post vedremo il lato del protocollo.
Ciao alla prossima. (stay tune!)