Source code for sivicncdriver.serial.serial_manager

"""
The serial_manager module
=========================

Provides a class to handle the CNC machine through a serial object.
"""

import serial
import time

from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal

from sivicncdriver.settings import logger

__all__ = ["SerialManager"]


[docs]class SerialManager(QObject): """ A class to manage the serial port. It will try to send what it receive and send via the send_print signal. When it receive a 'ok' from the serial it will send the send_confirm signal. """ send_print = pyqtSignal(str, str) send_confirm = pyqtSignal(bool) serial_fatal_error = pyqtSignal() def __init__(self, serial, fake_mode=False): """ The __init__ method. :param serial: The serial port. :param fake_mode: if True, then the manager will not use the serial object, and will always respond 'ok'. """ super(SerialManager, self).__init__() self.serial = serial self.fake_mode = fake_mode self.is_open = self.serial.isOpen() self.something_sent = False
[docs] def open(self, baudrate, serial_port, timeout): """ Opens the serial port with the given parameters. :param baudrate: The baudrate. :param serial_port: The port to be used. :param timeout: Timeout for reading and writing. """ logger.info("Opening {} with baudrate {}, timeout {}".format( repr(serial_port), baudrate, timeout)) self.send_print.emit("Opening {} with baudrate {}, timeout {}".format( repr(serial_port), baudrate, timeout), "info") self.serial.port = serial_port self.serial.baudrate = baudrate self.serial.timeout = timeout self.serial.write_timeout = timeout try: self.serial.open() self.is_open = True except serial.serialutil.SerialException: self.is_open = False logger.error("Could not open serial port.") self.send_print.emit("Could not open serial port.", "error") return False logger.debug(self.serial.timeout) return True
[docs] def close(self): """ Closes the serial port. """ logger.info("Closing serial port.") self.send_print.emit("Closing serial port.", "info") self.is_open = False self.serial.close()
[docs] def sendMsg(self, msg): """ Sends a message using the serial port if fake_mode is False. :param msg: The message to be sent. :returns: True if no error occurred, else False. """ if not self.fake_mode and not (self.serial and self.serial.isOpen()): self.send_print.emit("Error, no serial port to write.", "error") logger.error("Serial manager has no serial opened to send data.") return False elif self.fake_mode: self.send_print.emit(msg, "operator") logger.info( "Sending {} through fake serial port".format(repr(msg))) self.something_sent = True return True else: logger.info("Sending {} through {}".format(repr(msg), self.serial)) self.send_print.emit(msg, "operator") if msg[-1] != '\n': msg += '\n' try: self.serial.write(bytes(msg, encoding='utf8')) except serial.serialutil.SerialException as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while writing.", "error") except OSError as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while writing.", "error") self.serial_fatal_error.emit() return True
@pyqtSlot()
[docs] def readMsg(self): """ Reads a line from the serial port. And emit the send_print or send_confirm signals if necessary. """ if self.fake_mode: if self.something_sent: logger.info("Received {}".format(repr("ok"))) self.send_print.emit("ok", "machine") self.something_sent = False self.send_confirm.emit(True) return elif not (self.serial and self.serial.isOpen()): self.send_print.emit("Error, no serial port to read.", "error") logger.error("Serial manager has no serial opened to read data.") self.send_confirm.emit(False) return try: waiting = self.serial.in_waiting except OSError as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while reading.", "error") self.serial_fatal_error.emit() return if not waiting: return txt = "" try: txt = self.serial.readline().decode('ascii') except serial.serialutil.SerialException as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while reading.", "error") except UnicodeDecodeError as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while reading.", "error") except OSError as e: logger.error("Serial error : {}".format(e)) self.send_print.emit("Serial error while reading.", "error") self.serial_fatal_error.emit() if txt: if "error" in txt.lower(): self.send_print.emit(txt, "error") else: self.send_print.emit("m> {}".format(txt), "machine") self.send_confirm.emit("ok" in txt.lower())