mySerial

Adesso abbiamo la base di una piccola libreria di widgets GTK pronta per essere usata nelle nostre applicazioni.

Andremo a realizzare la nostra prima applicazione reale!

Ho scelto di realizzare un terminale seriale poichè, nelle applicazioni industriali, la comunicazione per lo sviluppo è quasi sempre basata su questo tipo di comunicazione.

myAppSerial

Creiamo la struttura della nostra applicazione.

Per fare questo dobbiamo creare una directory di lavoro a cui assegneremo il nome di myAppSerial.

Iniziamo col definire un nuovo package (libreria) a cui assegneremo il nome myLib.

Per fare questo creiamo la directory myLib. Qui aggiungiamo un file vuoto di nome __init__.py.

Ora decomprimiamo la nostro base di partenza 20150904.zip dentro la directory mySerial.

La libreria la trovate nel post del 4/set/2015 come si deduce anche dal nome della stessa .

Alla fine dovremo la seguente struttura:

myAppSerial/
  l00_start.py
  l01_startGtk.py
  my00init.py
  myLib/
    __init__.py
  myWidg/
    __init__.py
    my00init.py
    my00initGtk.py
    my01Box.py
    my02Label.py
    my02Entry.py
    my02TxtView.py
    my03Button.py
    my03ChkButton.py
    myWind.py
    myApp.py

myLib

Ora iniziamo a popolare la nostra libreria. Partiamo dalle librerie di basso livello senza interfaccia grafica GUI.

Creiamo un’ altro package che conterrà al suo interno tutti i moduli che hanno a che fare con le comunicazioni seriali. Lo chiameremo myProtocol. Creiamo la directory di nome myProtocol e al suo interno creiamo un file vuoto di nome __init__.py.

struttura aggiornata

myAppSerial/
  l00_start.py
  l01_startGtk.py
  my00init.py
  myLib/
    __init__.py
    myProtocol/
      __init__.py

Ora copiamo dentro la directory myProtocol lo stesso file che troviamo alla base myAppSerial/my00init. Questo ci serve come init iniziale del nostro micro ambiente myProtocol. Apriamolo ed eliminiamo le linee da 24 a 33 che sono quelle che interessano le GTK. In questo micro ambiente non ne abbiamo bisogno.

Inoltre aggiungiamo la variabile printable che ci sarà utile per verificare se un carattere è stampabile nel prossimo modulo che andremo a definire.

# caratteri stampabili
printable=list("'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ")

mySerial

Fa uso del package pySerial che dovrete installarlo nel vostro ambiente. Vi rimando al link del sito in cui troverete ampia documentazione.

Questo modulo mySerial sarà il responsabile della gestione delle nostre seriali.

#------------------------------------------------------------------------------
# myClass
#------------------------------------------------------------------------------
class MySerial(object):
	""" gestione di una comunicazione Seriale
		Attributi:
			- ser 			istanza seriale
			- dlTx 			ritardo tra un invio ed un'altro
			- cou 			caratteri ricevuti
			- buf 			buffer di ricezione
		Metodi:
			- close			chiusura del socket
			- chaSetting	configurazione parametri
			- sndChar		invio di un char espresso come stringa es: '\n'
			- sndString		invio di una stringa es: 'str' (stringa ascii)
			- rcvChar		ricezione di un carattere (funzione non bloccante)
			- rcvString		ricezione di una N caratteri (funzione non bloccante)
			- rcvStrTimeOut	ricezione di N caratteri entro un certo tempo
	"""

	def __init__(self, por="/dev/ttyS", par=['1','115200','8','N','1'], ope=True, deb=False):
		""" Inizializzo il dispositivo
			-> por    tipo device (dipende dal S.O.)
			-> par    parametri di configurazione
			-> ope    stabilisco se deve essere aperta la connessione
		"""
		# referenzio il flag di Debug
		self.deb = deb
		self.ope = ope
		self.par = par
		self.por = por+par[0]
		# Gestione apertura collegamento
		if self.ope:
			try:
				# provo ad aprire la connessione
				self.ser = serial.Serial(self.por)
				# configuro i parametri e provo apertura
				try:
					self.ser.close()
					self.ser.open()
					# set parameters
					self.chaSetting()
					# debug
					if self.deb:
						# ok
						print "%s now is open." %self.por
				except:
					self.ser = None
					# seriale gia' aperta
					print "%s port not opened!" %self.por
			except:
				# nessuna seriale presente
				self.ser = None
				# debug
				if self.deb:
					print "%s not present!" %self.por
					#sys.exit()
		# nessuna verifica di presenza device!
		else:
			self.ser = serial.Serial()
		# ritardo tra un invio e il successivo
		self.dlTx = 0.002    

	def close(self):
		"""Chiusura del socket"""
		self.ser.close()
		# debug
		if self.deb:
			print "%s now is close." %self.por

	def __del__(self):
		"""Distruzione dell'oggetto"""
		try:
			self.close()
			del self.ser
		except:
			pass

	def chaSetting(self):
		"""Configurazione parametri """
		# forzo la chiusura per inizializzare i nuovi parametri
		self.ser.close()
		self.ser.port = self.por
		self.ser.baudrate = atoi(self.par[1])
		self.ser.bytesize = atoi(self.par[2])
		self.ser.parity = self.par[3]
		self.ser.stopbits = atoi(self.par[4])
		try:
			self.ser.open()
		except:
			print "Problem to open the connection!!!"""
			print self.ser

	def sndByte(self, byt):
		""" Invio 1 byte"""
		cha = "%c" %byt
		self.ser.write(cha)
		# ritardo tra un invio e il successivo
		sleep(self.dlTx)
		if self.deb:
			# rendo visibile il codice
			cod = "\\x%02X" %ord(cha)
			print cod,

	def sndData(self, dat):
		""" Invio di sequenza di bytes"""
		# comando da inviare
		strCmd=""
		for byt in dat:
			strCmd+=("%c" %byt)
		# Non uso sndByte per evitare il ritardo self.dlTx
		self.ser.write(strCmd)
		if self.deb:
			for ele in dat:
				print "\\x%02X" %ele,
			print
		sleep(0.001)

	def sndChar(self, cmd):
		""" Invio di 1 carattere espresso come stringa es: '\n' """
		for ele in cmd:
			cha = "%c" %ele
			self.ser.write(cha)
			# ritardo tra un invio e il successivo
			sleep(self.dlTx)
		if self.deb:
			if cha == '\r':
				print "\n",
			else:
				print cha,

	def sndString(self, str):
		""" Invio di una stringa 'str' (stringa ascii) """
		for ch in str:
			self.ser.write(ch)
			# ritardo tra un invio e il successivo
			sleep(self.dlTx)
		# debug
		if self.deb:
			print "send:",
			printHex(str)

	def rcvByte(self):
		""" Ricezione di N bytes (funzione non bloccante) """
		# verifico se c'e' qualcosa in ricezione
		cou = self.ser.inWaiting()
		if cou > 0:
			# prelievo un byte
			return self.ser.read(cou)
		else:
			return None

	# alias di rcvByte
	def rcvChar(self):
		""" Ricezione di N bytes (funzione non bloccante) """
		return self.rcvByte()

	def rcvString(self, num=1):
		""" Attesa di N bytes (funzione non bloccante) """
		# verifico quanti caratteri ci sono gia' nel buffer
		cou = self.ser.inWaiting()
		if cou > 0:
			if cou >= num:
				# provo la ricezione
				dat = self.ser.read(num)
			else:
				dat = self.ser.read(cou)
				# dati letti
				num = cou
		else:
			dat = None
			num = 0
		return (num, dat)

	def rcvDataTimeOut(self, num=1, tou=0.1):
		""" Ricezione di una sequenza di bytes entro un certo tempo
			<- self.buf = caratteri acquisiti
		"""
		# pilisco il buffer prima della ricezione
		self.buf = ""
		# contatore caratteri da ricevere
		self.cou = 0

		# referenzio lo start per il tOut
		tim = clock()

		# flag
		flg = True
		while flg:
			# calcolo il tempo trascorso
			now = (clock() - tim)
			if now > tou:
				# forzo l'uscita (tempo scaduto)
				flg = False
			# verifico dati in ricezione
			cou, dat = self.rcvString(num)
			if cou:
				# salvo i dati ricevuti
				self.buf += dat
				self.cou += cou
				if self.cou >= num:
					flg = False
		# ritorno False se è scaduto il tempo
		if now > tou:
			ret = False
		# ritorno True se ho ricevuto tutti i dati
		else:
			ret = True
		return (ret, self.buf)

	# alias di rcvDataTimeOut
	def rcvStrTimeOut(self, num=1, tou=0.1):
		""" Ricezione di N caratteri entro un certo tempo
			<- self.buf = caratteri acquisiti
		"""
		return self.rcvDataTimeOut(num, tou)

#------------------------------------------------------------------------------
# myTry
#------------------------------------------------------------------------------
def myTry01():

	# inizializzazione in base al S.O.
	if sys.platform == 'win32':
		# Windows (numerazione parte da 1)
		por = "COM"

Il codice è stato scritto per non essere bloccante. Questo ci permetterà di lavorare in modo asincrono se ne avessimo bisogno.

Inoltre nel caso volessimo lavorare in modo sincrono ho sfruttato il timer del nostro PC per creare un time out oltre il quale comunque usciremo dal codice bloccante.

		tim = clock()

		# flag
		flg = True
		while flg:
			# calcolo il tempo trascorso
			now = (clock() - tim)
			if now > tou:
				# forzo l'uscita (tempo scaduto)
				flg = False
			# verifico dati in ricezione
			cou, dat = self.rcvString(num)
			if cou:
				# salvo i dati ricevuti
				self.buf += dat
				self.cou += cou
				if self.cou >= num:
					flg = False
		# ritorno False se è scaduto il tempo
		if now > tou:
			ret = False
		# ritorno True se ho ricevuto tutti i dati
		else:
			ret = True
		return (ret, self.buf)

	# alias di rcvDataTimeOut
	def rcvStrTimeOut(self, num=1, tou=0.1):
		""" Ricezione di N caratteri entro un certo tempo
			<- self.buf = caratteri acquisiti
		"""
		return self.rcvDataTimeOut(num, tou)

#------------------------------------------------------------------------------
# myTry

Vi lascio allo studio del rimanente codice.

myTry01

	else:
		# Unix  (numerazione parte da 0)	
		#por = "/dev/ttyS"
		#por = "/dev/ttymxc"
		por = "/dev/ttyUSB"
		par = ['0','115200','8','N','1']

	#  istanza di un seriale
	self = MySerial(por=por, par=par, ope=1, deb=1)
	if self.ser:
		# init DTR/RTS
		if 0:
			# set to low
			self.ser.setDTR(1)
			sleep(0.01)
			# set to high
			self.ser.setDTR(0)
			self.ser.setRTS(0)

		print "serial open: OK!"
		return self 

	else:
		print "Serial not found!"
		return None

#------------------------------------------------------------------------------
def myTry02():
	self = myTry01()
	if self:
		# invio una stringa
		self.sndString("Ciao mondo!\n")
		# attendo (num) N caratteri entro (tou) un tempo
		res,buf = self.rcvStrTimeOut(num=50, tou=0.01)
		print "%s" %('timeOut:','ok:')[res], "chars receiver:",len(buf)

Qui vediamo il modo di selezionare l’ inizializzazione in base al S.O. sistema operativo in cui gira il codice. Può tornare utile anche in altre applicazioni multi piattaforma.

		par = ['0','115200','8','N','1']

	#  istanza di un seriale
	self = MySerial(por=por, par=par, ope=1, deb=1)
	if self.ser:
		# init DTR/RTS
		if 0:
			# set to low
			self.ser.setDTR(1)
			sleep(0.01)
			# set to high

Inoltre ho aggiunto del codice di esempio per fare vedere come sollecitare l’ handshake.

	else:
		print "Serial not found!"
		return None

#------------------------------------------------------------------------------
def myTry02():

myTry02

		printHex(buf)
		# print "%s" %buf

#-----------------------------------------------------------------------------
# Main
#------------------------------------------------------------------------------
if __name__ == "__main__":

	# test arguments
	if len(sys.argv) == 1:
		# no arguments (scelgo io)
		choi = 2

Qui sfruttiamo il codice di prima per l’ inizializzazione. Poi facendo un loop hardware sui segnali Tx e Rx del nostro dispositivo seriale proviamo ad inviare e ricevere una stringa col nostro codice.

Se proviamo ad avviare il test da terminale $ python mySerial.py vedremo:

alternate text

mySerial in esecuzione.

Package

La struttura aggiornata del nostro package è la seguente:

myAppSerial/
  l00_start.py
  l01_startGtk.py
  my00init.py
  myLib/
    __init__.py
    myProtocol/
      __init__.py
      my00init.py
      mySerial.py

Per scaricare la nuova versione 20150905.zip

Saluti

Per oggi mi fermo qui.

Nel prossimo post vedremo il lato del protocollo.

Ciao alla prossima. (stay tune!)