Source code for sivicncdriver.ui.interface

"""
The interface module
====================

Provides the MainWindow class.
"""

import serial
from math import atan2, sqrt, pi
import os
import json

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5 import QtGui
from PyQt5 import QtCore
from PyQt5.QtWidgets import *

from sivicncdriver import settings
from sivicncdriver.settings import logger
from sivicncdriver.gcode.gcode import parse
from sivicncdriver.serial.serial_list import serial_ports
from sivicncdriver.serial.serial_manager import SerialManager
from sivicncdriver.ui.preprocessor import PreprocessorDialog
from sivicncdriver.gcode.arc_calculator import arc_to_segments
from sivicncdriver.serial.thread_send import SendThread
from sivicncdriver.serial.thread_read import ReadThread
import sivicncdriver.gcode.gcode_maker as gcode_maker

from sivicncdriver.ui.main_window import Ui_MainWindow

__all__ = ['MainWindow']

_translate = QtCore.QCoreApplication.translate


[docs]class MainWindow(QMainWindow, Ui_MainWindow): """ The main window of the application. """ def __init__(self): """ The __init__ method. It will set the UI up, list available serial ports, list available configurations, connect the UI and so on. """ super(MainWindow, self).__init__() self.setupUi(self) self.zoom = 1 self.list_serials() self.list_configs() self.set_serial_mode("manual") self.serial_manager = SerialManager(serial.Serial(timeout=0)) s = """Welcome to SiviCNCDriver, by Klafyvel from sivigik.com""" self.print(s, msg_type="info") self.connectUi() self.update_config(self.config_list.currentIndex()) self.baudrate.addItems( map(str, serial.serialutil.SerialBase.BAUDRATES)) self.baudrate.setCurrentIndex(12) self.file_loaded = False self.send_thread = None self.waiting_cmd = [] self.last_selected_path = None self.read_thread = ReadThread()
[docs] def connectUi(self): """ Connects The UI signals and slots. """ logger.debug("Connecting Ui.") self.btn_serial_ports_list.clicked.connect(self.list_serials) self.btn_y_plus.pressed.connect(self.start_continuous_y_forward) self.btn_y_minus.pressed.connect(self.start_continuous_y_backward) self.btn_x_plus.pressed.connect(self.start_continuous_x_forward) self.btn_x_minus.pressed.connect(self.start_continuous_x_backward) self.btn_z_plus.pressed.connect(self.start_continuous_z_forward) self.btn_z_minus.pressed.connect(self.start_continuous_z_backward) self.btn_y_plus.released.connect(self.stop_y) self.btn_y_minus.released.connect(self.stop_y) self.btn_x_plus.released.connect(self.stop_x) self.btn_x_minus.released.connect(self.stop_x) self.btn_z_plus.released.connect(self.stop_z) self.btn_z_minus.released.connect(self.stop_z) self.btn_set_origin.clicked.connect(self.set_origin) self.btn_go_to_zero.clicked.connect(self.goto_origin) self.btn_emergency_stop.clicked.connect(self.emergency_stop) self.auto_cmd_type.currentIndexChanged.connect( self.manage_auto_cmd_number) self.btn_run_auto_cmd.clicked.connect(self.auto_cmd) self.manage_auto_cmd_number(self.auto_cmd_type.currentIndex()) self.btn_run_custom_cmd.clicked.connect(self.run_custom_cmd) self.chk_fake_serial.stateChanged.connect( self.manage_emulate_serial_port) self.btn_send_current_file.clicked.connect(self.send_file) self.btn_command.clicked.connect(self.send_cmd) self.config_list.currentIndexChanged.connect(self.update_config) self.btn_save_config.clicked.connect(self.save_config) self.btn_save_config_as.clicked.connect(self.save_config_as) self.btn_send_config.clicked.connect(self.send_config) self.btn_connect.clicked.connect(self.manage_connection) self.btn_file.clicked.connect(self.choose_file) self.btn_reload.clicked.connect(self.load_file) self.btn_save_file.clicked.connect(self.save_file) self.btn_save_as.clicked.connect(self.save_file_as) self.redraw.clicked.connect(self.draw_file) self.btn_close.clicked.connect(self.close_file) self.serial_manager.send_print.connect(self.print) self.btn_preprocessor.clicked.connect(self.run_preprocessor) self.reverse_display_x.clicked.connect(self.update_drawing) self.reverse_display_y.clicked.connect(self.update_drawing) self.reverse_display_z.clicked.connect(self.update_drawing) self.btn_license.clicked.connect(self.about_license) self.btn_about_qt.clicked.connect(self.about_qt) self.code_edit.cursorPositionChanged.connect( self.highlight_selected_path) self.view_3D.parse_error.connect(self.parse_error) self.serial_manager.serial_fatal_error.connect(self.manage_connection)
[docs] def list_serials(self): """ Lists available serials ports. """ logger.debug("Listing available serial ports.") l = serial_ports() for i in range(self.serial_ports_list.count()): self.serial_ports_list.removeItem(0) for i in l: logger.info("Found {}".format(i)) self.serial_ports_list.addItem(i)
[docs] def list_configs(self): """ Lists available configurations. """ for _ in range(self.config_list.count()): self.config_list.removeItem(0) logger.debug("Listing available configurations.") for f in os.listdir(settings.CONFIG_DIR): if f.endswith(".json"): logger.debug("Found {}".format(f)) self.config_list.addItem(f[:-5]) self.config_list.addItem(_translate("MainWindow", "New configuration"))
[docs] def set_serial_mode(self, mode): """ Change serial mode. :param mode: can be "manual" or "file" :type mode: str """ if mode == "manual": logger.debug("Setting manual mode.") self.btn_set_origin.setEnabled(True) self.grp_cmd.setEnabled(True) self.grp_auto.setEnabled(True) # self.btn_y_plus.pressed.connect() elif mode == "file": logger.debug("Setting file mode.") self.btn_set_origin.setEnabled(False) self.grp_cmd.setEnabled(False) self.grp_auto.setEnabled(False)
[docs] def reset_config(self): """ Resets the configuration. """ self.drive_x.setCurrentIndex(0) self.drive_y.setCurrentIndex(0) self.drive_z.setCurrentIndex(0) self.ratio_x.setValue(1) self.ratio_y.setValue(1) self.ratio_z.setValue(1) self.play_x.setValue(0) self.play_y.setValue(0) self.play_z.setValue(0) self.reverse_x.setChecked(False) self.reverse_y.setChecked(False) self.reverse_z.setChecked(False) self.minTime_x.setValue(5) self.minTime_y.setValue(5) self.minTime_z.setValue(5) logger.info("Reset config.")
[docs] def config_as_dict(self): """ Get the configuration as a dict. :return: The configuration as a dict. :rtype: dict """ return { "x_drive": self.drive_x.currentIndex(), "x_ratio": self.ratio_x.value(), "x_play": self.play_x.value(), "x_reverse": bool(self.reverse_x.checkState()), "x_min_time": self.minTime_x.value(), "y_drive": self.drive_y.currentIndex(), "y_ratio": self.ratio_y.value(), "y_play": self.play_y.value(), "y_reverse": bool(self.reverse_y.checkState()), "y_min_time": self.minTime_y.value(), "z_drive": self.drive_z.currentIndex(), "z_ratio": self.ratio_z.value(), "z_play": self.play_z.value(), "z_reverse": bool(self.reverse_z.checkState()), "z_min_time": self.minTime_z.value(), }
[docs] def run_thread(self, gcode, n=None, disable=True, allow_waiting=True): """ Run a thread to send the given gcode. :param gcode: The gcode as a list of commands. :param n: A length for the sending_process progress bar. :param disable: Disable ui elements which trigger sending. :param allow_waiting: Adds the command to the waiting queue. :type gcode: list :type n: int :type disable: bool :type allow_waiting: bool """ if self.send_thread and allow_waiting: logger.info("Thread already in use, waiting for the end.") self.waiting_cmd.append( {"gcode": gcode, "n": n, "disable": disable}) return elif self.send_thread: self.send_thread.stop() try: self.serial_manager.serial.flush() except : pass self.send_thread = SendThread(self.serial_manager, gcode) if n: self.sending_progress.setMaximum(n) else: self.sending_progress.setMaximum(len(gcode)) self.sending_progress.setValue(0) self.btn_send_current_file.setText( _translate("MainWindow", "Cancel sending")) self.btn_send_current_file.clicked.disconnect() self.btn_send_current_file.clicked.connect(self.send_thread.stop) if disable: self.tabWidget.setEnabled(False) self.send_thread.read_allowed.connect( self.read_thread.set_read_allowed) self.serial_manager.send_confirm.connect(self.send_thread.confirm) self.send_thread.finished.connect(self.sending_end) self.send_thread.update_progress.connect(self.update_progress) self.send_thread.start()
# Slots @pyqtSlot(str, str)
[docs] def print(self, txt, msg_type="operator"): """ Prints a message on the application console. :param txt: The message :param msg_type: The type of the message. Can be "operator", "machine", "error" or "info" :type txt: str :type msg_type: str """ msg = "{}" if msg_type == "operator": msg = ( "\n<br /><span style='color:orange;'>" "<strong>&gt;&gt;&gt;</strong> {}</span>" ) elif msg_type == "machine": msg = "\n<br /><span style='color:yellow;'>{}</span>" elif msg_type == "error": msg = ( "\n<br /><span style='color:red;'>" "<strong>{}</strong></span>" ) elif msg_type == "info": msg = ( "\n<br /><span style='color:DarkOrange;'>" "<strong>{}</strong></span>" ) self.serial_monitor.moveCursor(QTextCursor.End) self.serial_monitor.insertHtml(msg.format(txt)) self.serial_monitor.moveCursor(QTextCursor.End)
@pyqtSlot(int)
[docs] def manage_auto_cmd_number(self, n): """ Enable the widgets for auto commands """ self.auto_cmd_number.setValue(1) self.auto_cmd_number.setEnabled(n != 1) self.auto_cmd_2.setEnabled(n != 1)
@pyqtSlot()
[docs] def auto_cmd(self): """ Sends auto commands using a thread if they are too long. """ logger.info("Sending auto command.") self.print("Sending auto command.", "info") axis = self.auto_cmd_axis.currentText() n = self.auto_cmd_number.value() step = self.auto_cmd_step.value() if self.auto_cmd_type.currentIndex() == 1: it = [gcode_maker.step(axis, step)] else: axis2 = self.auto_cmd_axis_2.currentText() step2 = self.auto_cmd_step_2.value() # AKA "Do not do this at home kids" : it = ( ( ( gcode_maker.step(axis, step), gcode_maker.step(axis, -step) )[(i % 4) >= 2], gcode_maker.step(axis2, step2) )[i % 2] for i in range(4*n-1) ) self.run_thread(it, n)
@pyqtSlot()
[docs] def run_custom_cmd(self): """ Sends a custom command using a thread. """ logger.info("Sending custom command.") self.print("Sending custom command.", "info") gcode = self.custom_cmd.toPlainText().split('\n') n = self.custom_cmd_number.value() l = len(gcode) it = (gcode[i % l] for i in range(n*l)) self.run_thread(it, n*l)
@pyqtSlot()
[docs] def send_file(self): """ Send a file using a different thread. """ logger.info("Sending file.") self.print("Sending file.", "info") gcode = self.code_edit.toPlainText().split('\n') self.run_thread(gcode)
@pyqtSlot()
[docs] def send_cmd(self): """ Sends an user command using a thread. """ gcode = [self.command_edit.text()] logger.info("Sending command.") self.print("Sending command.", "info") self.run_thread(gcode)
@pyqtSlot()
[docs] def send_config(self): """ Send a configuration to the machine. """ self.save_config() gcode = gcode_maker.config_as_gcode( **self.config_as_dict()).split('\n') logger.info("Sending configuration.") self.print("Sending configuration.", "info") self.run_thread(gcode)
@pyqtSlot()
[docs] def sending_end(self): """ Manages the end of upload. If some commands are waiting, run them at the end. """ self.send_thread.read_allowed.disconnect() self.serial_manager.send_confirm.disconnect() if self.send_thread and self.send_thread.user_stop: self.print("Stopped by user.", "error") logger.error("Upload stopped by user.") elif self.send_thread and self.send_thread.error: self.print("Error while sending.", "error") logger.error("Error while sending.") else: self.print("Done.", "info") logger.info("Upload done.") self.sending_progress.setValue(0) self.btn_send_current_file.setText( _translate("MainWindow", "Send current file")) self.btn_send_current_file.clicked.disconnect() self.btn_send_current_file.clicked.connect(self.send_file) self.tabWidget.setEnabled(True) self.send_thread = None if len(self.waiting_cmd) > 0: self.run_thread(**self.waiting_cmd.pop(0))
@pyqtSlot()
[docs] def start_continuous_y_forward(self): self.run_thread( [gcode_maker.start_continuous_y_forward()], disable=False)
@pyqtSlot()
[docs] def start_continuous_y_backward(self): self.run_thread( [gcode_maker.start_continuous_y_backward()], disable=False)
@pyqtSlot()
[docs] def start_continuous_x_forward(self): self.run_thread( [gcode_maker.start_continuous_x_forward()], disable=False)
@pyqtSlot()
[docs] def start_continuous_x_backward(self): self.run_thread( [gcode_maker.start_continuous_x_backward()], disable=False)
@pyqtSlot()
[docs] def start_continuous_z_forward(self): self.run_thread( [gcode_maker.start_continuous_z_forward()], disable=False)
@pyqtSlot()
[docs] def start_continuous_z_backward(self): self.run_thread( [gcode_maker.start_continuous_z_backward()], disable=False)
@pyqtSlot()
[docs] def stop_y(self): self.run_thread([gcode_maker.stop_y()])
@pyqtSlot()
[docs] def stop_x(self): self.run_thread([gcode_maker.stop_x()])
@pyqtSlot()
[docs] def stop_z(self): self.run_thread([gcode_maker.stop_z()])
@pyqtSlot()
[docs] def emergency_stop(self): self.run_thread([gcode_maker.emergency_stop()], allow_waiting=False)
@pyqtSlot()
[docs] def set_origin(self): self.run_thread([gcode_maker.set_origin()])
@pyqtSlot()
[docs] def goto_origin(self): self.run_thread([gcode_maker.goto_origin()])
@pyqtSlot(int)
[docs] def update_progress(self, s): """ Updates the progress bar. """ self.sending_progress.setValue(s)
@pyqtSlot(int)
[docs] def manage_emulate_serial_port(self, s): """ Enable widgets for serial port emulation. """ st = bool(s) self.serial_manager.fake_mode = st self.baudrate.setEnabled(not st) self.serial_ports_list.setEnabled(not st) self.btn_serial_ports_list.setEnabled(not st) self.btn_connect.setEnabled(not st) self.timeout.setEnabled(not st) if st: self.print("Emulating serial port.", "info") self.read_thread = ReadThread() self.read_thread.read.connect(self.serial_manager.readMsg) self.read_thread.start() else: self.print("Exiting serial port emulation.", "info") self.read_thread.read.disconnect() self.read_thread.stop()
@pyqtSlot(int)
[docs] def update_config(self, i): """ Updates the configuration widgets. """ nb_config = self.config_list.count() if i == nb_config-1: self.reset_config() else: file = self.config_list.currentText() + ".json" file = os.path.join(settings.CONFIG_DIR, file) logger.info("Loading config {}".format(file)) config = {} with open(file) as f: config = json.load(f) self.drive_x.setCurrentIndex(config.get("x_drive", 0)) self.drive_y.setCurrentIndex(config.get("y_drive", 0)) self.drive_z.setCurrentIndex(config.get("z_drive", 0)) self.ratio_x.setValue(config.get("x_ratio", 1)) self.ratio_y.setValue(config.get("y_ratio", 1)) self.ratio_z.setValue(config.get("z_ratio", 1)) self.play_x.setValue(config.get("x_play", 0)) self.play_y.setValue(config.get("y_play", 0)) self.play_z.setValue(config.get("z_play", 0)) self.reverse_x.setChecked(config.get("x_reverse", False)) self.reverse_y.setChecked(config.get("y_reverse", False)) self.reverse_z.setChecked(config.get("z_reverse", False)) self.minTime_x.setValue(config.get("x_min_time", 5)) self.minTime_y.setValue(config.get("y_min_time", 5)) self.minTime_z.setValue(config.get("z_min_time", 5))
@pyqtSlot()
[docs] def save_config(self, filename=None): """ Saves a configuration. :param filename: The name of the file. :type filename: str """ logger.info("Saving configuration.") logger.debug("Filename given : {}".format(filename)) current_config = self.config_list.currentIndex() nb_config = self.config_list.count() if current_config == nb_config-1 and not filename: self.save_config_as() else: if not filename: file = self.config_list.currentText() + ".json" file = os.path.join(settings.CONFIG_DIR, file) else: file = filename config = self.config_as_dict() with open(file, "w") as f: json.dump(config, f)
@pyqtSlot()
[docs] def save_config_as(self): """ Saves a configuration in a new file. """ f = QFileDialog.getSaveFileName( self, _translate("MainWindow", "Select file"), directory=settings.CONFIG_DIR, filter='JSON files (*.json)\nAll files (*)')[0] if f is not '': if not f.endswith(".json"): f = f + ".json" logger.info("Saving configuration as {}".format(f)) self.save_config(f) self.list_configs() self.update_config(self.config_list.currentIndex())
@pyqtSlot()
[docs] def manage_connection(self): """ Manages the connection widgets. """ if self.serial_manager.is_open: if self.send_thread: self.emergency_stop() self.read_thread.read.disconnect() self.read_thread.stop() self.baudrate.setEnabled(True) self.timeout.setEnabled(True) self.serial_ports_list.setEnabled(True) self.btn_serial_ports_list.setEnabled(True) self.btn_connect.setText(_translate("MainWindow", "Connect")) self.serial_manager.close() else: port = self.serial_ports_list.currentText() baudrate = int(self.baudrate.currentText()) timeout = self.timeout.value() if self.serial_manager.open(baudrate, port, timeout): self.read_thread = ReadThread() self.baudrate.setEnabled(False) self.timeout.setEnabled(False) self.serial_ports_list.setEnabled(False) self.btn_serial_ports_list.setEnabled(False) self.btn_connect.setText( _translate("MainWindow", "Disconnect")) self.read_thread.read.connect(self.serial_manager.readMsg) self.read_thread.start()
@pyqtSlot()
[docs] def choose_file(self): """ Sets the gcode file. """ if not self.file_loaded: directory = settings.FILE_DIR else: directory = os.path.dirname(self.filename.text()) file = QFileDialog.getOpenFileName( self, _translate("MainWindow", "Select file"), directory=directory, filter='GCode files (*.gcode, *.ngc)\nAll files (*)')[0] if file is not '': self.filename.setText(file) self.load_file()
@pyqtSlot()
[docs] def load_file(self): """ Loads a gcode file. """ file = self.filename.text() try: logger.info("Loading {}".format(repr(file))) with open(file) as f: gcode = f.read() self.draw_file(gcode) self.code_edit.setText(gcode) self.file_loaded = True except FileNotFoundError: self.choose_file()
@pyqtSlot()
[docs] def save_file_as(self): """ Saves a gcode file in a nex file. """ if not self.file_loaded: directory = settings.FILE_DIR else: directory = os.path.dirname(self.filename.text()) file = QFileDialog.getSaveFileName( self, _translate("MainWindow", "Select file"), directory=directory, filter='GCode files (*.gcode, *.ngc)\nAll files (*)')[0] if file is not '': logger.info("Saving {}".format(repr(file))) self.filename.setText(file) with open(file, 'w') as f: f.write(self.code_edit.toPlainText()) self.file_loaded = True
@pyqtSlot()
[docs] def save_file(self): """ Saves a gcode file. """ if not self.file_loaded: self.save_file_as() else: file = self.filename.text() logger.info("Saving {}".format(repr(file))) with open(file, 'w') as f: f.write(self.code_edit.toPlainText())
@pyqtSlot()
[docs] def close_file(self): """ Close the current file. """ self.filename.setText(_translate("MainWindow", "No file.")) self.code_edit.setText("") self.draw_file()
@pyqtSlot()
[docs] def update_drawing(self, highlight_line=None): """ Updates the drawing. :param highlight_line: A line which is to be highlighted. :type highlight_line: int """ self.view_3D.draw( reverse_x=self.reverse_display_x.isChecked(), reverse_y=self.reverse_display_y.isChecked(), reverse_z=self.reverse_display_z.isChecked(), highlight_line=highlight_line, )
@pyqtSlot(int)
[docs] def parse_error(self, line): """ Handles parsing errors. :param line: The line where the error occurred. """ self.chk_display_current_line.setChecked(False) self.code_edit.setExtraSelections([]) QMessageBox.critical( self, _translate("MainWindow", "Error."), _translate("MainWindow", "An error occurred during parsing.") ) logger.error("While parsing line {}".format(line)) highlight = QTextEdit.ExtraSelection() highlight.cursor = QTextCursor( self.code_edit.document().findBlockByLineNumber(line)) highlight.format.setProperty(QTextFormat.FullWidthSelection, True) highlight.format.setBackground(Qt.red) self.code_edit.setTextCursor(highlight.cursor) self.code_edit.setExtraSelections([highlight])
@pyqtSlot()
[docs] def draw_file(self, gcode=None): """ Draws a gcode file. :param gcode: gcode to use in place of the one form code_edit. """ if not gcode: gcode = self.code_edit.toPlainText() self.view_3D.compute_data(gcode) bounds = self.view_3D.get_bounds() self.space_x.display(bounds['max_x'] - bounds['min_x']) self.space_y.display(bounds['max_y'] - bounds['min_y']) self.space_z.display(bounds['max_z'] - bounds['min_z']) self.update_drawing()
@pyqtSlot()
[docs] def run_preprocessor(self): """ Runs the preprocessor dialog. """ self.preprocessor = PreprocessorDialog(self.code_edit.toPlainText()) self.preprocessor.accepted.connect(self.end_preprocessor) self.preprocessor.show()
@pyqtSlot()
[docs] def end_preprocessor(self): """ Manages the end of the preprocessing interface. """ self.code_edit.setText(self.preprocessor.gcode) self.preprocessor.accepted.disconnect() self.draw_file()
@pyqtSlot()
[docs] def about_license(self): """ Displays informations about the license. """ with open(os.path.join(settings.APP_DIR, 'license_dialog_text')) as f: QMessageBox.about(self, _translate( "MainWindow", "License"), f.read())
@pyqtSlot()
[docs] def about_qt(self): """ Displays informations about Qt. """ QMessageBox.aboutQt(self)
@pyqtSlot()
[docs] def highlight_selected_path(self): """ Looks for selected line in the code_edit, then updates the drawing to highlight the corresponding path. """ if not self.chk_display_current_line.isChecked(): return i = self.code_edit.textCursor().blockNumber() if i == self.last_selected_path: return else: self.last_selected_path = i self.code_edit.setExtraSelections([]) highlight = QTextEdit.ExtraSelection() highlight.cursor = self.code_edit.textCursor() highlight.format.setProperty(QTextFormat.FullWidthSelection, True) highlight.format.setBackground(Qt.green) self.code_edit.setExtraSelections([highlight]) self.update_drawing(highlight_line=i)