#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 优化版AI智能聊天机器人 1. 修复语言混入问题 - 使用对应语言的系统提示词 2. 优化性能 - 减少API调用, 增加缓存, 优化数据库查询 3. 使用豆包Doubao-Seed-1.6-Flash-250828模型 """ import sqlite3 import json import re from typing import List, Dict, Optional, Tuple import requests import logging from datetime import datetime import hashlib from functools import lru_cache import time # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DoubaoAPI: """豆包大模型API调用器 - 优化版""" def __init__(self, api_key: str, model_name: str = "doubao-seed-1-6-flash-250828"): self.api_key = api_key self.model_name = model_name self.api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions" self.request_timeout = 15 # 缩短超时时间,快速失败 self.cache = {} # 简单缓存 def chat(self, messages: List[Dict], temperature: float = 0.5, max_tokens: int = 1000, use_cache: bool = True) -> str: """调用豆包API进行对话 - 优化版""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } # 创建缓存键(用最后一条用户消息作为缓存键) if use_cache and messages: last_user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), None) if last_user_msg: cache_key = hashlib.md5(last_user_msg.encode()).hexdigest() if cache_key in self.cache: logger.info(f"使用缓存结果") return self.cache[cache_key] data = { "model": self.model_name, "messages": messages, "temperature": temperature, # 降低温度,减少随机性 "max_completion_tokens": max_tokens, # 减少令牌限制,加速输出 "top_p": 0.8 # 添加 top_p 采样,加速响应 } try: # 禁用代理,缩短超时 proxies = { 'http': None, 'https': None } start_time = time.time() response = requests.post( self.api_url, headers=headers, json=data, timeout=self.request_timeout, # 更短的超时 proxies=proxies ) elapsed_time = time.time() - start_time logger.info(f"API 调用耗时: {elapsed_time:.2f}秒") response.raise_for_status() result = response.json() content = result["choices"][0]["message"]["content"] # 缓存结果 if use_cache and cache_key: self.cache[cache_key] = content # 防止缓存过大 if len(self.cache) > 100: oldest_key = next(iter(self.cache)) del self.cache[oldest_key] return content except requests.Timeout: logger.error("API 调用超时") return "抱歉,响应有点慢,请稍后重试。" except Exception as e: logger.error(f"豆包API调用失败: {e}") return "抱歉,我现在无法回答您的问题,请稍后再试。" class QADatabase: """问答数据库管理器 - 优化版""" def __init__(self, db_path: str = '/root/老业务网站8.30/backend/ai_agent_qa.db'): self.db_path = db_path self.conn = None self._init_db() def _init_db(self): """初始化数据库连接池""" try: self.conn = sqlite3.connect(self.db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row except Exception as e: logger.error(f"数据库连接失败: {e}") def search_qa(self, query: str, language: str = 'zh', limit: int = 3) -> List[Dict]: """搜索相关问答 - 优化版(只进行必要的查询)""" try: cursor = self.conn.cursor() results = [] # 只进行一次查询:精确 + 模糊 + 关键词匹配(合并查询) cursor.execute( """SELECT question, answer, category FROM qa_pairs WHERE language = ? AND ( question = ? OR question LIKE ? OR keywords LIKE ? ) LIMIT ?""", (language, query, f'%{query}%', f'%{query}%', limit) ) results = [dict(row) for row in cursor.fetchall()] if not results: logger.debug(f"未找到匹配的问答 - 查询: {query}, 语言: {language}") return results except Exception as e: logger.error(f"数据库查询失败: {e}") return [] class LanguageDetector: """语言检测器 - 优化版""" LANGUAGE_PATTERNS = { 'zh': {'min_ratio': 0.3, 'chars': r'[\u4e00-\u9fff]'}, # 中文 'en': {'min_ratio': 0.5, 'chars': r'[a-zA-Z]'}, # 英文 'th': {'min_ratio': 0.3, 'chars': r'[\u0e00-\u0e7f]'}, # 泰文 'vi': {'min_ratio': 0.2, 'chars': r'[àáảãạăằắẳẵặâầấẩẫậèéẻẽẹêềếểễệìíỉĩịòóỏõọôồốổỗộơờớởỡợùúủũụưừứửữựỳýỷỹỵđ]'} # 越南文 } @classmethod def detect_language(cls, text: str) -> str: """快速检测语言""" text = text.strip() if not text: return 'zh' # 快速检测(只检查一次) scores = {} for lang, pattern in cls.LANGUAGE_PATTERNS.items(): matches = re.findall(pattern['chars'], text) ratio = len(matches) / max(len(text), 1) if ratio >= pattern['min_ratio']: scores[lang] = ratio if scores: detected_lang = max(scores.items(), key=lambda x: x[1])[0] logger.debug(f"检测语言: {detected_lang} (文本: {text[:20]}...)") return detected_lang return 'zh' # 默认中文 class IntentClassifier: """意图分类器""" INTENT_KEYWORDS = { 'greeting': { 'zh': ['你好', '怎么样', '问候'], 'en': ['hello', 'hi', 'hey'], 'th': ['สวัสดี', 'ท่านผู้'], 'vi': ['xin chào', 'bạn khỏe'] }, 'company_info': { 'zh': ['公司', '企业', '关于'], 'en': ['company', 'business', 'about'], 'th': ['บริษัท', 'องค์กร'], 'vi': ['công ty', 'tổ chức'] }, 'service_inquiry': { 'zh': ['服务', '产品', '功能'], 'en': ['service', 'product', 'solution'], 'th': ['บริการ', 'ผลิตภัณฑ์'], 'vi': ['dịch vụ', 'sản phẩm'] } } @classmethod def classify_intent(cls, text: str, language: str) -> str: """分类意图""" text_lower = text.lower() for intent, lang_keywords in cls.INTENT_KEYWORDS.items(): keywords = lang_keywords.get(language, []) for keyword in keywords: if keyword.lower() in text_lower: return intent return 'general' class ConversationMemory: """对话记忆管理器 - 优化版(简化版本)""" def __init__(self, max_history: int = 6): # 只保留最近3轮对话 self.max_history = max_history self.history = [] def add_message(self, role: str, content: str): """添加消息""" self.history.append({"role": role, "content": content}) # 只保持有限的历史 if len(self.history) > self.max_history: self.history = self.history[-self.max_history:] def get_history(self) -> List[Dict]: """获取对话历史""" return self.history def clear(self): """清空历史""" self.history = [] class AIAgent: """AI智能体 - 优化版""" def __init__(self, api_key: str): self.doubao_api = DoubaoAPI(api_key) self.qa_db = QADatabase() self.language_detector = LanguageDetector() self.intent_classifier = IntentClassifier() self.memory = ConversationMemory() # 多语言系统提示词 - 使用对应语言,避免混入其他语言 self.system_prompts = { 'zh': """你是一个友好专业的客服代表。理解客户问题后,用自己的话自然地回答,不要照搬。 回答原则: - 像朋友交谈一样自然亲切 - 不要出现"数据库显示"或"根据资料" - 信息多时用数字标题(1、2、3)组织 - 总长度控制在150字以内 示例: ❌ 不好:"我们提供三大产品" ✅ 好的:"我们有三大产品可以帮你" """, 'en': """You are a friendly professional customer service representative. After understanding customer questions, answer in your own words naturally. Do NOT copy database content. Principles: - Chat naturally and warmly like talking to a friend - Don't say "database shows" or "according to information" - Use numbered points (1, 2, 3) if there's much information - Keep total under 150 words """, 'th': """คุณเป็นพนักงานบริการลูกค้าที่เป็นมิตรและมืออาชีพ หลังจากเข้าใจคำถามลูกค้า ให้ตอบด้วยคำพูดของคุณเองอย่างเป็นธรรมชาติ อย่าคัดลอกเนื้อหาจากฐานข้อมูล หลักการ: - สนทนาอย่างเป็นธรรมชาติและอบอุ่นเหมือนพูดคุยกับเพื่อน - อย่าพูดว่า "ฐานข้อมูลแสดง" หรือ "ตามข้อมูล" - ใช้ลำดับเลข (1, 2, 3) หากมีข้อมูลมาก - เก็บไว้ภายใต้ 150 คำ """, 'vi': """Bạn là một đại diện dịch vụ khách hàng thân thiện và chuyên nghiệp. Sau khi hiểu câu hỏi của khách hàng, hãy trả lời bằng cách của riêng bạn một cách tự nhiên. KHÔNG sao chép nội dung cơ sở dữ liệu. Nguyên tắc: - Trò chuyện tự nhiên và ấm áp như nói chuyện với bạn bè - Đừng nói "cơ sở dữ liệu cho thấy" hoặc "theo thông tin" - Sử dụng điểm đánh số (1, 2, 3) nếu có nhiều thông tin - Giữ dưới 150 từ """ } def chat(self, user_input: str) -> Dict: """处理聊天 - 优化版""" try: start_time = time.time() # 1. 快速检测语言 language = self.language_detector.detect_language(user_input) # 2. 分类意图 intent = self.intent_classifier.classify_intent(user_input, language) # 3. 搜索相关问答(只查询一次) qa_results = self.qa_db.search_qa(user_input, language, limit=2) # 减少到2个 # 4. 构建消息 messages = self._build_messages(user_input, qa_results, language) # 5. 调用API(使用缓存) response = self.doubao_api.chat(messages, temperature=0.5, max_tokens=800, use_cache=True) # 6. 保存对话 self.memory.add_message("user", user_input) self.memory.add_message("assistant", response) elapsed = time.time() - start_time logger.info(f"完整处理耗时: {elapsed:.2f}秒") return { "response": response, "language": language, "intent": intent, "context_used": len(qa_results) > 0, "related_qa": qa_results, "timestamp": datetime.now().isoformat(), "elapsed_time": elapsed } except Exception as e: logger.error(f"聊天处理失败: {e}") return { "response": self._get_error_message(language if 'language' in locals() else 'zh'), "language": language if 'language' in locals() else 'zh', "intent": "error", "context_used": False, "related_qa": [], "timestamp": datetime.now().isoformat() } def _build_messages(self, user_input: str, qa_results: List[Dict], language: str) -> List[Dict]: """构建消息列表 - 使用对应语言的提示词""" messages = [] # 系统消息(使用对应语言) system_prompt = self.system_prompts.get(language, self.system_prompts['zh']) messages.append({"role": "system", "content": system_prompt}) # 添加上下文(使用对应语言的说明) if qa_results: context_templates = { 'zh': "【相关信息】\n", 'en': "【Related Information】\n", 'th': "【ข้อมูลที่เกี่ยวข้อง】\n", 'vi': "【Thông Tin Liên Quan】\n" } context = context_templates.get(language, context_templates['zh']) for qa in qa_results: context += f"Q: {qa['question']}\nA: {qa['answer']}\n\n" messages.append({"role": "system", "content": context}) # 添加最近的对话历史(只保持最近1轮) history = self.memory.get_history() for msg in history[-2:]: # 只保留最后1轮对话 messages.append({"role": msg["role"], "content": msg["content"]}) # 用户输入 messages.append({"role": "user", "content": user_input}) return messages def _get_error_message(self, language: str) -> str: """获取错误消息""" error_messages = { 'zh': "抱歉,我现在遇到技术问题。请稍后重试或通过网站联系我们。", 'en': "Sorry, I'm experiencing technical issues. Please try again later or contact us through the website.", 'th': "ขออภัยที่ฉันมีปัญหาด้านเทคนิค โปรดลองใหม่ในภายหลังหรือติดต่อเราผ่านเว็บไซต์", 'vi': "Xin lỗi, tôi đang gặp sự cố kỹ thuật. Vui lòng thử lại sau hoặc liên hệ với chúng tôi qua trang web." } return error_messages.get(language, error_messages['en']) def reset_conversation(self): """重置对话""" self.memory.clear() # ==================== HTTP 服务器 ==================== import http.server import socketserver from threading import Thread class ChatbotHandler(http.server.BaseHTTPRequestHandler): """聊天机器人HTTP处理器""" def do_POST(self): """处理POST请求""" if self.path == '/api/chat': self._handle_chat() elif self.path == '/api/reset': self._handle_reset() else: self._send_error(404, "Not Found") def do_GET(self): """处理GET请求""" if self.path == '/api/health': self._handle_health() else: self._send_error(404, "Not Found") def _handle_chat(self): """处理聊天请求""" try: content_length = int(self.headers.get('Content-Length', 0)) if content_length == 0: self._send_json_response({"error": "Empty request", "success": False}, 400) return post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) user_input = data.get('message', '').strip() if not user_input: self._send_json_response({"error": "Message cannot be empty", "success": False}, 400) return # 处理聊天 result = ai_agent.chat(user_input) response_data = { "success": True, "data": { "response": result["response"], "language": result["language"], "intent": result["intent"], "context_used": result["context_used"], "related_questions": [qa["question"] for qa in result["related_qa"]], "elapsed_time": result.get("elapsed_time", 0), "timestamp": result["timestamp"] } } self._send_json_response(response_data) except Exception as e: logger.error(f"处理聊天请求失败: {e}") self._send_json_response({"error": "Server error", "success": False}, 500) def _handle_reset(self): """处理重置请求""" try: ai_agent.reset_conversation() self._send_json_response({"success": True, "message": "Conversation reset"}) except Exception as e: logger.error(f"重置对话失败: {e}") self._send_json_response({"error": "Reset failed", "success": False}, 500) def _handle_health(self): """处理健康检查""" self._send_json_response({ "status": "healthy", "service": "AI Agent Chatbot", "version": "2.0-optimized", "timestamp": datetime.now().isoformat() }) def _send_json_response(self, data, status_code=200): """发送JSON响应""" self.send_response(status_code) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data).encode('utf-8')) def _send_error(self, code, message): """发送错误响应""" self._send_json_response({"error": message, "success": False}, code) def log_message(self, format, *args): """自定义日志输出""" logger.info(f"{self.client_address[0]} - {format % args}") # 全局AI智能体实例 ai_agent = None def start_server(port=6001, api_key="2249d4c5-2e08-4787-9df6-cf5beee474a5"): """启动服务器""" global ai_agent # 初始化AI智能体 ai_agent = AIAgent(api_key) # 创建HTTP服务器 handler = ChatbotHandler server = socketserver.TCPServer(("0.0.0.0", port), handler) logger.info(f"AI智能体服务启动 - 端口: {port}") logger.info(f"模型: doubao-seed-1-6-flash-250828") logger.info(f"版本: 2.0-optimized (性能优化版)") try: server.serve_forever() except KeyboardInterrupt: logger.info("服务器已停止") server.server_close() if __name__ == '__main__': import sys api_key = "2249d4c5-2e08-4787-9df6-cf5beee474a5" port = 6001 if len(sys.argv) > 1: api_key = sys.argv[1] if len(sys.argv) > 2: port = int(sys.argv[2]) start_server(port, api_key)