#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简化版AI智能聊天机器人 使用豆包Doubao-Seed-1.6-Flash-250828模型 对接前端网站聊天窗口,智能回复用户问题 """ import sqlite3 import json import re from typing import List, Dict, Optional, Tuple import requests import logging from datetime import datetime # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DoubaoAPI: """豆包大模型API调用器""" def __init__(self, api_key: str, model_name: str = "doubao-seed-1-6-flash-250828"): self.api_key = api_key self.model_name = model_name self.api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions" def chat(self, messages: List[Dict], temperature: float = 0.7, max_tokens: int = 2000) -> str: """调用豆包API进行对话""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model_name, "messages": messages, "temperature": temperature, "max_completion_tokens": max_tokens } try: # 禁用代理 proxies = { 'http': None, 'https': None } response = requests.post(self.api_url, headers=headers, json=data, timeout=30, proxies=proxies) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: logger.error(f"豆包API调用失败: {e}") return "抱歉,我现在无法回答您的问题,请稍后再试。" class QADatabase: """问答数据库管理器""" def __init__(self, db_path: str = '/root/老业务网站8.30/backend/ai_agent_qa.db'): self.db_path = db_path def search_qa(self, query: str, language: str = 'zh', limit: int = 5) -> List[Dict]: """搜索相关问答""" try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() results = [] # 1. 精确匹配 cursor.execute( "SELECT question, answer, category FROM qa_pairs WHERE question = ? AND language = ? LIMIT ?", (query, language, limit) ) exact_results = cursor.fetchall() if exact_results: results.extend([dict(row) for row in exact_results]) # 2. 问题模糊匹配 if len(results) < limit: cursor.execute( "SELECT question, answer, category FROM qa_pairs WHERE question LIKE ? AND language = ? LIMIT ?", (f'%{query}%', language, limit - len(results)) ) fuzzy_results = cursor.fetchall() results.extend([dict(row) for row in fuzzy_results]) # 3. 关键词匹配 if len(results) < limit: keywords = self._extract_keywords(query) for keyword in keywords: if len(results) >= limit: break cursor.execute( "SELECT question, answer, category FROM qa_pairs WHERE (question LIKE ? OR answer LIKE ? OR keywords LIKE ?) AND language = ? LIMIT ?", (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', language, limit - len(results)) ) keyword_results = cursor.fetchall() results.extend([dict(row) for row in keyword_results]) # 4. 如果还是没有结果,尝试更宽泛的搜索 if len(results) == 0: # 提取核心词汇进行搜索 core_words = ['公司', '产品', '服务', '做什么', '业务'] for word in core_words: if word in query: cursor.execute( "SELECT question, answer, category FROM qa_pairs WHERE (question LIKE ? OR answer LIKE ?) AND language = ? LIMIT ?", (f'%{word}%', f'%{word}%', language, limit) ) broad_results = cursor.fetchall() results.extend([dict(row) for row in broad_results]) if len(results) > 0: break conn.close() # 去重 seen = set() unique_results = [] for result in results: key = (result['question'], result['answer']) if key not in seen: seen.add(key) unique_results.append(result) return unique_results[:limit] except Exception as e: logger.error(f"数据库搜索失败: {e}") return [] def _extract_keywords(self, text: str) -> List[str]: """提取关键词""" # 移除标点符号 text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text) # 分割词语 words = text.split() # 过滤停用词和短词 stop_words = {'的', '了', '在', '是', '我', '你', '他', '她', '它', '们', '这', '那', '什么', '怎么', '如何', '为什么', '吗', '呢', '吧'} keywords = [word for word in words if word not in stop_words and len(word) > 1] return keywords[:5] class LanguageDetector: """语言检测器""" @staticmethod def detect_language(text: str) -> str: """检测文本语言""" # 中文字符检测 chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) # 英文单词检测 english_words = len(re.findall(r'\b[a-zA-Z]+\b', text)) # 泰语字符检测 thai_chars = len(re.findall(r'[\u0e00-\u0e7f]', text)) # 越南语字符检测(包含特殊字符) vietnamese_chars = len(re.findall(r'[àáạảãâầấậẩẫăằắặẳẵèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹđ]', text.lower())) total_chars = len(text.replace(' ', '')) if total_chars == 0: return 'zh' # 默认中文 # 计算各语言占比 chinese_ratio = chinese_chars / total_chars english_ratio = english_words / len(text.split()) if text.split() else 0 thai_ratio = thai_chars / total_chars vietnamese_ratio = vietnamese_chars / total_chars # 语言判断 if chinese_ratio > 0.3: return 'zh' elif thai_ratio > 0.3: return 'th' elif vietnamese_ratio > 0.1: return 'vi' elif english_ratio > 0.5: return 'en' else: return 'zh' # 默认中文 class IntentClassifier: """意图分类器""" INTENT_KEYWORDS = { 'greeting': { 'zh': ['你好', '您好', '嗨', '哈喽', '早上好', '下午好', '晚上好'], 'en': ['hello', 'hi', 'hey', 'good morning', 'good afternoon', 'good evening'], 'th': ['สวัสดี', 'หวัดดี'], 'vi': ['xin chào', 'chào', 'hello'] }, 'company_info': { 'zh': ['公司', '企业', '你们', '介绍', '是什么', '做什么'], 'en': ['company', 'business', 'what do you do', 'about'], 'th': ['บริษัท', 'ธุรกิจ'], 'vi': ['công ty', 'doanh nghiệp'] }, 'service_inquiry': { 'zh': ['服务', '产品', '功能', '提供', '解决方案'], 'en': ['service', 'product', 'solution', 'offer'], 'th': ['บริการ', 'ผลิตภัณฑ์'], 'vi': ['dịch vụ', 'sản phẩm'] }, 'price_inquiry': { 'zh': ['价格', '费用', '多少钱', '收费', '报价'], 'en': ['price', 'cost', 'fee', 'how much'], 'th': ['ราคา', 'ค่าใช้จ่าย'], 'vi': ['giá', 'chi phí'] }, 'contact': { 'zh': ['联系', '电话', '邮箱', '微信', '地址'], 'en': ['contact', 'phone', 'email', 'address'], 'th': ['ติดต่อ', 'โทรศัพท์'], 'vi': ['liên hệ', 'điện thoại'] }, 'technical': { 'zh': ['技术', '部署', '开发', 'SEO', '安全', '性能'], 'en': ['technical', 'technology', 'deployment', 'development'], 'th': ['เทคนิค', 'เทคโนโลยี'], 'vi': ['kỹ thuật', 'công nghệ'] } } @classmethod def classify_intent(cls, text: str, language: str) -> str: """分类用户意图""" text_lower = text.lower() for intent, lang_keywords in cls.INTENT_KEYWORDS.items(): keywords = lang_keywords.get(language, []) for keyword in keywords: if keyword.lower() in text_lower: return intent return 'general' # 默认通用意图 class ConversationMemory: """对话记忆管理器""" def __init__(self, max_history: int = 10): self.max_history = max_history self.history = [] def add_message(self, role: str, content: str): """添加消息到历史""" self.history.append({ "role": role, "content": content, "timestamp": datetime.now().isoformat() }) # 保持历史记录在限制范围内 if len(self.history) > self.max_history * 2: # user + assistant = 2 messages per turn self.history = self.history[-self.max_history * 2:] def get_history(self) -> List[Dict]: """获取对话历史""" return self.history def clear(self): """清空历史""" self.history = [] class AIAgent: """AI智能体""" def __init__(self, api_key: str): # 初始化组件 self.doubao_api = DoubaoAPI(api_key) self.qa_db = QADatabase() self.language_detector = LanguageDetector() self.intent_classifier = IntentClassifier() self.memory = ConversationMemory() # 系统提示词 self.system_prompts = { 'zh': """你是一个友好专业的客服代表,正在与客户进行对话。你的任务是理解客户的问题,查找相关信息,然后用自然、拟人化的方式回答。 **重要提醒**: - 不要直接复制粘贴数据库中的内容 - 要像真正理解内容后,用自己的话自然地表达出来 - 保持对话的流畅性和友好感 - 可以添加适当的语气词和表达,让回答更像真人在说话 **回答风格**: - 像好朋友在解释一样,自然亲切 - 可以用"是这样的"、"让我来介绍一下"、"简单来说"等自然表达 - 避免"您好,根据..."这种官方开场白 - 直接回答核心内容,就像面对面聊天一样 **内容组织**: - 如果信息较多,可以用简单的数字标题(1、2、3)来组织 - 每个点不要超过2句话 - 总长度控制在150字以内,保持简洁 **特别要求**: - 看到"我们提供"这种表达时,改成"我们有"或"我们可以帮你" - 不要出现"核心优势包括:"这种列表式表述 - 用"我可以帮您"代替"我们可以为您提供" - 让回答有温度、有感情、有人情味 示例: ❌ 不好的回答:"我们提供三大核心产品:销售/客服Copilot、经营数据分析Agent、内容生成&自动发布Agent。" ✅ 好的回答:"我们有三大产品可以帮你。销售助手可以实时跟进客户,数据分析工具能让你一眼看清业务状况,还有内容生成器自动写文案发稿子,挺省事的。" 记住:你就是客服本人,不要告诉客户"数据库显示"或"根据资料"。""", 'en': """You are a friendly and professional customer service representative having a conversation with customers. Your task is to understand customer questions, find relevant information, and respond naturally like a real person. **Important**: - Do NOT copy and paste database content directly - Rewrite the information in your own words after understanding it - Keep conversations flowing and friendly - Add appropriate tone and expressions to make responses sound like a real person speaking **Response Style**: - Explain like talking to a friend, natural and warm - Use natural phrases like "So here's the thing", "Let me explain", "Simply put" - Avoid official greetings like "Hello, according to..." - Answer core content directly, like face-to-face chatting **Content Organization**: - If there's a lot of information, use simple numbered headings (1, 2, 3) - No more than 2 sentences per point - Keep total length under 150 words, stay concise **Special Requirements**: - Change "We provide" to "We have" or "We can help you" - Avoid list-style expressions like "Core advantages include:" - Use "I can help you" instead of "We can provide you with" - Make responses warm, emotional, and human Example: ❌ Bad: "We provide three core products: Sales/Customer Service Copilot, Business Data Analysis Agent, and Content Generation & Auto-Publish Agent." ✅ Good: "We have three products to help you. The sales assistant tracks customers in real-time, the data analysis tool lets you see your business at a glance, and there's a content generator that writes copy and publishes automatically. Pretty handy." Remember: You ARE the customer service person, don't tell customers "database shows" or "according to the information".""", 'th': """คุณเป็นพนักงานบริการลูกค้าที่เป็นมิตรและมืออาชีพ กำลังสนทนากับลูกค้า งานของคุณคือทำความเข้าใจคำถามของลูกค้า ค้นหาข้อมูลที่เกี่ยวข้อง แล้วตอบกลับอย่างเป็นธรรมชาติเหมือนคนจริงๆ **สำคัญมาก**: - อย่าคัดลอกเนื้อหาจากฐานข้อมูลโดยตรง - เขียนใหม่ด้วยคำพูดของคุณเองหลังจากเข้าใจแล้ว - รักษาการสนทนาให้ราบรื่นและเป็นมิตร - เพิ่มน้ำเสียงและสำนวนที่เหมาะสมเพื่อให้คำตอบฟังดูเหมือนคนจริงๆ **สไตล์การตอบ**: - อธิบายเหมือนคุยกับเพื่อน เป็นธรรมชาติและอบอุ่น - ใช้สำนวนธรรมชาติ เช่น "ก็คือว่า"、"ให้ผมอธิบายนะ"、"ง่ายๆ ก็คือ" - หลีกเลี่ยงการทักทายอย่างเป็นทางการ เช่น "สวัสดี ตามที่..." - ตอบเนื้อหาหลักโดยตรง เหมือนพูดคุยต่อหน้า **การจัดเนื้อหา**: - ถ้ามีข้อมูลมาก ใช้หัวข้อตัวเลขง่ายๆ (1, 2, 3) - ไม่เกิน 2 ประโยคต่อหัวข้อ - เก็บความยาวรวมไว้ต่ำกว่า 150 คำ รักษาความกระชับ **ตัวอย่าง**: ❌ ไม่ดี: "เรามีผลิตภัณฑ์หลักสามอย่าง: Sales/Customer Service Copilot, Business Data Analysis Agent, และ Content Generation & Auto-Publish Agent" ✅ ดี: "เรามีผลิตภัณฑ์สามอย่างให้คุณ แอสซิสแทนต์ขายติดตามลูกค้าแบบเรียลไทม์ เครื่องมือวิเคราะห์ข้อมูลให้เห็นสถานะธุรกิจได้ในทีเดียว และยังมีตัวสร้างเนื้อหาที่เขียนคัดข้อความและเผยแพร่อัตโนมัติ ค่อนข้างสะดวก" จำไว้: คุณคือพนักงานบริการลูกค้า อย่าบอกลูกค้าว่า "ฐานข้อมูลแสดงว่า" หรือ "ตามข้อมูล""", 'vi': """Bạn là nhân viên chăm sóc khách hàng thân thiện và chuyên nghiệp, đang trò chuyện với khách hàng. Nhiệm vụ của bạn là hiểu câu hỏi của khách hàng, tìm thông tin liên quan, rồi trả lời một cách tự nhiên như người thật. **Quan trọng**: - KHÔNG sao chép trực tiếp nội dung cơ sở dữ liệu - Viết lại bằng lời của chính bạn sau khi hiểu - Giữ cuộc trò chuyện mượt mà và thân thiện - Thêm giọng điệu và biểu cảm phù hợp để câu trả lời nghe như người thật nói **Phong cách trả lời**: - Giải thích như nói chuyện với bạn bè, tự nhiên và ấm áp - Sử dụng cụm từ tự nhiên như "Vấn đề là", "Để tôi giải thích", "Nói đơn giản là" - Tránh lời chào chính thức như "Xin chào, theo..." - Trả lời nội dung chính trực tiếp, như nói chuyện trực tiếp **Tổ chức nội dung**: - Nếu có nhiều thông tin, dùng tiêu đề số đơn giản (1, 2, 3) - Không quá 2 câu mỗi điểm - Giữ tổng độ dài dưới 150 từ, giữ súc tích **Yêu cầu đặc biệt**: - Thay "Chúng tôi cung cấp" bằng "Chúng tôi có" hoặc "Chúng tôi có thể giúp bạn" - Tránh biểu hiện kiểu danh sách như "Ưu điểm cốt lõi bao gồm:" - Dùng "Tôi có thể giúp bạn" thay vì "Chúng tôi có thể cung cấp cho bạn" - Làm câu trả lời ấm áp, có cảm xúc, có tính người Ví dụ: ❌ Tệ: "Chúng tôi cung cấp ba sản phẩm cốt lõi: Sales/Customer Service Copilot, Business Data Analysis Agent, và Content Generation & Auto-Publish Agent." ✅ Tốt: "Chúng tôi có ba sản phẩm có thể giúp bạn. Trợ lý bán hàng theo dõi khách hàng theo thời gian thực, công cụ phân tích dữ liệu để bạn thấy tình hình kinh doanh trong nháy mắt, và còn có trình tạo nội dung tự viết copy và đăng tự động. Khá tiện lợi." Hãy nhớ: Bạn là nhân viên chăm sóc khách hàng, đừng nói với khách hàng "cơ sở dữ liệu cho thấy" hoặc "theo thông tin".""" } def chat(self, user_input: str) -> Dict: """处理用户输入并生成回复""" try: # 1. 检测语言 language = self.language_detector.detect_language(user_input) # 2. 分类意图 intent = self.intent_classifier.classify_intent(user_input, language) # 3. 搜索相关问答 qa_results = self.qa_db.search_qa(user_input, language, limit=3) # 4. 构建消息 messages = self._build_messages(user_input, qa_results, language, intent) # 5. 调用豆包API response = self.doubao_api.chat(messages) # 6. 保存对话历史 self.memory.add_message("user", user_input) self.memory.add_message("assistant", response) # 7. 返回结果 return { "response": response, "language": language, "intent": intent, "context_used": len(qa_results) > 0, "related_qa": qa_results[:2], # 返回前2个相关问答 "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"聊天处理失败: {e}") return { "response": self._get_error_message(language if 'language' in locals() else 'zh'), "language": language if 'language' in locals() else 'zh', "intent": "error", "context_used": False, "related_qa": [], "timestamp": datetime.now().isoformat() } def _build_messages(self, user_input: str, qa_results: List[Dict], language: str, intent: str) -> List[Dict]: """构建消息列表""" messages = [] # 系统消息 system_prompt = self.system_prompts.get(language, self.system_prompts['zh']) messages.append({"role": "system", "content": system_prompt}) # 添加上下文信息(重点强调要理解后用自己的话表达) if qa_results: context = "【重要】:以下是客户问题的相关信息,请你理解后用自己的话自然地回答,不要照搬原文。\n\n相关信息:\n" for qa in qa_results: context += f"问题:{qa['question']}\n答案:{qa['answer']}\n分类:{qa['category']}\n\n" context += "\n⚠️ 切记:理解这些信息后,用你自己的话表达,就像在跟朋友解释一样自然!" messages.append({"role": "system", "content": context}) # 添加历史对话(最近3轮) history = self.memory.get_history() for msg in history[-6:]: # 最近3轮对话(6条消息) messages.append({"role": msg["role"], "content": msg["content"]}) # 用户当前输入 messages.append({"role": "user", "content": user_input}) return messages def _get_error_message(self, language: str) -> str: """获取错误消息""" error_messages = { 'zh': "Sorry, I'm experiencing some technical issues right now. Please try again later or contact us through the website contact form.", 'en': "Sorry, I'm experiencing some technical issues right now. Please try again later or contact us through the website contact form.", 'th': "Sorry, I'm experiencing some technical issues right now. Please try again later or contact us through the website contact form.", 'vi': "Sorry, I'm experiencing some technical issues right now. Please try again later or contact us through the website contact form." } return error_messages.get(language, error_messages['en']) def reset_conversation(self): """重置对话""" self.memory.clear() # 简单的HTTP服务器 import http.server import socketserver import urllib.parse from threading import Thread class ChatbotHandler(http.server.BaseHTTPRequestHandler): """聊天机器人HTTP处理器""" def do_POST(self): """处理POST请求""" if self.path == '/api/chat': self._handle_chat() elif self.path == '/api/reset': self._handle_reset() else: self._send_error(404, "Not Found") def do_GET(self): """处理GET请求""" if self.path == '/api/health': self._handle_health() else: self._send_error(404, "Not Found") def _handle_chat(self): """处理聊天请求""" try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) user_input = data.get('message', '').strip() if not user_input: self._send_json_response({"error": "消息不能为空", "success": False}, 400) return # 处理聊天 result = ai_agent.chat(user_input) response_data = { "success": True, "data": { "response": result["response"], "language": result["language"], "intent": result["intent"], "context_used": result["context_used"], "related_questions": [qa["question"] for qa in result["related_qa"]], "timestamp": result["timestamp"] } } self._send_json_response(response_data) except Exception as e: logger.error(f"处理聊天请求失败: {e}") self._send_json_response({"error": "服务器内部错误", "success": False}, 500) def _handle_reset(self): """处理重置请求""" try: ai_agent.reset_conversation() self._send_json_response({"success": True, "message": "对话已重置"}) except Exception as e: logger.error(f"重置对话失败: {e}") self._send_json_response({"error": "重置失败", "success": False}, 500) def _handle_health(self): """处理健康检查""" self._send_json_response({ "status": "healthy", "service": "AI Agent Chatbot", "version": "1.0.0", "timestamp": datetime.now().isoformat() }) def _send_json_response(self, data: dict, status_code: int = 200): """发送JSON响应""" self.send_response(status_code) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() response_json = json.dumps(data, ensure_ascii=False, indent=2) self.wfile.write(response_json.encode('utf-8')) def _send_error(self, code: int, message: str): """发送错误响应""" self._send_json_response({"error": message, "success": False}, code) def do_OPTIONS(self): """处理OPTIONS请求(CORS预检)""" self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() # 初始化AI智能体 ai_agent = AIAgent(api_key="2249d4c5-2e08-4787-9df6-cf5beee474a5") # 命令行测试接口 def test_chatbot(): """命令行测试""" print("🤖 企业经营AI智能体聊天机器人") print("=" * 50) print("输入 'quit' 或 'exit' 退出") print("输入 'reset' 重置对话") print("=" * 50) while True: try: user_input = input("\n您: ").strip() if user_input.lower() in ['quit', 'exit', '退出', 'q']: print("再见!") break if user_input.lower() in ['reset', '重置']: ai_agent.reset_conversation() print("对话已重置") continue if not user_input: continue print("🤖 正在思考...") result = ai_agent.chat(user_input) print(f"AI智能体 [{result['language']}]: {result['response']}") if result['related_qa']: print(f"\n💡 相关问题:") for qa in result['related_qa']: print(f" - {qa['question']}") except KeyboardInterrupt: print("\n\n程序被用户中断") break except Exception as e: print(f"发生错误: {e}") def start_server(port: int = 6001): """启动HTTP服务器""" with socketserver.TCPServer(("", port), ChatbotHandler) as httpd: print(f"🚀 AI智能体聊天机器人服务已启动") print(f"API地址: http://localhost:{port}") print(f"健康检查: http://localhost:{port}/api/health") print(f"聊天接口: POST http://localhost:{port}/api/chat") print("按 Ctrl+C 停止服务") try: httpd.serve_forever() except KeyboardInterrupt: print("\n服务已停止") if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "test": # 命令行测试模式 test_chatbot() else: # HTTP服务器模式 start_server()