#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 企业经营AI智能体问答数据库演示脚本 展示如何使用问答数据库为AI智能体提供智能回答 """ from ai_agent_api import ask_question, get_related_questions, get_categories import time def demo_qa_system(): """演示问答系统功能""" print("🤖 企业经营AI智能体问答系统演示") print("=" * 60) print("本系统基于SQLite数据库,包含102条问答记录") print("支持中文和英文两种语言,涵盖多个业务领域") print("=" * 60) # 演示问题列表 demo_questions = [ { "question": "你们公司是做什么的?", "language": "zh", "description": "公司介绍类问题" }, { "question": "What does your company do?", "language": "en", "description": "Company introduction question" }, { "question": "营销功能具体包括什么?", "language": "zh", "description": "产品功能类问题" }, { "question": "有成功案例吗?", "language": "zh", "description": "成功案例类问题" }, { "question": "How do you deploy the website?", "language": "en", "description": "Technical deployment question" }, { "question": "如何联系你们?", "language": "zh", "description": "联系方式类问题" } ] print("\n📋 演示问答过程:") print("-" * 60) for i, demo in enumerate(demo_questions, 1): print(f"\n{i}. {demo['description']}") print(f" 问题: {demo['question']}") print(f" 语言: {demo['language']}") # 获取答案 start_time = time.time() answer = ask_question(demo['question'], demo['language']) end_time = time.time() print(f" 答案: {answer}") print(f" 响应时间: {(end_time - start_time)*1000:.2f}ms") # 获取相关问题 related = get_related_questions(demo['question'], demo['language'], 2) if related: print(f" 相关问题: {', '.join(related)}") print("-" * 60) # 显示分类信息 print("\n📊 数据库统计信息:") print("-" * 60) zh_categories = get_categories('zh') en_categories = get_categories('en') print(f"中文分类数量: {len(zh_categories)}") print(f"英文分类数量: {len(en_categories)}") print(f"总分类数量: {len(set(zh_categories + en_categories))}") print("\n主要中文分类:") for category in zh_categories[:10]: print(f" - {category}") print("\n主要英文分类:") for category in en_categories[:10]: print(f" - {category}") # 交互式问答演示 print("\n🎯 交互式问答演示:") print("-" * 60) print("请输入您的问题(输入 'quit' 退出):") while True: try: user_input = input("\n您: ").strip() if user_input.lower() in ['quit', 'exit', '退出', 'q']: print("感谢使用!再见!") break if not user_input: continue # 简单判断语言 if any(word in user_input.lower() for word in ['what', 'how', 'do', 'does', 'is', 'are', 'can', 'will']): language = 'en' else: language = 'zh' print(f"语言检测: {language}") # 获取答案 start_time = time.time() answer = ask_question(user_input, language) end_time = time.time() print(f"AI智能体: {answer}") print(f"响应时间: {(end_time - start_time)*1000:.2f}ms") # 获取相关问题 related = get_related_questions(user_input, language, 3) if related and len(related) > 1: # 避免显示相同问题 print(f"相关问题: {', '.join(related[:3])}") except KeyboardInterrupt: print("\n\n程序被用户中断") break except Exception as e: print(f"发生错误: {e}") def show_database_info(): """显示数据库信息""" print("\n📊 数据库详细信息:") print("-" * 60) import sqlite3 try: conn = sqlite3.connect('/root/老业务网站8.30/ai_agent_qa.db') cursor = conn.cursor() # 总记录数 cursor.execute('SELECT COUNT(*) FROM qa_pairs') total_count = cursor.fetchone()[0] print(f"总记录数: {total_count}") # 按语言统计 cursor.execute('SELECT language, COUNT(*) FROM qa_pairs GROUP BY language') language_stats = cursor.fetchall() print("\n语言分布:") for lang, count in language_stats: print(f" {lang}: {count} 条") # 按分类统计(前10个) cursor.execute('SELECT category, COUNT(*) FROM qa_pairs GROUP BY category ORDER BY COUNT(*) DESC LIMIT 10') category_stats = cursor.fetchall() print("\n热门分类 (前10):") for category, count in category_stats: print(f" {category}: {count} 条") # 数据库文件大小 import os db_size = os.path.getsize('/root/老业务网站8.30/ai_agent_qa.db') print(f"\n数据库文件大小: {db_size / 1024:.2f} KB") conn.close() except Exception as e: print(f"获取数据库信息失败: {e}") if __name__ == "__main__": # 显示数据库信息 show_database_info() # 运行演示 demo_qa_system()