''' Contains widgets specific to the submission summary and submission details. ''' import base64, logging, json from datetime import datetime from io import BytesIO from pprint import pformat from PyQt6 import QtPrintSupport from PyQt6.QtWidgets import ( QVBoxLayout, QDialog, QTableView, QTextEdit, QPushButton, QScrollArea, QMessageBox, QMenu, QLabel, QDialogButtonBox, QToolBar ) from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter from backend.db.models import BasicSubmission, Equipment, SubmissionEquipmentAssociation, Process from backend.excel import make_report_html, make_report_xlsx from tools import check_if_app, Report, Result, jinja_template_loading, get_first_blank_df_row, row_map from xhtml2pdf import pisa from .pop_ups import QuestionAsker from .equipment_usage import EquipmentUsage from ..visualizations import make_plate_barcode, make_plate_map, make_plate_map_html from .functions import select_save_file, select_open_file from .misc import ReportDatePicker import pandas as pd from openpyxl.worksheet.worksheet import Worksheet from getpass import getuser logger = logging.getLogger(f"submissions.{__name__}") env = jinja_template_loading() class pandasModel(QAbstractTableModel): """ pandas model for inserting summary sheet into gui NOTE: Copied from Stack Overflow. I have no idea how it actually works. """ def __init__(self, data) -> None: QAbstractTableModel.__init__(self) self._data = data def rowCount(self, parent=None) -> int: """ does what it says Args: parent (_type_, optional): _description_. Defaults to None. Returns: int: number of rows in data """ return self._data.shape[0] def columnCount(self, parent=None) -> int: """ does what it says Args: parent (_type_, optional): _description_. Defaults to None. Returns: int: number of columns in data """ return self._data.shape[1] def data(self, index, role=Qt.ItemDataRole.DisplayRole) -> str|None: if index.isValid(): if role == Qt.ItemDataRole.DisplayRole: return str(self._data.iloc[index.row(), index.column()]) return None def headerData(self, col, orientation, role): if orientation == Qt.Orientation.Horizontal and role == Qt.ItemDataRole.DisplayRole: return self._data.columns[col] return None class SubmissionsSheet(QTableView): """ presents submission summary to user in tab1 """ def __init__(self, parent) -> None: """ initialize Args: ctx (dict): settings passed from gui """ super().__init__(parent) self.app = self.parent() # self.ctx = ctx self.report = Report() self.setData() self.resizeColumnsToContents() self.resizeRowsToContents() self.setSortingEnabled(True) self.doubleClicked.connect(self.show_details) def setData(self) -> None: """ sets data in model """ # self.data = submissions_to_df() self.data = BasicSubmission.submissions_to_df() try: self.data['id'] = self.data['id'].apply(str) self.data['id'] = self.data['id'].str.zfill(3) except KeyError: pass proxyModel = QSortFilterProxyModel() proxyModel.setSourceModel(pandasModel(self.data)) self.setModel(proxyModel) def show_details(self) -> None: """ creates detailed data to show in seperate window """ logger.debug(f"Sheet.app: {self.app}") index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),0).data() dlg = SubmissionDetails(parent=self, id=value) if dlg.exec(): pass def create_barcode(self) -> None: """ Generates a window for displaying barcode """ index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),1).data() logger.debug(f"Selected value: {value}") dlg = BarcodeWindow(value) if dlg.exec(): dlg.print_barcode() def add_comment(self) -> None: """ Generates a text editor window. """ index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),1).data() logger.debug(f"Selected value: {value}") dlg = SubmissionComment(parent=self, rsl=value) if dlg.exec(): dlg.add_comment() def contextMenuEvent(self, event): """ Creates actions for right click menu events. Args: event (_type_): the item of interest """ self.menu = QMenu(self) renameAction = QAction('Delete', self) detailsAction = QAction('Details', self) # barcodeAction = QAction("Print Barcode", self) commentAction = QAction("Add Comment", self) backupAction = QAction("Backup", self) equipAction = QAction("Add Equipment", self) # hitpickAction = QAction("Hitpicks", self) renameAction.triggered.connect(lambda: self.delete_item(event)) detailsAction.triggered.connect(lambda: self.show_details()) # barcodeAction.triggered.connect(lambda: self.create_barcode()) commentAction.triggered.connect(lambda: self.add_comment()) backupAction.triggered.connect(lambda: self.regenerate_submission_form()) equipAction.triggered.connect(lambda: self.add_equipment()) # hitpickAction.triggered.connect(lambda: self.hit_pick()) self.menu.addAction(detailsAction) self.menu.addAction(renameAction) # self.menu.addAction(barcodeAction) self.menu.addAction(commentAction) self.menu.addAction(backupAction) self.menu.addAction(equipAction) # self.menu.addAction(hitpickAction) # add other required actions self.menu.popup(QCursor.pos()) def add_equipment(self): index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),0).data() self.add_equipment_function(rsl_plate_id=value) def add_equipment_function(self, rsl_plate_id): submission = BasicSubmission.query(id=rsl_plate_id) submission_type = submission.submission_type_name dlg = EquipmentUsage(parent=self, submission_type=submission_type, submission=submission) if dlg.exec(): equipment = dlg.parse_form() logger.debug(f"We've got equipment: {equipment}") for equip in equipment: e = Equipment.query(name=equip.name) assoc = SubmissionEquipmentAssociation(submission=submission, equipment=e) process = Process.query(name=equip.process) assoc.process = process assoc.role = equip.role # submission.submission_equipment_associations.append(assoc) logger.debug(f"Appending SubmissionEquipmentAssociation: {assoc}") # submission.save() assoc.save() def delete_item(self, event): """ Confirms user deletion and sends id to backend for deletion. Args: event (_type_): the item of interest """ index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),0).data() logger.debug(index) msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n") if msg.exec(): # delete_submission(id=value) BasicSubmission.query(id=value).delete() else: return self.setData() def link_extractions(self): self.link_extractions_function() self.app.report.add_result(self.report) self.report = Report() self.app.result_reporter() def link_extractions_function(self): """ Link extractions from runlogs to imported submissions Args: obj (QMainWindow): original app window Returns: Tuple[QMainWindow, dict]: Collection of new main app window and result dict """ fname = select_open_file(self, file_extension="csv") with open(fname.__str__(), 'r') as f: # split csv on commas runs = [col.strip().split(",") for col in f.readlines()] count = 0 for run in runs: new_run = dict( start_time=run[0].strip(), rsl_plate_num=run[1].strip(), sample_count=run[2].strip(), status=run[3].strip(), experiment_name=run[4].strip(), end_time=run[5].strip() ) # elution columns are item 6 in the comma split list to the end for ii in range(6, len(run)): new_run[f"column{str(ii-5)}_vol"] = run[ii] # Lookup imported submissions # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) # If no such submission exists, move onto the next run if sub == None: continue try: logger.debug(f"Found submission: {sub.rsl_plate_num}") count += 1 except AttributeError: continue if sub.extraction_info != None: existing = json.loads(sub.extraction_info) else: existing = None # Check if the new info already exists in the imported submission try: if json.dumps(new_run) in sub.extraction_info: logger.debug(f"Looks like we already have that info.") continue except TypeError: pass # Update or create the extraction info if existing != None: try: logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}") existing.append(new_run) logger.debug(f"Setting: {existing}") sub.extraction_info = json.dumps(existing) except TypeError: logger.error(f"Error updating!") sub.extraction_info = json.dumps([new_run]) logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}") else: sub.extraction_info = json.dumps([new_run]) sub.save() self.report.add_result(Result(msg=f"We added {count} logs to the database.", status='Information')) def link_pcr(self): self.link_pcr_function() self.app.report.add_result(self.report) self.report = Report() self.app.result_reporter() def link_pcr_function(self): """ Link PCR data from run logs to an imported submission Args: obj (QMainWindow): original app window Returns: Tuple[QMainWindow, dict]: Collection of new main app window and result dict """ fname = select_open_file(self, file_extension="csv") with open(fname.__str__(), 'r') as f: # split csv rows on comma runs = [col.strip().split(",") for col in f.readlines()] count = 0 for run in runs: new_run = dict( start_time=run[0].strip(), rsl_plate_num=run[1].strip(), biomek_status=run[2].strip(), quant_status=run[3].strip(), experiment_name=run[4].strip(), end_time=run[5].strip() ) # lookup imported submission # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) # if imported submission doesn't exist move on to next run if sub == None: continue try: logger.debug(f"Found submission: {sub.rsl_plate_num}") except AttributeError: continue # check if pcr_info already exists if hasattr(sub, 'pcr_info') and sub.pcr_info != None: existing = json.loads(sub.pcr_info) else: existing = None # check if this entry already exists in imported submission try: if json.dumps(new_run) in sub.pcr_info: logger.debug(f"Looks like we already have that info.") continue else: count += 1 except TypeError: logger.error(f"No json to dump") if existing != None: try: logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}") existing.append(new_run) logger.debug(f"Setting: {existing}") sub.pcr_info = json.dumps(existing) except TypeError: logger.error(f"Error updating!") sub.pcr_info = json.dumps([new_run]) logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}") else: sub.pcr_info = json.dumps([new_run]) sub.save() self.report.add_result(Result(msg=f"We added {count} logs to the database.", status='Information')) def generate_report(self): self.generate_report_function() self.app.report.add_result(self.report) self.report = Report() self.app.result_reporter() def generate_report_function(self): """ Generate a summary of activities for a time period Args: obj (QMainWindow): original app window Returns: Tuple[QMainWindow, dict]: Collection of new main app window and result dict """ report = Report() # ask for date ranges dlg = ReportDatePicker() if dlg.exec(): info = dlg.parse_form() logger.debug(f"Report info: {info}") # find submissions based on date range subs = BasicSubmission.query(start_date=info['start_date'], end_date=info['end_date']) # convert each object to dict records = [item.report_dict() for item in subs] logger.debug(f"Records: {pformat(records)}") # make dataframe from record dictionaries detailed_df, summary_df = make_report_xlsx(records=records) html = make_report_html(df=summary_df, start_date=info['start_date'], end_date=info['end_date']) # get save location of report fname = select_save_file(obj=self, default_name=f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf", extension="pdf") with open(fname, "w+b") as f: pisa.CreatePDF(html, dest=f) writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl') summary_df.to_excel(writer, sheet_name="Report") detailed_df.to_excel(writer, sheet_name="Details", index=False) worksheet: Worksheet = writer.sheets['Report'] for idx, col in enumerate(summary_df, start=1): # loop through all columns series = summary_df[col] max_len = max(( series.astype(str).map(len).max(), # len of largest item len(str(series.name)) # len of column name/header )) + 20 # adding a little extra space try: # worksheet.column_dimensions[get_column_letter(idx=idx)].width = max_len # Convert idx to letter col_letter = chr(ord('@') + idx) worksheet.column_dimensions[col_letter].width = max_len except ValueError: pass blank_row = get_first_blank_df_row(summary_df) + 1 logger.debug(f"Blank row index = {blank_row}") for col in range(3,6): col_letter = row_map[col] worksheet.cell(row=blank_row, column=col, value=f"=SUM({col_letter}2:{col_letter}{str(blank_row-1)})") for cell in worksheet['D']: if cell.row > 1: cell.style = 'Currency' writer.close() self.report.add_result(report) def regenerate_submission_form(self): index = (self.selectionModel().currentIndex()) value = index.sibling(index.row(),0).data() logger.debug(index) # msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n") # if msg.exec(): # delete_submission(id=value) sub = BasicSubmission.query(id=value) fname = select_save_file(self, default_name=sub.to_pydantic().construct_filename(), extension="xlsx") sub.backup(fname=fname, full_backup=False) class SubmissionDetails(QDialog): """ a window showing text details of submission """ def __init__(self, parent, id:int) -> None: super().__init__(parent) # self.ctx = ctx try: self.app = parent.parent().parent().parent().parent().parent().parent() except AttributeError: self.app = None self.setWindowTitle("Submission Details") # create scrollable interior interior = QScrollArea() interior.setParent(self) # get submision from db # sub = lookup_submissions(ctx=ctx, id=id) sub = BasicSubmission.query(id=id) logger.debug(f"Submission details data:\n{pformat(sub.to_dict())}") self.base_dict = sub.to_dict(full_data=True) # don't want id del self.base_dict['id'] logger.debug(f"Creating barcode.") if not check_if_app(): self.base_dict['barcode'] = base64.b64encode(make_plate_barcode(self.base_dict['Plate Number'], width=120, height=30)).decode('utf-8') logger.debug(f"Hitpicking plate...") self.plate_dicto = sub.hitpick_plate() logger.debug(f"Making platemap...") self.base_dict['platemap'] = make_plate_map_html(self.plate_dicto) # logger.debug(f"Platemap: {self.base_dict['platemap']}") # logger.debug(f"platemap: {platemap}") # image_io = BytesIO() # try: # platemap.save(image_io, 'JPEG') # except AttributeError: # logger.error(f"No plate map found for {sub.rsl_plate_num}") # self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8') self.template = env.get_template("submission_details.html") self.html = self.template.render(sub=self.base_dict) webview = QWebEngineView() webview.setMinimumSize(900, 500) webview.setMaximumSize(900, 500) webview.setHtml(self.html) self.layout = QVBoxLayout() interior.resize(900, 500) interior.setWidget(webview) self.setFixedSize(900, 500) # button to export a pdf version btn = QPushButton("Export PDF") btn.setParent(self) btn.setFixedWidth(900) btn.clicked.connect(self.export) def export(self): """ Renders submission to html, then creates and saves .pdf file to user selected file. """ # try: # home_dir = Path(self.ctx.directory_path).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__() # except FileNotFoundError: # home_dir = Path.home().resolve().__str__() # fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0]) # if fname.__str__() == ".": # logger.debug("Saving pdf was cancelled.") # return fname = select_save_file(obj=self, default_name=self.base_dict['Plate Number'], extension="pdf") del self.base_dict['platemap'] export_map = make_plate_map(self.plate_dicto) image_io = BytesIO() try: export_map.save(image_io, 'JPEG') except AttributeError: logger.error(f"No plate map found") self.base_dict['export_map'] = base64.b64encode(image_io.getvalue()).decode('utf-8') self.html2 = self.template.render(sub=self.base_dict) try: with open(fname, "w+b") as f: pisa.CreatePDF(self.html2, dest=f) except PermissionError as e: logger.error(f"Error saving pdf: {e}") msg = QMessageBox() msg.setText("Permission Error") msg.setInformativeText(f"Looks like {fname.__str__()} is open.\nPlease close it and try again.") msg.setWindowTitle("Permission Error") msg.exec() class BarcodeWindow(QDialog): def __init__(self, rsl_num:str): super().__init__() # set the title self.setWindowTitle("Image") self.layout = QVBoxLayout() # setting the geometry of window self.setGeometry(0, 0, 400, 300) # creating label self.label = QLabel() self.img = make_plate_barcode(rsl_num) self.pixmap = QPixmap() self.pixmap.loadFromData(self.img) # adding image to label self.label.setPixmap(self.pixmap) # show all the widgets] QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel self.buttonBox = QDialogButtonBox(QBtn) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) self.layout.addWidget(self.label) self.layout.addWidget(self.buttonBox) self.setLayout(self.layout) self._createActions() self._createToolBar() self._connectActions() def _createToolBar(self): """ adds items to menu bar """ toolbar = QToolBar("My main toolbar") toolbar.addAction(self.printAction) def _createActions(self): """ creates actions """ self.printAction = QAction("&Print", self) def _connectActions(self): """ connect menu and tool bar item to functions """ self.printAction.triggered.connect(self.print_barcode) def print_barcode(self): """ Sends barcode image to printer. """ printer = QtPrintSupport.QPrinter() dialog = QtPrintSupport.QPrintDialog(printer) if dialog.exec(): self.handle_paint_request(printer, self.pixmap.toImage()) def handle_paint_request(self, printer:QtPrintSupport.QPrinter, im): logger.debug(f"Hello from print handler.") painter = QPainter(printer) image = QPixmap.fromImage(im) painter.drawPixmap(120, -20, image) painter.end() class SubmissionComment(QDialog): """ a window for adding comment text to a submission """ def __init__(self, parent, rsl:str) -> None: super().__init__(parent) # self.ctx = ctx try: self.app = parent.parent().parent().parent().parent().parent().parent print(f"App: {self.app}") except AttributeError: pass self.rsl = rsl self.setWindowTitle(f"{self.rsl} Submission Comment") # create text field self.txt_editor = QTextEdit(self) self.txt_editor.setReadOnly(False) self.txt_editor.setText("Add Comment") QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel self.buttonBox = QDialogButtonBox(QBtn) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) self.layout = QVBoxLayout() self.setFixedSize(400, 300) self.layout.addWidget(self.txt_editor) self.layout.addWidget(self.buttonBox, alignment=Qt.AlignmentFlag.AlignBottom) self.setLayout(self.layout) def add_comment(self): """ Adds comment to submission object. """ commenter = getuser() comment = self.txt_editor.toPlainText() dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") full_comment = [{"name":commenter, "time": dt, "text": comment}] logger.debug(f"Full comment: {full_comment}") sub = BasicSubmission.query(rsl_number=self.rsl) try: # For some reason .append results in new comment being ignores, so have to concatenate lists. sub.comment = sub.comment + full_comment except (AttributeError, TypeError) as e: logger.error(f"Hit error {e} creating comment") sub.comment = full_comment # logger.debug(sub.comment) sub.save(original=False) # logger.debug(f"Save result: {result}")