Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import spacy | |
| import nltk | |
| from nltk.corpus import wordnet | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM | |
| from textblob import TextBlob | |
| import requests | |
| import json | |
| from pathlib import Path | |
| import torch | |
| class AdvancedTextAugmenter: | |
| def __init__(self): | |
| self.setup_dependencies() | |
| self.setup_models() | |
| def setup_dependencies(self): | |
| """Configure les dépendances nécessaires""" | |
| try: | |
| # Télécharge les ressources NLTK nécessaires | |
| nltk.download('wordnet', quiet=True) | |
| nltk.download('averaged_perceptron_tagger', quiet=True) | |
| nltk.download('punkt', quiet=True) | |
| # Charge spaCy pour le français | |
| try: | |
| self.nlp = spacy.load("fr_core_news_sm") | |
| except OSError: | |
| print("Modèle spaCy français non trouvé. Installation...") | |
| os.system("python -m spacy download fr_core_news_sm") | |
| self.nlp = spacy.load("fr_core_news_sm") | |
| except Exception as e: | |
| print(f"Erreur lors de la configuration: {e}") | |
| print("Installez les dépendances avec: pip install spacy nltk transformers textblob torch") | |
| def setup_models(self): | |
| """Configure les modèles de transformation""" | |
| try: | |
| # Paraphraseur basé sur T5 | |
| self.paraphraser = pipeline( | |
| "text2text-generation", | |
| model="plguillou/t5-base-fr-sum-cnndm", | |
| tokenizer="plguillou/t5-base-fr-sum-cnndm", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| # Modèle de traduction pour back-translation | |
| self.translator_fr_en = pipeline( | |
| "translation_fr_to_en", | |
| model="Helsinki-NLP/opus-mt-fr-en", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| self.translator_en_fr = pipeline( | |
| "translation_en_to_fr", | |
| model="Helsinki-NLP/opus-mt-en-fr", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| except Exception as e: | |
| print(f"Erreur lors du chargement des modèles: {e}") | |
| print("Utilisation de méthodes alternatives...") | |
| self.paraphraser = None | |
| self.translator_fr_en = None | |
| self.translator_en_fr = None | |
| def get_wordnet_synonyms(self, word, pos_tag): | |
| """Récupère les synonymes via WordNet""" | |
| synonyms = set() | |
| # Convertit les tags POS de NLTK vers WordNet | |
| wordnet_pos = self.get_wordnet_pos(pos_tag) | |
| if wordnet_pos: | |
| for syn in wordnet.synsets(word, pos=wordnet_pos, lang='fra'): | |
| for lemma in syn.lemmas(lang='fra'): | |
| synonym = lemma.name().replace('_', ' ') | |
| if synonym.lower() != word.lower(): | |
| synonyms.add(synonym) | |
| return list(synonyms) | |
| def get_wordnet_pos(self, treebank_tag): | |
| """Convertit les tags POS vers le format WordNet""" | |
| if treebank_tag.startswith('J'): | |
| return wordnet.ADJ | |
| elif treebank_tag.startswith('V'): | |
| return wordnet.VERB | |
| elif treebank_tag.startswith('N'): | |
| return wordnet.NOUN | |
| elif treebank_tag.startswith('R'): | |
| return wordnet.ADV | |
| else: | |
| return None | |
| def synonym_replacement(self, text, replace_ratio=0.3): | |
| """Méthode 1: Remplacement par synonymes via WordNet et spaCy - CORRIGÉE""" | |
| doc = self.nlp(text) | |
| result_tokens = [] | |
| for token in doc: | |
| # Préserve les espaces avant le token | |
| if token.i > 0: | |
| # Ajoute les espaces entre les tokens | |
| prev_token = doc[token.i - 1] | |
| spaces_between = text[prev_token.idx + len(prev_token.text):token.idx] | |
| result_tokens.append(spaces_between) | |
| if (not token.is_stop and not token.is_punct and | |
| token.pos_ in ['NOUN', 'ADJ', 'VERB', 'ADV'] and | |
| random.random() < replace_ratio): | |
| # Essaie d'abord avec WordNet | |
| synonyms = self.get_wordnet_synonyms(token.lemma_, token.tag_) | |
| if synonyms: | |
| synonym = random.choice(synonyms) | |
| # Préserve la casse | |
| if token.text[0].isupper(): | |
| synonym = synonym.capitalize() | |
| result_tokens.append(synonym) | |
| else: | |
| result_tokens.append(token.text) | |
| else: | |
| result_tokens.append(token.text) | |
| # CORRECTION MAJEURE: Simple jointure avec reconstruction propre | |
| return ''.join(result_tokens) | |
| def back_translation(self, text): | |
| """Méthode 2: Back-translation FR->EN->FR""" | |
| if not self.translator_fr_en or not self.translator_en_fr: | |
| return self.fallback_paraphrase(text) | |
| try: | |
| # Traduit en anglais | |
| english = self.translator_fr_en(text, max_length=512)[0]['translation_text'] | |
| # Retraduit en français | |
| back_translated = self.translator_en_fr(english, max_length=512)[0]['translation_text'] | |
| return back_translated | |
| except Exception as e: | |
| print(f"Erreur back-translation: {e}") | |
| return self.fallback_paraphrase(text) | |
| def neural_paraphrasing(self, text): | |
| """Méthode 3: Paraphrase neuronale avec T5""" | |
| if not self.paraphraser: | |
| return self.fallback_paraphrase(text) | |
| try: | |
| # Préfixe pour la paraphrase | |
| input_text = f"paraphrase: {text}" | |
| result = self.paraphraser( | |
| input_text, | |
| max_length=len(text.split()) * 2, | |
| num_return_sequences=1, | |
| temperature=0.8, | |
| do_sample=True | |
| ) | |
| return result[0]['generated_text'] | |
| except Exception as e: | |
| print(f"Erreur paraphrase neuronale: {e}") | |
| return self.fallback_paraphrase(text) | |
| def fallback_paraphrase(self, text): | |
| """Méthode de secours utilisant des transformations linguistiques - CORRIGÉE""" | |
| doc = self.nlp(text) | |
| # Réorganise les phrases | |
| sentences = [sent.text.strip() for sent in doc.sents] | |
| paraphrased_sentences = [] | |
| for sentence in sentences: | |
| sent_doc = self.nlp(sentence) | |
| # Transformations syntaxiques simples avec préservation des espaces | |
| result_tokens = [] | |
| for token in sent_doc: | |
| # Préserve les espaces | |
| if token.i > 0: | |
| prev_token = sent_doc[token.i - 1] | |
| spaces_between = sentence[prev_token.idx + len(prev_token.text):token.idx] | |
| result_tokens.append(spaces_between) | |
| if token.pos_ == 'ADP': # Prépositions | |
| prep_alternatives = { | |
| 'dans': 'à travers', 'sur': 'au-dessus de', | |
| 'avec': 'en compagnie de', 'pour': 'en faveur de' | |
| } | |
| result_tokens.append(prep_alternatives.get(token.text.lower(), token.text)) | |
| else: | |
| result_tokens.append(token.text) | |
| paraphrased_sentences.append(''.join(result_tokens)) | |
| return ' '.join(paraphrased_sentences) | |
| def contextual_word_insertion(self, text, insert_ratio=0.1): | |
| """Méthode 4: Insertion contextuelle de mots - CORRIGÉE""" | |
| doc = self.nlp(text) | |
| result = "" | |
| adverb_intensifiers = ['vraiment', 'particulièrement', 'extrêmement', 'assez', 'plutôt'] | |
| conjunctions = ['également', 'aussi', 'de plus', 'par ailleurs'] | |
| for i, token in enumerate(doc): | |
| # Ajoute les espaces avant le token si nécessaire | |
| if token.i > 0: | |
| prev_token = doc[token.i - 1] | |
| spaces_between = text[prev_token.idx + len(prev_token.text):token.idx] | |
| result += spaces_between | |
| # Insert adverbs before adjectives | |
| if (token.pos_ == 'ADJ' and random.random() < insert_ratio): | |
| result += random.choice(adverb_intensifiers) + " " | |
| result += token.text | |
| # Insert conjunctions at sentence boundaries | |
| if (token.text in ['.', '!', '?'] and i < len(doc) - 1 and | |
| random.random() < insert_ratio): | |
| result += " " + random.choice(conjunctions) + "," | |
| return result | |
| def process_single_file(self, file_path, output_counter): | |
| """Traite un seul fichier et génère ses variations""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| original_text = f.read().strip() | |
| if not original_text: | |
| return output_counter | |
| print(f"Traitement de: {file_path.name}") | |
| # Génère la première variation: Synonymes + insertion contextuelle | |
| print(" → Génération variation 1 (synonymes + insertion)...") | |
| variation_1 = self.synonym_replacement(original_text) | |
| variation_1 = self.contextual_word_insertion(variation_1) | |
| # Génère la deuxième variation: Back-translation OU paraphrase neuronale | |
| print(" → Génération variation 2 (back-translation/paraphrase)...") | |
| if random.choice([True, False]): | |
| variation_2 = self.back_translation(original_text) | |
| else: | |
| variation_2 = self.neural_paraphrasing(original_text) | |
| # Sauvegarde les variations | |
| output_file_1 = f"template{output_counter}.txt" | |
| with open(output_file_1, 'w', encoding='utf-8') as f: | |
| f.write(variation_1) | |
| output_file_2 = f"template{output_counter + 1}.txt" | |
| with open(output_file_2, 'w', encoding='utf-8') as f: | |
| f.write(variation_2) | |
| print(f" ✓ Créé: {output_file_1}, {output_file_2}") | |
| return output_counter + 2 | |
| except Exception as e: | |
| print(f"Erreur lors du traitement de {file_path}: {e}") | |
| return output_counter | |
| def augment_dataset(self, input_directory=".", output_prefix="template", start_number=419): | |
| """Traite tous les fichiers texte du répertoire""" | |
| print("=== AUGMENTATION AVANCÉE DE DONNÉES TEXTUELLES ===\n") | |
| # Trouve tous les fichiers .txt | |
| text_files = sorted(list(Path(input_directory).glob("*.txt"))) | |
| if not text_files: | |
| print("❌ Aucun fichier .txt trouvé dans le répertoire.") | |
| return | |
| print(f"📁 Trouvé {len(text_files)} fichiers à traiter...") | |
| print(f"🚀 Démarrage de la génération à partir de {output_prefix}{start_number}.txt\n") | |
| output_counter = start_number | |
| processed_files = 0 | |
| for file_path in text_files: | |
| output_counter = self.process_single_file(file_path, output_counter) | |
| processed_files += 1 | |
| if processed_files % 50 == 0: | |
| print(f"📊 Progression: {processed_files}/{len(text_files)} fichiers traités\n") | |
| total_generated = output_counter - start_number | |
| print(f"\n🎉 TERMINÉ!") | |
| print(f"📈 Statistiques:") | |
| print(f" • Fichiers originaux: {len(text_files)}") | |
| print(f" • Nouveaux fichiers générés: {total_generated}") | |
| print(f" • Total final: {len(text_files) + total_generated}") | |
| print(f" • Facteur de multiplication: x{(len(text_files) + total_generated) / len(text_files):.1f}") | |
| # Installation automatique des dépendances | |
| def install_dependencies(): | |
| """Installe les dépendances nécessaires""" | |
| import subprocess | |
| import sys | |
| packages = [ | |
| "spacy", "nltk", "transformers", "textblob", "torch", "sentencepiece" | |
| ] | |
| for package in packages: | |
| try: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", package]) | |
| except: | |
| print(f"Impossible d'installer {package}") | |
| # Utilisation | |
| if __name__ == "__main__": | |
| print("Vérification des dépendances...") | |
| try: | |
| augmenter = AdvancedTextAugmenter() | |
| # Lance l'augmentation | |
| augmenter.augment_dataset( | |
| input_directory="data_txt", # Répertoire courant | |
| output_prefix="template", | |
| start_number=419 | |
| ) | |
| except ImportError as e: | |
| print(f"Dépendances manquantes: {e}") | |
| print("Installation automatique...") | |
| install_dependencies() | |
| print("Relancez le script après l'installation.") |