agent功能开发增加MCP后端

This commit is contained in:
2025-11-07 17:42:06 +08:00
parent 3574f5391f
commit a1c76a257c
5 changed files with 2329 additions and 0 deletions

361
kimi_integration.py Normal file
View File

@@ -0,0 +1,361 @@
"""
Kimi API 集成示例
演示如何将MCP工具与Kimi大模型结合使用
"""
from openai import OpenAI
import json
from typing import List, Dict, Any
from mcp_client_example import MCPClient
# Kimi API配置
KIMI_API_KEY = "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5"
KIMI_BASE_URL = "https://api.moonshot.cn/v1"
KIMI_MODEL = "kimi-k2-turbo-preview"
# 初始化Kimi客户端
kimi_client = OpenAI(
api_key=KIMI_API_KEY,
base_url=KIMI_BASE_URL,
)
# 初始化MCP客户端
mcp_client = MCPClient()
def convert_mcp_tools_to_kimi_format() -> tuple[List[Dict], Dict]:
"""
将MCP工具转换为Kimi API的tools格式
Returns:
tools: Kimi格式的工具列表
tool_map: 工具名称到执行函数的映射
"""
# 获取所有MCP工具
mcp_tools_response = mcp_client.list_tools()
mcp_tools = mcp_tools_response["tools"]
# 转换为Kimi格式
kimi_tools = []
tool_map = {}
for tool in mcp_tools:
# Kimi工具格式
kimi_tool = {
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
}
kimi_tools.append(kimi_tool)
# 创建工具执行函数
tool_name = tool["name"]
tool_map[tool_name] = lambda args, name=tool_name: execute_mcp_tool(name, args)
return kimi_tools, tool_map
def execute_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
执行MCP工具
Args:
tool_name: 工具名称
arguments: 工具参数
Returns:
工具执行结果
"""
print(f"[工具调用] {tool_name}")
print(f"[参数] {json.dumps(arguments, ensure_ascii=False, indent=2)}")
result = mcp_client.call_tool(tool_name, arguments)
print(f"[结果] 成功: {result.get('success', False)}")
return result
def chat_with_kimi(user_message: str, verbose: bool = True) -> str:
"""
与Kimi进行对话支持工具调用
Args:
user_message: 用户消息
verbose: 是否打印详细信息
Returns:
Kimi的回复
"""
# 获取Kimi格式的工具
tools, tool_map = convert_mcp_tools_to_kimi_format()
if verbose:
print(f"\n{'='*60}")
print(f"加载了 {len(tools)} 个工具")
print(f"{'='*60}\n")
# 初始化对话
messages = [
{
"role": "system",
"content": """你是一个专业的金融数据分析助手,由 Moonshot AI 提供支持。
你可以使用各种工具来帮助用户查询和分析金融数据,包括:
- 新闻搜索(全球新闻、中国新闻、医疗新闻)
- 公司研究(路演信息、研究报告)
- 概念板块分析
- 股票分析(涨停分析、财务数据、交易数据)
- 财务报表(资产负债表、现金流量表)
请根据用户的问题,选择合适的工具来获取信息,并提供专业的分析和建议。"""
},
{
"role": "user",
"content": user_message
}
]
if verbose:
print(f"[用户]: {user_message}\n")
# 对话循环,处理工具调用
finish_reason = None
iteration = 0
max_iterations = 10 # 防止无限循环
while finish_reason is None or finish_reason == "tool_calls":
iteration += 1
if iteration > max_iterations:
print("[警告] 达到最大迭代次数")
break
if verbose and iteration > 1:
print(f"\n[轮次 {iteration}]")
# 调用Kimi API
completion = kimi_client.chat.completions.create(
model=KIMI_MODEL,
messages=messages,
temperature=0.6, # Kimi推荐的temperature值
tools=tools,
)
choice = completion.choices[0]
finish_reason = choice.finish_reason
if verbose:
print(f"[Kimi] finish_reason: {finish_reason}")
# 处理工具调用
if finish_reason == "tool_calls":
# 将Kimi的消息添加到上下文
messages.append(choice.message)
# 执行每个工具调用
for tool_call in choice.message.tool_calls:
tool_name = tool_call.function.name
tool_arguments = json.loads(tool_call.function.arguments)
# 执行工具
tool_result = tool_map[tool_name](tool_arguments)
# 将工具结果添加到消息中
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_name,
"content": json.dumps(tool_result, ensure_ascii=False),
})
if verbose:
print() # 空行分隔
# 返回最终回复
final_response = choice.message.content
if verbose:
print(f"\n[Kimi]: {final_response}\n")
print(f"{'='*60}")
return final_response
def demo_simple_query():
"""演示1: 简单查询"""
print("\n" + "="*60)
print("演示1: 简单新闻查询")
print("="*60)
response = chat_with_kimi("帮我查找关于人工智能的最新新闻")
return response
def demo_stock_analysis():
"""演示2: 股票分析"""
print("\n" + "="*60)
print("演示2: 股票财务分析")
print("="*60)
response = chat_with_kimi("帮我分析贵州茅台600519的财务状况")
return response
def demo_concept_research():
"""演示3: 概念研究"""
print("\n" + "="*60)
print("演示3: 概念板块研究")
print("="*60)
response = chat_with_kimi("查找新能源汽车相关的概念板块,并告诉我涨幅最高的是哪些")
return response
def demo_industry_comparison():
"""演示4: 行业对比"""
print("\n" + "="*60)
print("演示4: 行业内股票对比")
print("="*60)
response = chat_with_kimi("帮我找出半导体行业的龙头股票,并对比它们的财务指标")
return response
def demo_comprehensive_analysis():
"""演示5: 综合分析"""
print("\n" + "="*60)
print("演示5: 综合分析")
print("="*60)
response = chat_with_kimi("""
我想投资白酒行业,请帮我:
1. 搜索白酒行业的主要上市公司
2. 对比贵州茅台和五粮液的财务数据
3. 查看最近的行业新闻
4. 给出投资建议
""")
return response
def interactive_chat():
"""交互式对话"""
print("\n" + "="*60)
print("Kimi 金融助手 - 交互模式")
print("="*60)
print("提示:输入 'quit''exit' 退出")
print("="*60 + "\n")
while True:
try:
user_input = input("你: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', '退出']:
print("\n再见!")
break
response = chat_with_kimi(user_input)
except KeyboardInterrupt:
print("\n\n再见!")
break
except Exception as e:
print(f"\n[错误] {str(e)}\n")
def test_kimi_connection():
"""测试Kimi API连接"""
print("\n" + "="*60)
print("测试 Kimi API 连接")
print("="*60 + "\n")
try:
# 简单的测试请求
response = kimi_client.chat.completions.create(
model=KIMI_MODEL,
messages=[
{"role": "user", "content": "你好,请介绍一下你自己"}
],
temperature=0.6
)
print("[✓] 连接成功!")
print(f"[✓] 模型: {KIMI_MODEL}")
print(f"[✓] 回复: {response.choices[0].message.content}\n")
return True
except Exception as e:
print(f"[✗] 连接失败: {str(e)}\n")
return False
def show_available_tools():
"""显示所有可用工具"""
print("\n" + "="*60)
print("可用工具列表")
print("="*60 + "\n")
tools, _ = convert_mcp_tools_to_kimi_format()
for i, tool in enumerate(tools, 1):
func = tool["function"]
print(f"{i}. {func['name']}")
print(f" 描述: {func['description'][:80]}...")
print()
print(f"总计: {len(tools)} 个工具\n")
if __name__ == "__main__":
import sys
# 首先测试连接
if not test_kimi_connection():
print("请检查API Key和网络连接")
sys.exit(1)
# 显示可用工具
show_available_tools()
# 运行演示
print("\n选择运行模式:")
print("1. 简单查询演示")
print("2. 股票分析演示")
print("3. 概念研究演示")
print("4. 行业对比演示")
print("5. 综合分析演示")
print("6. 交互式对话")
print("7. 运行所有演示")
try:
choice = input("\n请选择 (1-7): ").strip()
if choice == "1":
demo_simple_query()
elif choice == "2":
demo_stock_analysis()
elif choice == "3":
demo_concept_research()
elif choice == "4":
demo_industry_comparison()
elif choice == "5":
demo_comprehensive_analysis()
elif choice == "6":
interactive_chat()
elif choice == "7":
demo_simple_query()
demo_stock_analysis()
demo_concept_research()
demo_industry_comparison()
demo_comprehensive_analysis()
else:
print("无效选择")
except KeyboardInterrupt:
print("\n\n程序已退出")
finally:
mcp_client.close()

248
mcp_client_example.py Normal file
View File

@@ -0,0 +1,248 @@
"""
MCP客户端使用示例
演示如何调用MCP服务器的各种工具
"""
import httpx
import json
from typing import Dict, Any
class MCPClient:
"""MCP客户端"""
def __init__(self, base_url: str = "http://localhost:8900"):
self.base_url = base_url
self.client = httpx.Client(timeout=60.0)
def list_tools(self):
"""列出所有可用工具"""
response = self.client.get(f"{self.base_url}/tools")
response.raise_for_status()
return response.json()
def get_tool(self, tool_name: str):
"""获取特定工具的定义"""
response = self.client.get(f"{self.base_url}/tools/{tool_name}")
response.raise_for_status()
return response.json()
def call_tool(self, tool_name: str, arguments: Dict[str, Any]):
"""调用工具"""
payload = {
"tool": tool_name,
"arguments": arguments
}
response = self.client.post(f"{self.base_url}/tools/call", json=payload)
response.raise_for_status()
return response.json()
def close(self):
"""关闭客户端"""
self.client.close()
def print_result(title: str, result: Dict[str, Any]):
"""打印结果"""
print(f"\n{'=' * 60}")
print(f"{title}")
print(f"{'=' * 60}")
print(json.dumps(result, ensure_ascii=False, indent=2))
def main():
"""主函数 - 演示各种工具的使用"""
client = MCPClient()
try:
# 1. 列出所有工具
print("\n示例1: 列出所有可用工具")
tools = client.list_tools()
print(f"可用工具数量: {len(tools['tools'])}")
for tool in tools['tools']:
print(f" - {tool['name']}: {tool['description'][:50]}...")
# 2. 搜索中国新闻
print("\n示例2: 搜索中国新闻(关键词:人工智能)")
result = client.call_tool(
"search_china_news",
{
"query": "人工智能",
"top_k": 5
}
)
if result['success']:
print_result("中国新闻搜索结果", result['data'])
# 3. 搜索概念板块(按涨跌幅排序)
print("\n示例3: 搜索概念板块(关键词:新能源,按涨跌幅排序)")
result = client.call_tool(
"search_concepts",
{
"query": "新能源",
"size": 5,
"sort_by": "change_pct"
}
)
if result['success']:
print_result("概念搜索结果", result['data'])
# 4. 获取股票的相关概念
print("\n示例4: 获取股票相关概念股票代码600519")
result = client.call_tool(
"get_stock_concepts",
{
"stock_code": "600519",
"size": 10
}
)
if result['success']:
print_result("股票概念结果", result['data'])
# 5. 搜索涨停股票
print("\n示例5: 搜索涨停股票(关键词:锂电池)")
result = client.call_tool(
"search_limit_up_stocks",
{
"query": "锂电池",
"mode": "hybrid",
"page_size": 5
}
)
if result['success']:
print_result("涨停股票搜索结果", result['data'])
# 6. 搜索研究报告
print("\n示例6: 搜索研究报告(关键词:投资策略)")
result = client.call_tool(
"search_research_reports",
{
"query": "投资策略",
"mode": "hybrid",
"size": 3
}
)
if result['success']:
print_result("研究报告搜索结果", result['data'])
# 7. 获取概念统计数据
print("\n示例7: 获取概念统计最近7天")
result = client.call_tool(
"get_concept_statistics",
{
"days": 7,
"min_stock_count": 3
}
)
if result['success']:
print_result("概念统计结果", result['data'])
# 8. 搜索路演信息
print("\n示例8: 搜索路演信息(关键词:业绩)")
result = client.call_tool(
"search_roadshows",
{
"query": "业绩",
"size": 3
}
)
if result['success']:
print_result("路演搜索结果", result['data'])
# 9. 获取股票基本信息
print("\n示例9: 获取股票基本信息股票600519")
result = client.call_tool(
"get_stock_basic_info",
{
"seccode": "600519"
}
)
if result['success']:
print_result("股票基本信息", result['data'])
# 10. 获取股票财务指标
print("\n示例10: 获取股票财务指标股票600519最近5期")
result = client.call_tool(
"get_stock_financial_index",
{
"seccode": "600519",
"limit": 5
}
)
if result['success']:
print_result("财务指标", result['data'])
# 11. 获取股票交易数据
print("\n示例11: 获取股票交易数据股票600519最近10天")
result = client.call_tool(
"get_stock_trade_data",
{
"seccode": "600519",
"limit": 10
}
)
if result['success']:
print_result("交易数据", result['data'])
# 12. 按行业搜索股票
print("\n示例12: 按行业搜索股票(行业:半导体)")
result = client.call_tool(
"search_stocks_by_criteria",
{
"industry": "半导体",
"limit": 10
}
)
if result['success']:
print_result("行业股票", result['data'])
# 13. 股票对比分析
print("\n示例13: 股票对比分析600519 vs 000858")
result = client.call_tool(
"get_stock_comparison",
{
"seccodes": ["600519", "000858"],
"metric": "financial"
}
)
if result['success']:
print_result("股票对比", result['data'])
except Exception as e:
print(f"\n错误: {str(e)}")
finally:
client.close()
def test_single_tool():
"""测试单个工具(用于快速测试)"""
client = MCPClient()
try:
# 修改这里来测试不同的工具
result = client.call_tool(
"search_china_news",
{
"query": "芯片",
"exact_match": True,
"top_k": 3
}
)
print_result("测试结果", result)
except Exception as e:
print(f"错误: {str(e)}")
finally:
client.close()
if __name__ == "__main__":
# 运行完整示例
main()
# 或者测试单个工具
# test_single_tool()

108
mcp_config.py Normal file
View File

@@ -0,0 +1,108 @@
"""
MCP服务器配置文件
集中管理所有配置项
"""
from typing import Dict
from pydantic import BaseSettings
class Settings(BaseSettings):
"""应用配置"""
# 服务器配置
SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8900
DEBUG: bool = True
# 后端API服务端点
NEWS_API_URL: str = "http://222.128.1.157:21891"
ROADSHOW_API_URL: str = "http://222.128.1.157:19800"
CONCEPT_API_URL: str = "http://222.128.1.157:16801"
STOCK_ANALYSIS_API_URL: str = "http://222.128.1.157:8811"
# HTTP客户端配置
HTTP_TIMEOUT: float = 60.0
HTTP_MAX_RETRIES: int = 3
# 日志配置
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# CORS配置
CORS_ORIGINS: list = ["*"]
CORS_CREDENTIALS: bool = True
CORS_METHODS: list = ["*"]
CORS_HEADERS: list = ["*"]
# LLM配置如果需要集成
LLM_PROVIDER: str = "openai" # openai, anthropic, etc.
LLM_API_KEY: str = ""
LLM_MODEL: str = "gpt-4"
LLM_BASE_URL: str = ""
# 速率限制
RATE_LIMIT_ENABLED: bool = False
RATE_LIMIT_PER_MINUTE: int = 60
# 缓存配置
CACHE_ENABLED: bool = True
CACHE_TTL: int = 300 # 秒
class Config:
env_file = ".env"
case_sensitive = True
# 全局设置实例
settings = Settings()
# 工具类别映射(用于组织和展示)
TOOL_CATEGORIES: Dict[str, list] = {
"新闻搜索": [
"search_news",
"search_china_news",
"search_medical_news"
],
"公司研究": [
"search_roadshows",
"search_research_reports"
],
"概念板块": [
"search_concepts",
"get_concept_details",
"get_stock_concepts",
"get_concept_statistics"
],
"股票分析": [
"search_limit_up_stocks",
"get_daily_stock_analysis"
]
}
# 工具优先级用于LLM选择工具时的提示
TOOL_PRIORITIES: Dict[str, int] = {
"search_china_news": 10, # 最常用
"search_concepts": 9,
"search_limit_up_stocks": 8,
"search_research_reports": 8,
"get_stock_concepts": 7,
"search_news": 6,
"get_daily_stock_analysis": 5,
"get_concept_statistics": 5,
"search_medical_news": 4,
"search_roadshows": 4,
"get_concept_details": 3,
}
# 默认参数配置
DEFAULT_PARAMS = {
"top_k": 20,
"page_size": 20,
"size": 10,
"sort_by": "change_pct",
"mode": "hybrid",
"exact_match": False,
}

546
mcp_database.py Normal file
View File

@@ -0,0 +1,546 @@
"""
MySQL数据库查询模块
提供股票财务数据查询功能
"""
import aiomysql
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, date
from decimal import Decimal
import json
logger = logging.getLogger(__name__)
# MySQL连接配置
MYSQL_CONFIG = {
'host': '222.128.1.157',
'port': 33060,
'user': 'root',
'password': 'Zzl5588161!',
'db': 'stock',
'charset': 'utf8mb4',
'autocommit': True
}
# 全局连接池
_pool = None
class DateTimeEncoder(json.JSONEncoder):
"""JSON编码器处理datetime和Decimal类型"""
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
async def get_pool():
"""获取MySQL连接池"""
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=MYSQL_CONFIG['host'],
port=MYSQL_CONFIG['port'],
user=MYSQL_CONFIG['user'],
password=MYSQL_CONFIG['password'],
db=MYSQL_CONFIG['db'],
charset=MYSQL_CONFIG['charset'],
autocommit=MYSQL_CONFIG['autocommit'],
minsize=1,
maxsize=10
)
logger.info("MySQL connection pool created")
return _pool
async def close_pool():
"""关闭MySQL连接池"""
global _pool
if _pool:
_pool.close()
await _pool.wait_closed()
_pool = None
logger.info("MySQL connection pool closed")
def convert_row(row: Dict) -> Dict:
"""转换数据库行,处理特殊类型"""
if not row:
return {}
result = {}
for key, value in row.items():
if isinstance(value, Decimal):
result[key] = float(value)
elif isinstance(value, (datetime, date)):
result[key] = value.isoformat()
else:
result[key] = value
return result
async def get_stock_basic_info(seccode: str) -> Optional[Dict[str, Any]]:
"""
获取股票基本信息
Args:
seccode: 股票代码
Returns:
股票基本信息字典
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
query = """
SELECT
SECCODE, SECNAME, ORGNAME,
F001V as english_name,
F003V as legal_representative,
F004V as registered_address,
F005V as office_address,
F010D as establishment_date,
F011V as website,
F012V as email,
F013V as phone,
F015V as main_business,
F016V as business_scope,
F017V as company_profile,
F030V as industry_level1,
F032V as industry_level2,
F034V as sw_industry_level1,
F036V as sw_industry_level2,
F026V as province,
F028V as city,
F041V as chairman,
F042V as general_manager,
UPDATE_DATE as update_date
FROM ea_baseinfo
WHERE SECCODE = %s
LIMIT 1
"""
await cursor.execute(query, (seccode,))
result = await cursor.fetchone()
if result:
return convert_row(result)
return None
async def get_stock_financial_index(
seccode: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
获取股票财务指标
Args:
seccode: 股票代码
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
limit: 返回条数
Returns:
财务指标列表
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# 构建查询
query = """
SELECT
SECCODE, SECNAME, ENDDATE, STARTDATE,
F069D as report_year,
F003N as eps, -- 每股收益
F004N as basic_eps,
F008N as bps, -- 每股净资产
F014N as roe, -- 净资产收益率
F016N as roa, -- 总资产报酬率
F017N as net_profit_margin, -- 净利润率
F022N as receivable_turnover, -- 应收账款周转率
F023N as inventory_turnover, -- 存货周转率
F025N as total_asset_turnover, -- 总资产周转率
F041N as debt_ratio, -- 资产负债率
F042N as current_ratio, -- 流动比率
F043N as quick_ratio, -- 速动比率
F052N as revenue_growth, -- 营业收入增长率
F053N as profit_growth, -- 净利润增长率
F089N as revenue, -- 营业收入
F090N as operating_cost, -- 营业成本
F101N as net_profit, -- 净利润
F102N as net_profit_parent -- 归母净利润
FROM ea_financialindex
WHERE SECCODE = %s
"""
params = [seccode]
if start_date:
query += " AND ENDDATE >= %s"
params.append(start_date)
if end_date:
query += " AND ENDDATE <= %s"
params.append(end_date)
query += " ORDER BY ENDDATE DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
results = await cursor.fetchall()
return [convert_row(row) for row in results]
async def get_stock_trade_data(
seccode: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 30
) -> List[Dict[str, Any]]:
"""
获取股票交易数据
Args:
seccode: 股票代码
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
limit: 返回条数
Returns:
交易数据列表
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
query = """
SELECT
SECCODE, SECNAME, TRADEDATE,
F002N as prev_close, -- 昨日收盘价
F003N as open_price, -- 开盘价
F005N as high_price, -- 最高价
F006N as low_price, -- 最低价
F007N as close_price, -- 收盘价
F004N as volume, -- 成交量
F011N as turnover, -- 成交金额
F009N as change_amount, -- 涨跌额
F010N as change_pct, -- 涨跌幅
F012N as turnover_rate, -- 换手率
F013N as amplitude, -- 振幅
F026N as pe_ratio, -- 市盈率
F020N as total_shares, -- 总股本
F021N as circulating_shares -- 流通股本
FROM ea_trade
WHERE SECCODE = %s
"""
params = [seccode]
if start_date:
query += " AND TRADEDATE >= %s"
params.append(start_date)
if end_date:
query += " AND TRADEDATE <= %s"
params.append(end_date)
query += " ORDER BY TRADEDATE DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
results = await cursor.fetchall()
return [convert_row(row) for row in results]
async def get_stock_balance_sheet(
seccode: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 8
) -> List[Dict[str, Any]]:
"""
获取资产负债表数据
Args:
seccode: 股票代码
start_date: 开始日期
end_date: 结束日期
limit: 返回条数
Returns:
资产负债表数据列表
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
query = """
SELECT
SECCODE, SECNAME, ENDDATE,
F001D as report_year,
F006N as cash, -- 货币资金
F009N as receivables, -- 应收账款
F015N as inventory, -- 存货
F019N as current_assets, -- 流动资产合计
F023N as long_term_investment, -- 长期股权投资
F025N as fixed_assets, -- 固定资产
F037N as noncurrent_assets, -- 非流动资产合计
F038N as total_assets, -- 资产总计
F039N as short_term_loan, -- 短期借款
F042N as payables, -- 应付账款
F052N as current_liabilities, -- 流动负债合计
F053N as long_term_loan, -- 长期借款
F060N as noncurrent_liabilities, -- 非流动负债合计
F061N as total_liabilities, -- 负债合计
F062N as share_capital, -- 股本
F063N as capital_reserve, -- 资本公积
F065N as retained_earnings, -- 未分配利润
F070N as total_equity -- 所有者权益合计
FROM ea_asset
WHERE SECCODE = %s
"""
params = [seccode]
if start_date:
query += " AND ENDDATE >= %s"
params.append(start_date)
if end_date:
query += " AND ENDDATE <= %s"
params.append(end_date)
query += " ORDER BY ENDDATE DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
results = await cursor.fetchall()
return [convert_row(row) for row in results]
async def get_stock_cashflow(
seccode: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 8
) -> List[Dict[str, Any]]:
"""
获取现金流量表数据
Args:
seccode: 股票代码
start_date: 开始日期
end_date: 结束日期
limit: 返回条数
Returns:
现金流量表数据列表
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
query = """
SELECT
SECCODE, SECNAME, ENDDATE, STARTDATE,
F001D as report_year,
F009N as operating_cash_inflow, -- 经营活动现金流入
F014N as operating_cash_outflow, -- 经营活动现金流出
F015N as net_operating_cashflow, -- 经营活动现金流量净额
F021N as investing_cash_inflow, -- 投资活动现金流入
F026N as investing_cash_outflow, -- 投资活动现金流出
F027N as net_investing_cashflow, -- 投资活动现金流量净额
F031N as financing_cash_inflow, -- 筹资活动现金流入
F035N as financing_cash_outflow, -- 筹资活动现金流出
F036N as net_financing_cashflow, -- 筹资活动现金流量净额
F039N as net_cash_increase, -- 现金及现金等价物净增加额
F044N as net_profit, -- 净利润
F046N as depreciation, -- 固定资产折旧
F060N as net_operating_cashflow_adjusted -- 经营活动现金流量净额(补充)
FROM ea_cashflow
WHERE SECCODE = %s
"""
params = [seccode]
if start_date:
query += " AND ENDDATE >= %s"
params.append(start_date)
if end_date:
query += " AND ENDDATE <= %s"
params.append(end_date)
query += " ORDER BY ENDDATE DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
results = await cursor.fetchall()
return [convert_row(row) for row in results]
async def search_stocks_by_criteria(
industry: Optional[str] = None,
province: Optional[str] = None,
min_market_cap: Optional[float] = None,
max_market_cap: Optional[float] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""
按条件搜索股票
Args:
industry: 行业名称
province: 省份
min_market_cap: 最小市值(亿元)
max_market_cap: 最大市值(亿元)
limit: 返回条数
Returns:
股票列表
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
query = """
SELECT DISTINCT
b.SECCODE,
b.SECNAME,
b.F030V as industry_level1,
b.F032V as industry_level2,
b.F034V as sw_industry_level1,
b.F026V as province,
b.F028V as city,
b.F015V as main_business,
t.F007N as latest_price,
t.F010N as change_pct,
t.F026N as pe_ratio,
t.TRADEDATE as latest_trade_date
FROM ea_baseinfo b
LEFT JOIN (
SELECT SECCODE, MAX(TRADEDATE) as max_date
FROM ea_trade
GROUP BY SECCODE
) latest ON b.SECCODE = latest.SECCODE
LEFT JOIN ea_trade t ON b.SECCODE = t.SECCODE
AND t.TRADEDATE = latest.max_date
WHERE 1=1
"""
params = []
if industry:
query += " AND (b.F030V LIKE %s OR b.F032V LIKE %s OR b.F034V LIKE %s)"
pattern = f"%{industry}%"
params.extend([pattern, pattern, pattern])
if province:
query += " AND b.F026V = %s"
params.append(province)
if min_market_cap or max_market_cap:
# 市值 = 最新价 * 总股本 / 100000000转换为亿元
if min_market_cap:
query += " AND (t.F007N * t.F020N / 100000000) >= %s"
params.append(min_market_cap)
if max_market_cap:
query += " AND (t.F007N * t.F020N / 100000000) <= %s"
params.append(max_market_cap)
query += " ORDER BY t.TRADEDATE DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
results = await cursor.fetchall()
return [convert_row(row) for row in results]
async def get_stock_comparison(
seccodes: List[str],
metric: str = "financial"
) -> Dict[str, Any]:
"""
股票对比分析
Args:
seccodes: 股票代码列表
metric: 对比指标类型 (financial/trade)
Returns:
对比数据
"""
pool = await get_pool()
if not seccodes or len(seccodes) < 2:
return {"error": "至少需要2个股票代码进行对比"}
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
placeholders = ','.join(['%s'] * len(seccodes))
if metric == "financial":
# 对比最新财务指标
query = f"""
SELECT
f.SECCODE, f.SECNAME, f.ENDDATE,
f.F003N as eps,
f.F008N as bps,
f.F014N as roe,
f.F017N as net_profit_margin,
f.F041N as debt_ratio,
f.F052N as revenue_growth,
f.F053N as profit_growth,
f.F089N as revenue,
f.F101N as net_profit
FROM ea_financialindex f
INNER JOIN (
SELECT SECCODE, MAX(ENDDATE) as max_date
FROM ea_financialindex
WHERE SECCODE IN ({placeholders})
GROUP BY SECCODE
) latest ON f.SECCODE = latest.SECCODE
AND f.ENDDATE = latest.max_date
"""
else: # trade
# 对比最新交易数据
query = f"""
SELECT
t.SECCODE, t.SECNAME, t.TRADEDATE,
t.F007N as close_price,
t.F010N as change_pct,
t.F012N as turnover_rate,
t.F026N as pe_ratio,
t.F020N as total_shares,
t.F021N as circulating_shares
FROM ea_trade t
INNER JOIN (
SELECT SECCODE, MAX(TRADEDATE) as max_date
FROM ea_trade
WHERE SECCODE IN ({placeholders})
GROUP BY SECCODE
) latest ON t.SECCODE = latest.SECCODE
AND t.TRADEDATE = latest.max_date
"""
await cursor.execute(query, seccodes)
results = await cursor.fetchall()
return {
"comparison_type": metric,
"stocks": [convert_row(row) for row in results]
}

1066
mcp_server.py Normal file

File diff suppressed because it is too large Load Diff