From 231173232871400e8e4feef6ad959f8b4f424ee5 Mon Sep 17 00:00:00 2001 From: lwark Date: Mon, 5 May 2025 10:41:26 -0500 Subject: [PATCH] Attempt at spinning off clientsubmission. --- TODO.md | 3 + .../versions/0746f7e2c10e_rebuild_20240723.py | 428 ---- ...5b86f5ac2d9_add_custom_info_column_to__.py | 56 - src/submissions/backend/db/models/kits.py | 17 +- .../backend/db/models/organizations.py | 4 +- .../backend/db/models/submissions.py | 2039 +++++++++-------- 6 files changed, 1068 insertions(+), 1479 deletions(-) delete mode 100644 alembic/versions/0746f7e2c10e_rebuild_20240723.py delete mode 100644 alembic/versions/25b86f5ac2d9_add_custom_info_column_to__.py diff --git a/TODO.md b/TODO.md index aeee29b..817b4f3 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,6 @@ +- [ ] Add in database objects for rsl_run (submission -> run), procedure (run -> procedure), many more things will likely be associated with procedure. +- [ ] Add in database object for client submission. +- [ ] Add arbitrary pipette addition to equipment UI. - [ ] transfer details template rendering fully into sql objects - [x] Add in connecting links for tips. - [x] Add in connecting links for equipment/processes. diff --git a/alembic/versions/0746f7e2c10e_rebuild_20240723.py b/alembic/versions/0746f7e2c10e_rebuild_20240723.py deleted file mode 100644 index b8de6e9..0000000 --- a/alembic/versions/0746f7e2c10e_rebuild_20240723.py +++ /dev/null @@ -1,428 +0,0 @@ -"""rebuild 20240723 - -Revision ID: 0746f7e2c10e -Revises: -Create Date: 2024-07-23 14:08:37.436400 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '0746f7e2c10e' -down_revision = None -branch_labels = None -depends_on = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('_basicsample', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('submitter_id', sa.String(length=64), nullable=False), - sa.Column('sample_type', sa.String(length=32), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('submitter_id') - ) - op.create_table('_configitem', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('key', sa.String(length=32), nullable=True), - sa.Column('value', sa.JSON(), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_contact', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('email', sa.String(length=64), nullable=True), - sa.Column('phone', sa.String(length=32), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_controltype', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=255), nullable=True), - sa.Column('targets', sa.JSON(), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('sub_type') - ) - op.create_table('_equipment', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('nickname', sa.String(length=64), nullable=True), - sa.Column('asset_number', sa.String(length=16), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_equipmentrole', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=32), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_kittype', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('sub_type') - ) - op.create_table('_organization', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('cost_centre', sa.String(), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_process', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('sub_type') - ) - op.create_table('_reagentrole', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('eol_ext', sa.Interval(), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_submissiontype', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=128), nullable=True), - sa.Column('info_map', sa.JSON(), nullable=True), - sa.Column('defaults', sa.JSON(), nullable=True), - sa.Column('template_file', sa.LargeBinary(), nullable=True), - sa.Column('sample_map', sa.JSON(), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('sub_type') - ) - op.create_table('_tiprole', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_bacterialculturesample', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('organism', sa.String(length=64), nullable=True), - sa.Column('concentration', sa.String(length=16), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_basicsample.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_discount', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('kit_id', sa.INTEGER(), nullable=True), - sa.Column('client_id', sa.INTEGER(), nullable=True), - sa.Column('sub_type', sa.String(length=128), nullable=True), - sa.Column('amount', sa.FLOAT(precision=2), nullable=True), - sa.ForeignKeyConstraint(['client_id'], ['_organization.id'], name='fk_org_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['kit_id'], ['_kittype.id'], name='fk_kit_type_id', ondelete='SET NULL'), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_equipment_processes', - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('equipment_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipment_id'], ['_equipment.id'], ), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], ) - ) - op.create_table('_equipmentroles_equipment', - sa.Column('equipment_id', sa.INTEGER(), nullable=True), - sa.Column('equipmentroles_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipment_id'], ['_equipment.id'], ), - sa.ForeignKeyConstraint(['equipmentroles_id'], ['_equipmentrole.id'], ) - ) - op.create_table('_equipmentroles_processes', - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('equipmentrole_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipmentrole_id'], ['_equipmentrole.id'], ), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], ) - ) - op.create_table('_kittypereagentroleassociation', - sa.Column('reagent_roles_id', sa.INTEGER(), nullable=False), - sa.Column('kits_id', sa.INTEGER(), nullable=False), - sa.Column('submission_type_id', sa.INTEGER(), nullable=False), - sa.Column('uses', sa.JSON(), nullable=True), - sa.Column('required', sa.INTEGER(), nullable=True), - sa.Column('last_used', sa.String(length=32), nullable=True), - sa.ForeignKeyConstraint(['kits_id'], ['_kittype.id'], ), - sa.ForeignKeyConstraint(['reagent_roles_id'], ['_reagentrole.id'], ), - sa.ForeignKeyConstraint(['submission_type_id'], ['_submissiontype.id'], ), - sa.PrimaryKeyConstraint('reagent_roles_id', 'kits_id', 'submission_type_id') - ) - op.create_table('_kittypes_processes', - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('kit_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['kit_id'], ['_kittype.id'], ), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], ) - ) - op.create_table('_orgs_contacts', - sa.Column('org_id', sa.INTEGER(), nullable=True), - sa.Column('contact_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['contact_id'], ['_contact.id'], ), - sa.ForeignKeyConstraint(['org_id'], ['_organization.id'], ) - ) - op.create_table('_process_tiprole', - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('tiprole_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], ), - sa.ForeignKeyConstraint(['tiprole_id'], ['_tiprole.id'], ) - ) - op.create_table('_reagent', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('role_id', sa.INTEGER(), nullable=True), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('lot', sa.String(length=64), nullable=True), - sa.Column('expiry', sa.TIMESTAMP(), nullable=True), - sa.ForeignKeyConstraint(['role_id'], ['_reagentrole.id'], name='fk_reagent_role_id', ondelete='SET NULL'), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_submissiontypeequipmentroleassociation', - sa.Column('equipmentrole_id', sa.INTEGER(), nullable=False), - sa.Column('submissiontype_id', sa.INTEGER(), nullable=False), - sa.Column('uses', sa.JSON(), nullable=True), - sa.Column('static', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipmentrole_id'], ['_equipmentrole.id'], ), - sa.ForeignKeyConstraint(['submissiontype_id'], ['_submissiontype.id'], ), - sa.PrimaryKeyConstraint('equipmentrole_id', 'submissiontype_id') - ) - op.create_table('_submissiontypekittypeassociation', - sa.Column('submission_types_id', sa.INTEGER(), nullable=False), - sa.Column('kits_id', sa.INTEGER(), nullable=False), - sa.Column('mutable_cost_column', sa.FLOAT(precision=2), nullable=True), - sa.Column('mutable_cost_sample', sa.FLOAT(precision=2), nullable=True), - sa.Column('constant_cost', sa.FLOAT(precision=2), nullable=True), - sa.ForeignKeyConstraint(['kits_id'], ['_kittype.id'], ), - sa.ForeignKeyConstraint(['submission_types_id'], ['_submissiontype.id'], ), - sa.PrimaryKeyConstraint('submission_types_id', 'kits_id') - ) - op.create_table('_submissiontypes_processes', - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('equipmentroles_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipmentroles_id'], ['_submissiontype.id'], ), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], ) - ) - op.create_table('_submissiontypetiproleassociation', - sa.Column('tiprole_id', sa.INTEGER(), nullable=False), - sa.Column('submissiontype_id', sa.INTEGER(), nullable=False), - sa.Column('uses', sa.JSON(), nullable=True), - sa.Column('static', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['submissiontype_id'], ['_submissiontype.id'], ), - sa.ForeignKeyConstraint(['tiprole_id'], ['_tiprole.id'], ), - sa.PrimaryKeyConstraint('tiprole_id', 'submissiontype_id') - ) - op.create_table('_tips', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('role_id', sa.INTEGER(), nullable=True), - sa.Column('sub_type', sa.String(length=64), nullable=True), - sa.Column('lot', sa.String(length=64), nullable=True), - sa.ForeignKeyConstraint(['role_id'], ['_tiprole.id'], name='fk_tip_role_id', ondelete='SET NULL'), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_wastewatersample', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('ww_processing_num', sa.String(length=64), nullable=True), - sa.Column('ww_full_sample_id', sa.String(length=64), nullable=True), - sa.Column('rsl_number', sa.String(length=64), nullable=True), - sa.Column('collection_date', sa.TIMESTAMP(), nullable=True), - sa.Column('received_date', sa.TIMESTAMP(), nullable=True), - sa.Column('notes', sa.String(length=2000), nullable=True), - sa.Column('sample_location', sa.String(length=8), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_basicsample.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_basicsubmission', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('rsl_plate_num', sa.String(length=32), nullable=False), - sa.Column('submitter_plate_num', sa.String(length=127), nullable=True), - sa.Column('submitted_date', sa.TIMESTAMP(), nullable=True), - sa.Column('submitting_lab_id', sa.INTEGER(), nullable=True), - sa.Column('sample_count', sa.INTEGER(), nullable=True), - sa.Column('extraction_kit_id', sa.INTEGER(), nullable=True), - sa.Column('submission_type_name', sa.String(), nullable=True), - sa.Column('technician', sa.String(length=64), nullable=True), - sa.Column('reagents_id', sa.INTEGER(), nullable=True), - sa.Column('extraction_info', sa.JSON(), nullable=True), - sa.Column('run_cost', sa.FLOAT(precision=2), nullable=True), - sa.Column('signed_by', sa.String(length=32), nullable=True), - sa.Column('comment', sa.JSON(), nullable=True), - sa.Column('submission_category', sa.String(length=64), nullable=True), - sa.Column('cost_centre', sa.String(length=64), nullable=True), - sa.Column('contact_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['contact_id'], ['_contact.id'], name='fk_BS_contact_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['extraction_kit_id'], ['_kittype.id'], name='fk_BS_extkit_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['reagents_id'], ['_reagent.id'], name='fk_BS_reagents_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['submission_type_name'], ['_submissiontype.sub_type'], name='fk_BS_subtype_name', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['submitting_lab_id'], ['_organization.id'], name='fk_BS_sublab_id', ondelete='SET NULL'), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('rsl_plate_num'), - sa.UniqueConstraint('submitter_plate_num') - ) - op.create_table('_equipment_tips', - sa.Column('equipment_id', sa.INTEGER(), nullable=True), - sa.Column('tips_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['equipment_id'], ['_equipment.id'], ), - sa.ForeignKeyConstraint(['tips_id'], ['_tips.id'], ) - ) - op.create_table('_reagentroles_reagents', - sa.Column('reagent_id', sa.INTEGER(), nullable=True), - sa.Column('reagentrole_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['reagent_id'], ['_reagent.id'], ), - sa.ForeignKeyConstraint(['reagentrole_id'], ['_reagentrole.id'], ) - ) - op.create_table('_tiproles_tips', - sa.Column('tiprole_id', sa.INTEGER(), nullable=True), - sa.Column('tips_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['tiprole_id'], ['_tiprole.id'], ), - sa.ForeignKeyConstraint(['tips_id'], ['_tips.id'], ) - ) - op.create_table('_bacterialculture', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.ForeignKeyConstraint(['id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_control', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('parent_id', sa.INTEGER(), nullable=True), - sa.Column('sub_type', sa.String(length=255), nullable=True), - sa.Column('submitted_date', sa.TIMESTAMP(), nullable=True), - sa.Column('contains', sa.JSON(), nullable=True), - sa.Column('matches', sa.JSON(), nullable=True), - sa.Column('kraken', sa.JSON(), nullable=True), - sa.Column('submission_id', sa.INTEGER(), nullable=True), - sa.Column('refseq_version', sa.String(length=16), nullable=True), - sa.Column('kraken2_version', sa.String(length=16), nullable=True), - sa.Column('kraken2_db_version', sa.String(length=32), nullable=True), - sa.Column('sample_id', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['parent_id'], ['_controltype.id'], name='fk_control_parent_id'), - sa.ForeignKeyConstraint(['sample_id'], ['_basicsample.id'], name='cont_BCS_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['submission_id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('sub_type') - ) - op.create_table('_submissionequipmentassociation', - sa.Column('equipment_id', sa.INTEGER(), nullable=False), - sa.Column('submission_id', sa.INTEGER(), nullable=False), - sa.Column('role', sa.String(length=64), nullable=False), - sa.Column('process_id', sa.INTEGER(), nullable=True), - sa.Column('start_time', sa.TIMESTAMP(), nullable=True), - sa.Column('end_time', sa.TIMESTAMP(), nullable=True), - sa.Column('comments', sa.String(length=1024), nullable=True), - sa.ForeignKeyConstraint(['equipment_id'], ['_equipment.id'], ), - sa.ForeignKeyConstraint(['process_id'], ['_process.id'], name='SEA_Process_id', ondelete='SET NULL'), - sa.ForeignKeyConstraint(['submission_id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('equipment_id', 'submission_id', 'role') - ) - op.create_table('_submissionreagentassociation', - sa.Column('reagent_id', sa.INTEGER(), nullable=False), - sa.Column('submission_id', sa.INTEGER(), nullable=False), - sa.Column('comments', sa.String(length=1024), nullable=True), - sa.ForeignKeyConstraint(['reagent_id'], ['_reagent.id'], ), - sa.ForeignKeyConstraint(['submission_id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('reagent_id', 'submission_id') - ) - op.create_table('_submissionsampleassociation', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('sample_id', sa.INTEGER(), nullable=False), - sa.Column('submission_id', sa.INTEGER(), nullable=False), - sa.Column('row', sa.INTEGER(), nullable=False), - sa.Column('column', sa.INTEGER(), nullable=False), - sa.Column('submission_rank', sa.INTEGER(), nullable=False), - sa.Column('base_sub_type', sa.String(), nullable=True), - sa.ForeignKeyConstraint(['sample_id'], ['_basicsample.id'], ), - sa.ForeignKeyConstraint(['submission_id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('submission_id', 'row', 'column'), - sa.UniqueConstraint('id') - ) - op.create_table('_submissiontipsassociation', - sa.Column('tip_id', sa.INTEGER(), nullable=False), - sa.Column('submission_id', sa.INTEGER(), nullable=False), - sa.Column('role_name', sa.String(length=32), nullable=False), - sa.ForeignKeyConstraint(['submission_id'], ['_basicsubmission.id'], ), - sa.ForeignKeyConstraint(['tip_id'], ['_tips.id'], ), - sa.PrimaryKeyConstraint('tip_id', 'submission_id', 'role_name') - ) - op.create_table('_wastewater', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('ext_technician', sa.String(length=64), nullable=True), - sa.Column('pcr_technician', sa.String(length=64), nullable=True), - sa.Column('pcr_info', sa.JSON(), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_wastewaterartic', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('artic_technician', sa.String(length=64), nullable=True), - sa.Column('dna_core_submission_number', sa.String(length=64), nullable=True), - sa.Column('pcr_info', sa.JSON(), nullable=True), - sa.Column('gel_image', sa.String(length=64), nullable=True), - sa.Column('gel_info', sa.JSON(), nullable=True), - sa.Column('gel_controls', sa.JSON(), nullable=True), - sa.Column('source_plates', sa.JSON(), nullable=True), - sa.Column('artic_date', sa.TIMESTAMP(), nullable=True), - sa.Column('ngs_date', sa.TIMESTAMP(), nullable=True), - sa.Column('gel_date', sa.TIMESTAMP(), nullable=True), - sa.Column('gel_barcode', sa.String(length=16), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_basicsubmission.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_wastewaterarticassociation', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('source_plate', sa.String(length=16), nullable=True), - sa.Column('source_plate_number', sa.INTEGER(), nullable=True), - sa.Column('source_well', sa.String(length=8), nullable=True), - sa.Column('ct', sa.String(length=8), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_submissionsampleassociation.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('_wastewaterassociation', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('ct_n1', sa.FLOAT(precision=2), nullable=True), - sa.Column('ct_n2', sa.FLOAT(precision=2), nullable=True), - sa.Column('n1_status', sa.String(length=32), nullable=True), - sa.Column('n2_status', sa.String(length=32), nullable=True), - sa.Column('pcr_results', sa.JSON(), nullable=True), - sa.ForeignKeyConstraint(['id'], ['_submissionsampleassociation.id'], ), - sa.PrimaryKeyConstraint('id') - ) - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('_wastewaterassociation') - op.drop_table('_wastewaterarticassociation') - op.drop_table('_wastewaterartic') - op.drop_table('_wastewater') - op.drop_table('_submissiontipsassociation') - op.drop_table('_submissionsampleassociation') - op.drop_table('_submissionreagentassociation') - op.drop_table('_submissionequipmentassociation') - op.drop_table('_control') - op.drop_table('_bacterialculture') - op.drop_table('_tiproles_tips') - op.drop_table('_reagentroles_reagents') - op.drop_table('_equipment_tips') - op.drop_table('_basicsubmission') - op.drop_table('_wastewatersample') - op.drop_table('_tips') - op.drop_table('_submissiontypetiproleassociation') - op.drop_table('_submissiontypes_processes') - op.drop_table('_submissiontypekittypeassociation') - op.drop_table('_submissiontypeequipmentroleassociation') - op.drop_table('_reagent') - op.drop_table('_process_tiprole') - op.drop_table('_orgs_contacts') - op.drop_table('_kittypes_processes') - op.drop_table('_kittypereagentroleassociation') - op.drop_table('_equipmentroles_processes') - op.drop_table('_equipmentroles_equipment') - op.drop_table('_equipment_processes') - op.drop_table('_discount') - op.drop_table('_bacterialculturesample') - op.drop_table('_tiprole') - op.drop_table('_submissiontype') - op.drop_table('_reagentrole') - op.drop_table('_process') - op.drop_table('_organization') - op.drop_table('_kittype') - op.drop_table('_equipmentrole') - op.drop_table('_equipment') - op.drop_table('_controltype') - op.drop_table('_contact') - op.drop_table('_configitem') - op.drop_table('_basicsample') - # ### end Alembic commands ### diff --git a/alembic/versions/25b86f5ac2d9_add_custom_info_column_to__.py b/alembic/versions/25b86f5ac2d9_add_custom_info_column_to__.py deleted file mode 100644 index 2521d35..0000000 --- a/alembic/versions/25b86f5ac2d9_add_custom_info_column_to__.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Add custom info column to _basicsubmission - -Revision ID: 25b86f5ac2d9 -Revises: 0746f7e2c10e -Create Date: 2024-09-24 09:09:15.223556 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '25b86f5ac2d9' -down_revision = '0746f7e2c10e' -branch_labels = None -depends_on = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('_basicsubmission', schema=None) as batch_op: - batch_op.add_column(sa.Column('custom', sa.JSON(), nullable=True)) - - # with op.batch_alter_table('_process', schema=None) as batch_op: - # batch_op.create_unique_constraint(None, ['name']) - # - # with op.batch_alter_table('_submissionsampleassociation', schema=None) as batch_op: - # batch_op.create_unique_constraint(None, ['id']) - - with op.batch_alter_table('_wastewaterarticassociation', schema=None) as batch_op: - batch_op.alter_column('source_plate', - existing_type=sa.VARCHAR(length=16), - type_=sa.String(length=32), - existing_nullable=True) - - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('_wastewaterarticassociation', schema=None) as batch_op: - batch_op.alter_column('source_plate', - existing_type=sa.String(length=32), - type_=sa.VARCHAR(length=16), - existing_nullable=True) - - # with op.batch_alter_table('_submissionsampleassociation', schema=None) as batch_op: - # batch_op.drop_constraint(None, type_='unique') - # - # with op.batch_alter_table('_process', schema=None) as batch_op: - # batch_op.drop_constraint(None, type_='unique') - - with op.batch_alter_table('_basicsubmission', schema=None) as batch_op: - batch_op.drop_column('custom') - - # ### end Alembic commands ### diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index c76701f..ff0966f 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -94,6 +94,14 @@ equipment_tips = Table( extend_existing=True ) +kittypes_submissions = Table( + "_kittypes_submissions", + Base.metadata, + Column("_basicsubmission_id", INTEGER, ForeignKey("_basicsubmission.id")), + Column("kittype_id", INTEGER, ForeignKey("_kittype.id")), + extend_existing=True +) + class KitType(BaseClass): """ @@ -104,7 +112,8 @@ class KitType(BaseClass): id = Column(INTEGER, primary_key=True) #: primary key name = Column(String(64), unique=True) #: name of kit - submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for + submissions = relationship("BasicSubmission", back_populates="kittypes", + secondary=kittypes_submissions) #: submissions this kit was used for processes = relationship("Process", back_populates="kit_types", secondary=kittypes_processes) #: equipment processes used by this kit @@ -1039,9 +1048,9 @@ class SubmissionType(BaseClass): } """ - submissiontype_kit_associations = relationship( - "SubmissionTypeKitTypeAssociation", - back_populates="submission_type", + runtype_kit_associations = relationship( + "RunTypeKitTypeAssociation", + back_populates="runtype", cascade="all, delete-orphan", ) #: Association of kittypes diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py index e56eaa6..3beab4a 100644 --- a/src/submissions/backend/db/models/organizations.py +++ b/src/submissions/backend/db/models/organizations.py @@ -31,7 +31,7 @@ class Organization(BaseClass): id = Column(INTEGER, primary_key=True) #: primary key name = Column(String(64)) #: organization name - submissions = relationship("BasicSubmission", + submissions = relationship("ClientSubmission", back_populates="submitting_lab") #: submissions this organization has submitted cost_centre = Column(String()) #: cost centre used by org for payment contacts = relationship("Contact", back_populates="organization", @@ -103,7 +103,7 @@ class Contact(BaseClass): phone = Column(String(32)) #: contact phone number organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization - submissions = relationship("BasicSubmission", back_populates="contact") #: submissions this contact has submitted + submissions = relationship("ClientSubmission", back_populates="contact") #: submissions this contact has submitted @classproperty def searchables(cls): diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index ed39afa..1827288 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -15,8 +15,9 @@ from operator import itemgetter from pprint import pformat from pandas import DataFrame from sqlalchemy.ext.hybrid import hybrid_property -from . import BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, LogMixin, SubmissionReagentAssociation -from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func +from . import Base, BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, LogMixin, \ + SubmissionReagentAssociation +from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.ext.associationproxy import association_proxy @@ -34,46 +35,37 @@ from jinja2.exceptions import TemplateNotFound from jinja2 import Template from PIL import Image +from . import kittypes_submissions + logger = logging.getLogger(f"submissions.{__name__}") -class BasicSubmission(BaseClass, LogMixin): +class ClientSubmission(BaseClass, LogMixin): """ - Concrete of basic submission which polymorphs into BacterialCulture and Wastewater + Object for the client submission from which all run objects will be created. """ id = Column(INTEGER, primary_key=True) #: primary key - rsl_plate_num = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012) submitter_plate_num = Column(String(127), unique=True) #: The number given to the submission by the submitting lab submitted_date = Column(TIMESTAMP) #: Date submission received submitting_lab = relationship("Organization", back_populates="submissions") #: client org submitting_lab_id = Column(INTEGER, ForeignKey("_organization.id", ondelete="SET NULL", name="fk_BS_sublab_id")) #: client lab id from _organizations - sample_count = Column(INTEGER) #: Number of samples in the submission - extraction_kit = relationship("KitType", back_populates="submissions") #: The extraction kit used - extraction_kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete="SET NULL", - name="fk_BS_extkit_id")) #: id of joined extraction kit - submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", - name="fk_BS_subtype_name")) #: name of joined submission type - technician = Column(String(64)) #: initials of processing tech(s) - reagents_id = Column(String, ForeignKey("_reagent.id", ondelete="SET NULL", - name="fk_BS_reagents_id")) #: id of used reagents - extraction_info = Column(JSON) #: unstructured output from the extraction table logger. - run_cost = Column( - FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kit costs at time of creation. - signed_by = Column(String(32)) #: user name of person who submitted the submission to the database. - comment = Column(JSON) #: user notes - submission_category = Column( + _submission_category = Column( String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name - cost_centre = Column( - String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. + sample_count = Column(INTEGER) #: Number of samples in the submission + + runs = relationship("BasicRun", back_populates="client_submission") #: many-to-one relationship + contact = relationship("Contact", back_populates="submissions") #: client org contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL", name="fk_BS_contact_id")) #: client lab id from _organizations - custom = Column(JSON) - controls = relationship("Control", back_populates="submission", - uselist=True) #: A control sample added to submission - completed_date = Column(TIMESTAMP) + submission_type = relationship("SubmissionType", back_populates="instances") #: client org + submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", + name="fk_BS_subtype_name")) #: name of joined submission type + + cost_centre = Column( + String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. submission_sample_associations = relationship( "SubmissionSampleAssociation", @@ -85,6 +77,73 @@ class BasicSubmission(BaseClass, LogMixin): "sample", creator=lambda sample: SubmissionSampleAssociation( sample=sample)) #: Association proxy to SubmissionSampleAssociation.samples + @hybrid_property + def submission_category(self): + return self._submission_category + + @submission_category.setter + def submission_category(self, submission_category): + if submission_category in ["Research", "Diagnostic", "Surveillance", "Validation"]: + self._submission_category = submission_category + else: + try: + self._submission_category = self.submission_type_name + except AttributeError: + self._submission_category = "NA" + + +class BasicSubmission(BaseClass, LogMixin): + """ + Object for an entire submission run. Links to client submissions, reagents, equipment, processes + """ + + id = Column(INTEGER, primary_key=True) #: primary key + rsl_plate_number = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012) + client_submission = relationship("ClientSubmission", back_populates="runs") + # submitter_plate_num = Column(String(127), unique=True) #: The number given to the submission by the submitting lab + started_date = Column(TIMESTAMP) #: Date this run was started. + # submitting_lab = relationship("Organization", back_populates="submissions") #: client org + # submitting_lab_id = Column(INTEGER, ForeignKey("_organization.id", ondelete="SET NULL", + # name="fk_BS_sublab_id")) #: client lab id from _organizations + #sample_count = Column(INTEGER) #: Number of samples in the submission + # kittypes = relationship("KitType", back_populates="runs") #: The extraction kit used + # kittype_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete="SET NULL", + # name="fk_BS_extkit_id")) #: id of joined extraction kit + # submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", + # name="fk_BS_subtype_name")) #: name of joined submission type + technician = Column(JSON) #: name of processing tech(s) + # reagents_id = Column(String, ForeignKey("_reagent.id", ondelete="SET NULL", + # name="fk_BS_reagents_id")) #: id of used reagents + misc_info = Column(JSON) #: unstructured output for overflow info. + run_cost = Column( + FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kit costs at time of creation. + signed_by = Column(String(32)) #: user name of person who submitted the submission to the database. + comment = Column(JSON) #: user notes + # submission_category = Column( + # String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name + # cost_centre = Column( + # String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. + # contact = relationship("Contact", back_populates="submissions") #: client org + # contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL", + # name="fk_BS_contact_id")) #: client lab id from _organizations + custom = Column(JSON) + controls = relationship("Control", back_populates="submission", + uselist=True) #: A control sample added to submission + completed_date = Column(TIMESTAMP) + + kittypes = relationship("KitType", back_populates="submissions", + secondary=kittypes_submissions) #: submissions this kit was used for + + # submission_sample_associations = relationship( + # "SubmissionSampleAssociation", + # back_populates="submission", + # cascade="all, delete-orphan", + # ) #: Relation to SubmissionSampleAssociation + # + # samples = association_proxy("submission_sample_associations", + # "sample", creator=lambda sample: SubmissionSampleAssociation( + # sample=sample)) #: Association proxy to SubmissionSampleAssociation.samples + submission_reagent_associations = relationship( "SubmissionReagentAssociation", back_populates="submission", @@ -112,18 +171,18 @@ class BasicSubmission(BaseClass, LogMixin): "tips") # NOTE: Allows for subclassing into ex. BacterialCulture, Wastewater, etc. - __mapper_args__ = { - "polymorphic_identity": "Basic Submission", - "polymorphic_on": case( - - (submission_type_name == "Wastewater", "Wastewater"), - (submission_type_name == "Wastewater Artic", "Wastewater Artic"), - (submission_type_name == "Bacterial Culture", "Bacterial Culture"), - - else_="Basic Submission" - ), - "with_polymorphic": "*", - } + # __mapper_args__ = { + # "polymorphic_identity": "Basic Submission", + # "polymorphic_on": case( + # + # (submission_type_name == "Wastewater", "Wastewater"), + # (submission_type_name == "Wastewater Artic", "Wastewater Artic"), + # (submission_type_name == "Bacterial Culture", "Bacterial Culture"), + # + # else_="Basic Submission" + # ), + # "with_polymorphic": "*", + # } def __repr__(self) -> str: return f"" @@ -1437,956 +1496,957 @@ class BasicSubmission(BaseClass, LogMixin): # NOTE: Below are the custom submission types -class BacterialCulture(BasicSubmission): - """ - derivative submission type from BasicSubmission - """ - id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) - - __mapper_args__ = dict(polymorphic_identity="Bacterial Culture", - polymorphic_load="inline", - inherit_condition=(id == BasicSubmission.id)) - - def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: - """ - Extends parent class method to add controls to dict - - Returns: - dict: dictionary used in submissions summary - """ - output = super().to_dict(full_data=full_data, backup=backup, report=report) - if report: - return output - if full_data: - output['controls'] = [item.to_sub_dict() for item in self.controls] - return output - - @classmethod - def filename_template(cls): - """ - extends parent - """ - template = super().filename_template() - template += "_{{ submitting_lab.name }}_{{ submitter_plate_num }}" - return template - - @classmethod - def custom_validation(cls, pyd) -> "PydSubmission": - """ - Extends parent. Currently finds control sample and adds to reagents. - - Args: - input_dict (dict): _description_ - xl (pd.ExcelFile | None, optional): _description_. Defaults to None. - info_map (dict | None, optional): _description_. Defaults to None. - - Returns: - PydSubmission: Updated pydantic. - """ - from . import ControlType - pyd = super().custom_validation(pyd) - # NOTE: build regex for all control types that have targets - regex = ControlType.build_positive_regex(control_type="Irida Control") - # NOTE: search samples for match - for sample in pyd.samples: - matched = regex.match(sample.submitter_id) - if bool(matched): - new_lot = matched.group() - try: - pos_control_reg = \ - next(reg for reg in pyd.reagents if reg.role == "Bacterial-Positive Control") - except StopIteration: - logger.error(f"No positive control reagent listed") - return pyd - pos_control_reg.lot = new_lot - pos_control_reg.missing = False - return pyd - - @classmethod - def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: - """ - Performs class specific info parsing before info parsing is finalized. - - Args: - input_dict (dict): Generic input info - xl (Workbook | None, optional): Original xl workbook. Defaults to None. - custom_fields (dict, optional): Map of custom fields to be parsed. Defaults to {}. - - Returns: - dict: Updated info dictionary. - """ - input_dict = super().custom_info_parser(input_dict=input_dict, xl=xl, custom_fields=custom_fields) - return input_dict - - def custom_context_events(self) -> dict: - """ - Sets context events for main widget - - Returns: - dict: Context menu items for this instance. - """ - events = super().custom_context_events() - events['Import Concentration'] = self.import_concentration - return events - - @report_result - def import_concentration(self, obj) -> Report: - from frontend.widgets import select_open_file - from backend.excel.parser import ConcentrationParser - report = Report() - fname = select_open_file(obj=obj, file_extension="xlsx") - if not fname: - report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) - return report - parser = ConcentrationParser(filepath=fname, submission=self) - conc_samples = [sample for sample in parser.samples] - # logger.debug(f"Concentration samples: {pformat(conc_samples)}") - for sample in self.samples: - # logger.debug(f"Sample {sample.submitter_id}") - # logger.debug(f"Match {item['submitter_id']}") - try: - # NOTE: Fix for ENs which have no rsl_number... - sample_dict = next(item for item in conc_samples if str(item['submitter_id']).upper() == sample.submitter_id) - except StopIteration: - logger.error(f"Couldn't find sample dict for {sample.submitter_id}") - continue - logger.debug(f"Sample {sample.submitter_id} conc. = {sample_dict['concentration']}") - if sample_dict['concentration']: - sample.concentration = sample_dict['concentration'] - else: - continue - sample.save() - # logger.debug(conc_samples) - return report - - @classmethod - def parse_concentration(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]: - lookup_table = cls.get_submission_type().sample_map['lookup_table'] - logger.debug(lookup_table) - main_sheet = xl[lookup_table['sheet']] - for row in main_sheet.iter_rows(min_row=lookup_table['start_row'], max_row=lookup_table['end_row']): - idx = row[0].row - sample = dict( - submitter_id=main_sheet.cell(row=idx, column=lookup_table['sample_columns']['submitter_id']).value) - sample['concentration'] = main_sheet.cell(row=idx, - column=lookup_table['sample_columns']['concentration']).value - yield sample - - # def get_provisional_controls(self, controls_only: bool = True): - def get_provisional_controls(self, include: List[str] = []): - # NOTE To ensure Samples are done last. - include = sorted(include) - # logger.debug(include) - pos_str = "(ATCC)|(MCS)" - pos_regex = re.compile(rf"^{pos_str}") - neg_str = "(EN)" - neg_regex = re.compile(rf"^{neg_str}") - output = [] - for item in include: - match item: - case "Positive": - if self.controls: - provs = (control.sample for control in self.controls if control.is_positive_control) - else: - provs = (sample for sample in self.samples if bool(pos_regex.match(sample.submitter_id))) - case "Negative": - if self.controls: - provs = (control.sample for control in self.controls if not control.is_positive_control) - else: - provs = (sample for sample in self.samples if bool(neg_regex.match(sample.submitter_id))) - case _: - provs = (sample for sample in self.samples if not sample.control and sample not in output) - for prov in provs: - # logger.debug(f"Prov: {prov}") - prov.submission = self.rsl_plate_num - prov.submitted_date = self.submitted_date - output.append(prov) - return output - - -class Wastewater(BasicSubmission): - """ - derivative submission type from BasicSubmission - """ - id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True, autoincrement=False) - ext_technician = Column(String(64)) #: Name of technician doing extraction - pcr_technician = Column(String(64)) #: Name of technician doing pcr - pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) - - __mapper_args__ = __mapper_args__ = dict(polymorphic_identity="Wastewater", - polymorphic_load="inline", - inherit_condition=(id == BasicSubmission.id)) - - def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: - """ - Extends parent class method to add controls to dict - - Returns: - dict: dictionary used in submissions summary - """ - output = super().to_dict(full_data=full_data, backup=backup, report=report) - if report: - return output - try: - output['pcr_info'] = self.pcr_info - except TypeError as e: - pass - if self.ext_technician is None or self.ext_technician == "None": - output['ext_technician'] = self.technician - else: - output["ext_technician"] = self.ext_technician - if self.pcr_technician is None or self.pcr_technician == "None": - output["pcr_technician"] = self.technician - else: - output['pcr_technician'] = self.pcr_technician - if full_data: - output['samples'] = [sample for sample in output['samples']] - dummy_samples = [] - for item in output['samples']: - thing = deepcopy(item) - try: - thing['row'] = thing['source_row'] - thing['column'] = thing['source_column'] - except KeyError: - logger.error(f"No row or column for sample: {item['submitter_id']}") - continue - thing['tooltip'] = f"Sample Name: {thing['name']}\nWell: {thing['sample_location']}" - dummy_samples.append(thing) - # logger.debug(f"Dummy samples for 24 well: {pformat(dummy_samples)}") - output['origin_plate'] = self.__class__.make_plate_map(sample_list=dummy_samples, plate_rows=4, - plate_columns=6) - # logger.debug(f"PCR info: {output['pcr_info']}") - return output - - @classmethod - def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: - """ - Update submission dictionary with class specific information. Extends parent - - Args: - input_dict (dict): Input sample dictionary - xl (Workbook): xl (Workbook): original xl workbook, used for child classes mostly. - custom_fields: Dictionary of locations, ranges, etc to be used by this function - - Returns: - dict: Updated info dictionary - """ - input_dict = super().custom_info_parser(input_dict) - if xl is not None: - try: - input_dict['csv'] = xl["Copy to import file"] - except KeyError as e: - logger.error(e) - try: - match input_dict['rsl_plate_num']: - case dict(): - input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']] - case str(): - input_dict['csv'] = xl[input_dict['rsl_plate_num']] - case _: - pass - except Exception as e: - logger.error(f"Error handling couldn't get csv due to: {e}") - return input_dict - - @classmethod - def parse_samples(cls, input_dict: dict) -> dict: - """ - Update sample dictionary with type specific information. Extends parent - - Args: - input_dict (dict): Input sample dictionary - - Returns: - dict: Updated sample dictionary - """ - input_dict = super().parse_samples(input_dict=input_dict) - # NOTE: Had to put in this section due to ENs not having rsl_number and therefore not getting PCR results. - check = check_key_or_attr("rsl_number", input_dict) - if not check: - input_dict['rsl_number'] = input_dict['submitter_id'] - # logger.debug(pformat(input_dict, indent=4)) - return input_dict - - @classmethod - def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]: - """ - Perform parsing of pcr info. Since most of our PC outputs are the same format, this should work for most. - - Args: - xl (pd.DataFrame): pcr info form - rsl_plate_number (str): rsl plate num of interest - - Returns: - Generator[dict, None, None]: Updated samples - """ - samples = [item for item in super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num)] - # NOTE: Due to having to run through samples in for loop we need to convert to list. - # NOTE: Also, you can't change the size of a list while iterating it, so don't even think about it. - output = [] - for sample in samples: - logger.debug(sample) - # NOTE: remove '-{target}' from controls - sample['sample'] = re.sub('-N\\d*$', '', sample['sample']) - # NOTE: if sample is already in output skip - if sample['sample'] in [item['sample'] for item in output]: - logger.warning(f"Already have {sample['sample']}") - continue - # NOTE: Set ct values - # logger.debug(f"Sample ct: {sample['ct']}") - sample[f"ct_{sample['target'].lower()}"] = sample['ct'] if isinstance(sample['ct'], float) else 0.0 - # NOTE: Set assessment - # logger.debug(f"Sample assessemnt: {sample['assessment']}") - # NOTE: Get sample having other target - other_targets = [s for s in samples if re.sub('-N\\d*$', '', s['sample']) == sample['sample']] - for s in other_targets: - sample[f"ct_{s['target'].lower()}"] = s['ct'] if isinstance(s['ct'], float) else 0.0 - try: - del sample['ct'] - except KeyError: - pass - try: - del sample['assessment'] - except KeyError: - pass - # logger.debug(sample) - row = int(row_keys[sample['well'][:1]]) - column = int(sample['well'][1:]) - sample['row'] = row - sample['column'] = column - output.append(sample) - # NOTE: And then convert back to list to keep fidelity with parent method. - for sample in output: - yield sample - - @classmethod - def enforce_name(cls, instr: str, data: dict | None = {}) -> str: - """ - Custom naming method for this class. Extends parent. - - Args: - instr (str): Initial name. - data (dict | None, optional): Additional parameters for name. Defaults to None. - - Returns: - str: Updated name. - """ - try: - # NOTE: Deal with PCR file. - instr = re.sub(r"PCR(-|_)", "", instr) - except (AttributeError, TypeError) as e: - logger.error(f"Problem using regex: {e}") - outstr = super().enforce_name(instr=instr, data=data) - return outstr - - @classmethod - def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]: - """ - Makes adjustments to samples before writing to excel. Extends parent. - - Args: - samples (List[Any]): List of Samples - - Returns: - List[Any]: Updated list of samples - """ - samples = super().adjust_autofill_samples(samples) - samples = [item for item in samples if not item.submitter_id.startswith("EN")] - return samples - - @classmethod - def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: - """ - Get the details jinja template for the correct class. Extends parent - - Args: - base_dict (dict): incoming dictionary of Submission fields - - Returns: - Tuple[dict, Template]: (Updated dictionary, Template to be rendered) - """ - base_dict, template = super().get_details_template(base_dict=base_dict) - base_dict['excluded'] += ['origin_plate'] - return base_dict, template - - def custom_context_events(self) -> dict: - """ - Sets context events for main widget - - Returns: - dict: Context menu items for this instance. - """ - events = super().custom_context_events() - events['Link PCR'] = self.link_pcr - return events - - @report_result - def link_pcr(self, obj) -> Report: - """ - PYQT6 function to add PCR info to this submission - - Args: - obj (_type_): Parent widget - """ - from backend.excel import PCRParser - from backend.db import PCRControl, ControlType - from frontend.widgets import select_open_file - report = Report() - fname = select_open_file(obj=obj, file_extension="xlsx") - if not fname: - report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) - return report - parser = PCRParser(filepath=fname, submission=self) - self.set_attribute("pcr_info", parser.pcr_info) - # NOTE: These are generators here, need to expand. - pcr_samples = sorted([sample for sample in parser.samples], key=itemgetter('column')) - logger.debug(f"Samples from parser: {pformat(pcr_samples)}") - # NOTE: Samples from parser check out. - pcr_controls = [control for control in parser.controls] - self.save(original=False) - for assoc in self.submission_sample_associations: - logger.debug(f"Checking pcr_samples for {assoc.sample.rsl_number}, {assoc.sample.ww_full_sample_id} at " - f"column {assoc.column} and row {assoc.row}") - # NOTE: Associations of interest do exist in the submission, are not being found below - # Okay, I've found the problem, at last, the problem is that only one RSL number is saved for each sample, - # Even though each instance of say "25-YUL13-PU3-0320" has multiple RSL numbers in the excel sheet. - # so, yeah, the submitters need to make sure that there are unique values for each one. - try: - sample_dict = next(item for item in pcr_samples if item['sample'] == assoc.sample.rsl_number - and item['row'] == assoc.row and item['column'] == assoc.column) - logger.debug( - f"Found sample {sample_dict} at index {pcr_samples.index(sample_dict)}: {pcr_samples[pcr_samples.index(sample_dict)]}") - except StopIteration: - logger.error(f"Couldn't find {assoc} in the Parser samples") - continue - logger.debug(f"Length of pcr_samples: {len(pcr_samples)}") - assoc = self.update_subsampassoc(assoc=assoc, input_dict=sample_dict) - result = assoc.save() - if result: - report.add_result(result) - controltype = ControlType.query(name="PCR Control") - submitted_date = datetime.strptime(" ".join(parser.pcr_info['run_start_date/time'].split(" ")[:-1]), - "%Y-%m-%d %I:%M:%S %p") - for control in pcr_controls: - # logger.debug(f"Control coming into save: {control}") - new_control = PCRControl(**control) - new_control.submitted_date = submitted_date - new_control.controltype = controltype - new_control.submission = self - # logger.debug(f"Control coming into save: {new_control.__dict__}") - new_control.save() - return report - - -class WastewaterArtic(BasicSubmission): - """ - derivative submission type for artic wastewater - """ - id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) - artic_technician = Column(String(64)) #: Name of technician performing artic - dna_core_submission_number = Column(String(64)) #: Number used by core as id - pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) - gel_image = Column(String(64)) #: file name of gel image in zip file - gel_info = Column(JSON) #: unstructured data from gel. - gel_controls = Column(JSON) #: locations of controls on the gel - source_plates = Column(JSON) #: wastewater plates that samples come from - artic_date = Column(TIMESTAMP) #: Date Artic Performed - ngs_date = Column(TIMESTAMP) #: Date submission received - gel_date = Column(TIMESTAMP) #: Date submission received - gel_barcode = Column(String(16)) #: Identifier for the used gel. - - __mapper_args__ = dict(polymorphic_identity="Wastewater Artic", - polymorphic_load="inline", - inherit_condition=(id == BasicSubmission.id)) - - def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: - """ - Extends parent class method to add controls to dict - - Returns: - dict: dictionary used in submissions summary - """ - output = super().to_dict(full_data=full_data, backup=backup, report=report) - if report: - return output - if self.artic_technician in [None, "None"]: - output['artic_technician'] = self.technician - else: - output['artic_technician'] = self.artic_technician - output['gel_info'] = self.gel_info - output['gel_image'] = self.gel_image - output['dna_core_submission_number'] = self.dna_core_submission_number - output['source_plates'] = self.source_plates - output['artic_date'] = self.artic_date or self.submitted_date - output['ngs_date'] = self.ngs_date or self.submitted_date - output['gel_date'] = self.gel_date or self.submitted_date - output['gel_barcode'] = self.gel_barcode - return output - - @classmethod - def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: - """ - Update submission dictionary with class specific information - - Args: - input_dict (dict): Input sample dictionary - xl (pd.ExcelFile): original xl workbook, used for child classes mostly - custom_fields: Dictionary of locations, ranges, etc to be used by this function - - Returns: - dict: Updated sample dictionary - """ - from backend.validators import RSLNamer - from openpyxl_image_loader.sheet_image_loader import SheetImageLoader - - def scrape_image(wb: Workbook, info_dict: dict) -> Image or None: - """ - Pulls image from excel workbook - - Args: - wb (Workbook): Workbook of interest. - info_dict (dict): Location map. - - Returns: - Image or None: Image of interest. - """ - ws = wb[info_dict['sheet']] - img_loader = SheetImageLoader(ws) - for ii in range(info_dict['start_row'], info_dict['end_row'] + 1): - for jj in range(info_dict['start_column'], info_dict['end_column'] + 1): - cell_str = f"{row_map[jj]}{ii}" - if img_loader.image_in(cell_str): - try: - return img_loader.get(cell_str) - except ValueError as e: - logger.error(f"Could not open image from cell: {cell_str} due to {e}") - return None - return None - - input_dict = super().custom_info_parser(input_dict) - input_dict['submission_type'] = dict(value="Wastewater Artic", missing=False) - egel_section = custom_fields['egel_controls'] - ws = xl[egel_section['sheet']] - # NOTE: Here we should be scraping the control results. - data = [ws.cell(row=ii, column=jj) for jj in range(egel_section['start_column'], egel_section['end_column'] + 1) - for - ii in range(egel_section['start_row'], egel_section['end_row'] + 1)] - data = [cell for cell in data if cell.value is not None and "NTC" in cell.value] - input_dict['gel_controls'] = [ - dict(sample_id=cell.value, location=f"{row_map[cell.row - 9]}{str(cell.column - 14).zfill(2)}") for cell in - data] - # NOTE: Get source plate information - source_plates_section = custom_fields['source_plates'] - ws = xl[source_plates_section['sheet']] - data = [dict(plate=ws.cell(row=ii, column=source_plates_section['plate_column']).value, - starting_sample=ws.cell(row=ii, column=source_plates_section['starting_sample_column']).value) for - ii in - range(source_plates_section['start_row'], source_plates_section['end_row'] + 1)] - for datum in data: - if datum['plate'] in ["None", None, ""]: - continue - else: - datum['plate'] = RSLNamer(filename=datum['plate'], submission_type="Wastewater").parsed_name - if xl is not None: - try: - input_dict['csv'] = xl["hitpicks_csv_to_export"] - except KeyError as e: - logger.error(e) - try: - match input_dict['rsl_plate_num']: - case dict(): - input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']] - case str(): - input_dict['csv'] = xl[input_dict['rsl_plate_num']] - case _: - pass - except Exception as e: - logger.error(f"Error handling couldn't get csv due to: {e}") - input_dict['source_plates'] = data - egel_info_section = custom_fields['egel_info'] - ws = xl[egel_info_section['sheet']] - data = [] - for ii in range(egel_info_section['start_row'], egel_info_section['end_row'] + 1): - datum = dict( - name=ws.cell(row=ii, column=egel_info_section['start_column'] - 3).value, - values=[] - ) - for jj in range(egel_info_section['start_column'], egel_info_section['end_column'] + 1): - d = dict( - name=ws.cell(row=egel_info_section['start_row'] - 1, column=jj).value, - value=ws.cell(row=ii, column=jj).value - ) - if d['value'] is not None: - datum['values'].append(d) - data.append(datum) - input_dict['gel_info'] = data - egel_image_section = custom_fields['image_range'] - img: Image = scrape_image(wb=xl, info_dict=egel_image_section) - if img is not None: - tmp = Path(TemporaryFile().name).with_suffix(".jpg") - img.save(tmp.__str__()) - with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip"), 'a') as zipf: - # NOTE: Add a file located at the source_path to the destination within the zip - # file. It will overwrite existing files if the names collide, but it - # will give a warning - zipf.write(tmp.__str__(), f"{input_dict['rsl_plate_num']['value']}.jpg") - input_dict['gel_image'] = f"{input_dict['rsl_plate_num']['value']}.jpg" - return input_dict - - @classmethod - def enforce_name(cls, instr: str, data: dict = {}) -> str: - """ - Custom naming method for this class. Extends parent. - - Args: - instr (str): Initial name. - data (dict | None, optional): Additional parameters for name. Defaults to None. - - Returns: - str: Updated name. - """ - logger.debug(f"Incoming String: {instr}") - try: - # NOTE: Deal with PCR file. - instr = re.sub(r"Artic", "", instr, flags=re.IGNORECASE) - except (AttributeError, TypeError) as e: - logger.error(f"Problem using regex: {e}") - try: - instr = instr.replace("-", "") - except AttributeError: - instr = date.today().strftime("%Y%m%d") - instr = re.sub(r"^(\d{6})", f"RSL-AR-\\1", instr) - outstr = super().enforce_name(instr=instr, data=data) - outstr = outstr.replace("RSLAR", "RSL-AR") - return outstr - - @classmethod - def parse_samples(cls, input_dict: dict) -> dict: - """ - Update sample dictionary with type specific information. Extends parent. - - Args: - input_dict (dict): Input sample dictionary - - Returns: - dict: Updated sample dictionary - """ - input_dict = super().parse_samples(input_dict) - input_dict['sample_type'] = "Wastewater Sample" - # NOTE: Stop gap solution because WW is sloppy with their naming schemes - try: - input_dict['source_plate'] = input_dict['source_plate'].replace("WW20", "WW-20") - except KeyError: - pass - try: - input_dict['source_plate_number'] = int(input_dict['source_plate_number']) - except (ValueError, KeyError): - input_dict['source_plate_number'] = 0 - # NOTE: Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" at the end, this has to be done here. No moving to sqlalchemy object :( - input_dict['submitter_id'] = re.sub(r"\s\(.+\)\s?$", "", str(input_dict['submitter_id'])).strip() - try: - input_dict['ww_processing_num'] = input_dict['sample_name_(lims)'] - del input_dict['sample_name_(lims)'] - except KeyError: - logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") - try: - input_dict['ww_full_sample_id'] = input_dict['sample_name_(ww)'] - del input_dict['sample_name_(ww)'] - except KeyError: - logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") - year = str(date.today().year)[-2:] - # NOTE: Check for extraction negative control (Enterics) - if re.search(rf"^{year}-(ENC)", input_dict['submitter_id']): - input_dict['rsl_number'] = cls.en_adapter(input_str=input_dict['submitter_id']) - # NOTE: Check for extraction negative control (Robotics) - if re.search(rf"^{year}-(RSL)", input_dict['submitter_id']): - logger.debug(f"Found {year}-(RSL), so we are going to run PBS adapter:") - input_dict['rsl_number'] = cls.pbs_adapter(input_str=input_dict['submitter_id']) - return input_dict - - @classmethod - def en_adapter(cls, input_str: str) -> str: - """ - Stopgap solution because WW names their ENs different - - Args: - input_str (str): input name - - Returns: - str: output name - """ - processed = input_str.replace("RSL", "") - # NOTE: Remove anything in brackets at the end of string? - processed = re.sub(r"\(.*\)$", "", processed).strip() - # NOTE: Remove letters that are not R. - processed = re.sub(r"[A-QS-Z]+\d*", "", processed) - # NOTE: Remove trailing '-' if any - processed = processed.strip("-") - try: - # NOTE: get digit at the end of the string. - en_num = re.search(r"\-\d{1}$", processed).group() - processed = rreplace(processed, en_num, "") - except AttributeError: - en_num = "1" - en_num = en_num.strip("-") - try: - # NOTE: Get last digit and maybe 'R' with another digit. - plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() - processed = rreplace(processed, plate_num, "") - except AttributeError: - plate_num = "1" - # NOTE: plate_num not currently used, but will keep incase it is in the future - plate_num = plate_num.strip("-") - day = re.search(r"\d{2}$", processed).group() - processed = rreplace(processed, day, "") - month = re.search(r"\d{2}$", processed).group() - processed = rreplace(processed, month, "") - processed = processed.replace("--", "") - year = re.search(r'^(?:\d{2})?\d{2}', processed).group() - year = f"20{year}" - final_en_name = f"EN{en_num}-{year}{month}{day}" - return final_en_name - - @classmethod - def pbs_adapter(cls, input_str): - """ - Stopgap solution because WW names their controls different - - Args: - input_str (str): input name - - Returns: - str: output name - """ - logger.debug(f"PBS adapter on {input_str}") - # NOTE: Remove letters. - processed = input_str.replace("RSL", "") - # logger.debug(processed) - # NOTE: Remove brackets at end - processed = re.sub(r"\(.*\)$", "", processed).strip() - # logger.debug(processed) - processed = re.sub(r"-RPT", "", processed, flags=re.IGNORECASE) - # NOTE: Remove any non-R letters at end. - processed = re.sub(r"[A-QS-Z]+\d*", "", processed) - # logger.debug(processed) - # NOTE: Remove trailing '-' if any - processed = processed.strip("-") - # logger.debug(processed) - try: - plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() - processed = rreplace(processed, plate_num, "") - except AttributeError: - plate_num = "1" - plate_num = plate_num.strip("-") - try: - repeat_num = re.search(r"R(?P\d)?$", processed).groups()[0] - except: - repeat_num = None - if repeat_num is None and "R" in plate_num: - repeat_num = "1" - try: - plate_num = re.sub(r"R", rf"R{repeat_num}", plate_num) - except AttributeError: - logger.error(f"Problem re-evaluating plate number for {processed}") - # logger.debug(processed) - # NOTE: Remove any redundant -digits - processed = re.sub(r"-\d$", "", processed) - # logger.debug(processed) - day = re.search(r"\d{2}$", processed).group() - processed = rreplace(processed, day, "") - # logger.debug(processed) - month = re.search(r"\d{2}$", processed).group() - processed = rreplace(processed, month, "") - processed = processed.replace("--", "") - # logger.debug(processed) - year = re.search(r'^(?:\d{2})?\d{2}', processed).group() - year = f"20{year}" - # logger.debug(processed) - final_en_name = f"PBS{year}{month}{day}-{plate_num}" - return final_en_name - - @classmethod - def custom_validation(cls, pyd) -> dict: - """ - Performs any final custom parsing of the excel file. Extends parent - - Args: - input_dict (dict): Parser product up to this point. - xl (pd.ExcelFile | None, optional): Excel submission form. Defaults to None. - info_map (dict | None, optional): Map of information locations from SubmissionType. Defaults to None. - plate_map (dict | None, optional): Constructed plate map of samples. Defaults to None. - - Returns: - dict: Updated parser product. - """ - input_dict = super().custom_validation(pyd) - exclude_plates = [None, "", "none", "na"] - pyd.source_plates = [plate for plate in pyd.source_plates if plate['plate'].lower() not in exclude_plates] - for sample in pyd.samples: - if re.search(r"^NTC", sample.submitter_id): - if isinstance(pyd.rsl_plate_num, dict): - placeholder = pyd.rsl_plate_num['value'] - else: - placeholder = pyd.rsl_plate_num - sample.submitter_id = f"{sample.submitter_id}-WWG-{placeholder}" - return input_dict - - @classmethod - def custom_info_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False, - custom_fields: dict = {}) -> Workbook: - """ - Adds custom autofill methods for submission. Extends Parent - - Args: - input_excel (Workbook): initial workbook. - info (dict | None, optional): dictionary of additional info. Defaults to None. - backup (bool, optional): Whether this is part of a backup operation. Defaults to False. - custom_fields: Dictionary of locations, ranges, etc to be used by this function - - Returns: - Workbook: Updated workbook - """ - input_excel = super().custom_info_writer(input_excel, info, backup) - if isinstance(info, types.GeneratorType): - info = {k: v for k, v in info} - # NOTE: check for source plate information - if check_key_or_attr(key='source_plates', interest=info, check_none=True): - source_plates_section = custom_fields['source_plates'] - worksheet = input_excel[source_plates_section['sheet']] - start_row = source_plates_section['start_row'] - # NOTE: write source plates to First strand list - for iii, plate in enumerate(info['source_plates']['value']): - row = start_row + iii - try: - worksheet.cell(row=row, column=source_plates_section['plate_column'], value=plate['plate']) - except TypeError: - pass - try: - worksheet.cell(row=row, column=source_plates_section['starting_sample_column'], - value=plate['starting_sample']) - except TypeError: - pass - else: - logger.warning(f"No source plate info found.") - # NOTE: check for gel information - if check_key_or_attr(key='gel_info', interest=info, check_none=True): - egel_section = custom_fields['egel_info'] - # NOTE: print json field gel results to Egel results - worksheet = input_excel[egel_section['sheet']] - start_row = egel_section['start_row'] - 1 - start_column = egel_section['start_column'] - 3 - for row, ki in enumerate(info['gel_info']['value'], start=1): - row = start_row + row - worksheet.cell(row=row, column=start_column, value=ki['name']) - for jjj, kj in enumerate(ki['values'], start=1): - column = start_column + 2 + jjj - worksheet.cell(row=start_row, column=column, value=kj['name']) - try: - worksheet.cell(row=row, column=column, value=kj['value']) - except AttributeError: - logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}") - else: - logger.warning("No gel info found.") - if check_key_or_attr(key='gel_image', interest=info, check_none=True): - worksheet = input_excel[egel_section['sheet']] - with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: - z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name)) - img = OpenpyxlImage(z) - img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) - img.width = 600 - img.anchor = egel_section['img_anchor'] - worksheet.add_image(img) - else: - logger.warning("No gel image found.") - return input_excel - - @classmethod - def custom_sample_writer(self, sample: dict) -> dict: - if sample['source_plate_number'] in [0, "0"]: - sample['source_plate_number'] = "control" - return sample - - @classmethod - def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: - """ - Get the details jinja template for the correct class. Extends parent - - Args: - base_dict (dict): incoming dictionary of Submission fields - - Returns: - Tuple[dict, Template]: (Updated dictionary, Template to be rendered) - """ - base_dict, template = super().get_details_template(base_dict=base_dict) - base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates", - "gel_controls, gel_image_path"] - base_dict['DNA Core ID'] = base_dict['dna_core_submission_number'] - if check_key_or_attr(key='gel_info', interest=base_dict, check_none=True): - headers = [item['name'] for item in base_dict['gel_info'][0]['values']] - base_dict['headers'] = [''] * (4 - len(headers)) - base_dict['headers'] += headers - if check_key_or_attr(key='gel_image', interest=base_dict, check_none=True): - with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: - base_dict['gel_image_actual'] = base64.b64encode(zipped.read(base_dict['gel_image'])).decode('utf-8') - return base_dict, template - - def custom_context_events(self) -> dict: - """ - Creates dictionary of str:function to be passed to context menu. Extends parent - - Returns: - dict: dictionary of functions - """ - events = super().custom_context_events() - events['Gel Box'] = self.gel_box - return events - - def set_attribute(self, key: str, value): - """ - Performs custom attribute setting based on values. Extends parent - - Args: - key (str): name of attribute - value (_type_): value of attribute - """ - super().set_attribute(key=key, value=value) - if key == 'gel_info': - if len(self.gel_info) > 3: - self.gel_info = self.gel_info[-3:] - - def gel_box(self, obj): - """ - Creates PYQT6 widget to perform gel viewing operations - - Args: - obj (_type_): parent widget - """ - from frontend.widgets.gel_checker import GelBox - from frontend.widgets import select_open_file - report = Report() - fname = select_open_file(obj=obj, file_extension="jpg") - if not fname: - report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) - return report - dlg = GelBox(parent=obj, img_path=fname, submission=self) - if dlg.exec(): - self.dna_core_submission_number, self.gel_barcode, img_path, output, comment = dlg.parse_form() - self.gel_image = img_path.name - self.gel_info = output - dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") - com = dict(text=comment, name=getuser(), time=dt) - if com['text'] is not None and com['text'] != "": - if self.comment is not None: - self.comment.append(com) - else: - self.comment = [com] - with ZipFile(self.__directory_path__.joinpath("submission_imgs.zip"), 'a') as zipf: - # NOTE: Add a file located at the source_path to the destination within the zip - # file. It will overwrite existing files if the names collide, but it - # will give a warning - zipf.write(img_path, self.gel_image) - self.save() - +# class BacterialCulture(BasicSubmission): +# """ +# derivative submission type from BasicSubmission +# """ +# id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) +# +# __mapper_args__ = dict(polymorphic_identity="Bacterial Culture", +# polymorphic_load="inline", +# inherit_condition=(id == BasicSubmission.id)) +# +# def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: +# """ +# Extends parent class method to add controls to dict +# +# Returns: +# dict: dictionary used in submissions summary +# """ +# output = super().to_dict(full_data=full_data, backup=backup, report=report) +# if report: +# return output +# if full_data: +# output['controls'] = [item.to_sub_dict() for item in self.controls] +# return output +# +# @classmethod +# def filename_template(cls): +# """ +# extends parent +# """ +# template = super().filename_template() +# template += "_{{ submitting_lab.name }}_{{ submitter_plate_num }}" +# return template +# +# @classmethod +# def custom_validation(cls, pyd) -> "PydSubmission": +# """ +# Extends parent. Currently finds control sample and adds to reagents. +# +# Args: +# input_dict (dict): _description_ +# xl (pd.ExcelFile | None, optional): _description_. Defaults to None. +# info_map (dict | None, optional): _description_. Defaults to None. +# +# Returns: +# PydSubmission: Updated pydantic. +# """ +# from . import ControlType +# pyd = super().custom_validation(pyd) +# # NOTE: build regex for all control types that have targets +# regex = ControlType.build_positive_regex(control_type="Irida Control") +# # NOTE: search samples for match +# for sample in pyd.samples: +# matched = regex.match(sample.submitter_id) +# if bool(matched): +# new_lot = matched.group() +# try: +# pos_control_reg = \ +# next(reg for reg in pyd.reagents if reg.role == "Bacterial-Positive Control") +# except StopIteration: +# logger.error(f"No positive control reagent listed") +# return pyd +# pos_control_reg.lot = new_lot +# pos_control_reg.missing = False +# return pyd +# +# @classmethod +# def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: +# """ +# Performs class specific info parsing before info parsing is finalized. +# +# Args: +# input_dict (dict): Generic input info +# xl (Workbook | None, optional): Original xl workbook. Defaults to None. +# custom_fields (dict, optional): Map of custom fields to be parsed. Defaults to {}. +# +# Returns: +# dict: Updated info dictionary. +# """ +# input_dict = super().custom_info_parser(input_dict=input_dict, xl=xl, custom_fields=custom_fields) +# return input_dict +# +# def custom_context_events(self) -> dict: +# """ +# Sets context events for main widget +# +# Returns: +# dict: Context menu items for this instance. +# """ +# events = super().custom_context_events() +# events['Import Concentration'] = self.import_concentration +# return events +# +# @report_result +# def import_concentration(self, obj) -> Report: +# from frontend.widgets import select_open_file +# from backend.excel.parser import ConcentrationParser +# report = Report() +# fname = select_open_file(obj=obj, file_extension="xlsx") +# if not fname: +# report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) +# return report +# parser = ConcentrationParser(filepath=fname, submission=self) +# conc_samples = [sample for sample in parser.samples] +# # logger.debug(f"Concentration samples: {pformat(conc_samples)}") +# for sample in self.samples: +# # logger.debug(f"Sample {sample.submitter_id}") +# # logger.debug(f"Match {item['submitter_id']}") +# try: +# # NOTE: Fix for ENs which have no rsl_number... +# sample_dict = next( +# item for item in conc_samples if str(item['submitter_id']).upper() == sample.submitter_id) +# except StopIteration: +# logger.error(f"Couldn't find sample dict for {sample.submitter_id}") +# continue +# logger.debug(f"Sample {sample.submitter_id} conc. = {sample_dict['concentration']}") +# if sample_dict['concentration']: +# sample.concentration = sample_dict['concentration'] +# else: +# continue +# sample.save() +# # logger.debug(conc_samples) +# return report +# +# @classmethod +# def parse_concentration(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]: +# lookup_table = cls.get_submission_type().sample_map['lookup_table'] +# logger.debug(lookup_table) +# main_sheet = xl[lookup_table['sheet']] +# for row in main_sheet.iter_rows(min_row=lookup_table['start_row'], max_row=lookup_table['end_row']): +# idx = row[0].row +# sample = dict( +# submitter_id=main_sheet.cell(row=idx, column=lookup_table['sample_columns']['submitter_id']).value) +# sample['concentration'] = main_sheet.cell(row=idx, +# column=lookup_table['sample_columns']['concentration']).value +# yield sample +# +# # def get_provisional_controls(self, controls_only: bool = True): +# def get_provisional_controls(self, include: List[str] = []): +# # NOTE To ensure Samples are done last. +# include = sorted(include) +# # logger.debug(include) +# pos_str = "(ATCC)|(MCS)" +# pos_regex = re.compile(rf"^{pos_str}") +# neg_str = "(EN)" +# neg_regex = re.compile(rf"^{neg_str}") +# output = [] +# for item in include: +# match item: +# case "Positive": +# if self.controls: +# provs = (control.sample for control in self.controls if control.is_positive_control) +# else: +# provs = (sample for sample in self.samples if bool(pos_regex.match(sample.submitter_id))) +# case "Negative": +# if self.controls: +# provs = (control.sample for control in self.controls if not control.is_positive_control) +# else: +# provs = (sample for sample in self.samples if bool(neg_regex.match(sample.submitter_id))) +# case _: +# provs = (sample for sample in self.samples if not sample.control and sample not in output) +# for prov in provs: +# # logger.debug(f"Prov: {prov}") +# prov.submission = self.rsl_plate_num +# prov.submitted_date = self.submitted_date +# output.append(prov) +# return output +# +# +# class Wastewater(BasicSubmission): +# """ +# derivative submission type from BasicSubmission +# """ +# id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True, autoincrement=False) +# ext_technician = Column(String(64)) #: Name of technician doing extraction +# pcr_technician = Column(String(64)) #: Name of technician doing pcr +# pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) +# +# __mapper_args__ = __mapper_args__ = dict(polymorphic_identity="Wastewater", +# polymorphic_load="inline", +# inherit_condition=(id == BasicSubmission.id)) +# +# def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: +# """ +# Extends parent class method to add controls to dict +# +# Returns: +# dict: dictionary used in submissions summary +# """ +# output = super().to_dict(full_data=full_data, backup=backup, report=report) +# if report: +# return output +# try: +# output['pcr_info'] = self.pcr_info +# except TypeError as e: +# pass +# if self.ext_technician is None or self.ext_technician == "None": +# output['ext_technician'] = self.technician +# else: +# output["ext_technician"] = self.ext_technician +# if self.pcr_technician is None or self.pcr_technician == "None": +# output["pcr_technician"] = self.technician +# else: +# output['pcr_technician'] = self.pcr_technician +# if full_data: +# output['samples'] = [sample for sample in output['samples']] +# dummy_samples = [] +# for item in output['samples']: +# thing = deepcopy(item) +# try: +# thing['row'] = thing['source_row'] +# thing['column'] = thing['source_column'] +# except KeyError: +# logger.error(f"No row or column for sample: {item['submitter_id']}") +# continue +# thing['tooltip'] = f"Sample Name: {thing['name']}\nWell: {thing['sample_location']}" +# dummy_samples.append(thing) +# # logger.debug(f"Dummy samples for 24 well: {pformat(dummy_samples)}") +# output['origin_plate'] = self.__class__.make_plate_map(sample_list=dummy_samples, plate_rows=4, +# plate_columns=6) +# # logger.debug(f"PCR info: {output['pcr_info']}") +# return output +# +# @classmethod +# def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: +# """ +# Update submission dictionary with class specific information. Extends parent +# +# Args: +# input_dict (dict): Input sample dictionary +# xl (Workbook): xl (Workbook): original xl workbook, used for child classes mostly. +# custom_fields: Dictionary of locations, ranges, etc to be used by this function +# +# Returns: +# dict: Updated info dictionary +# """ +# input_dict = super().custom_info_parser(input_dict) +# if xl is not None: +# try: +# input_dict['csv'] = xl["Copy to import file"] +# except KeyError as e: +# logger.error(e) +# try: +# match input_dict['rsl_plate_num']: +# case dict(): +# input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']] +# case str(): +# input_dict['csv'] = xl[input_dict['rsl_plate_num']] +# case _: +# pass +# except Exception as e: +# logger.error(f"Error handling couldn't get csv due to: {e}") +# return input_dict +# +# @classmethod +# def parse_samples(cls, input_dict: dict) -> dict: +# """ +# Update sample dictionary with type specific information. Extends parent +# +# Args: +# input_dict (dict): Input sample dictionary +# +# Returns: +# dict: Updated sample dictionary +# """ +# input_dict = super().parse_samples(input_dict=input_dict) +# # NOTE: Had to put in this section due to ENs not having rsl_number and therefore not getting PCR results. +# check = check_key_or_attr("rsl_number", input_dict) +# if not check: +# input_dict['rsl_number'] = input_dict['submitter_id'] +# # logger.debug(pformat(input_dict, indent=4)) +# return input_dict +# +# @classmethod +# def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]: +# """ +# Perform parsing of pcr info. Since most of our PC outputs are the same format, this should work for most. +# +# Args: +# xl (pd.DataFrame): pcr info form +# rsl_plate_number (str): rsl plate num of interest +# +# Returns: +# Generator[dict, None, None]: Updated samples +# """ +# samples = [item for item in super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num)] +# # NOTE: Due to having to run through samples in for loop we need to convert to list. +# # NOTE: Also, you can't change the size of a list while iterating it, so don't even think about it. +# output = [] +# for sample in samples: +# logger.debug(sample) +# # NOTE: remove '-{target}' from controls +# sample['sample'] = re.sub('-N\\d*$', '', sample['sample']) +# # NOTE: if sample is already in output skip +# if sample['sample'] in [item['sample'] for item in output]: +# logger.warning(f"Already have {sample['sample']}") +# continue +# # NOTE: Set ct values +# # logger.debug(f"Sample ct: {sample['ct']}") +# sample[f"ct_{sample['target'].lower()}"] = sample['ct'] if isinstance(sample['ct'], float) else 0.0 +# # NOTE: Set assessment +# # logger.debug(f"Sample assessemnt: {sample['assessment']}") +# # NOTE: Get sample having other target +# other_targets = [s for s in samples if re.sub('-N\\d*$', '', s['sample']) == sample['sample']] +# for s in other_targets: +# sample[f"ct_{s['target'].lower()}"] = s['ct'] if isinstance(s['ct'], float) else 0.0 +# try: +# del sample['ct'] +# except KeyError: +# pass +# try: +# del sample['assessment'] +# except KeyError: +# pass +# # logger.debug(sample) +# row = int(row_keys[sample['well'][:1]]) +# column = int(sample['well'][1:]) +# sample['row'] = row +# sample['column'] = column +# output.append(sample) +# # NOTE: And then convert back to list to keep fidelity with parent method. +# for sample in output: +# yield sample +# +# @classmethod +# def enforce_name(cls, instr: str, data: dict | None = {}) -> str: +# """ +# Custom naming method for this class. Extends parent. +# +# Args: +# instr (str): Initial name. +# data (dict | None, optional): Additional parameters for name. Defaults to None. +# +# Returns: +# str: Updated name. +# """ +# try: +# # NOTE: Deal with PCR file. +# instr = re.sub(r"PCR(-|_)", "", instr) +# except (AttributeError, TypeError) as e: +# logger.error(f"Problem using regex: {e}") +# outstr = super().enforce_name(instr=instr, data=data) +# return outstr +# +# @classmethod +# def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]: +# """ +# Makes adjustments to samples before writing to excel. Extends parent. +# +# Args: +# samples (List[Any]): List of Samples +# +# Returns: +# List[Any]: Updated list of samples +# """ +# samples = super().adjust_autofill_samples(samples) +# samples = [item for item in samples if not item.submitter_id.startswith("EN")] +# return samples +# +# @classmethod +# def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: +# """ +# Get the details jinja template for the correct class. Extends parent +# +# Args: +# base_dict (dict): incoming dictionary of Submission fields +# +# Returns: +# Tuple[dict, Template]: (Updated dictionary, Template to be rendered) +# """ +# base_dict, template = super().get_details_template(base_dict=base_dict) +# base_dict['excluded'] += ['origin_plate'] +# return base_dict, template +# +# def custom_context_events(self) -> dict: +# """ +# Sets context events for main widget +# +# Returns: +# dict: Context menu items for this instance. +# """ +# events = super().custom_context_events() +# events['Link PCR'] = self.link_pcr +# return events +# +# @report_result +# def link_pcr(self, obj) -> Report: +# """ +# PYQT6 function to add PCR info to this submission +# +# Args: +# obj (_type_): Parent widget +# """ +# from backend.excel import PCRParser +# from backend.db import PCRControl, ControlType +# from frontend.widgets import select_open_file +# report = Report() +# fname = select_open_file(obj=obj, file_extension="xlsx") +# if not fname: +# report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) +# return report +# parser = PCRParser(filepath=fname, submission=self) +# self.set_attribute("pcr_info", parser.pcr_info) +# # NOTE: These are generators here, need to expand. +# pcr_samples = sorted([sample for sample in parser.samples], key=itemgetter('column')) +# logger.debug(f"Samples from parser: {pformat(pcr_samples)}") +# # NOTE: Samples from parser check out. +# pcr_controls = [control for control in parser.controls] +# self.save(original=False) +# for assoc in self.submission_sample_associations: +# logger.debug(f"Checking pcr_samples for {assoc.sample.rsl_number}, {assoc.sample.ww_full_sample_id} at " +# f"column {assoc.column} and row {assoc.row}") +# # NOTE: Associations of interest do exist in the submission, are not being found below +# # Okay, I've found the problem, at last, the problem is that only one RSL number is saved for each sample, +# # Even though each instance of say "25-YUL13-PU3-0320" has multiple RSL numbers in the excel sheet. +# # so, yeah, the submitters need to make sure that there are unique values for each one. +# try: +# sample_dict = next(item for item in pcr_samples if item['sample'] == assoc.sample.rsl_number +# and item['row'] == assoc.row and item['column'] == assoc.column) +# logger.debug( +# f"Found sample {sample_dict} at index {pcr_samples.index(sample_dict)}: {pcr_samples[pcr_samples.index(sample_dict)]}") +# except StopIteration: +# logger.error(f"Couldn't find {assoc} in the Parser samples") +# continue +# logger.debug(f"Length of pcr_samples: {len(pcr_samples)}") +# assoc = self.update_subsampassoc(assoc=assoc, input_dict=sample_dict) +# result = assoc.save() +# if result: +# report.add_result(result) +# controltype = ControlType.query(name="PCR Control") +# submitted_date = datetime.strptime(" ".join(parser.pcr_info['run_start_date/time'].split(" ")[:-1]), +# "%Y-%m-%d %I:%M:%S %p") +# for control in pcr_controls: +# # logger.debug(f"Control coming into save: {control}") +# new_control = PCRControl(**control) +# new_control.submitted_date = submitted_date +# new_control.controltype = controltype +# new_control.submission = self +# # logger.debug(f"Control coming into save: {new_control.__dict__}") +# new_control.save() +# return report +# +# +# class WastewaterArtic(BasicSubmission): +# """ +# derivative submission type for artic wastewater +# """ +# id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) +# artic_technician = Column(String(64)) #: Name of technician performing artic +# dna_core_submission_number = Column(String(64)) #: Number used by core as id +# pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) +# gel_image = Column(String(64)) #: file name of gel image in zip file +# gel_info = Column(JSON) #: unstructured data from gel. +# gel_controls = Column(JSON) #: locations of controls on the gel +# source_plates = Column(JSON) #: wastewater plates that samples come from +# artic_date = Column(TIMESTAMP) #: Date Artic Performed +# ngs_date = Column(TIMESTAMP) #: Date submission received +# gel_date = Column(TIMESTAMP) #: Date submission received +# gel_barcode = Column(String(16)) #: Identifier for the used gel. +# +# __mapper_args__ = dict(polymorphic_identity="Wastewater Artic", +# polymorphic_load="inline", +# inherit_condition=(id == BasicSubmission.id)) +# +# def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: +# """ +# Extends parent class method to add controls to dict +# +# Returns: +# dict: dictionary used in submissions summary +# """ +# output = super().to_dict(full_data=full_data, backup=backup, report=report) +# if report: +# return output +# if self.artic_technician in [None, "None"]: +# output['artic_technician'] = self.technician +# else: +# output['artic_technician'] = self.artic_technician +# output['gel_info'] = self.gel_info +# output['gel_image'] = self.gel_image +# output['dna_core_submission_number'] = self.dna_core_submission_number +# output['source_plates'] = self.source_plates +# output['artic_date'] = self.artic_date or self.submitted_date +# output['ngs_date'] = self.ngs_date or self.submitted_date +# output['gel_date'] = self.gel_date or self.submitted_date +# output['gel_barcode'] = self.gel_barcode +# return output +# +# @classmethod +# def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: +# """ +# Update submission dictionary with class specific information +# +# Args: +# input_dict (dict): Input sample dictionary +# xl (pd.ExcelFile): original xl workbook, used for child classes mostly +# custom_fields: Dictionary of locations, ranges, etc to be used by this function +# +# Returns: +# dict: Updated sample dictionary +# """ +# from backend.validators import RSLNamer +# from openpyxl_image_loader.sheet_image_loader import SheetImageLoader +# +# def scrape_image(wb: Workbook, info_dict: dict) -> Image or None: +# """ +# Pulls image from excel workbook +# +# Args: +# wb (Workbook): Workbook of interest. +# info_dict (dict): Location map. +# +# Returns: +# Image or None: Image of interest. +# """ +# ws = wb[info_dict['sheet']] +# img_loader = SheetImageLoader(ws) +# for ii in range(info_dict['start_row'], info_dict['end_row'] + 1): +# for jj in range(info_dict['start_column'], info_dict['end_column'] + 1): +# cell_str = f"{row_map[jj]}{ii}" +# if img_loader.image_in(cell_str): +# try: +# return img_loader.get(cell_str) +# except ValueError as e: +# logger.error(f"Could not open image from cell: {cell_str} due to {e}") +# return None +# return None +# +# input_dict = super().custom_info_parser(input_dict) +# input_dict['submission_type'] = dict(value="Wastewater Artic", missing=False) +# egel_section = custom_fields['egel_controls'] +# ws = xl[egel_section['sheet']] +# # NOTE: Here we should be scraping the control results. +# data = [ws.cell(row=ii, column=jj) for jj in range(egel_section['start_column'], egel_section['end_column'] + 1) +# for +# ii in range(egel_section['start_row'], egel_section['end_row'] + 1)] +# data = [cell for cell in data if cell.value is not None and "NTC" in cell.value] +# input_dict['gel_controls'] = [ +# dict(sample_id=cell.value, location=f"{row_map[cell.row - 9]}{str(cell.column - 14).zfill(2)}") for cell in +# data] +# # NOTE: Get source plate information +# source_plates_section = custom_fields['source_plates'] +# ws = xl[source_plates_section['sheet']] +# data = [dict(plate=ws.cell(row=ii, column=source_plates_section['plate_column']).value, +# starting_sample=ws.cell(row=ii, column=source_plates_section['starting_sample_column']).value) for +# ii in +# range(source_plates_section['start_row'], source_plates_section['end_row'] + 1)] +# for datum in data: +# if datum['plate'] in ["None", None, ""]: +# continue +# else: +# datum['plate'] = RSLNamer(filename=datum['plate'], submission_type="Wastewater").parsed_name +# if xl is not None: +# try: +# input_dict['csv'] = xl["hitpicks_csv_to_export"] +# except KeyError as e: +# logger.error(e) +# try: +# match input_dict['rsl_plate_num']: +# case dict(): +# input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']] +# case str(): +# input_dict['csv'] = xl[input_dict['rsl_plate_num']] +# case _: +# pass +# except Exception as e: +# logger.error(f"Error handling couldn't get csv due to: {e}") +# input_dict['source_plates'] = data +# egel_info_section = custom_fields['egel_info'] +# ws = xl[egel_info_section['sheet']] +# data = [] +# for ii in range(egel_info_section['start_row'], egel_info_section['end_row'] + 1): +# datum = dict( +# name=ws.cell(row=ii, column=egel_info_section['start_column'] - 3).value, +# values=[] +# ) +# for jj in range(egel_info_section['start_column'], egel_info_section['end_column'] + 1): +# d = dict( +# name=ws.cell(row=egel_info_section['start_row'] - 1, column=jj).value, +# value=ws.cell(row=ii, column=jj).value +# ) +# if d['value'] is not None: +# datum['values'].append(d) +# data.append(datum) +# input_dict['gel_info'] = data +# egel_image_section = custom_fields['image_range'] +# img: Image = scrape_image(wb=xl, info_dict=egel_image_section) +# if img is not None: +# tmp = Path(TemporaryFile().name).with_suffix(".jpg") +# img.save(tmp.__str__()) +# with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip"), 'a') as zipf: +# # NOTE: Add a file located at the source_path to the destination within the zip +# # file. It will overwrite existing files if the names collide, but it +# # will give a warning +# zipf.write(tmp.__str__(), f"{input_dict['rsl_plate_num']['value']}.jpg") +# input_dict['gel_image'] = f"{input_dict['rsl_plate_num']['value']}.jpg" +# return input_dict +# +# @classmethod +# def enforce_name(cls, instr: str, data: dict = {}) -> str: +# """ +# Custom naming method for this class. Extends parent. +# +# Args: +# instr (str): Initial name. +# data (dict | None, optional): Additional parameters for name. Defaults to None. +# +# Returns: +# str: Updated name. +# """ +# logger.debug(f"Incoming String: {instr}") +# try: +# # NOTE: Deal with PCR file. +# instr = re.sub(r"Artic", "", instr, flags=re.IGNORECASE) +# except (AttributeError, TypeError) as e: +# logger.error(f"Problem using regex: {e}") +# try: +# instr = instr.replace("-", "") +# except AttributeError: +# instr = date.today().strftime("%Y%m%d") +# instr = re.sub(r"^(\d{6})", f"RSL-AR-\\1", instr) +# outstr = super().enforce_name(instr=instr, data=data) +# outstr = outstr.replace("RSLAR", "RSL-AR") +# return outstr +# +# @classmethod +# def parse_samples(cls, input_dict: dict) -> dict: +# """ +# Update sample dictionary with type specific information. Extends parent. +# +# Args: +# input_dict (dict): Input sample dictionary +# +# Returns: +# dict: Updated sample dictionary +# """ +# input_dict = super().parse_samples(input_dict) +# input_dict['sample_type'] = "Wastewater Sample" +# # NOTE: Stop gap solution because WW is sloppy with their naming schemes +# try: +# input_dict['source_plate'] = input_dict['source_plate'].replace("WW20", "WW-20") +# except KeyError: +# pass +# try: +# input_dict['source_plate_number'] = int(input_dict['source_plate_number']) +# except (ValueError, KeyError): +# input_dict['source_plate_number'] = 0 +# # NOTE: Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" at the end, this has to be done here. No moving to sqlalchemy object :( +# input_dict['submitter_id'] = re.sub(r"\s\(.+\)\s?$", "", str(input_dict['submitter_id'])).strip() +# try: +# input_dict['ww_processing_num'] = input_dict['sample_name_(lims)'] +# del input_dict['sample_name_(lims)'] +# except KeyError: +# logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") +# try: +# input_dict['ww_full_sample_id'] = input_dict['sample_name_(ww)'] +# del input_dict['sample_name_(ww)'] +# except KeyError: +# logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") +# year = str(date.today().year)[-2:] +# # NOTE: Check for extraction negative control (Enterics) +# if re.search(rf"^{year}-(ENC)", input_dict['submitter_id']): +# input_dict['rsl_number'] = cls.en_adapter(input_str=input_dict['submitter_id']) +# # NOTE: Check for extraction negative control (Robotics) +# if re.search(rf"^{year}-(RSL)", input_dict['submitter_id']): +# logger.debug(f"Found {year}-(RSL), so we are going to run PBS adapter:") +# input_dict['rsl_number'] = cls.pbs_adapter(input_str=input_dict['submitter_id']) +# return input_dict +# +# @classmethod +# def en_adapter(cls, input_str: str) -> str: +# """ +# Stopgap solution because WW names their ENs different +# +# Args: +# input_str (str): input name +# +# Returns: +# str: output name +# """ +# processed = input_str.replace("RSL", "") +# # NOTE: Remove anything in brackets at the end of string? +# processed = re.sub(r"\(.*\)$", "", processed).strip() +# # NOTE: Remove letters that are not R. +# processed = re.sub(r"[A-QS-Z]+\d*", "", processed) +# # NOTE: Remove trailing '-' if any +# processed = processed.strip("-") +# try: +# # NOTE: get digit at the end of the string. +# en_num = re.search(r"\-\d{1}$", processed).group() +# processed = rreplace(processed, en_num, "") +# except AttributeError: +# en_num = "1" +# en_num = en_num.strip("-") +# try: +# # NOTE: Get last digit and maybe 'R' with another digit. +# plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() +# processed = rreplace(processed, plate_num, "") +# except AttributeError: +# plate_num = "1" +# # NOTE: plate_num not currently used, but will keep incase it is in the future +# plate_num = plate_num.strip("-") +# day = re.search(r"\d{2}$", processed).group() +# processed = rreplace(processed, day, "") +# month = re.search(r"\d{2}$", processed).group() +# processed = rreplace(processed, month, "") +# processed = processed.replace("--", "") +# year = re.search(r'^(?:\d{2})?\d{2}', processed).group() +# year = f"20{year}" +# final_en_name = f"EN{en_num}-{year}{month}{day}" +# return final_en_name +# +# @classmethod +# def pbs_adapter(cls, input_str): +# """ +# Stopgap solution because WW names their controls different +# +# Args: +# input_str (str): input name +# +# Returns: +# str: output name +# """ +# logger.debug(f"PBS adapter on {input_str}") +# # NOTE: Remove letters. +# processed = input_str.replace("RSL", "") +# # logger.debug(processed) +# # NOTE: Remove brackets at end +# processed = re.sub(r"\(.*\)$", "", processed).strip() +# # logger.debug(processed) +# processed = re.sub(r"-RPT", "", processed, flags=re.IGNORECASE) +# # NOTE: Remove any non-R letters at end. +# processed = re.sub(r"[A-QS-Z]+\d*", "", processed) +# # logger.debug(processed) +# # NOTE: Remove trailing '-' if any +# processed = processed.strip("-") +# # logger.debug(processed) +# try: +# plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() +# processed = rreplace(processed, plate_num, "") +# except AttributeError: +# plate_num = "1" +# plate_num = plate_num.strip("-") +# try: +# repeat_num = re.search(r"R(?P\d)?$", processed).groups()[0] +# except: +# repeat_num = None +# if repeat_num is None and "R" in plate_num: +# repeat_num = "1" +# try: +# plate_num = re.sub(r"R", rf"R{repeat_num}", plate_num) +# except AttributeError: +# logger.error(f"Problem re-evaluating plate number for {processed}") +# # logger.debug(processed) +# # NOTE: Remove any redundant -digits +# processed = re.sub(r"-\d$", "", processed) +# # logger.debug(processed) +# day = re.search(r"\d{2}$", processed).group() +# processed = rreplace(processed, day, "") +# # logger.debug(processed) +# month = re.search(r"\d{2}$", processed).group() +# processed = rreplace(processed, month, "") +# processed = processed.replace("--", "") +# # logger.debug(processed) +# year = re.search(r'^(?:\d{2})?\d{2}', processed).group() +# year = f"20{year}" +# # logger.debug(processed) +# final_en_name = f"PBS{year}{month}{day}-{plate_num}" +# return final_en_name +# +# @classmethod +# def custom_validation(cls, pyd) -> dict: +# """ +# Performs any final custom parsing of the excel file. Extends parent +# +# Args: +# input_dict (dict): Parser product up to this point. +# xl (pd.ExcelFile | None, optional): Excel submission form. Defaults to None. +# info_map (dict | None, optional): Map of information locations from SubmissionType. Defaults to None. +# plate_map (dict | None, optional): Constructed plate map of samples. Defaults to None. +# +# Returns: +# dict: Updated parser product. +# """ +# input_dict = super().custom_validation(pyd) +# exclude_plates = [None, "", "none", "na"] +# pyd.source_plates = [plate for plate in pyd.source_plates if plate['plate'].lower() not in exclude_plates] +# for sample in pyd.samples: +# if re.search(r"^NTC", sample.submitter_id): +# if isinstance(pyd.rsl_plate_num, dict): +# placeholder = pyd.rsl_plate_num['value'] +# else: +# placeholder = pyd.rsl_plate_num +# sample.submitter_id = f"{sample.submitter_id}-WWG-{placeholder}" +# return input_dict +# +# @classmethod +# def custom_info_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False, +# custom_fields: dict = {}) -> Workbook: +# """ +# Adds custom autofill methods for submission. Extends Parent +# +# Args: +# input_excel (Workbook): initial workbook. +# info (dict | None, optional): dictionary of additional info. Defaults to None. +# backup (bool, optional): Whether this is part of a backup operation. Defaults to False. +# custom_fields: Dictionary of locations, ranges, etc to be used by this function +# +# Returns: +# Workbook: Updated workbook +# """ +# input_excel = super().custom_info_writer(input_excel, info, backup) +# if isinstance(info, types.GeneratorType): +# info = {k: v for k, v in info} +# # NOTE: check for source plate information +# if check_key_or_attr(key='source_plates', interest=info, check_none=True): +# source_plates_section = custom_fields['source_plates'] +# worksheet = input_excel[source_plates_section['sheet']] +# start_row = source_plates_section['start_row'] +# # NOTE: write source plates to First strand list +# for iii, plate in enumerate(info['source_plates']['value']): +# row = start_row + iii +# try: +# worksheet.cell(row=row, column=source_plates_section['plate_column'], value=plate['plate']) +# except TypeError: +# pass +# try: +# worksheet.cell(row=row, column=source_plates_section['starting_sample_column'], +# value=plate['starting_sample']) +# except TypeError: +# pass +# else: +# logger.warning(f"No source plate info found.") +# # NOTE: check for gel information +# if check_key_or_attr(key='gel_info', interest=info, check_none=True): +# egel_section = custom_fields['egel_info'] +# # NOTE: print json field gel results to Egel results +# worksheet = input_excel[egel_section['sheet']] +# start_row = egel_section['start_row'] - 1 +# start_column = egel_section['start_column'] - 3 +# for row, ki in enumerate(info['gel_info']['value'], start=1): +# row = start_row + row +# worksheet.cell(row=row, column=start_column, value=ki['name']) +# for jjj, kj in enumerate(ki['values'], start=1): +# column = start_column + 2 + jjj +# worksheet.cell(row=start_row, column=column, value=kj['name']) +# try: +# worksheet.cell(row=row, column=column, value=kj['value']) +# except AttributeError: +# logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}") +# else: +# logger.warning("No gel info found.") +# if check_key_or_attr(key='gel_image', interest=info, check_none=True): +# worksheet = input_excel[egel_section['sheet']] +# with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: +# z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name)) +# img = OpenpyxlImage(z) +# img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) +# img.width = 600 +# img.anchor = egel_section['img_anchor'] +# worksheet.add_image(img) +# else: +# logger.warning("No gel image found.") +# return input_excel +# +# @classmethod +# def custom_sample_writer(self, sample: dict) -> dict: +# if sample['source_plate_number'] in [0, "0"]: +# sample['source_plate_number'] = "control" +# return sample +# +# @classmethod +# def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: +# """ +# Get the details jinja template for the correct class. Extends parent +# +# Args: +# base_dict (dict): incoming dictionary of Submission fields +# +# Returns: +# Tuple[dict, Template]: (Updated dictionary, Template to be rendered) +# """ +# base_dict, template = super().get_details_template(base_dict=base_dict) +# base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates", +# "gel_controls, gel_image_path"] +# base_dict['DNA Core ID'] = base_dict['dna_core_submission_number'] +# if check_key_or_attr(key='gel_info', interest=base_dict, check_none=True): +# headers = [item['name'] for item in base_dict['gel_info'][0]['values']] +# base_dict['headers'] = [''] * (4 - len(headers)) +# base_dict['headers'] += headers +# if check_key_or_attr(key='gel_image', interest=base_dict, check_none=True): +# with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: +# base_dict['gel_image_actual'] = base64.b64encode(zipped.read(base_dict['gel_image'])).decode('utf-8') +# return base_dict, template +# +# def custom_context_events(self) -> dict: +# """ +# Creates dictionary of str:function to be passed to context menu. Extends parent +# +# Returns: +# dict: dictionary of functions +# """ +# events = super().custom_context_events() +# events['Gel Box'] = self.gel_box +# return events +# +# def set_attribute(self, key: str, value): +# """ +# Performs custom attribute setting based on values. Extends parent +# +# Args: +# key (str): name of attribute +# value (_type_): value of attribute +# """ +# super().set_attribute(key=key, value=value) +# if key == 'gel_info': +# if len(self.gel_info) > 3: +# self.gel_info = self.gel_info[-3:] +# +# def gel_box(self, obj): +# """ +# Creates PYQT6 widget to perform gel viewing operations +# +# Args: +# obj (_type_): parent widget +# """ +# from frontend.widgets.gel_checker import GelBox +# from frontend.widgets import select_open_file +# report = Report() +# fname = select_open_file(obj=obj, file_extension="jpg") +# if not fname: +# report.add_result(Result(msg="No file selected, cancelling.", status="Warning")) +# return report +# dlg = GelBox(parent=obj, img_path=fname, submission=self) +# if dlg.exec(): +# self.dna_core_submission_number, self.gel_barcode, img_path, output, comment = dlg.parse_form() +# self.gel_image = img_path.name +# self.gel_info = output +# dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") +# com = dict(text=comment, name=getuser(), time=dt) +# if com['text'] is not None and com['text'] != "": +# if self.comment is not None: +# self.comment.append(com) +# else: +# self.comment = [com] +# with ZipFile(self.__directory_path__.joinpath("submission_imgs.zip"), 'a') as zipf: +# # NOTE: Add a file located at the source_path to the destination within the zip +# # file. It will overwrite existing files if the names collide, but it +# # will give a warning +# zipf.write(img_path, self.gel_image) +# self.save() +# # NOTE: Sample Classes @@ -2398,6 +2458,7 @@ class BasicSample(BaseClass, LogMixin): id = Column(INTEGER, primary_key=True) #: primary key submitter_id = Column(String(64), nullable=False, unique=True) #: identification from submitter sample_type = Column(String(32)) #: mode_sub_type of sample + misc_info = Column(JSON) sample_submission_associations = relationship( "SubmissionSampleAssociation", @@ -2849,7 +2910,7 @@ class BacterialCultureSample(BasicSample): return sample -# NOTE: Submission to Sample Associations +# # NOTE: Submission to Sample Associations class SubmissionSampleAssociation(BaseClass): """