Last minute save of new pcr parser.

This commit is contained in:
lwark
2024-05-16 15:32:58 -05:00
parent bbcbd35127
commit d1bf12e8d1
3 changed files with 199 additions and 103 deletions

View File

@@ -156,11 +156,11 @@ class KitType(BaseClass):
# logger.debug(f"Constructing xl map with str {submission_type}")
assocs = [item for item in self.kit_reagenttype_associations if
item.submission_type.name == submission_type]
st_assoc = [item for item in self.used_for if submission_type == item.name][0]
# st_assoc = [item for item in self.used_for if submission_type == item.name][0]
case SubmissionType():
# logger.debug(f"Constructing xl map with SubmissionType {submission_type}")
assocs = [item for item in self.kit_reagenttype_associations if item.submission_type == submission_type]
st_assoc = submission_type
# st_assoc = submission_type
case _:
raise ValueError(f"Wrong variable type: {type(submission_type)} used!")
# logger.debug("Get all KitTypeReagentTypeAssociation for SubmissionType")
@@ -279,9 +279,9 @@ class ReagentType(BaseClass):
ReagentType|List[ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query: Query = cls.__database_session__.query(cls)
if (kit_type != None and reagent == None) or (reagent != None and kit_type == None):
if (kit_type is not None and reagent is None) or (reagent is not None and kit_type is None):
raise ValueError("Cannot filter without both reagent and kit type.")
elif kit_type == None and reagent == None:
elif kit_type is None and reagent is None:
pass
else:
match kit_type:
@@ -296,7 +296,7 @@ class ReagentType(BaseClass):
reagent = Reagent.query(lot_number=reagent)
case _:
pass
assert reagent.type != []
assert reagent.type
# logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
# logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
result = list(set(kit_type.reagent_types).intersection(reagent.type))
@@ -353,7 +353,7 @@ class Reagent(BaseClass):
"submission") #: Association proxy to SubmissionSampleAssociation.samples
def __repr__(self):
if self.name != None:
if self.name is not None:
return f"<Reagent({self.name}-{self.lot})>"
else:
return f"<Reagent({self.type.name}-{self.lot})>"
@@ -368,7 +368,7 @@ class Reagent(BaseClass):
Returns:
dict: representation of the reagent's attributes
"""
if extraction_kit != None:
if extraction_kit is not None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
try:
reagent_role = list(set(self.type).intersection(extraction_kit.reagent_types))[0]
@@ -412,10 +412,10 @@ class Reagent(BaseClass):
report = Report()
logger.debug(f"Attempting update of reagent type at intersection of ({self}), ({kit})")
rt = ReagentType.query(kit_type=kit, reagent=self, limit=1)
if rt != None:
if rt is not None:
logger.debug(f"got reagenttype {rt}")
assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt)
if assoc != None:
if assoc is not None:
if assoc.last_used != self.lot:
logger.debug(f"Updating {assoc} last used to {self.lot}")
assoc.last_used = self.lot
@@ -658,14 +658,10 @@ class SubmissionType(BaseClass):
output = {}
# logger.debug("Iterating through equipment roles")
for item in self.submissiontype_equipmentrole_associations:
map = item.uses
if map is None:
map = {}
# try:
output[item.equipment_role.name] = map
# except TypeError:
# pass
# output.append(map)
emap = item.uses
if emap is None:
emap = {}
output[item.equipment_role.name] = emap
return output
def get_equipment(self, extraction_kit: str | KitType | None = None) -> List['PydEquipmentRole']:
@@ -737,7 +733,7 @@ class SubmissionType(BaseClass):
match key:
case str():
# logger.debug(f"Looking up submission type by info-map key str: {key}")
query = query.filter(cls.info_map.op('->')(key) != None)
query = query.filter(cls.info_map.op('->')(key) is not None)
case _:
pass
return cls.execute_query(query=query, limit=limit)

View File

@@ -735,20 +735,46 @@ class BasicSubmission(BaseClass):
return re.sub(rf"{abb}(\d)", rf"{abb}-\1", outstr)
# return outstr
# @classmethod
# def parse_pcr(cls, xl: pd.DataFrame, rsl_number: str) -> list:
# """
# Perform custom parsing of pcr info.
#
# Args:
# xl (pd.DataFrame): pcr info form
# rsl_number (str): rsl plate num of interest
#
# Returns:
# list: _description_
# """
# logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!")
# return []
@classmethod
def parse_pcr(cls, xl: pd.DataFrame, rsl_number: str) -> list:
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list:
"""
Perform custom parsing of pcr info.
Args:
xl (pd.DataFrame): pcr info form
rsl_number (str): rsl plate num of interest
rsl_plate_number (str): rsl plate num of interest
Returns:
list: _description_
"""
logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!")
return []
pcr_sample_map = cls.get_submission_type().sample_map['pcr_samples']
logger.debug(f'sample map: {pcr_sample_map}')
main_sheet = xl[pcr_sample_map['main_sheet']]
samples = []
fields = {k: v for k, v in pcr_sample_map.items() if k not in ['main_sheet', 'start_row']}
for row in main_sheet.iter_rows(min_row=pcr_sample_map['start_row']):
idx = row[0].row
sample = {}
for k, v in fields.items():
sheet = xl[v['sheet']]
sample[k] = sheet.cell(row=idx, column=v['column']).value
samples.append(sample)
return samples
@classmethod
def filename_template(cls) -> str:
@@ -1314,46 +1340,74 @@ class Wastewater(BasicSubmission):
input_dict['csv'] = xl["Copy to import file"]
return input_dict
# @classmethod
# def parse_pcr(cls, xl: pd.ExcelFile, rsl_number: str) -> list:
# """
# Parse specific to wastewater samples.
# """
# samples = super().parse_pcr(xl=xl, rsl_number=rsl_number)
# df = xl.parse(sheet_name="Results", dtype=object).fillna("")
# column_names = ["Well", "Well Position", "Omit", "Sample", "Target", "Task", " Reporter", "Quencher",
# "Amp Status", "Amp Score", "Curve Quality", "Result Quality Issues", "Cq", "Cq Confidence",
# "Cq Mean", "Cq SD", "Auto Threshold", "Threshold", "Auto Baseline", "Baseline Start",
# "Baseline End"]
# samples_df = df.iloc[23:][0:]
# logger.debug(f"Dataframe of PCR results:\n\t{samples_df}")
# samples_df.columns = column_names
# logger.debug(f"Samples columns: {samples_df.columns}")
# well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:, -1:]
# try:
# samples_df['Assessment'] = well_call_df.values
# except ValueError:
# logger.error("Well call number doesn't match sample number")
# logger.debug(f"Well call df: {well_call_df}")
# for _, row in samples_df.iterrows():
# try:
# sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0]
# except IndexError:
# sample_obj = dict(
# sample=row['Sample'],
# plate_rsl=rsl_number,
# )
# logger.debug(f"Got sample obj: {sample_obj}")
# if isinstance(row['Cq'], float):
# sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq']
# else:
# sample_obj[f"ct_{row['Target'].lower()}"] = 0.0
# try:
# sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment']
# except KeyError:
# logger.error(f"No assessment for {sample_obj['sample']}")
# samples.append(sample_obj)
# return samples
@classmethod
def parse_pcr(cls, xl: pd.ExcelFile, rsl_number: str) -> list:
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list:
"""
Parse specific to wastewater samples.
"""
samples = super().parse_pcr(xl=xl, rsl_number=rsl_number)
df = xl.parse(sheet_name="Results", dtype=object).fillna("")
column_names = ["Well", "Well Position", "Omit", "Sample", "Target", "Task", " Reporter", "Quencher",
"Amp Status", "Amp Score", "Curve Quality", "Result Quality Issues", "Cq", "Cq Confidence",
"Cq Mean", "Cq SD", "Auto Threshold", "Threshold", "Auto Baseline", "Baseline Start",
"Baseline End"]
samples_df = df.iloc[23:][0:]
logger.debug(f"Dataframe of PCR results:\n\t{samples_df}")
samples_df.columns = column_names
logger.debug(f"Samples columns: {samples_df.columns}")
well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:, -1:]
try:
samples_df['Assessment'] = well_call_df.values
except ValueError:
logger.error("Well call number doesn't match sample number")
logger.debug(f"Well call df: {well_call_df}")
for _, row in samples_df.iterrows():
samples = super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num)
logger.debug(f'Samples from parent pcr parser: {pformat(samples)}')
output = []
for sample in samples:
sample['sample'] = re.sub('-N\\d$', '', sample['sample'])
if sample['sample'] in [item['sample'] for item in output]:
continue
sample[f"ct_{sample['target'].lower()}"] = sample['ct'] if isinstance(sample['ct'], float) else 0.0
sample[f"{sample['target'].lower()}_status"] = sample['assessment']
other_targets = [s for s in samples if re.sub('-N\\d$', '', s['sample']) == sample['sample']]
for s in other_targets:
sample[f"ct_{s['target'].lower()}"] = s['ct'] if isinstance(s['ct'], float) else 0.0
sample[f"{s['target'].lower()}_status"] = s['assessment']
try:
sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0]
except IndexError:
sample_obj = dict(
sample=row['Sample'],
plate_rsl=rsl_number,
)
logger.debug(f"Got sample obj: {sample_obj}")
if isinstance(row['Cq'], float):
sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq']
else:
sample_obj[f"ct_{row['Target'].lower()}"] = 0.0
try:
sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment']
del sample['ct']
except KeyError:
logger.error(f"No assessment for {sample_obj['sample']}")
samples.append(sample_obj)
return samples
pass
try:
del sample['assessment']
except KeyError:
pass
output.append(sample)
return output
@classmethod
def enforce_name(cls, instr: str, data: dict | None = {}) -> str: