#pm_agent.py import json import re import logging from langchain_openai import ChatOpenAI from app.schemas.spec import ProjectSpec from app.llm.prompts import PM_AGENT_PROMPT from app.core.config import settings logger = logging.getLogger(__name__) async def run_pm_agent(user_input: str, history: list | None = None) -> ProjectSpec: """Agent PM qui remplit ProjectSpec en analysant le besoin utilisateur.""" llm = ChatOpenAI( base_url=settings.llm_base_url, api_key=settings.llm_api_key, model="local-model", temperature=0.3, ) messages = [ {"role": "system", "content": PM_AGENT_PROMPT}, ] if history: messages.extend(history) messages.append({ "role": "user", "content": f"""En te basant sur TOUT l'historique de notre conversation et sur cette dernière précision, génère le JSON ProjectSpec valide et complètement rempli. DERNIÈRE PRÉCISION / INPUT UTILISATEUR : {user_input} IMPORTANT : Tu dois impérativement combiner les contraintes énoncées précédemment (comme le langage de programmation choisi, l'OS, etc.) avec cette nouvelle information pour enrichir la spécification existante. Réponds UNIQUEMENT avec du JSON valide. Aucun texte avant le JSON, aucun texte après le JSON.""" }) try: response = await llm.ainvoke(messages) raw_response = response.content if hasattr(response, 'content') else str(response) logger.debug(f"[PM Agent] Réponse brute: {raw_response}") parsed_spec = _extract_json_robust(raw_response) project_spec = ProjectSpec(**parsed_spec) logger.info(f"[PM Agent] ✅ Tentative réussie (parsing JSON manuel)") project_spec = _validate_and_sanitize(project_spec, user_input) return project_spec except Exception as e2: logger.error(f"[PM Agent] ❌ Tentative échouée: {type(e2).__name__}: {str(e2)}") logger.info("[PM Agent] Fallback: Génération automatique du spec") return _generate_fallback_spec(user_input) def _extract_json_robust(response_text: str) -> dict: """ Extrait et parse le JSON depuis la réponse du LLM. Robuste pour llama.cpp qui ajoute souvent du texte avant/après le JSON. """ logger.debug(f"[JSON Extract] Réponse brute: {response_text}") # Stratégie 1: Chercher un bloc JSON entre ```json et ``` json_match = re.search(r'```json\s*(.*?)\s*```', response_text, re.DOTALL) if json_match: json_str = json_match.group(1).strip() logger.debug(f"[JSON Extract] ✓ Trouvé via ```json block") return json.loads(json_str) # Stratégie 2: Chercher un bloc JSON entre ``` et ``` (sans spécifier json) json_match = re.search(r'```\s*(.*?)\s*```', response_text, re.DOTALL) if json_match: json_str = json_match.group(1).strip() logger.debug(f"[JSON Extract] ✓ Trouvé via ``` block") try: return json.loads(json_str) except json.JSONDecodeError: logger.debug(f"[JSON Extract] ✗ Code block trouvé mais pas JSON valide") # Stratégie 3: Chercher directement un objet JSON { ... } en étant très prudent first_brace = response_text.find('{') last_brace = response_text.rfind('}') if first_brace != -1 and last_brace != -1 and last_brace > first_brace: json_str = response_text[first_brace:last_brace+1].strip() logger.debug(f"[JSON Extract] Tentative extraction (longueur: {len(json_str)})") try: parsed = json.loads(json_str) logger.debug(f"[JSON Extract] ✓ Trouvé via extraction") return parsed except json.JSONDecodeError as e: logger.debug(f"[JSON Extract] ✗ Extraction invalide: {e}") # Stratégie 4: Parser tout le texte comme JSON (dernier recours) try: parsed = json.loads(response_text.strip()) logger.debug(f"[JSON Extract] ✓ Trouvé via parsing direct") return parsed except json.JSONDecodeError: logger.debug(f"[JSON Extract] ✗ Parsing direct échoué") # Aucune stratégie n'a fonctionné logger.error(f"[JSON Extract] ❌ Impossible d'extraire JSON valide") logger.error(f"[JSON Extract] Réponse brute complète:\n{response_text}") raise ValueError(f"Impossible d'extraire un JSON valide de la réponse du LLM.\nRéponse: {response_text}") def _validate_and_sanitize(spec: ProjectSpec, user_input: str) -> ProjectSpec: """ Fait confiance au LLM pour la logique métier. Se contente de s'assurer de la cohérence technique minimale. """ logger.debug(f"[Sanitize] Validation du spec généré pour : {spec.title}") # 1. Sécurité minimale sur les chaînes de texte vides if not spec.title or spec.title.lower() in ["null", "none", ""]: spec.title = "Automation Script" if not spec.description: spec.description = f"Script d'automatisation basé sur : {user_input[:50]}..." # 2. Gestion de la complétude : if not spec.is_complete or (spec.clarifying_question and spec.clarifying_question.strip()): spec.is_complete = False if not spec.clarifying_question: spec.clarifying_question = "Pourriez-vous apporter plus de précisions sur les actions que doit effectuer ce script ?" logger.info(f"[Sanitize] Le besoin est qualifié d'INCOMPLET par l'agent PM.") else: spec.is_complete = True spec.clarifying_question = None logger.info(f"[Sanitize] Le besoin est qualifié de COMPLET par l'agent PM.") return spec def _generate_fallback_spec(user_input: str) -> ProjectSpec: """Génère un ProjectSpec par défaut en cas d'erreur complète.""" logger.warning(f"[Fallback] Génération d'un spec par défaut pour: {user_input}") return ProjectSpec( title="Automation Script", description=f"Automatisation du besoin utilisateur: {user_input}", requirements=[ f"Exécuter l'action demandée: {user_input}", "Gérer les erreurs", "Générer des logs" ], constraints=["PEP 8 strict"], language="Python", target_stack="", io_config={ "has_inputs": False, "input_type": "none", "input_paths_or_sources": [], "has_outputs": False, "output_type": "log_only", "output_formats": [] }, auth_config={ "requires_auth": False, "auth_method": "none", "target_tools_and_apis": [] }, env_config={ "target_os": "cross_platform", "language_version": "3.11", "critical_dependencies": ["logging"] }, error_handling_strategy="log_and_continue", is_complete=False, clarifying_question="Pourriez-vous donner plus de détails sur votre demande ?" )