File size: 7,073 Bytes
25fcb73 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
import json
import re
from tqdm import tqdm
import os
import config
# --- Functions from dataOrganize.py ---
def clean_text(text: str) -> str:
"""
Cleans the input text by removing common noise from FDA documents.
"""
if not text:
return ""
text = re.sub(r'REVISED:\s*\d{1,2}/\d{4}', '', text)
text = re.sub(r'\s{2,}', ' ', text).strip()
text = re.sub(r'[\-=*]{3,}', '', text)
return text
def organize_drug_data(input_path):
"""
Loads raw drug data, filters for high-quality entries, cleans the text,
and returns the organized data as a list.
"""
print(f"Loading raw data from: {input_path}...")
try:
with open(input_path, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
print(f"Error: The file '{input_path}' was not found.")
return []
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from '{input_path}'.")
return []
entries = data.get('results', data) if isinstance(data, dict) else data
if not isinstance(entries, list):
print("Error: The JSON data is not in the expected list format.")
return []
organized_data = []
print("Filtering, cleaning, and organizing drug data...")
for entry in tqdm(entries, desc="Processing drug entries"):
if not isinstance(entry, dict):
continue
openfda = entry.get("openfda", {})
brand_name_list = openfda.get("brand_name")
generic_name_list = openfda.get("generic_name")
if not brand_name_list and not generic_name_list:
continue
if "indications_and_usage" not in entry:
continue
brand_name = brand_name_list[0] if brand_name_list else "Unknown Brand"
generic_name = generic_name_list[0] if generic_name_list else "Unknown Generic"
sections_to_extract = {
"indications_and_usage": "Indications and Usage", "adverse_reactions": "Adverse Reactions",
"drug_interactions": "Drug Interactions", "contraindications": "Contraindications",
"warnings": "Warnings", "boxed_warning": "Boxed Warning",
"mechanism_of_action": "Mechanism of Action", "pharmacokinetics": "Pharmacokinetics",
"dosage_and_administration": "Dosage and Administration", "how_supplied": "How Supplied",
"storage_and_handling": "Storage and Handling", "information_for_patients": "Information for Patients",
"pregnancy": "Pregnancy", "nursing_mothers": "Nursing Mothers",
"pediatric_use": "Pediatric Use", "geriatric_use": "Geriatric Use"
}
processed_sections = {}
for key, section_name in sections_to_extract.items():
text_list = entry.get(key)
if text_list and isinstance(text_list, list) and text_list[0]:
cleaned_text = clean_text(text_list[0])
if cleaned_text:
processed_sections[section_name] = cleaned_text
if processed_sections:
organized_entry = {
"brand_name": brand_name,
"generic_name": generic_name,
"sections": processed_sections
}
organized_data.append(organized_entry)
print(f"Found {len(organized_data)} high-quality drug entries.")
return organized_data
# --- Functions from deduplicate_drugs.py ---
def deduplicate_drugs(data):
"""
Deduplicates a list of drugs based on brand_name and generic_name.
"""
print(f"Deduplicating {len(data)} drugs...")
seen_drugs = set()
deduplicated_drugs = []
for drug in data:
brand_name = drug.get('brand_name')
generic_name = drug.get('generic_name')
if isinstance(brand_name, list):
brand_name = brand_name[0] if brand_name else None
if isinstance(generic_name, list):
generic_name = generic_name[0] if generic_name else None
brand_name_lower = brand_name.lower() if brand_name else None
generic_name_lower = generic_name.lower() if generic_name else None
drug_identifier = (brand_name_lower, generic_name_lower)
if drug_identifier not in seen_drugs:
seen_drugs.add(drug_identifier)
deduplicated_drugs.append(drug)
print(f"Deduplication complete. Found {len(deduplicated_drugs)} unique drugs.")
return deduplicated_drugs
# --- Functions from format_fda_data.py ---
def generate_section_id(section_title):
"""Generates a simplified, lowercase, underscore-separated ID from a section title."""
s = re.sub(r'[/\-&]', ' ', section_title)
s = re.sub(r'[^a-zA-Z0-9\s]', '', s)
parts = s.lower().split()
if len(parts) >= 2:
return '_'.join(parts[:2])
elif len(parts) == 1:
return parts[0]
else:
return "section"
def transform_drug_data(drugs, output_file_path):
"""
Transforms drug data to a JSON Lines format.
"""
print(f"Transforming {len(drugs)} drugs to JSONL format...")
processed_records = []
for drug in drugs:
generic_name = drug.get('generic_name')
sections = drug.get('sections')
if not generic_name or not isinstance(sections, dict):
continue
if isinstance(generic_name, list):
generic_name = generic_name[0] if generic_name else None
if not generic_name:
continue
generic_name_upper = generic_name.upper()
for section_title, section_content in sections.items():
if not section_title or not section_content:
continue
section_id = generate_section_id(section_title)
doc_id = f"{generic_name_upper.replace(' ', '_')}_{section_id}"
record = {
"doc_id": doc_id,
"generic_name": generic_name_upper,
"section": section_title,
"content": section_content.strip()
}
processed_records.append(json.dumps(record))
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
with open(output_file_path, 'w') as f_out:
f_out.write('\n'.join(processed_records))
print(f"Transformation complete. {len(processed_records)} records created.")
print(f"Transformed data saved to: {output_file_path}")
if __name__ == '__main__':
# Define file paths using config
raw_data_path = config.RAW_DATA_PATH
cleaned_data_path = config.CLEANED_DATA_PATH
# --- Run the full pipeline ---
print("--- Starting Data Preparation Pipeline ---")
# Step 1: Organize and clean the raw data in memory
organized_data = organize_drug_data(raw_data_path)
# Step 2: Deduplicate the cleaned data in memory
deduplicated_data = deduplicate_drugs(organized_data)
# Step 3: Transform the deduplicated data and write to the final file
transform_drug_data(deduplicated_data, cleaned_data_path)
print("--- Data Preparation Pipeline Finished ---")
|