Tips parser complete.
This commit is contained in:
@@ -10,7 +10,7 @@ import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from pathlib import Path
|
||||
from backend.db.models import *
|
||||
from backend.validators import PydSubmission, PydReagent, RSLNamer, PydSample, PydEquipment
|
||||
from backend.validators import PydSubmission, PydReagent, RSLNamer, PydSample, PydEquipment, PydTips
|
||||
import logging, re
|
||||
from collections import OrderedDict
|
||||
from datetime import date
|
||||
@@ -57,6 +57,7 @@ class SheetParser(object):
|
||||
self.parse_reagents()
|
||||
self.parse_samples()
|
||||
self.parse_equipment()
|
||||
self.parse_tips()
|
||||
self.finalize_parse()
|
||||
# logger.debug(f"Parser.sub after info scrape: {pformat(self.sub)}")
|
||||
|
||||
@@ -101,6 +102,10 @@ class SheetParser(object):
|
||||
parser = EquipmentParser(xl=self.xl, submission_type=self.submission_type)
|
||||
self.sub['equipment'] = parser.parse_equipment()
|
||||
|
||||
def parse_tips(self):
|
||||
parser = TipParser(xl=self.xl, submission_type=self.submission_type)
|
||||
self.sub['tips'] = parser.parse_tips()
|
||||
|
||||
def import_kit_validation_check(self):
|
||||
"""
|
||||
Enforce that the parser has an extraction kit
|
||||
@@ -139,13 +144,21 @@ class SheetParser(object):
|
||||
pyd_dict['reagents'] = [PydReagent(**reagent) for reagent in self.sub['reagents']]
|
||||
# logger.debug(f"Equipment: {self.sub['equipment']}")
|
||||
try:
|
||||
check = len(self.sub['equipment']) == 0
|
||||
check = bool(self.sub['equipment'])
|
||||
except TypeError:
|
||||
check = True
|
||||
check = False
|
||||
if check:
|
||||
pyd_dict['equipment'] = None
|
||||
pyd_dict['equipment'] = [PydEquipment(**equipment) for equipment in self.sub['equipment']]
|
||||
else:
|
||||
pyd_dict['equipment'] = self.sub['equipment']
|
||||
pyd_dict['equipment'] = None
|
||||
try:
|
||||
check = bool(self.sub['tips'])
|
||||
except TypeError:
|
||||
check = False
|
||||
if check:
|
||||
pyd_dict['tips'] = [PydTips(**tips) for tips in self.sub['tips']]
|
||||
else:
|
||||
pyd_dict['tips'] = None
|
||||
psm = PydSubmission(filepath=self.filepath, **pyd_dict)
|
||||
return psm
|
||||
|
||||
@@ -535,6 +548,7 @@ class SampleParser(object):
|
||||
samples = remove_key_from_list_of_dicts(samples, "id")
|
||||
return sorted(samples, key=lambda k: (k['row'], k['column']))
|
||||
|
||||
|
||||
class EquipmentParser(object):
|
||||
|
||||
def __init__(self, xl: Workbook, submission_type: str|SubmissionType) -> None:
|
||||
@@ -567,15 +581,74 @@ class EquipmentParser(object):
|
||||
# logger.debug(f"Using equipment regex: {regex} on {input}")
|
||||
try:
|
||||
return regex.search(input).group().strip("-")
|
||||
except AttributeError:
|
||||
except AttributeError as e:
|
||||
logger.error(f"Error getting asset number for {input}: {e}")
|
||||
return input
|
||||
|
||||
def parse_equipment(self) -> List[PydEquipment]:
|
||||
def parse_equipment(self) -> List[dict]:
|
||||
"""
|
||||
Scrapes equipment from xl sheet
|
||||
|
||||
Returns:
|
||||
List[PydEquipment]: list of equipment
|
||||
List[dict]: list of equipment
|
||||
"""
|
||||
logger.debug(f"Equipment parser going into parsing: {pformat(self.__dict__)}")
|
||||
output = []
|
||||
# logger.debug(f"Sheets: {sheets}")
|
||||
for sheet in self.xl.sheetnames:
|
||||
ws = self.xl[sheet]
|
||||
try:
|
||||
relevant = {k:v for k,v in self.map.items() if v['sheet'] == sheet}
|
||||
except (TypeError, KeyError) as e:
|
||||
logger.error(f"Error creating relevant equipment list: {e}")
|
||||
continue
|
||||
logger.debug(f"Relevant equipment: {pformat(relevant)}")
|
||||
previous_asset = ""
|
||||
for k, v in relevant.items():
|
||||
logger.debug(f"Checking: {v}")
|
||||
asset = ws.cell(v['name']['row'], v['name']['column']).value
|
||||
if not check_not_nan(asset):
|
||||
asset = previous_asset
|
||||
else:
|
||||
previous_asset = asset
|
||||
asset = self.get_asset_number(input=asset)
|
||||
logger.debug(f"asset: {asset}")
|
||||
eq = Equipment.query(name=asset)
|
||||
process = ws.cell(row=v['process']['row'], column=v['process']['column']).value
|
||||
try:
|
||||
output.append(
|
||||
dict(name=eq.name, processes=[process], role=k, asset_number=eq.asset_number,
|
||||
nickname=eq.nickname))
|
||||
except AttributeError:
|
||||
logger.error(f"Unable to add {eq} to list.")
|
||||
logger.debug(f"Here is the output so far: {pformat(output)}")
|
||||
return output
|
||||
|
||||
|
||||
class TipParser(object):
|
||||
|
||||
def __init__(self, xl: Workbook, submission_type: str|SubmissionType) -> None:
|
||||
if isinstance(submission_type, str):
|
||||
submission_type = SubmissionType.query(name=submission_type)
|
||||
self.submission_type = submission_type
|
||||
self.xl = xl
|
||||
self.map = self.fetch_tip_map()
|
||||
|
||||
def fetch_tip_map(self) -> List[dict]:
|
||||
"""
|
||||
Gets the map of equipment locations in the submission type's spreadsheet
|
||||
|
||||
Returns:
|
||||
List[dict]: List of locations
|
||||
"""
|
||||
return self.submission_type.construct_tips_map()
|
||||
|
||||
def parse_tips(self) -> List[dict]:
|
||||
"""
|
||||
Scrapes equipment from xl sheet
|
||||
|
||||
Returns:
|
||||
List[dict]: list of equipment
|
||||
"""
|
||||
# logger.debug(f"Equipment parser going into parsing: {pformat(self.__dict__)}")
|
||||
output = []
|
||||
@@ -583,29 +656,34 @@ class EquipmentParser(object):
|
||||
for sheet in self.xl.sheetnames:
|
||||
ws = self.xl[sheet]
|
||||
try:
|
||||
relevant = [item for item in self.map if item['sheet'] == sheet]
|
||||
except (TypeError, KeyError):
|
||||
relevant = {k: v for k, v in self.map.items() if v['sheet'] == sheet}
|
||||
except (TypeError, KeyError) as e:
|
||||
logger.error(f"Error creating relevant equipment list: {e}")
|
||||
continue
|
||||
# logger.debug(f"Relevant equipment: {pformat(relevant)}")
|
||||
logger.debug(f"Relevant equipment: {pformat(relevant)}")
|
||||
previous_asset = ""
|
||||
for equipment in relevant:
|
||||
asset = ws.cell(equipment['name']['row'], equipment['name']['column'])
|
||||
for k, v in relevant.items():
|
||||
asset = ws.cell(v['name']['row'], v['name']['column']).value
|
||||
if "lot" in v.keys():
|
||||
lot = ws.cell(v['lot']['row'], v['lot']['column']).value
|
||||
else:
|
||||
lot = None
|
||||
if not check_not_nan(asset):
|
||||
asset = previous_asset
|
||||
else:
|
||||
previous_asset = asset
|
||||
asset = self.get_asset_number(input=asset)
|
||||
eq = Equipment.query(asset_number=asset)
|
||||
process = ws.cell(row=equipment['process']['row'], column=equipment['process']['column'])
|
||||
logger.debug(f"asset: {asset}")
|
||||
eq = Tips.query(lot=lot, name=asset, limit=1)
|
||||
# process = ws.cell(row=v['process']['row'], column=v['process']['column']).value
|
||||
try:
|
||||
output.append(
|
||||
dict(name=eq.name, processes=[process], role=equipment['role'], asset_number=asset,
|
||||
nickname=eq.nickname))
|
||||
dict(name=eq.name, role=k, lot=lot))
|
||||
except AttributeError:
|
||||
logger.error(f"Unable to add {eq} to PydEquipment list.")
|
||||
# logger.debug(f"Here is the output so far: {pformat(output)}")
|
||||
logger.error(f"Unable to add {eq} to PydTips list.")
|
||||
logger.debug(f"Here is the output so far: {pformat(output)}")
|
||||
return output
|
||||
|
||||
|
||||
class PCRParser(object):
|
||||
"""Object to pull data from Design and Analysis PCR export file."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user