#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AI智能体问答API接口 为AI智能体提供简单的问答查询接口 """ import sqlite3 import json import re from typing import Optional, List class AIAgentAPI: """AI智能体问答API""" def __init__(self, db_path: str = '/root/老业务网站8.30/ai_agent_qa.db'): self.db_path = db_path def get_answer(self, question: str, language: str = 'zh') -> str: """ 根据问题获取答案 Args: question: 用户问题 language: 语言代码 ('zh' 或 'en') Returns: 答案字符串 """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # 清理问题文本 clean_question = self._clean_question(question) # 1. 尝试精确匹配问题 cursor.execute(''' SELECT answer FROM qa_pairs WHERE question = ? AND language = ? ''', (clean_question, language)) result = cursor.fetchone() if result: conn.close() return result['answer'] # 2. 尝试模糊匹配问题 cursor.execute(''' SELECT answer FROM qa_pairs WHERE question LIKE ? AND language = ? ORDER BY LENGTH(question) ASC LIMIT 1 ''', (f'%{clean_question}%', language)) result = cursor.fetchone() if result: conn.close() return result['answer'] # 3. 尝试关键词匹配 keywords = self._extract_keywords(clean_question) for keyword in keywords: cursor.execute(''' SELECT answer FROM qa_pairs WHERE (question LIKE ? OR answer LIKE ? OR keywords LIKE ?) AND language = ? ORDER BY LENGTH(question) ASC LIMIT 1 ''', (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', language)) result = cursor.fetchone() if result: conn.close() return result['answer'] # 4. 返回默认回答 conn.close() return self._get_default_answer(language) except Exception as e: return f"查询过程中出现错误: {str(e)}" def get_related_questions(self, question: str, language: str = 'zh', limit: int = 3) -> List[str]: """ 获取相关问题 Args: question: 用户问题 language: 语言代码 limit: 返回数量限制 Returns: 相关问题列表 """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() clean_question = self._clean_question(question) keywords = self._extract_keywords(clean_question) # 构建查询条件 conditions = [] params = [] for keyword in keywords: conditions.append('(question LIKE ? OR answer LIKE ? OR keywords LIKE ?)') params.extend([f'%{keyword}%', f'%{keyword}%', f'%{keyword}%']) if conditions: where_clause = ' OR '.join(conditions) query = f''' SELECT DISTINCT question FROM qa_pairs WHERE {where_clause} AND language = ? ORDER BY LENGTH(question) ASC LIMIT ? ''' params.extend([language, limit]) else: query = ''' SELECT DISTINCT question FROM qa_pairs WHERE language = ? ORDER BY RANDOM() LIMIT ? ''' params = [language, limit] cursor.execute(query, params) results = cursor.fetchall() conn.close() return [row['question'] for row in results] except Exception as e: return [f"查询相关问题失败: {str(e)}"] def _clean_question(self, question: str) -> str: """清理问题文本""" # 移除多余空格 question = re.sub(r'\s+', ' ', question.strip()) # 移除标点符号 question = re.sub(r'[^\w\s\u4e00-\u9fff]', '', question) return question def _extract_keywords(self, question: str) -> List[str]: """提取关键词""" # 移除停用词 stop_words = {'的', '了', '在', '是', '我', '你', '他', '她', '它', '们', '这', '那', '什么', '怎么', '如何', '为什么', 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} # 分割文本 words = re.findall(r'\b\w+\b', question.lower()) # 过滤停用词和短词 keywords = [word for word in words if word not in stop_words and len(word) > 1] return keywords[:5] # 最多返回5个关键词 def _get_default_answer(self, language: str) -> str: """获取默认回答""" if language == 'en': return "I'm sorry, I couldn't find a specific answer to your question. Please try rephrasing your question or contact us through our website for more detailed assistance." else: return "抱歉,我没有找到相关问题的具体答案。您可以尝试重新表述问题,或者通过网站联系表单获取更详细的帮助。" def get_categories(self, language: str = 'zh') -> List[str]: """获取所有分类""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT DISTINCT category FROM qa_pairs WHERE language = ? ORDER BY category ''', (language,)) results = cursor.fetchall() conn.close() return [row[0] for row in results] except Exception as e: return [f"获取分类失败: {str(e)}"] def get_qa_by_category(self, category: str, language: str = 'zh', limit: int = 5) -> List[dict]: """根据分类获取问答""" try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT question, answer FROM qa_pairs WHERE category = ? AND language = ? ORDER BY id LIMIT ? ''', (category, language, limit)) results = cursor.fetchall() conn.close() return [dict(row) for row in results] except Exception as e: return [{"question": "查询失败", "answer": str(e)}] # 全局API实例 ai_api = AIAgentAPI() def ask_question(question: str, language: str = 'zh') -> str: """ 简单的问答接口 Args: question: 用户问题 language: 语言代码 ('zh' 或 'en') Returns: 答案字符串 """ return ai_api.get_answer(question, language) def get_related_questions(question: str, language: str = 'zh', limit: int = 3) -> List[str]: """ 获取相关问题 Args: question: 用户问题 language: 语言代码 limit: 返回数量限制 Returns: 相关问题列表 """ return ai_api.get_related_questions(question, language, limit) def get_categories(language: str = 'zh') -> List[str]: """ 获取所有分类 Args: language: 语言代码 Returns: 分类列表 """ return ai_api.get_categories(language) # 测试函数 def test_api(): """测试API功能""" print("AI智能体问答API测试") print("=" * 40) # 测试问题 test_questions = [ "你们公司是做什么的?", "What does your company do?", "营销功能包括什么?", "有成功案例吗?", "如何联系你们?" ] for question in test_questions: print(f"问题: {question}") # 判断语言 language = 'en' if any(word in question.lower() for word in ['what', 'how', 'do', 'does', 'is', 'are']) else 'zh' answer = ask_question(question, language) print(f"答案: {answer[:100]}...") # 获取相关问题 related = get_related_questions(question, language, 2) print(f"相关问题: {', '.join(related)}") print("-" * 40) if __name__ == "__main__": test_api()