Moments before disaster.
This commit is contained in:
@@ -84,7 +84,7 @@ class ControlType(BaseClass):
|
||||
Returns:
|
||||
List[ControlType]: Control types that have targets
|
||||
"""
|
||||
return [item for item in cls.query() if item.targets]
|
||||
return (item for item in cls.query() if item.targets)
|
||||
|
||||
@classmethod
|
||||
def build_positive_regex(cls) -> Pattern:
|
||||
|
||||
@@ -506,7 +506,7 @@ class BasicSubmission(BaseClass):
|
||||
Returns:
|
||||
List[str]: List of names
|
||||
"""
|
||||
return [item.role for item in self.submission_equipment_associations]
|
||||
return (item.role for item in self.submission_equipment_associations)
|
||||
|
||||
@classmethod
|
||||
def submissions_to_df(cls, submission_type: str | None = None, limit: int = 0,
|
||||
@@ -527,7 +527,8 @@ class BasicSubmission(BaseClass):
|
||||
# logger.debug(f"Using limit: {limit}")
|
||||
# NOTE: use lookup function to create list of dicts
|
||||
subs = [item.to_dict() for item in
|
||||
cls.query(submission_type=submission_type, limit=limit, chronologic=chronologic, page=page, page_size=page_size)]
|
||||
cls.query(submission_type=submission_type, limit=limit, chronologic=chronologic, page=page,
|
||||
page_size=page_size)]
|
||||
# logger.debug(f"Got {len(subs)} submissions.")
|
||||
df = pd.DataFrame.from_records(subs)
|
||||
# logger.debug(f"Column names: {df.columns}")
|
||||
@@ -971,6 +972,19 @@ class BasicSubmission(BaseClass):
|
||||
# samples.append(sample)
|
||||
# return samples
|
||||
|
||||
@classmethod
|
||||
def parse_pcr_controls(cls, xl: Workbook) -> list:
|
||||
location_map = cls.get_submission_type().sample_map['pcr_controls']
|
||||
name_column = 1
|
||||
for item in location_map:
|
||||
logger.debug(f"Looking for {item['name']}")
|
||||
worksheet = xl[item['sheet']]
|
||||
for iii, row in enumerate(worksheet.iter_rows(max_row=len(worksheet['A']), max_col=name_column), start=1):
|
||||
for cell in row:
|
||||
if cell.value == item['name']:
|
||||
logger.debug(f"Pulling from row {iii}, column {item['ct_column']}")
|
||||
yield dict(name=item['name'], ct=worksheet.cell(row=iii, column=item['ct_column']).value)
|
||||
|
||||
@classmethod
|
||||
def filename_template(cls) -> str:
|
||||
"""
|
||||
@@ -1560,6 +1574,11 @@ class Wastewater(BasicSubmission):
|
||||
for sample in output:
|
||||
yield sample
|
||||
|
||||
# @classmethod
|
||||
# def parse_pcr_controls(cls, xl: Workbook, location_map: list) -> list:
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr: str, data: dict | None = {}) -> str:
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
'''
|
||||
contains parser objects for pulling values from client generated submission sheets.
|
||||
'''
|
||||
import json
|
||||
from copy import copy
|
||||
from getpass import getuser
|
||||
from pprint import pformat
|
||||
@@ -142,7 +143,6 @@ class SheetParser(object):
|
||||
return PydSubmission(filepath=self.filepath, run_custom=True, **self.sub)
|
||||
|
||||
|
||||
|
||||
class InfoParser(object):
|
||||
"""
|
||||
Object to parse generic info from excel sheet.
|
||||
@@ -675,6 +675,7 @@ class PCRParser(object):
|
||||
rsl_plate_num = self.submission_obj.rsl_plate_num
|
||||
self.pcr = self.parse_general()
|
||||
self.samples = self.submission_obj.parse_pcr(xl=self.xl, rsl_plate_num=rsl_plate_num)
|
||||
self.controls = self.submission_obj.parse_pcr_controls(xl=self.xl)
|
||||
|
||||
def parse_general(self):
|
||||
"""
|
||||
@@ -698,3 +699,74 @@ class PCRParser(object):
|
||||
pcr['imported_by'] = getuser()
|
||||
# logger.debug(f"PCR: {pformat(pcr)}")
|
||||
return pcr
|
||||
|
||||
|
||||
class EDSParser(object):
|
||||
expand_device = {"QS7PRO": "QuantStudio tm 7 Pro System"}
|
||||
expand_block = {"BLOCK_96W_01ML": "96-Well 0.1-mL Block"}
|
||||
|
||||
def __init__(self, filepath: str | Path | None = None):
|
||||
logger.info(f"\n\nParsing {filepath.__str__()}\n\n")
|
||||
match filepath:
|
||||
case Path():
|
||||
self.filepath = filepath
|
||||
case str():
|
||||
self.filepath = Path(filepath)
|
||||
case _:
|
||||
logger.error(f"No filepath given.")
|
||||
raise ValueError("No filepath given.")
|
||||
self.eds = ZipFile(self.filepath)
|
||||
self.analysis_settings = json.loads(self.eds.read("primary/analysis_setting.json").decode("utf-8"))
|
||||
self.analysis_results = json.loads(self.eds.read("primary/analysis_setting.json").decode("utf-8"))
|
||||
self.presence_absence_results = json.loads(
|
||||
self.eds.read("extensions/am.pa/presence_absence_result.json").decode("utf-8"))
|
||||
self.presence_absence_settings = json.loads(
|
||||
self.eds.read("extensions/am.pa/presence_absence_setting.json").decode("utf-8"))
|
||||
self.run_summary = json.loads(self.eds.read("run/run_summary.json").decode("utf-8"))
|
||||
self.run_method = json.loads(self.eds.read("setup/run_method.json").decode("utf-8"))
|
||||
self.plate_setup = json.loads(self.eds.read("setup/plate_setup.json").decode("utf-8"))
|
||||
self.eds_summary = json.loads(self.eds.read("summary.json").decode("utf-8"))
|
||||
|
||||
def parse_DA_date_format(self, value: int) -> datetime:
|
||||
value = value / 1000
|
||||
return datetime.utcfromtimestamp(value)
|
||||
|
||||
def get_run_time(self, start: datetime, end: datetime) -> Tuple[str, str, str]:
|
||||
delta = end - start
|
||||
minutes, seconds = divmod(delta.seconds, 60)
|
||||
duration = f"{minutes} minutes {seconds} seconds"
|
||||
start_time = start.strftime("%Y-%m-%d %I:%M:%S %p %Z")
|
||||
end_time = end.strftime("%Y-%m-%d %I:%M:%S %p %Z")
|
||||
return start_time, end_time, duration
|
||||
|
||||
def parse_summary(self):
|
||||
summary = dict()
|
||||
summary['file_name'] = self.filepath.absolute().__str__()
|
||||
summary['comment'] = self.eds_summary['description']
|
||||
summary['operator'] = self.run_summary['operator']
|
||||
summary['barcode'] = self.plate_setup['plateBarcode']
|
||||
try:
|
||||
summary['instrument_type'] = self.__class__.expand_device[self.eds_summary['instrumentType']]
|
||||
except KeyError:
|
||||
summary['instrument_type'] = self.eds_summary['instrumentType']
|
||||
try:
|
||||
summary['block_type'] = self.__class__.expand_block[self.plate_setup['blockType']]
|
||||
except KeyError:
|
||||
summary['block_type'] = self.plate_setup['blockType']
|
||||
summary['instrument_name'] = self.run_summary['instrumentName']
|
||||
summary['instrument_serial_number'] = self.run_summary['instrumentSerialNumber']
|
||||
summary['heated_cover_serial_number'] = self.run_summary['coverSerialNumber']
|
||||
summary['block_serial_number'] = self.run_summary['blockSerialNumber']
|
||||
run_start = self.parse_DA_date_format(self.run_summary['startTime'])
|
||||
run_end = self.parse_DA_date_format(self.run_summary['endTime'])
|
||||
summary['run_start_date/time'], summary['run_end_date/time'], summary['run_duration'] = \
|
||||
self.get_run_time(run_start, run_end)
|
||||
summary['sample_volume'] = self.run_method['sampleVolume']
|
||||
summary['cover_temperature'] = self.run_method['coverTemperature']
|
||||
summary['passive_reference'] = self.plate_setup['passiveReference']
|
||||
summary['pcr_stage/step_number'] = f"Stage {self.analysis_settings['cqAnalysisStageNumber']} Step {self.analysis_settings['cqAnalysisStepNumber']}"
|
||||
summary['quantification_cycle_method'] = self.analysis_results['cqAlgorithmType']
|
||||
summary['analysis_date/time'] = self.parse_DA_date_format(self.eds_summary['analysis']['primary']['analysisTime'])
|
||||
summary['software_name_and_version'] = "Design & Analysis Software v2.8.0"
|
||||
summary['plugin_name_and_version'] = "Primary Analysis v1.8.1, Presence Absence v2.4.0"
|
||||
return summary
|
||||
|
||||
Reference in New Issue
Block a user