diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 94038dc..fe98852 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -156,11 +156,11 @@ class KitType(BaseClass): # logger.debug(f"Constructing xl map with str {submission_type}") assocs = [item for item in self.kit_reagenttype_associations if item.submission_type.name == submission_type] - st_assoc = [item for item in self.used_for if submission_type == item.name][0] + # st_assoc = [item for item in self.used_for if submission_type == item.name][0] case SubmissionType(): # logger.debug(f"Constructing xl map with SubmissionType {submission_type}") assocs = [item for item in self.kit_reagenttype_associations if item.submission_type == submission_type] - st_assoc = submission_type + # st_assoc = submission_type case _: raise ValueError(f"Wrong variable type: {type(submission_type)} used!") # logger.debug("Get all KitTypeReagentTypeAssociation for SubmissionType") @@ -279,9 +279,9 @@ class ReagentType(BaseClass): ReagentType|List[ReagentType]: ReagentType or list of ReagentTypes matching filter. """ query: Query = cls.__database_session__.query(cls) - if (kit_type != None and reagent == None) or (reagent != None and kit_type == None): + if (kit_type is not None and reagent is None) or (reagent is not None and kit_type is None): raise ValueError("Cannot filter without both reagent and kit type.") - elif kit_type == None and reagent == None: + elif kit_type is None and reagent is None: pass else: match kit_type: @@ -296,7 +296,7 @@ class ReagentType(BaseClass): reagent = Reagent.query(lot_number=reagent) case _: pass - assert reagent.type != [] + assert reagent.type # logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}") # logger.debug(f"Kit reagent types: {kit_type.reagent_types}") result = list(set(kit_type.reagent_types).intersection(reagent.type)) @@ -353,7 +353,7 @@ class Reagent(BaseClass): "submission") #: Association proxy to SubmissionSampleAssociation.samples def __repr__(self): - if self.name != None: + if self.name is not None: return f"" else: return f"" @@ -368,7 +368,7 @@ class Reagent(BaseClass): Returns: dict: representation of the reagent's attributes """ - if extraction_kit != None: + if extraction_kit is not None: # Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType try: reagent_role = list(set(self.type).intersection(extraction_kit.reagent_types))[0] @@ -412,10 +412,10 @@ class Reagent(BaseClass): report = Report() logger.debug(f"Attempting update of reagent type at intersection of ({self}), ({kit})") rt = ReagentType.query(kit_type=kit, reagent=self, limit=1) - if rt != None: + if rt is not None: logger.debug(f"got reagenttype {rt}") assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt) - if assoc != None: + if assoc is not None: if assoc.last_used != self.lot: logger.debug(f"Updating {assoc} last used to {self.lot}") assoc.last_used = self.lot @@ -658,14 +658,10 @@ class SubmissionType(BaseClass): output = {} # logger.debug("Iterating through equipment roles") for item in self.submissiontype_equipmentrole_associations: - map = item.uses - if map is None: - map = {} - # try: - output[item.equipment_role.name] = map - # except TypeError: - # pass - # output.append(map) + emap = item.uses + if emap is None: + emap = {} + output[item.equipment_role.name] = emap return output def get_equipment(self, extraction_kit: str | KitType | None = None) -> List['PydEquipmentRole']: @@ -737,7 +733,7 @@ class SubmissionType(BaseClass): match key: case str(): # logger.debug(f"Looking up submission type by info-map key str: {key}") - query = query.filter(cls.info_map.op('->')(key) != None) + query = query.filter(cls.info_map.op('->')(key) is not None) case _: pass return cls.execute_query(query=query, limit=limit) diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index df1a805..1ecaee8 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -735,20 +735,46 @@ class BasicSubmission(BaseClass): return re.sub(rf"{abb}(\d)", rf"{abb}-\1", outstr) # return outstr + # @classmethod + # def parse_pcr(cls, xl: pd.DataFrame, rsl_number: str) -> list: + # """ + # Perform custom parsing of pcr info. + # + # Args: + # xl (pd.DataFrame): pcr info form + # rsl_number (str): rsl plate num of interest + # + # Returns: + # list: _description_ + # """ + # logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") + # return [] @classmethod - def parse_pcr(cls, xl: pd.DataFrame, rsl_number: str) -> list: + def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list: """ Perform custom parsing of pcr info. Args: xl (pd.DataFrame): pcr info form - rsl_number (str): rsl plate num of interest + rsl_plate_number (str): rsl plate num of interest Returns: list: _description_ """ logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") - return [] + pcr_sample_map = cls.get_submission_type().sample_map['pcr_samples'] + logger.debug(f'sample map: {pcr_sample_map}') + main_sheet = xl[pcr_sample_map['main_sheet']] + samples = [] + fields = {k: v for k, v in pcr_sample_map.items() if k not in ['main_sheet', 'start_row']} + for row in main_sheet.iter_rows(min_row=pcr_sample_map['start_row']): + idx = row[0].row + sample = {} + for k, v in fields.items(): + sheet = xl[v['sheet']] + sample[k] = sheet.cell(row=idx, column=v['column']).value + samples.append(sample) + return samples @classmethod def filename_template(cls) -> str: @@ -1314,46 +1340,74 @@ class Wastewater(BasicSubmission): input_dict['csv'] = xl["Copy to import file"] return input_dict + # @classmethod + # def parse_pcr(cls, xl: pd.ExcelFile, rsl_number: str) -> list: + # """ + # Parse specific to wastewater samples. + # """ + # samples = super().parse_pcr(xl=xl, rsl_number=rsl_number) + # df = xl.parse(sheet_name="Results", dtype=object).fillna("") + # column_names = ["Well", "Well Position", "Omit", "Sample", "Target", "Task", " Reporter", "Quencher", + # "Amp Status", "Amp Score", "Curve Quality", "Result Quality Issues", "Cq", "Cq Confidence", + # "Cq Mean", "Cq SD", "Auto Threshold", "Threshold", "Auto Baseline", "Baseline Start", + # "Baseline End"] + # samples_df = df.iloc[23:][0:] + # logger.debug(f"Dataframe of PCR results:\n\t{samples_df}") + # samples_df.columns = column_names + # logger.debug(f"Samples columns: {samples_df.columns}") + # well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:, -1:] + # try: + # samples_df['Assessment'] = well_call_df.values + # except ValueError: + # logger.error("Well call number doesn't match sample number") + # logger.debug(f"Well call df: {well_call_df}") + # for _, row in samples_df.iterrows(): + # try: + # sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0] + # except IndexError: + # sample_obj = dict( + # sample=row['Sample'], + # plate_rsl=rsl_number, + # ) + # logger.debug(f"Got sample obj: {sample_obj}") + # if isinstance(row['Cq'], float): + # sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] + # else: + # sample_obj[f"ct_{row['Target'].lower()}"] = 0.0 + # try: + # sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment'] + # except KeyError: + # logger.error(f"No assessment for {sample_obj['sample']}") + # samples.append(sample_obj) + # return samples @classmethod - def parse_pcr(cls, xl: pd.ExcelFile, rsl_number: str) -> list: + def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list: """ Parse specific to wastewater samples. """ - samples = super().parse_pcr(xl=xl, rsl_number=rsl_number) - df = xl.parse(sheet_name="Results", dtype=object).fillna("") - column_names = ["Well", "Well Position", "Omit", "Sample", "Target", "Task", " Reporter", "Quencher", - "Amp Status", "Amp Score", "Curve Quality", "Result Quality Issues", "Cq", "Cq Confidence", - "Cq Mean", "Cq SD", "Auto Threshold", "Threshold", "Auto Baseline", "Baseline Start", - "Baseline End"] - samples_df = df.iloc[23:][0:] - logger.debug(f"Dataframe of PCR results:\n\t{samples_df}") - samples_df.columns = column_names - logger.debug(f"Samples columns: {samples_df.columns}") - well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:, -1:] - try: - samples_df['Assessment'] = well_call_df.values - except ValueError: - logger.error("Well call number doesn't match sample number") - logger.debug(f"Well call df: {well_call_df}") - for _, row in samples_df.iterrows(): + samples = super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num) + logger.debug(f'Samples from parent pcr parser: {pformat(samples)}') + output = [] + for sample in samples: + sample['sample'] = re.sub('-N\\d$', '', sample['sample']) + if sample['sample'] in [item['sample'] for item in output]: + continue + sample[f"ct_{sample['target'].lower()}"] = sample['ct'] if isinstance(sample['ct'], float) else 0.0 + sample[f"{sample['target'].lower()}_status"] = sample['assessment'] + other_targets = [s for s in samples if re.sub('-N\\d$', '', s['sample']) == sample['sample']] + for s in other_targets: + sample[f"ct_{s['target'].lower()}"] = s['ct'] if isinstance(s['ct'], float) else 0.0 + sample[f"{s['target'].lower()}_status"] = s['assessment'] try: - sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0] - except IndexError: - sample_obj = dict( - sample=row['Sample'], - plate_rsl=rsl_number, - ) - logger.debug(f"Got sample obj: {sample_obj}") - if isinstance(row['Cq'], float): - sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] - else: - sample_obj[f"ct_{row['Target'].lower()}"] = 0.0 - try: - sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment'] + del sample['ct'] except KeyError: - logger.error(f"No assessment for {sample_obj['sample']}") - samples.append(sample_obj) - return samples + pass + try: + del sample['assessment'] + except KeyError: + pass + output.append(sample) + return output @classmethod def enforce_name(cls, instr: str, data: dict | None = {}) -> str: diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index afbf3c3..dbabd4a 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -732,67 +732,113 @@ class EquipmentParser(object): return output +# class PCRParser(object): +# """ +# Object to pull data from Design and Analysis PCR export file. +# """ +# +# def __init__(self, filepath: Path | None = None) -> None: +# """ +# Initializes object. +# +# Args: +# filepath (Path | None, optional): file to parse. Defaults to None. +# """ +# logger.debug(f"Parsing {filepath.__str__()}") +# if filepath == None: +# logger.error(f"No filepath given.") +# self.xl = None +# else: +# try: +# self.xl = pd.ExcelFile(filepath.__str__()) +# except ValueError as e: +# logger.error(f"Incorrect value: {e}") +# self.xl = None +# except PermissionError: +# logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.") +# return +# self.parse_general(sheet_name="Results") +# namer = RSLNamer(filename=filepath.__str__()) +# self.plate_num = namer.parsed_name +# self.submission_type = namer.submission_type +# logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") +# parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) +# self.samples = parser.parse_pcr(xl=self.xl, rsl_number=self.plate_num) +# +# def parse_general(self, sheet_name: str): +# """ +# Parse general info rows for all types of PCR results +# +# Args: +# sheet_name (str): Name of sheet in excel workbook that holds info. +# """ +# self.pcr = {} +# df = self.xl.parse(sheet_name=sheet_name, dtype=object).fillna("") +# self.pcr['comment'] = df.iloc[0][1] +# self.pcr['operator'] = df.iloc[1][1] +# self.pcr['barcode'] = df.iloc[2][1] +# self.pcr['instrument'] = df.iloc[3][1] +# self.pcr['block_type'] = df.iloc[4][1] +# self.pcr['instrument_name'] = df.iloc[5][1] +# self.pcr['instrument_serial'] = df.iloc[6][1] +# self.pcr['heated_cover_serial'] = df.iloc[7][1] +# self.pcr['block_serial'] = df.iloc[8][1] +# self.pcr['run-start'] = df.iloc[9][1] +# self.pcr['run_end'] = df.iloc[10][1] +# self.pcr['run_duration'] = df.iloc[11][1] +# self.pcr['sample_volume'] = df.iloc[12][1] +# self.pcr['cover_temp'] = df.iloc[13][1] +# self.pcr['passive_ref'] = df.iloc[14][1] +# self.pcr['pcr_step'] = df.iloc[15][1] +# self.pcr['quant_cycle_method'] = df.iloc[16][1] +# self.pcr['analysis_time'] = df.iloc[17][1] +# self.pcr['software'] = df.iloc[18][1] +# self.pcr['plugin'] = df.iloc[19][1] +# self.pcr['exported_on'] = df.iloc[20][1] +# self.pcr['imported_by'] = getuser() + class PCRParser(object): - """ - Object to pull data from Design and Analysis PCR export file. - """ + """Object to pull data from Design and Analysis PCR export file.""" - def __init__(self, filepath: Path | None = None) -> None: + def __init__(self, filepath: Path | None=None, submission: BasicSubmission | None=None) -> None: """ - Initializes object. + Initializes object. - Args: - filepath (Path | None, optional): file to parse. Defaults to None. - """ - logger.debug(f"Parsing {filepath.__str__()}") - if filepath == None: - logger.error(f"No filepath given.") + Args: + filepath (Path | None, optional): file to parse. Defaults to None. + """ + logger.debug(f'Parsing {filepath.__str__()}') + if filepath is None: + logger.error('No filepath given.') self.xl = None else: try: - self.xl = pd.ExcelFile(filepath.__str__()) + self.xl = load_workbook(filepath) except ValueError as e: - logger.error(f"Incorrect value: {e}") + logger.error(f'Incorrect value: {e}') self.xl = None except PermissionError: - logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.") - return - self.parse_general(sheet_name="Results") - namer = RSLNamer(filename=filepath.__str__()) - self.plate_num = namer.parsed_name - self.submission_type = namer.submission_type - logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") - parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) - self.samples = parser.parse_pcr(xl=self.xl, rsl_number=self.plate_num) + logger.error(f'Couldn\'t get permissions for {filepath.__str__()}. Operation might have been cancelled.') + return None + if submission is None: + self.submission_obj = Wastewater + rsl_plate_num = None + else: + self.submission_obj = submission + rsl_plate_num = self.submission_obj.rsl_plate_num + self.pcr = self.parse_general() + self.samples = self.submission_obj.parse_pcr(xl=self.xl, rsl_plate_num=rsl_plate_num) - def parse_general(self, sheet_name: str): + def parse_general(self): """ Parse general info rows for all types of PCR results Args: sheet_name (str): Name of sheet in excel workbook that holds info. - """ - self.pcr = {} - df = self.xl.parse(sheet_name=sheet_name, dtype=object).fillna("") - self.pcr['comment'] = df.iloc[0][1] - self.pcr['operator'] = df.iloc[1][1] - self.pcr['barcode'] = df.iloc[2][1] - self.pcr['instrument'] = df.iloc[3][1] - self.pcr['block_type'] = df.iloc[4][1] - self.pcr['instrument_name'] = df.iloc[5][1] - self.pcr['instrument_serial'] = df.iloc[6][1] - self.pcr['heated_cover_serial'] = df.iloc[7][1] - self.pcr['block_serial'] = df.iloc[8][1] - self.pcr['run-start'] = df.iloc[9][1] - self.pcr['run_end'] = df.iloc[10][1] - self.pcr['run_duration'] = df.iloc[11][1] - self.pcr['sample_volume'] = df.iloc[12][1] - self.pcr['cover_temp'] = df.iloc[13][1] - self.pcr['passive_ref'] = df.iloc[14][1] - self.pcr['pcr_step'] = df.iloc[15][1] - self.pcr['quant_cycle_method'] = df.iloc[16][1] - self.pcr['analysis_time'] = df.iloc[17][1] - self.pcr['software'] = df.iloc[18][1] - self.pcr['plugin'] = df.iloc[19][1] - self.pcr['exported_on'] = df.iloc[20][1] - self.pcr['imported_by'] = getuser() + """ + info_map = self.submission_obj.get_submission_type().sample_map['pcr_general_info'] + sheet = self.xl[info_map['sheet']] + iter_rows = sheet.iter_rows(min_row=info_map['start_row'], max_row=info_map['end_row']) + pcr = {row[0].value.lower().replace(' ', '_'): row[1].value for row in iter_rows} + pcr['imported_by'] = getuser() + return pcr