#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 基于LangChain架构的智能聊天机器人 使用豆包Doubao-Seed-1.6-Flash-250828模型 对接前端网站聊天窗口,智能回复用户问题 """ import sqlite3 import json import re from typing import List, Dict, Optional, Tuple from langchain.llms.base import LLM from langchain.schema import BaseMessage, HumanMessage, AIMessage from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationChain from langchain.prompts import PromptTemplate import requests import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DoubaoLLM(LLM): """豆包大模型LLM包装器""" def __init__(self, api_key: str, model_name: str = "Doubao-Seed-1.6-Flash-250828"): super().__init__() self.api_key = api_key self.model_name = model_name self.api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions" @property def _llm_type(self) -> str: return "doubao" def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: """调用豆包API""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model_name, "messages": [ {"role": "user", "content": prompt} ], "temperature": 0.7, "max_tokens": 1000 } try: response = requests.post(self.api_url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: logger.error(f"豆包API调用失败: {e}") return "抱歉,我现在无法回答您的问题,请稍后再试。" class QADatabase: """问答数据库管理器""" def __init__(self, db_path: str = '/root/老业务网站8.30/ai_agent_qa.db'): self.db_path = db_path def search_qa(self, query: str, language: str = 'zh', limit: int = 5) -> List[Dict]: """搜索相关问答""" try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # 多重搜索策略 search_queries = [ # 精确匹配 "SELECT question, answer, category FROM qa_pairs WHERE question = ? AND language = ? LIMIT ?", # 问题模糊匹配 "SELECT question, answer, category FROM qa_pairs WHERE question LIKE ? AND language = ? LIMIT ?", # 关键词匹配 "SELECT question, answer, category FROM qa_pairs WHERE (question LIKE ? OR answer LIKE ? OR keywords LIKE ?) AND language = ? LIMIT ?" ] results = [] # 1. 精确匹配 cursor.execute(search_queries[0], (query, language, limit)) exact_results = cursor.fetchall() if exact_results: results.extend([dict(row) for row in exact_results]) # 2. 问题模糊匹配 if len(results) < limit: cursor.execute(search_queries[1], (f'%{query}%', language, limit - len(results))) fuzzy_results = cursor.fetchall() results.extend([dict(row) for row in fuzzy_results]) # 3. 关键词匹配 if len(results) < limit: keywords = self._extract_keywords(query) for keyword in keywords: if len(results) >= limit: break cursor.execute(search_queries[2], (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', language, limit - len(results))) keyword_results = cursor.fetchall() results.extend([dict(row) for row in keyword_results]) conn.close() # 去重 seen = set() unique_results = [] for result in results: key = (result['question'], result['answer']) if key not in seen: seen.add(key) unique_results.append(result) return unique_results[:limit] except Exception as e: logger.error(f"数据库搜索失败: {e}") return [] def _extract_keywords(self, text: str) -> List[str]: """提取关键词""" # 移除标点符号 text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text) # 分割词语 words = text.split() # 过滤停用词和短词 stop_words = {'的', '了', '在', '是', '我', '你', '他', '她', '它', '们', '这', '那', '什么', '怎么', '如何', '为什么', '吗', '呢', '吧'} keywords = [word for word in words if word not in stop_words and len(word) > 1] return keywords[:5] class LanguageDetector: """语言检测器""" @staticmethod def detect_language(text: str) -> str: """检测文本语言""" # 中文字符检测 chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) # 英文单词检测 english_words = len(re.findall(r'\b[a-zA-Z]+\b', text)) # 泰语字符检测 thai_chars = len(re.findall(r'[\u0e00-\u0e7f]', text)) # 越南语字符检测(包含特殊字符) vietnamese_chars = len(re.findall(r'[àáạảãâầấậẩẫăằắặẳẵèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹđ]', text.lower())) total_chars = len(text.replace(' ', '')) if total_chars == 0: return 'zh' # 默认中文 # 计算各语言占比 chinese_ratio = chinese_chars / total_chars english_ratio = english_words / len(text.split()) if text.split() else 0 thai_ratio = thai_chars / total_chars vietnamese_ratio = vietnamese_chars / total_chars # 语言判断 if chinese_ratio > 0.3: return 'zh' elif thai_ratio > 0.3: return 'th' elif vietnamese_ratio > 0.1: return 'vi' elif english_ratio > 0.5: return 'en' else: return 'zh' # 默认中文 class IntentClassifier: """意图分类器""" INTENT_KEYWORDS = { 'greeting': { 'zh': ['你好', '您好', '嗨', '哈喽', '早上好', '下午好', '晚上好'], 'en': ['hello', 'hi', 'hey', 'good morning', 'good afternoon', 'good evening'], 'th': ['สวัสดี', 'หวัดดี'], 'vi': ['xin chào', 'chào', 'hello'] }, 'company_info': { 'zh': ['公司', '企业', '你们', '介绍', '是什么', '做什么'], 'en': ['company', 'business', 'what do you do', 'about'], 'th': ['บริษัท', 'ธุรกิจ'], 'vi': ['công ty', 'doanh nghiệp'] }, 'service_inquiry': { 'zh': ['服务', '产品', '功能', '提供', '解决方案'], 'en': ['service', 'product', 'solution', 'offer'], 'th': ['บริการ', 'ผลิตภัณฑ์'], 'vi': ['dịch vụ', 'sản phẩm'] }, 'price_inquiry': { 'zh': ['价格', '费用', '多少钱', '收费', '报价'], 'en': ['price', 'cost', 'fee', 'how much'], 'th': ['ราคา', 'ค่าใช้จ่าย'], 'vi': ['giá', 'chi phí'] }, 'contact': { 'zh': ['联系', '电话', '邮箱', '微信', '地址'], 'en': ['contact', 'phone', 'email', 'address'], 'th': ['ติดต่อ', 'โทรศัพท์'], 'vi': ['liên hệ', 'điện thoại'] }, 'technical': { 'zh': ['技术', '部署', '开发', 'SEO', '安全', '性能'], 'en': ['technical', 'technology', 'deployment', 'development'], 'th': ['เทคนิค', 'เทคโนโลยี'], 'vi': ['kỹ thuật', 'công nghệ'] } } @classmethod def classify_intent(cls, text: str, language: str) -> str: """分类用户意图""" text_lower = text.lower() for intent, lang_keywords in cls.INTENT_KEYWORDS.items(): keywords = lang_keywords.get(language, []) for keyword in keywords: if keyword.lower() in text_lower: return intent return 'general' # 默认通用意图 class AIAgent: """AI智能体""" def __init__(self, api_key: str): # 初始化组件 self.llm = DoubaoLLM(api_key) self.qa_db = QADatabase() self.language_detector = LanguageDetector() self.intent_classifier = IntentClassifier() # 初始化记忆 self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # 创建提示模板 self.prompt_template = PromptTemplate( input_variables=["chat_history", "user_input", "context", "language"], template=self._get_prompt_template() ) # 创建对话链 self.conversation = ConversationChain( llm=self.llm, memory=self.memory, prompt=self.prompt_template, verbose=True ) def _get_prompt_template(self) -> str: """获取提示模板""" return """你是一个专业的AI智能体,代表企业经营AI智能体网站为用户提供服务。 你的角色和任务: 1. 你是企业AI智能体解决方案的专业顾问 2. 专注于AI自动运行的知识型网站、营销型网站、问答型网站 3. 帮助企业实现营销自动化、提升客户体验、增强品牌影响力 4. 使用用户的语言进行回复(中文、英文、泰语、越南语) 回复原则: 1. 专业、友好、有帮助 2. 基于提供的上下文信息回答 3. 如果没有相关信息,诚实说明并引导用户联系我们 4. 保持回复简洁明了,重点突出 5. 使用用户使用的语言回复 对话历史: {chat_history} 相关上下文信息: {context} 用户语言:{language} 用户问题:{user_input} 请基于上下文信息,用{language}语言专业地回答用户问题:""" def chat(self, user_input: str) -> Dict: """处理用户输入并生成回复""" try: # 1. 检测语言 language = self.language_detector.detect_language(user_input) # 2. 分类意图 intent = self.intent_classifier.classify_intent(user_input, language) # 3. 搜索相关问答 qa_results = self.qa_db.search_qa(user_input, language, limit=3) # 4. 构建上下文 context = self._build_context(qa_results, intent, language) # 5. 生成回复 response = self.conversation.predict( user_input=user_input, context=context, language=language ) # 6. 返回结果 return { "response": response, "language": language, "intent": intent, "context_used": len(qa_results) > 0, "related_qa": qa_results[:2] # 返回前2个相关问答 } except Exception as e: logger.error(f"聊天处理失败: {e}") return { "response": self._get_error_message(language), "language": language, "intent": "error", "context_used": False, "related_qa": [] } def _build_context(self, qa_results: List[Dict], intent: str, language: str) -> str: """构建上下文信息""" if not qa_results: return self._get_default_context(intent, language) context_parts = [] for qa in qa_results: context_parts.append(f"问题:{qa['question']}\n答案:{qa['answer']}\n分类:{qa['category']}") return "\n\n".join(context_parts) def _get_default_context(self, intent: str, language: str) -> str: """获取默认上下文""" default_contexts = { 'zh': { 'greeting': "我们是专注于AI自动运行知识型网站的专业团队,很高兴为您服务!", 'company_info': "我们专注于打造AI自动运行的知识型网站,帮助企业实现营销自动化。", 'service_inquiry': "我们提供营销型网站、知识型网站、问答型网站三种核心解决方案。", 'price_inquiry': "我们的服务价格根据项目规模和需求定制,请联系我们获取详细报价。", 'contact': "您可以通过网站联系表单或联系我们页面获取详细联系方式。", 'technical': "我们拥有强大的AI技术团队,提供全面的技术解决方案。" }, 'en': { 'greeting': "We are a professional team focused on AI-automated knowledge websites, happy to serve you!", 'company_info': "We specialize in creating AI-automated knowledge websites to help enterprises achieve marketing automation.", 'service_inquiry': "We provide three core solutions: marketing websites, knowledge websites, and Q&A websites.", 'price_inquiry': "Our service prices are customized based on project scale and requirements. Please contact us for detailed quotes.", 'contact': "You can contact us through the website contact form or contact page for detailed contact information.", 'technical': "We have a strong AI technical team providing comprehensive technical solutions." } } return default_contexts.get(language, default_contexts['zh']).get(intent, "我们很乐意为您提供帮助!") def _get_error_message(self, language: str) -> str: """获取错误消息""" error_messages = { 'zh': "抱歉,我现在遇到了一些技术问题。请稍后再试,或者通过网站联系表单联系我们。", 'en': "Sorry, I'm experiencing some technical issues right now. Please try again later or contact us through the website contact form.", 'th': "ขออภัย ขณะนี้มีปัญหาทางเทคนิค กรุณาลองใหม่อีกครั้งหรือติดต่อเราผ่านแบบฟอร์มติดต่อ", 'vi': "Xin lỗi, hiện tại tôi đang gặp một số vấn đề kỹ thuật. Vui lòng thử lại sau hoặc liên hệ với chúng tôi qua biểu mẫu liên hệ." } return error_messages.get(language, error_messages['zh']) def reset_conversation(self): """重置对话""" self.memory.clear() # Flask Web API from flask import Flask, request, jsonify from flask_cors import CORS app = Flask(__name__) CORS(app) # 允许跨域请求 # 初始化AI智能体 ai_agent = AIAgent(api_key="2249d4c5-2e08-4787-9df6-cf5beee474a5") @app.route('/api/chat', methods=['POST']) def chat_api(): """聊天API接口""" try: data = request.get_json() user_input = data.get('message', '').strip() if not user_input: return jsonify({ "error": "消息不能为空", "success": False }), 400 # 处理聊天 result = ai_agent.chat(user_input) return jsonify({ "success": True, "data": { "response": result["response"], "language": result["language"], "intent": result["intent"], "context_used": result["context_used"], "related_questions": [qa["question"] for qa in result["related_qa"]] } }) except Exception as e: logger.error(f"API错误: {e}") return jsonify({ "error": "服务器内部错误", "success": False }), 500 @app.route('/api/reset', methods=['POST']) def reset_conversation(): """重置对话""" try: ai_agent.reset_conversation() return jsonify({ "success": True, "message": "对话已重置" }) except Exception as e: logger.error(f"重置对话错误: {e}") return jsonify({ "error": "重置失败", "success": False }), 500 @app.route('/api/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ "status": "healthy", "service": "AI Agent Chatbot", "version": "1.0.0" }) # 命令行测试接口 def test_chatbot(): """命令行测试""" print("🤖 企业经营AI智能体聊天机器人") print("=" * 50) print("输入 'quit' 或 'exit' 退出") print("输入 'reset' 重置对话") print("=" * 50) while True: try: user_input = input("\n您: ").strip() if user_input.lower() in ['quit', 'exit', '退出', 'q']: print("再见!") break if user_input.lower() in ['reset', '重置']: ai_agent.reset_conversation() print("对话已重置") continue if not user_input: continue print("🤖 正在思考...") result = ai_agent.chat(user_input) print(f"AI智能体 [{result['language']}]: {result['response']}") if result['related_qa']: print(f"\n💡 相关问题:") for qa in result['related_qa']: print(f" - {qa['question']}") except KeyboardInterrupt: print("\n\n程序被用户中断") break except Exception as e: print(f"发生错误: {e}") if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "test": # 命令行测试模式 test_chatbot() else: # Web API模式 print("🚀 启动AI智能体聊天机器人服务...") print("API地址: http://localhost:5001") print("健康检查: http://localhost:5001/api/health") app.run(host='0.0.0.0', port=5001, debug=True)