Spaces:
Build error
Build error
| import os | |
| import pandas as pd | |
| import json | |
| import gradio as gr | |
| from openai import AzureOpenAI | |
| from PyPDF2 import PdfReader | |
| from gradio.themes.base import Base | |
| from gradio.themes.utils import colors, fonts, sizes | |
| import base64 | |
| class BaseTheme(Base): | |
| def __init__( | |
| self, | |
| *, | |
| primary_hue: colors.Color | str = colors.orange, | |
| secondary_hue: colors.Color | str = colors.blue, | |
| neutral_hue: colors.Color | str = colors.gray, | |
| spacing_size: sizes.Size | str = sizes.spacing_md, | |
| radius_size: sizes.Size | str = sizes.radius_md, | |
| text_size: sizes.Size | str = sizes.text_lg, | |
| ): | |
| super().__init__( | |
| primary_hue=primary_hue, | |
| secondary_hue=secondary_hue, | |
| neutral_hue=neutral_hue, | |
| spacing_size=spacing_size, | |
| radius_size=radius_size, | |
| text_size=text_size, | |
| ) | |
| basetheme = BaseTheme() | |
| js_func = """ | |
| function refresh() { | |
| const url = new URL(window.location); | |
| if (url.searchParams.get('__theme') !== 'dark') { | |
| url.searchParams.set('__theme', 'dark'); | |
| window.location.href = url.href; | |
| } | |
| } | |
| """ | |
| # =============================== | |
| # Azure OpenAI setup | |
| # =============================== | |
| os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("AZURE_OPENAI_ENDPOINT") | |
| os.environ["AZURE_OPENAI_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY") # Replace with your actual API key | |
| client = AzureOpenAI( | |
| api_version="2023-05-15", | |
| azure_deployment="gpt-4o", # Replace with your actual model deployment name | |
| ) | |
| # =============================== | |
| # Helper Functions | |
| # =============================== | |
| def parse_field_definitions(field_text): | |
| """ | |
| Converts user-entered lines in the format: | |
| Field Name: Description | |
| into a dictionary { "Field Name": "Description", ... }. | |
| Lines without a colon are ignored or added with an empty description. | |
| """ | |
| user_fields = {} | |
| lines = field_text.split("\n") | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if ":" in line: | |
| # Split on the first colon | |
| field, description = line.split(":", 1) | |
| field = field.strip() | |
| description = description.strip() | |
| user_fields[field] = description | |
| else: | |
| # If no colon is found, treat entire line as a field with an empty description | |
| user_fields[line] = "" | |
| return user_fields | |
| def read_file_metadata(file_path): | |
| df = pd.read_csv(file_path) | |
| column_names = list(df.columns) | |
| sample_columns = column_names[:2] | |
| sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
| return column_names, sample_data | |
| def read_excel_metadata(file_path): | |
| df = pd.read_excel(file_path) | |
| column_names = list(df.columns) | |
| sample_columns = column_names[:2] | |
| sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
| return column_names, sample_data | |
| def create_column_mapping_prompt(file_metadata, user_fields): | |
| prompt = ( | |
| "You are given CSV/Excel data from different sources. The files contain columns with similar content but with different names.\n" | |
| "The user has provided the following desired fields and their descriptions:\n" | |
| f"{json.dumps(user_fields, indent=2)}\n\n" | |
| "For each file, here are the details (showing example data from the first two columns):\n\n" | |
| ) | |
| for file_path, column_names, sample_data in file_metadata: | |
| prompt += f"File: {file_path}\n" | |
| prompt += f"Columns: {column_names}\n" | |
| prompt += f"Example Data (first two columns): {sample_data}\n\n" | |
| prompt += ( | |
| "Your task is to map the existing column names from each file to the desired fields provided by the user. " | |
| "For each desired field, decide which column name in each file best represents it. " | |
| "If a field cannot be found, map it to an empty string.\n\n" | |
| "Return the mapping in JSON format with the following structure:\n" | |
| "{\n" | |
| ' "desired_field1": { "source_file1": "matched_column_name_or_empty", "source_file2": "matched_column_name_or_empty", ... },\n' | |
| ' "desired_field2": { ... },\n' | |
| " ...\n" | |
| "}\n\n" | |
| "Do not include any additional text in your response." | |
| ) | |
| return prompt | |
| def get_column_mapping(file_metadata, user_fields): | |
| column_match_prompt = create_column_mapping_prompt(file_metadata, user_fields) | |
| completion = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": column_match_prompt}], | |
| temperature=0.1, | |
| response_format={"type": "json_object"}, | |
| ) | |
| try: | |
| response_text = completion.choices[0].message.content.strip() | |
| result_mapping = json.loads(response_text) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Error parsing LLM response: {e}\n\nResponse:\n{completion.choices[0].message.content}" | |
| ) | |
| return result_mapping | |
| def merge_files_with_mapping(file_paths, user_fields): | |
| file_metadata = [] | |
| for file_path in file_paths: | |
| if file_path.lower().endswith('.csv'): | |
| columns, sample_data = read_file_metadata(file_path) | |
| elif file_path.lower().endswith(('.xlsx', '.xls')): | |
| columns, sample_data = read_excel_metadata(file_path) | |
| else: | |
| continue | |
| file_metadata.append((file_path, columns, sample_data)) | |
| # Ask the LLM for a column mapping | |
| mapping = get_column_mapping(file_metadata, user_fields) if file_metadata else {} | |
| all_data = [] | |
| for file_path in file_paths: | |
| if file_path.lower().endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| elif file_path.lower().endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(file_path) | |
| else: | |
| continue | |
| new_columns = {} | |
| for desired_field, file_mapping in mapping.items(): | |
| source_column = "" | |
| if file_path in file_mapping: | |
| source_column = file_mapping[file_path] | |
| else: | |
| base_name = os.path.basename(file_path) | |
| source_column = file_mapping.get(base_name, "") | |
| if source_column and source_column in df.columns: | |
| new_columns[source_column] = desired_field | |
| df.rename(columns=new_columns, inplace=True) | |
| all_data.append(df) | |
| if not all_data: | |
| raise ValueError("No valid CSV/Excel files to merge.") | |
| final_df = pd.concat(all_data, ignore_index=True) | |
| # Only keep columns in the order the user specified | |
| desired_columns = list(user_fields.keys()) | |
| final_df = final_df.reindex(columns=desired_columns) | |
| final_df.to_csv("merged_data.csv", index=False) | |
| return final_df | |
| def combine_all_data(file_paths, user_fields): | |
| final_df = merge_files_with_mapping(file_paths, user_fields) | |
| desired_columns = list(user_fields.keys()) | |
| final_df = final_df.reindex(columns=desired_columns) | |
| final_df.to_csv("merged_all_data.csv", index=False) | |
| absolute_path = os.path.abspath("merged_all_data.csv") | |
| return final_df,absolute_path | |
| # =============================== | |
| # Gradio Interface Function | |
| # =============================== | |
| def process_data(files, field_text): | |
| """ | |
| Main function for Gradio to handle user inputs: | |
| - files: list of CSV/Excel files | |
| - pdf_file: a single PDF file | |
| - field_text: multiline text with lines in the form: "Field Name: Description" | |
| """ | |
| # Parse the user's desired fields from multiline text | |
| user_fields = parse_field_definitions(field_text) | |
| if not user_fields: | |
| return "No valid fields found. Please use the format:\n\nField Name: Description" | |
| file_paths = [f.name for f in files] if files else [] | |
| try: | |
| final_df, absolute_path = combine_all_data(file_paths, user_fields) | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| return final_df, absolute_path | |
| with open("Frame 1.png", "rb") as logo_file: | |
| base64_logo = base64.b64encode(logo_file.read()).decode("utf-8") | |
| # =============================== | |
| # Gradio UI | |
| # =============================== | |
| with gr.Blocks(theme=basetheme,js=js_func,fill_height=True) as demo: | |
| # Add logo at the top using Base64 HTML | |
| with gr.Row(): | |
| gr.HTML( | |
| f""" | |
| <div style="display: grid; grid-template-columns: 1fr 2fr 1fr; align-items: center;"> | |
| <div style="justify-self: start;"> | |
| <img src="data:image/png;base64,{base64_logo}" alt="Logo" style="width: 150px; height: auto;"> | |
| </div> | |
| <div style="justify-self: center;"> | |
| <h2 style="margin: 0; text-align: center;">AI Data Transformation with User-Selected Fields</h2> | |
| </div> | |
| <div></div> | |
| </div> | |
| """ | |
| ) | |
| gr.Interface( | |
| fn=process_data, | |
| inputs=[ | |
| gr.File(label="Upload CSV/Excel files", file_count="multiple",file_types=[".csv", ".xlsx", ".xls"]), | |
| gr.Textbox( | |
| label="Desired Fields (one per line, use 'Field Name: Description' format)", | |
| placeholder="Example:\nName: Full name\nDOB: Date of birth\nAddress: Full address\n", | |
| lines=6, | |
| ), | |
| ], | |
| outputs=[gr.Dataframe(label="Final Merged Data"),gr.File(label="Download CSV")], | |
| description=( | |
| "Upload one or more CSV/Excel files and enter your desired fields below. " | |
| "Type each field on a new line in the format:\n" | |
| "'Field Name: Description'\n\n" | |
| "The AI will automatically map and merge columns from your files to these fields." | |
| ), | |
| ) | |
| if __name__ == "__main__": | |
| # Launch the Gradio app | |
| demo.launch() | |