Spaces:
Sleeping
Sleeping
| """ | |
| DNA-Diffusion Gradio Application | |
| Interactive DNA sequence generation with slot machine visualization and protein analysis | |
| """ | |
| import gradio as gr | |
| import logging | |
| import json | |
| import os | |
| from typing import Dict, Any, Tuple | |
| import html | |
| import requests | |
| import time | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Try to import spaces for GPU decoration | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| except ImportError: | |
| SPACES_AVAILABLE = False | |
| # Create a dummy decorator if spaces is not available | |
| class spaces: | |
| def GPU(duration=60): | |
| def decorator(func): | |
| return func | |
| return decorator | |
| # Try to import model, but allow app to run without it for UI development | |
| try: | |
| from dna_diffusion_model import DNADiffusionModel, get_model | |
| MODEL_AVAILABLE = True | |
| logger.info("DNA-Diffusion model module loaded successfully") | |
| except ImportError as e: | |
| logger.warning(f"DNA-Diffusion model not available: {e}") | |
| MODEL_AVAILABLE = False | |
| # Load the HTML interface | |
| HTML_FILE = "dna-slot-machine.html" | |
| if not os.path.exists(HTML_FILE): | |
| raise FileNotFoundError(f"HTML interface file '{HTML_FILE}' not found. Please ensure it exists in the same directory as app.py") | |
| with open(HTML_FILE, "r") as f: | |
| SLOT_MACHINE_HTML = f.read() | |
| class ProteinAnalyzer: | |
| """Handles protein translation and analysis using LLM""" | |
| # Genetic code table for DNA to amino acid translation | |
| CODON_TABLE = { | |
| 'TTT': 'F', 'TTC': 'F', 'TTA': 'L', 'TTG': 'L', | |
| 'TCT': 'S', 'TCC': 'S', 'TCA': 'S', 'TCG': 'S', | |
| 'TAT': 'Y', 'TAC': 'Y', 'TAA': '*', 'TAG': '*', | |
| 'TGT': 'C', 'TGC': 'C', 'TGA': '*', 'TGG': 'W', | |
| 'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L', | |
| 'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', | |
| 'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q', | |
| 'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', | |
| 'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M', | |
| 'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', | |
| 'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K', | |
| 'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R', | |
| 'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V', | |
| 'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', | |
| 'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E', | |
| 'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' | |
| } | |
| def dna_to_protein(dna_sequence: str) -> str: | |
| """Translate DNA sequence to protein sequence""" | |
| # Ensure sequence is uppercase | |
| dna_sequence = dna_sequence.upper() | |
| # Remove any non-DNA characters | |
| dna_sequence = ''.join(c for c in dna_sequence if c in 'ATCG') | |
| # Translate to protein | |
| protein = [] | |
| for i in range(0, len(dna_sequence) - 2, 3): | |
| codon = dna_sequence[i:i+3] | |
| if len(codon) == 3: | |
| amino_acid = ProteinAnalyzer.CODON_TABLE.get(codon, 'X') | |
| if amino_acid == '*': # Stop codon | |
| break | |
| protein.append(amino_acid) | |
| return ''.join(protein) | |
| def analyze_protein_with_llm(protein_sequence: str, cell_type: str, language: str = "en") -> str: | |
| """Analyze protein structure and function using Friendli LLM API""" | |
| # Get API token from environment | |
| token = os.getenv("FRIENDLI_TOKEN") | |
| if not token: | |
| logger.warning("FRIENDLI_TOKEN not found in environment variables") | |
| if language == "ko": | |
| return "단백질 분석 불가: API 토큰이 설정되지 않았습니다" | |
| return "Protein analysis unavailable: API token not configured" | |
| try: | |
| url = "https://api.friendli.ai/dedicated/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| # Create prompt for protein analysis based on language | |
| if language == "ko": | |
| prompt = f"""당신은 생물정보학 전문가입니다. 다음 단백질 서열을 분석하고 잠재적인 구조와 기능에 대한 통찰력을 제공해주세요. | |
| 단백질 서열: {protein_sequence} | |
| 세포 유형: {cell_type} | |
| 다음 내용을 포함해주세요: | |
| 1. 서열 패턴을 기반으로 예측되는 단백질 패밀리 또는 도메인 | |
| 2. 잠재적인 구조적 특징 (알파 나선, 베타 시트, 루프) | |
| 3. 가능한 생물학적 기능 | |
| 4. {cell_type} 세포 유형과의 관련성 | |
| 5. 주목할 만한 서열 모티프나 특성 | |
| 과학 애플리케이션에 표시하기에 적합하도록 간결하면서도 유익한 응답을 작성해주세요.""" | |
| else: | |
| prompt = f"""You are a bioinformatics expert. Analyze the following protein sequence and provide insights about its potential structure and function. | |
| Protein sequence: {protein_sequence} | |
| Cell type context: {cell_type} | |
| Please provide: | |
| 1. Predicted protein family or domain based on sequence patterns | |
| 2. Potential structural features (alpha helices, beta sheets, loops) | |
| 3. Possible biological functions | |
| 4. Relevance to the {cell_type} cell type | |
| 5. Any notable sequence motifs or characteristics | |
| Keep the response concise but informative, suitable for display in a scientific application.""" | |
| payload = { | |
| "model": "dep86pjolcjjnv8", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are a knowledgeable bioinformatics assistant specializing in protein structure and function prediction." if language == "en" else "당신은 단백질 구조와 기능 예측을 전문으로 하는 지식이 풍부한 생물정보학 어시스턴트입니다." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "max_tokens": 1000, | |
| "temperature": 0.7, | |
| "top_p": 0.8, | |
| "stream": False # Disable streaming for simplicity | |
| } | |
| response = requests.post(url, json=payload, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| result = response.json() | |
| analysis = result['choices'][0]['message']['content'] | |
| return analysis | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to analyze protein with LLM: {e}") | |
| return f"Protein analysis failed: {str(e)}" | |
| except Exception as e: | |
| logger.error(f"Unexpected error during protein analysis: {e}") | |
| return "Protein analysis unavailable due to an error" | |
| class DNADiffusionApp: | |
| """Main application class for DNA-Diffusion Gradio interface""" | |
| def __init__(self): | |
| self.model = None | |
| self.model_loading = False | |
| self.model_error = None | |
| self.protein_analyzer = ProteinAnalyzer() | |
| def initialize_model(self): | |
| """Initialize the DNA-Diffusion model""" | |
| if not MODEL_AVAILABLE: | |
| self.model_error = "DNA-Diffusion model module not available. Please install dependencies." | |
| return | |
| if self.model_loading: | |
| return | |
| self.model_loading = True | |
| try: | |
| logger.info("Starting model initialization...") | |
| self.model = get_model() | |
| logger.info("Model initialized successfully!") | |
| self.model_error = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize model: {e}") | |
| self.model_error = str(e) | |
| self.model = None | |
| finally: | |
| self.model_loading = False | |
| def generate_sequence(self, cell_type: str, guidance_scale: float = 1.0) -> Tuple[str, Dict[str, Any]]: | |
| """Generate a DNA sequence using the model or mock data""" | |
| # Use mock generation if model is not available | |
| if not MODEL_AVAILABLE or self.model is None: | |
| logger.warning("Using mock sequence generation") | |
| import random | |
| sequence = ''.join(random.choice(['A', 'T', 'C', 'G']) for _ in range(200)) | |
| metadata = { | |
| 'cell_type': cell_type, | |
| 'guidance_scale': guidance_scale, | |
| 'generation_time': 2.0, | |
| 'mock': True | |
| } | |
| # Simulate generation time | |
| time.sleep(2.0) | |
| return sequence, metadata | |
| # Use real model | |
| try: | |
| result = self.model.generate(cell_type, guidance_scale) | |
| return result['sequence'], result['metadata'] | |
| except Exception as e: | |
| logger.error(f"Generation failed: {e}") | |
| raise | |
| def handle_generation_request(self, cell_type: str, guidance_scale: float, language: str = "en"): | |
| """Handle sequence generation request from Gradio""" | |
| try: | |
| logger.info(f"Generating sequence for cell type: {cell_type}, language: {language}") | |
| # Generate DNA sequence | |
| sequence, metadata = self.generate_sequence(cell_type, guidance_scale) | |
| # Translate to protein | |
| logger.info("Translating DNA to protein sequence...") | |
| protein_sequence = self.protein_analyzer.dna_to_protein(sequence) | |
| # Add protein sequence to metadata | |
| metadata['protein_sequence'] = protein_sequence | |
| metadata['protein_length'] = len(protein_sequence) | |
| # Analyze protein with LLM | |
| logger.info("Analyzing protein structure and function...") | |
| protein_analysis = self.protein_analyzer.analyze_protein_with_llm( | |
| protein_sequence, cell_type, language | |
| ) | |
| # Add analysis to metadata | |
| metadata['protein_analysis'] = protein_analysis | |
| logger.info("Generation and analysis complete") | |
| return sequence, json.dumps(metadata) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Generation request failed: {error_msg}") | |
| return "", json.dumps({"error": error_msg}) | |
| # Create single app instance | |
| app = DNADiffusionApp() | |
| def create_demo(): | |
| """Create the Gradio demo interface""" | |
| # CSS to hide backend controls and prevent scrolling | |
| css = """ | |
| #hidden-controls { display: none !important; } | |
| .gradio-container { | |
| overflow: hidden; | |
| background-color: #000000 !important; | |
| } | |
| #dna-frame { overflow: hidden; position: relative; } | |
| body { | |
| background-color: #000000 !important; | |
| } | |
| """ | |
| # JavaScript for handling communication between iframe and Gradio | |
| js = """ | |
| function() { | |
| console.log('Initializing DNA-Diffusion Gradio interface...'); | |
| // Set up message listener to receive requests from iframe | |
| window.addEventListener('message', function(event) { | |
| console.log('Parent received message:', event.data); | |
| if (event.data.type === 'generate_request') { | |
| console.log('Triggering generation for cell type:', event.data.cellType); | |
| console.log('Language:', event.data.language); | |
| // Update the hidden cell type input | |
| const radioInputs = document.querySelectorAll('#cell-type-input input[type="radio"]'); | |
| radioInputs.forEach(input => { | |
| if (input.value === event.data.cellType) { | |
| input.checked = true; | |
| // Trigger change event | |
| input.dispatchEvent(new Event('change')); | |
| } | |
| }); | |
| // Update the language input | |
| const langInputs = document.querySelectorAll('#language-input input[type="radio"]'); | |
| langInputs.forEach(input => { | |
| if (input.value === event.data.language) { | |
| input.checked = true; | |
| input.dispatchEvent(new Event('change')); | |
| } | |
| }); | |
| // Small delay to ensure radio button update is processed | |
| setTimeout(() => { | |
| document.querySelector('#generate-btn').click(); | |
| }, 100); | |
| } | |
| }); | |
| // Function to send sequence to iframe | |
| window.sendSequenceToIframe = function(sequence, metadata) { | |
| console.log('Sending sequence to iframe:', sequence); | |
| const iframe = document.querySelector('#dna-frame iframe'); | |
| if (iframe && iframe.contentWindow) { | |
| try { | |
| const meta = JSON.parse(metadata); | |
| if (meta.error) { | |
| iframe.contentWindow.postMessage({ | |
| type: 'generation_error', | |
| error: meta.error | |
| }, '*'); | |
| } else { | |
| iframe.contentWindow.postMessage({ | |
| type: 'sequence_generated', | |
| sequence: sequence, | |
| metadata: meta | |
| }, '*'); | |
| } | |
| } catch (e) { | |
| console.error('Failed to parse metadata:', e); | |
| // If parsing fails, still send the sequence | |
| iframe.contentWindow.postMessage({ | |
| type: 'sequence_generated', | |
| sequence: sequence, | |
| metadata: {} | |
| }, '*'); | |
| } | |
| } else { | |
| console.error('Could not find iframe'); | |
| } | |
| }; | |
| } | |
| """ | |
| with gr.Blocks(css=css, js=js, theme=gr.themes.Base()) as demo: | |
| # Hidden controls for backend processing | |
| with gr.Column(elem_id="hidden-controls", visible=False): | |
| cell_type_input = gr.Radio( | |
| ["K562", "GM12878", "HepG2"], | |
| value="K562", | |
| label="Cell Type", | |
| elem_id="cell-type-input" | |
| ) | |
| language_input = gr.Radio( | |
| ["en", "ko"], | |
| value="en", | |
| label="Language", | |
| elem_id="language-input" | |
| ) | |
| guidance_input = gr.Slider( | |
| minimum=1.0, | |
| maximum=10.0, | |
| value=1.0, | |
| step=0.5, | |
| label="Guidance Scale", | |
| elem_id="guidance-input" | |
| ) | |
| generate_btn = gr.Button("Generate", elem_id="generate-btn") | |
| sequence_output = gr.Textbox(label="Sequence", elem_id="sequence-output") | |
| metadata_output = gr.Textbox(label="Metadata", elem_id="metadata-output") | |
| # Main interface - the slot machine in an iframe | |
| # Escape the HTML content for srcdoc | |
| escaped_html = html.escape(SLOT_MACHINE_HTML, quote=True) | |
| iframe_html = f'<iframe srcdoc="{escaped_html}" style="width: 100%; height: 800px; border: none; display: block;"></iframe>' | |
| html_display = gr.HTML( | |
| iframe_html, | |
| elem_id="dna-frame" | |
| ) | |
| # Wire up the generation | |
| generate_btn.click( | |
| fn=app.handle_generation_request, | |
| inputs=[cell_type_input, guidance_input, language_input], | |
| outputs=[sequence_output, metadata_output] | |
| ).then( | |
| fn=None, | |
| inputs=[sequence_output, metadata_output], | |
| outputs=None, | |
| js="(seq, meta) => sendSequenceToIframe(seq, meta)" | |
| ) | |
| # Initialize model on load | |
| demo.load( | |
| fn=app.initialize_model, | |
| inputs=None, | |
| outputs=None | |
| ) | |
| return demo | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| # Parse any command line arguments | |
| import argparse | |
| parser = argparse.ArgumentParser(description="DNA-Diffusion Gradio App") | |
| parser.add_argument("--share", action="store_true", help="Create a public shareable link") | |
| parser.add_argument("--port", type=int, default=7860, help="Port to run the app on") | |
| parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to run the app on") | |
| args = parser.parse_args() | |
| # For Hugging Face Spaces deployment | |
| import os | |
| if os.getenv("SPACE_ID"): | |
| # Running on Hugging Face Spaces | |
| args.host = "0.0.0.0" | |
| args.port = 7860 | |
| args.share = False | |
| inbrowser = False | |
| else: | |
| inbrowser = True | |
| logger.info(f"Starting DNA-Diffusion Gradio app on {args.host}:{args.port}") | |
| demo.launch( | |
| share=args.share, | |
| server_name=args.host, | |
| server_port=args.port, | |
| inbrowser=inbrowser | |
| ) |