Before control chart update.

This commit is contained in:
lwark
2024-07-31 07:50:23 -05:00
parent 2a07265cbc
commit eb6cdc63e2
10 changed files with 305 additions and 396 deletions

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from datetime import date, timedelta
from typing import List, Tuple, Any
from backend.db.models import BasicSubmission
from tools import jinja_template_loading, Settings, get_unique_values_in_df_column, html_to_pdf, get_first_blank_df_row, \
from tools import jinja_template_loading, html_to_pdf, get_first_blank_df_row, \
row_map
from PyQt6.QtWidgets import QWidget
from openpyxl.worksheet.worksheet import Worksheet
@@ -71,20 +71,20 @@ class ReportMaker(object):
# logger.debug(f"Name: {row[0][1]}")
data = [item for item in row[1]]
kit = dict(name=row[0][1], cost=data[1], run_count=int(data[0]), sample_count=int(data[2]))
# if this is the same lab as before add together
# NOTE: if this is the same lab as before add together
if lab == old_lab:
output[-1]['kits'].append(kit)
output[-1]['total_cost'] += kit['cost']
output[-1]['total_samples'] += kit['sample_count']
output[-1]['total_runs'] += kit['run_count']
# if not the same lab, make a new one
# NOTE: if not the same lab, make a new one
else:
adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'],
total_runs=kit['run_count'])
output.append(adder)
old_lab = lab
# logger.debug(output)
dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output} # , "table":table}
dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output}
temp = env.get_template('summary_report.html')
html = temp.render(input=dicto)
return html
@@ -120,11 +120,11 @@ class ReportMaker(object):
"""
# logger.debug(f"Updating worksheet")
worksheet: Worksheet = self.writer.sheets['Report']
for idx, col in enumerate(self.summary_df, start=1): # loop through all columns
for idx, col in enumerate(self.summary_df, start=1): # NOTE: loop through all columns
series = self.summary_df[col]
max_len = max((
series.astype(str).map(len).max(), # len of largest item
len(str(series.name)) # len of column name/header
series.astype(str).map(len).max(), # NOTE: len of largest item
len(str(series.name)) # NOTE: len of column name/header
)) + 20 # NOTE: adding a little extra space
try:
# NOTE: Convert idx to letter
@@ -142,224 +142,3 @@ class ReportMaker(object):
cell.style = 'Currency'
def make_report_xlsx(records: list[dict]) -> Tuple[DataFrame, DataFrame]:
"""
create the dataframe for a report
Args:
records (list[dict]): list of dictionaries created from submissions
Returns:
DataFrame: output dataframe
"""
df = DataFrame.from_records(records)
# NOTE: put submissions with the same lab together
df = df.sort_values("submitting_lab")
# NOTE: aggregate cost and sample count columns
df2 = df.groupby(["submitting_lab", "extraction_kit"]).agg(
{'extraction_kit': 'count', 'cost': 'sum', 'sample_count': 'sum'})
df2 = df2.rename(columns={"extraction_kit": 'run_count'})
# logger.debug(f"Output daftaframe for xlsx: {df2.columns}")
df = df.drop('id', axis=1)
df = df.sort_values(['submitting_lab', "submitted_date"])
return df, df2
def make_report_html(df: DataFrame, start_date: date, end_date: date) -> str:
"""
generates html from the report dataframe
Args:
df (DataFrame): input dataframe generated from 'make_report_xlsx' above
start_date (date): starting date of the report period
end_date (date): ending date of the report period
Returns:
str: html string
"""
old_lab = ""
output = []
# logger.debug(f"Report DataFrame: {df}")
for ii, row in enumerate(df.iterrows()):
# logger.debug(f"Row {ii}: {row}")
lab = row[0][0]
# logger.debug(type(row))
# logger.debug(f"Old lab: {old_lab}, Current lab: {lab}")
# logger.debug(f"Name: {row[0][1]}")
data = [item for item in row[1]]
kit = dict(name=row[0][1], cost=data[1], run_count=int(data[0]), sample_count=int(data[2]))
# if this is the same lab as before add together
if lab == old_lab:
output[-1]['kits'].append(kit)
output[-1]['total_cost'] += kit['cost']
output[-1]['total_samples'] += kit['sample_count']
output[-1]['total_runs'] += kit['run_count']
# if not the same lab, make a new one
else:
adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'],
total_runs=kit['run_count'])
output.append(adder)
old_lab = lab
# logger.debug(output)
dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output} #, "table":table}
temp = env.get_template('summary_report.html')
html = temp.render(input=dicto)
return html
# TODO: move this into a classmethod of Controls?
def convert_data_list_to_df(input: list[dict], subtype: str | None = None) -> DataFrame:
"""
Convert list of control records to dataframe
Args:
ctx (dict): settings passed from gui
input (list[dict]): list of dictionaries containing records
subtype (str | None, optional): name of submission type. Defaults to None.
Returns:
DataFrame: dataframe of controls
"""
df = DataFrame.from_records(input)
safe = ['name', 'submitted_date', 'genus', 'target']
for column in df.columns:
if "percent" in column:
count_col = [item for item in df.columns if "count" in item][0]
# NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating.
df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
if column not in safe:
if subtype is not None and column != subtype:
del df[column]
# NOTE: move date of sample submitted on same date as previous ahead one.
df = displace_date(df)
# NOTE: ad hoc method to make data labels more accurate.
df = df_column_renamer(df=df)
return df
def df_column_renamer(df: DataFrame) -> DataFrame:
"""
Ad hoc function I created to clarify some fields
Args:
df (DataFrame): input dataframe
Returns:
DataFrame: dataframe with 'clarified' column names
"""
df = df[df.columns.drop(list(df.filter(regex='_hashes')))]
return df.rename(columns={
"contains_ratio": "contains_shared_hashes_ratio",
"matches_ratio": "matches_shared_hashes_ratio",
"kraken_count": "kraken2_read_count_(top_50)",
"kraken_percent": "kraken2_read_percent_(top_50)"
})
def displace_date(df: DataFrame) -> DataFrame:
"""
This function serves to split samples that were submitted on the same date by incrementing dates.
It will shift the date forward by one day if it is the same day as an existing date in a list.
Args:
df (DataFrame): input dataframe composed of control records
Returns:
DataFrame: output dataframe with dates incremented.
"""
# logger.debug(f"Unique items: {df['name'].unique()}")
# NOTE: get submitted dates for each control
dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in
sorted(df['name'].unique())]
previous_dates = []
for _, item in enumerate(dict_list):
df, previous_dates = check_date(df=df, item=item, previous_dates=previous_dates)
return df
def check_date(df: DataFrame, item: dict, previous_dates: list) -> Tuple[DataFrame, list]:
"""
Checks if an items date is already present in df and adjusts df accordingly
Args:
df (DataFrame): input dataframe
item (dict): control for checking
previous_dates (list): list of dates found in previous controls
Returns:
Tuple[DataFrame, list]: Output dataframe and appended list of previous dates
"""
try:
check = item['date'] in previous_dates
except IndexError:
check = False
previous_dates.append(item['date'])
if check:
# logger.debug(f"We found one! Increment date!\n\t{item['date']} to {item['date'] + timedelta(days=1)}")
# NOTE: get df locations where name == item name
mask = df['name'] == item['name']
# NOTE: increment date in dataframe
df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
item['date'] += timedelta(days=1)
passed = False
else:
passed = True
# logger.debug(f"\n\tCurrent date: {item['date']}\n\tPrevious dates:{previous_dates}")
# logger.debug(f"DF: {type(df)}, previous_dates: {type(previous_dates)}")
# if run didn't lead to changed date, return values
if passed:
# logger.debug(f"Date check passed, returning.")
return df, previous_dates
# NOTE: if date was changed, rerun with new date
else:
logger.warning(f"Date check failed, running recursion")
df, previous_dates = check_date(df, item, previous_dates)
return df, previous_dates
# def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
# """
# get all unique values in a dataframe column by name
#
# Args:
# df (DataFrame): input dataframe
# column_name (str): name of column of interest
#
# Returns:
# list: sorted list of unique values
# """
# return sorted(df[column_name].unique())
def drop_reruns_from_df(ctx: Settings, df: DataFrame) -> DataFrame:
"""
Removes semi-duplicates from dataframe after finding sequencing repeats.
Args:
settings (dict): settings passed from gui
df (DataFrame): initial dataframe
Returns:
DataFrame: dataframe with originals removed in favour of repeats.
"""
if 'rerun_regex' in ctx:
sample_names = get_unique_values_in_df_column(df, column_name="name")
rerun_regex = re.compile(fr"{ctx.rerun_regex}")
for sample in sample_names:
if rerun_regex.search(sample):
first_run = re.sub(rerun_regex, "", sample)
df = df.drop(df[df.name == first_run].index)
return df
# def make_hitpicks(input:List[dict]) -> DataFrame:
# """
# Converts list of dictionaries constructed by hitpicking to dataframe
#
# Args:
# input (List[dict]): list of hitpicked dictionaries
#
# Returns:
# DataFrame: constructed dataframe.
# """
# return DataFrame.from_records(input)