Source code for sivicncdriver.serial.thread_send

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5 import QtGui
from PyQt5.QtWidgets import *

from sivicncdriver import settings
from sivicncdriver.settings import logger


[docs]class SendThread(QThread): """ A thread to send a list of instructions without blocking the main thread. """ update_progress = pyqtSignal(int) read_allowed = pyqtSignal(bool) def __init__(self, serial_manager, gcode): """ The __init__ method. :param serial_manager: The main window's serial manager :param gcode: An iterable of gcode instructions :type serial_manager: SerialManager :type gcode: iterable """ QThread.__init__(self) self.gcode = gcode self.serial_manager = serial_manager self.user_stop = False self.error = False self.confirmed = True def __del__(self): self.wait() @pyqtSlot()
[docs] def stop(self): """ A simple slot to tell the thread to stop. """ self.user_stop = True
@pyqtSlot(bool)
[docs] def confirm(self, st): """ Receive confirmation from the readThread. :param st: Everything ok ? """ self.error = not st self.confirmed = True logger.debug("Confirmation : {}.".format(st))
[docs] def run(self): """ Runs the thread. The commands are sent using the serial manager. If an error occurs or if the thread is stopped by the user, then it quits. """ n = 0 gen = (i for i in self.gcode) try: while not self.error and not self.user_stop: if self.confirmed: l = next(gen) self.read_allowed.emit(False) if self.serial_manager.sendMsg(l): self.update_progress.emit(n) self.confirmed = False self.read_allowed.emit(True) else: self.error = True self.read_allowed.emit(True) n += 1 except StopIteration: pass