Renaming ReagentType to ReagentRole

This commit is contained in:
lwark
2024-06-07 14:15:19 -05:00
parent a355c5bb76
commit c7b6c90eac
8 changed files with 197 additions and 129 deletions

View File

@@ -13,24 +13,27 @@ logger = logging.getLogger(f"submissions.{__name__}")
# table containing organization/contact relationship
orgs_contacts = Table(
"_orgs_contacts",
Base.metadata,
Column("org_id", INTEGER, ForeignKey("_organization.id")),
Column("contact_id", INTEGER, ForeignKey("_contact.id")),
# __table_args__ = {'extend_existing': True}
extend_existing = True
)
"_orgs_contacts",
Base.metadata,
Column("org_id", INTEGER, ForeignKey("_organization.id")),
Column("contact_id", INTEGER, ForeignKey("_contact.id")),
# __table_args__ = {'extend_existing': True}
extend_existing=True
)
class Organization(BaseClass):
"""
Base of organization
"""
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: organization name
submissions = relationship("BasicSubmission", back_populates="submitting_lab") #: submissions this organization has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization", secondary=orgs_contacts) #: contacts involved with this org
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: organization name
submissions = relationship("BasicSubmission",
back_populates="submitting_lab") #: submissions this organization has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization",
secondary=orgs_contacts) #: contacts involved with this org
def __repr__(self) -> str:
return f"<Organization({self.name})>"
@@ -38,10 +41,10 @@ class Organization(BaseClass):
@classmethod
@setup_lookup
def query(cls,
id:int|None=None,
name:str|None=None,
limit:int=0,
) -> Organization|List[Organization]:
id: int | None = None,
name: str | None = None,
limit: int = 0,
) -> Organization | List[Organization]:
"""
Lookup organizations in the database by a number of parameters.
@@ -51,11 +54,11 @@ class Organization(BaseClass):
Returns:
Organization|List[Organization]:
"""
"""
query: Query = cls.__database_session__.query(cls)
match id:
case int():
query = query.filter(cls.id==id)
query = query.filter(cls.id == id)
limit = 1
case _:
pass
@@ -73,33 +76,35 @@ class Organization(BaseClass):
def save(self):
super().save()
class Contact(BaseClass):
"""
Base of Contact
"""
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: contact name
email = Column(String(64)) #: contact email
phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization
submissions = relationship("BasicSubmission", back_populates="contact") #: submissions this contact has submitted
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: contact name
email = Column(String(64)) #: contact email
phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True,
secondary=orgs_contacts) #: relationship to joined organization
submissions = relationship("BasicSubmission", back_populates="contact") #: submissions this contact has submitted
def __repr__(self) -> str:
"""
Returns:
str: Representation of this Contact
"""
"""
return f"<Contact({self.name})>"
@classmethod
@setup_lookup
def query(cls,
name:str|None=None,
email:str|None=None,
phone:str|None=None,
limit:int=0,
) -> Contact|List[Contact]:
def query(cls,
name: str | None = None,
email: str | None = None,
phone: str | None = None,
limit: int = 0,
) -> Contact | List[Contact]:
"""
Lookup contacts in the database by a number of parameters.
@@ -111,29 +116,28 @@ class Contact(BaseClass):
Returns:
Contact|List[Contact]: Contact(s) of interest.
"""
"""
# super().query(session)
query: Query = cls.__database_session__.query(cls)
match name:
case str():
# logger.debug(f"Looking up contact with name: {name}")
query = query.filter(cls.name==name)
query = query.filter(cls.name == name)
limit = 1
case _:
pass
match email:
case str():
# logger.debug(f"Looking up contact with email: {name}")
query = query.filter(cls.email==email)
query = query.filter(cls.email == email)
limit = 1
case _:
pass
match phone:
case str():
# logger.debug(f"Looking up contact with phone: {name}")
query = query.filter(cls.phone==phone)
query = query.filter(cls.phone == phone)
limit = 1
case _:
pass
return cls.execute_query(query=query, limit=limit)

View File

@@ -248,6 +248,7 @@ class BasicSubmission(BaseClass):
ext_info = self.extraction_info
except TypeError:
ext_info = None
output = {
"id": self.id,
"plate_number": self.rsl_plate_num,
@@ -257,8 +258,7 @@ class BasicSubmission(BaseClass):
"submitting_lab": sub_lab,
"sample_count": self.sample_count,
"extraction_kit": ext_kit,
"cost": self.run_cost,
"cost": self.run_cost
}
if report:
return output
@@ -299,6 +299,15 @@ class BasicSubmission(BaseClass):
except Exception as e:
logger.error(f"Error setting comment: {self.comment}, {e}")
comments = None
try:
contact = self.contact.name
except AttributeError as e:
logger.error(f"Problem setting contact: {e}")
contact = "NA"
try:
contact_phone = self.contact.phone
except AttributeError:
contact_phone = "NA"
output["submission_category"] = self.submission_category
output["technician"] = self.technician
output["reagents"] = reagents
@@ -308,9 +317,9 @@ class BasicSubmission(BaseClass):
output["equipment"] = equipment
output["cost_centre"] = cost_centre
output["signed_by"] = self.signed_by
output["contact"] = self.contact.name
output["contact_phone"] = self.contact.phone
# logger.debug(f"Setting contact to: {contact} of type: {type(contact)}")
output["contact"] = contact
output["contact_phone"] = contact_phone
return output
def calculate_column_count(self) -> int:
@@ -431,7 +440,7 @@ class BasicSubmission(BaseClass):
excluded = ['controls', 'extraction_info', 'pcr_info', 'comment', 'comments', 'samples', 'reagents',
'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls',
'source_plates', 'pcr_technician', 'ext_technician', 'artic_technician', 'cost_centre',
'signed_by']
'signed_by', 'artic_date', 'gel_barcode', 'gel_date', 'ngs_date', 'contact_phone', 'contact']
for item in excluded:
try:
df = df.drop(item, axis=1)
@@ -544,7 +553,6 @@ class BasicSubmission(BaseClass):
# logger.debug("To dict complete")
new_dict = {}
for key, value in dicto.items():
# start = time()
# logger.debug(f"Checking {key}")
missing = value is None or value in ['', 'None']
match key:
@@ -1403,6 +1411,10 @@ class WastewaterArtic(BasicSubmission):
gel_info = Column(JSON) #: unstructured data from gel.
gel_controls = Column(JSON) #: locations of controls on the gel
source_plates = Column(JSON) #: wastewater plates that samples come from
artic_date = Column(TIMESTAMP) #: Date Artic Performed
ngs_date = Column(TIMESTAMP) #: Date submission received
gel_date = Column(TIMESTAMP) #: Date submission received
gel_barcode = Column(String(16))
__mapper_args__ = dict(polymorphic_identity="Wastewater Artic",
polymorphic_load="inline",
@@ -1418,12 +1430,18 @@ class WastewaterArtic(BasicSubmission):
output = super().to_dict(full_data=full_data, backup=backup, report=report)
if self.artic_technician in [None, "None"]:
output['artic_technician'] = self.technician
else:
output['artic_technician'] = self.artic_technician
if report:
return output
output['gel_info'] = self.gel_info
output['gel_image'] = self.gel_image
output['dna_core_submission_number'] = self.dna_core_submission_number
output['source_plates'] = self.source_plates
output['artic_date'] = self.artic_date or self.submitted_date
output['ngs_date'] = self.ngs_date or self.submitted_date
output['gel_date'] = self.gel_date or self.submitted_date
output['gel_barcode'] = self.gel_barcode
return output
@classmethod
@@ -1756,19 +1774,22 @@ class WastewaterArtic(BasicSubmission):
output.append(dicto)
continue
for item in self.source_plates:
old_plate = WastewaterAssociation.query(submission=item['plate'], sample=assoc.sample, limit=1)
if assoc.sample.id is None:
old_plate = None
else:
old_plate = WastewaterAssociation.query(submission=item['plate'], sample=assoc.sample, limit=1)
if old_plate is not None:
set_plate = old_plate.submission.rsl_plate_num
logger.debug(dicto['WW Processing Num'])
if dicto['WW Processing Num'].startswith("NTC"):
dicto['Well'] = dicto['WW Processing Num']
# logger.debug(f"Dictionary: {pformat(dicto)}")
if dicto['ww_processing_num'].startswith("NTC"):
dicto['well'] = dicto['ww_processing_num']
else:
dicto['Well'] = f"{row_map[old_plate.row]}{old_plate.column}"
dicto['well'] = f"{row_map[old_plate.row]}{old_plate.column}"
break
elif dicto['WW Processing Num'].startswith("NTC"):
dicto['Well'] = dicto['WW Processing Num']
elif dicto['ww_processing_num'].startswith("NTC"):
dicto['well'] = dicto['ww_processing_num']
dicto['plate_name'] = set_plate
logger.debug(f"Here is our raw sample: {pformat(dicto)}")
# logger.debug(f"Here is our raw sample: {pformat(dicto)}")
output.append(dicto)
return output
@@ -1801,7 +1822,7 @@ class WastewaterArtic(BasicSubmission):
fname = select_open_file(obj=obj, file_extension="jpg")
dlg = GelBox(parent=obj, img_path=fname, submission=self)
if dlg.exec():
self.dna_core_submission_number, img_path, output, comment = dlg.parse_form()
self.dna_core_submission_number, self.gel_barcode, img_path, output, comment = dlg.parse_form()
self.gel_image = img_path.name
self.gel_info = output
dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
@@ -2135,7 +2156,7 @@ class BasicSample(BaseClass):
if dlg.exec():
pass
#Below are the custom sample types
# Below are the custom sample types
class WastewaterSample(BasicSample):
"""
@@ -2235,6 +2256,7 @@ class WastewaterSample(BasicSample):
searchables.append(dict(label=label, field=item))
return searchables
class BacterialCultureSample(BasicSample):
"""
base of bacterial culture sample

View File

@@ -139,7 +139,7 @@ class SheetParser(object):
# logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pformat(self.sub)}")
pyd_dict = copy(self.sub)
pyd_dict['samples'] = [PydSample(**sample) for sample in self.sub['samples']]
logger.debug(f"Reagents: {pformat(self.sub['reagents'])}")
# logger.debug(f"Reagents: {pformat(self.sub['reagents'])}")
pyd_dict['reagents'] = [PydReagent(**reagent) for reagent in self.sub['reagents']]
# logger.debug(f"Equipment: {self.sub['equipment']}")
try:
@@ -215,13 +215,13 @@ class InfoParser(object):
new = location
new['name'] = k
relevant.append(new)
logger.debug(f"relevant map for {sheet}: {pformat(relevant)}")
# logger.debug(f"relevant map for {sheet}: {pformat(relevant)}")
if not relevant:
continue
for item in relevant:
# NOTE: Get cell contents at this location
value = ws.cell(row=item['row'], column=item['column']).value
logger.debug(f"Value for {item['name']} = {value}")
# logger.debug(f"Value for {item['name']} = {value}")
match item['name']:
case "submission_type":
value, missing = is_missing(value)

View File

@@ -53,7 +53,8 @@ class SheetWriter(object):
template = self.submission_type.template_file
workbook = load_workbook(BytesIO(template))
missing_only = False
self.workbook = workbook
# self.workbook = workbook
self.xl = workbook
self.write_info()
self.write_reagents()
self.write_samples()
@@ -62,23 +63,23 @@ class SheetWriter(object):
def write_info(self):
disallowed = ['filepath', 'reagents', 'samples', 'equipment', 'controls']
info_dict = {k: v for k, v in self.sub.items() if k not in disallowed}
writer = InfoWriter(xl=self.workbook, submission_type=self.submission_type, info_dict=info_dict)
writer = InfoWriter(xl=self.xl, submission_type=self.submission_type, info_dict=info_dict)
self.xl = writer.write_info()
def write_reagents(self):
reagent_list = self.sub['reagents']
writer = ReagentWriter(xl=self.workbook, submission_type=self.submission_type,
writer = ReagentWriter(xl=self.xl, submission_type=self.submission_type,
extraction_kit=self.sub['extraction_kit'], reagent_list=reagent_list)
self.xl = writer.write_reagents()
def write_samples(self):
sample_list = self.sub['samples']
writer = SampleWriter(xl=self.workbook, submission_type=self.submission_type, sample_list=sample_list)
writer = SampleWriter(xl=self.xl, submission_type=self.submission_type, sample_list=sample_list)
self.xl = writer.write_samples()
def write_equipment(self):
equipment_list = self.sub['equipment']
writer = EquipmentWriter(xl=self.workbook, submission_type=self.submission_type, equipment_list=equipment_list)
writer = EquipmentWriter(xl=self.xl, submission_type=self.submission_type, equipment_list=equipment_list)
self.xl = writer.write_equipment()
@@ -93,18 +94,18 @@ class InfoWriter(object):
self.submission_type = submission_type
self.sub_object = sub_object
self.xl = xl
map = submission_type.construct_info_map(mode='write')
self.info = self.reconcile_map(info_dict, map)
info_map = submission_type.construct_info_map(mode='write')
self.info = self.reconcile_map(info_dict, info_map)
# logger.debug(pformat(self.info))
def reconcile_map(self, info_dict: dict, map: dict) -> dict:
def reconcile_map(self, info_dict: dict, info_map: dict) -> dict:
output = {}
for k, v in info_dict.items():
if v is None:
continue
dicto = {}
try:
dicto['locations'] = map[k]
dicto['locations'] = info_map[k]
except KeyError:
# continue
pass
@@ -126,7 +127,7 @@ class InfoWriter(object):
logger.error(f"No locations for {k}, skipping")
continue
for loc in locations:
# logger.debug(f"Writing {k} to {loc['sheet']}, row: {loc['row']}, column: {loc['column']}")
logger.debug(f"Writing {k} to {loc['sheet']}, row: {loc['row']}, column: {loc['column']}")
sheet = self.xl[loc['sheet']]
sheet.cell(row=loc['row'], column=loc['column'], value=v['value'])
return self.sub_object.custom_info_writer(self.xl, info=self.info)
@@ -141,14 +142,14 @@ class ReagentWriter(object):
submission_type = SubmissionType.query(name=submission_type)
if isinstance(extraction_kit, str):
kit_type = KitType.query(name=extraction_kit)
map = kit_type.construct_xl_map_for_use(submission_type)
self.reagents = self.reconcile_map(reagent_list=reagent_list, map=map)
reagent_map = kit_type.construct_xl_map_for_use(submission_type)
self.reagents = self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map)
def reconcile_map(self, reagent_list, map) -> List[dict]:
def reconcile_map(self, reagent_list, reagent_map) -> List[dict]:
output = []
for reagent in reagent_list:
try:
mp_info = map[reagent['role']]
mp_info = reagent_map[reagent['role']]
except KeyError:
continue
placeholder = copy(reagent)
@@ -182,7 +183,7 @@ class SampleWriter(object):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
self.map = submission_type.construct_sample_map()['lookup_table']
self.sample_map = submission_type.construct_sample_map()['lookup_table']
self.samples = self.reconcile_map(sample_list)
def reconcile_map(self, sample_list: list):
@@ -200,10 +201,10 @@ class SampleWriter(object):
return sorted(output, key=lambda k: k['submission_rank'])
def write_samples(self):
sheet = self.xl[self.map['sheet']]
columns = self.map['sample_columns']
sheet = self.xl[self.sample_map['sheet']]
columns = self.sample_map['sample_columns']
for ii, sample in enumerate(self.samples):
row = self.map['start_row'] + (sample['submission_rank'] - 1)
row = self.sample_map['start_row'] + (sample['submission_rank'] - 1)
for k, v in sample.items():
try:
column = columns[k]
@@ -220,13 +221,15 @@ class EquipmentWriter(object):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
map = self.submission_type.construct_equipment_map()
self.equipment = self.reconcile_map(equipment_list=equipment_list, map=map)
equipment_map = self.submission_type.construct_equipment_map()
self.equipment = self.reconcile_map(equipment_list=equipment_list, equipment_map=equipment_map)
def reconcile_map(self, equipment_list: list, map: list):
def reconcile_map(self, equipment_list: list, equipment_map: list):
output = []
if equipment_list is None:
return output
for ii, equipment in enumerate(equipment_list, start=1):
mp_info = map[equipment['role']]
mp_info = equipment_map[equipment['role']]
# logger.debug(f"{equipment['role']} map: {mp_info}")
placeholder = copy(equipment)
if mp_info == {}:

View File

@@ -573,10 +573,22 @@ class PydSubmission(BaseModel, extra='allow'):
@field_validator("contact")
@classmethod
def get_contact_from_org(cls, value, values):
logger.debug(f"Checking on value: {value}")
match value:
case dict():
if isinstance(value['value'], tuple):
value['value'] = value['value'][0]
case tuple():
value = dict(value=value[0], missing=False)
case _:
value = dict(value=value, missing=False)
check = Contact.query(name=value['value'])
if check is None:
org = Organization.query(name=values.data['submitting_lab']['value'])
contact = org.contacts[0].name
logger.debug(f"Pulled: {contact}")
if isinstance(contact, tuple):
contact = contact[0]
return dict(value=contact, missing=True)
else:
return value