0%

sqlmap_MCP

github:https://github.com/ilikeoyt/sqlmap-mcp

MCP核心架构

  • MCP 主机(MCP Hosts):发起请求的 LLM 应用程序(例如 Claude Desktop、IDE 或 AI 工具)。
  • MCP 客户端(MCP Clients):在主机程序内部,与 MCP server 保持 1:1 的连接。
  • MCP 服务器(MCP Servers):为 MCP client 提供上下文、工具和 prompt 信息。
  • 本地资源(Local Resources):本地计算机中可供 MCP server 安全访问的资源(例如文件、数据库)。
  • 远程资源(Remote Resources):MCP server 可以连接到的远程资源(例如通过 API)。

image-20250525153638895

MCP client 充当 LLM 和 MCP server 之间的桥梁,MCP client 的工作流程如下:

  • MCP client 首先从 MCP server 获取可用的工具列表。
  • 将用户的查询连同工具描述通过 function calling 一起发送给 LLM。
  • LLM 决定是否需要使用工具以及使用哪些工具。
  • 如果需要使用工具,MCP client 会通过 MCP server 执行相应的工具调用。
  • 工具调用的结果会被发送回 LLM。
  • LLM 基于所有信息生成自然语言响应。
  • 最后将响应展示给用户。

MCP 通信机制

MCP 协议支持两种主要的通信机制:基于标准输入输出 stdio 的本地通信和基于SSE(Server-Sent Events)的远程通信。这两种机制都使用 JSON-RPC 2.0 格式进行消息传输,确保了通信的标准化和可扩展性。

  • 本地通信:通过 stdio 传输数据,适用于在同一台机器上运行的客户端和服务器之间的通信。
  • 远程通信:利用 SSE 与 HTTP 结合,实现跨网络的实时数据传输,适用于需要访问远程资源或分布式部署的场景。

MCP Server

MCP Server 是 MCP 架构中的关键组件,它可以提供 3 种主要类型的功能:

  • 资源(Resources):类似文件的数据,可以被客户端读取,如 API 响应或文件内容。
  • 工具(Tools):可以被 LLM 调用的函数(需要用户批准)。
  • 提示(Prompts):预先编写的模板,帮助用户完成特定任务。

这些功能使 MCP server 能够为 AI 应用提供丰富的上下文信息和操作能力,从而增强 LLM 的实用性和灵活性。

Demo

安装uv命令

1
curl -LsSf https://astral.sh/uv/install.sh | sh

初始化项目

1
2
3
4
5
6
7
8
9
10
11
12
13
# 给项目创建一个文件夹
uv init weather
cd weather

# 创建一个虚拟环境并激活
uv venv
source .venv/bin/activate

# 安装依赖
uv add "mcp[cli]" httpx

# 创建 server 文件
touch weather.py

编写server代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"

async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None

def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string."""
props = feature["properties"]
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""

@mcp.tool()
async def get_alerts(state: str) -> str:
"""Get weather alerts for a US state.

Args:
state: Two-letter US state code (e.g. CA, NY)
"""
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
data = await make_nws_request(url)

if not data or "features" not in data:
return "Unable to fetch alerts or no alerts found."

if not data["features"]:
return "No active alerts for this state."

alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get weather forecast for a location.

Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
# First get the forecast grid endpoint
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

if not points_data:
return "Unable to fetch forecast data for this location."

# Get the forecast URL from the points response
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

if not forecast_data:
return "Unable to fetch detailed forecast."

# Format the periods into a readable forecast
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods
forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
forecasts.append(forecast)

return "\n---\n".join(forecasts)

if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')

可以通过mcp dev进行调试

1
mcp dev weather.py

image-20250525155107399

image-20250525155146762

测试提示词效果

image-20250525155225642

接入cherry Studio

image-20250525155655390

参数和环境变量都能在调试的命令行中看见

image-20250525155755077

运行测试

image-20250525155932619

sqlmap-mcp

在了解完mcp的一些基本知识,尝试编写这样的MCPserver(中间确实踩了很多坑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
import subprocess
from mcp.server.fastmcp import FastMCP
import logging

# 初始化 FastMCP 服务器
mcp = FastMCP("sqlmap")


async def run_sqlmap(url: str, sqlmap_args: list[str]) -> str:
"""异步执行 SQLMap 并返回字符串结果"""
try:
print(f"[DEBUG] 开始执行 SQLMap: {url}")
process = await asyncio.create_subprocess_exec(
"python3",
"/Users/yangxing/Desktop/sqlmap/sqlmap.py",
"-u", url,
*sqlmap_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
text=False # 必须为 False,手动处理编码
)

# 使用 asyncio.wait_for 设置超时
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=7200 # 2小时超时
)

# 手动解码输出
stdout_str = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_str = stderr.decode('utf-8', errors='replace') if stderr else ""

if process.returncode == 0:
print(f"[DEBUG] SQLMap 执行成功 (返回码: {process.returncode})")
return stdout_str if stdout_str else stderr_str
else:
print(f"[DEBUG] SQLMap 执行失败 (返回码: {process.returncode})")
return f"SQLMap 执行失败 (返回码: {process.returncode})\n{stderr_str}"

except asyncio.TimeoutError:
print("[DEBUG] SQLMap 执行超时")
return "SQLMap 执行超时 (7200秒)"

except Exception as e:
print(f"[DEBUG] 执行 SQLMap 时发生异常: {str(e)}")
return f"执行异常: {str(e)}"


@mcp.tool()
async def sqlmap_scan(url: str, sqlmap_args: list[str] = []) -> str:
"""使用 SQLMap 对目标 URL 进行 SQL 注入扫描"""
# 添加 --batch 参数确保非交互式执行
full_args = ["--batch"] + sqlmap_args

# 执行 SQLMap 并获取结果
output = await run_sqlmap(url, full_args)

# 确保始终返回字符串
return output if isinstance(output, str) else str(output)


if __name__ == "__main__":
# 使用正确的异步运行方式
try:
print("[INFO] 启动 SQLMap MCP 服务器...")
mcp.run()
except KeyboardInterrupt:
print("[INFO] 服务器已停止")

以sse的方式启动

1
mcp run sqlmap_mcp.py --transport sse

配置:

image-20250525161241390

测试效果(待完善,还有比较多的bug)

image-20250525161814615

stdio模式(目前只能在cursor中使用,cherry中不知道为什么会出现前后任务断联的情况。。。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
from typing import Any, Dict, Optional, List
import uuid
import asyncio
import subprocess
import os
import re
from enum import Enum
from mcp.server.fastmcp import FastMCP

# 初始化FastMCP服务器
mcp = FastMCP("sqlmap")

# 全局存储扫描任务
tasks: Dict[str, Dict[str, Any]] = {}
SQLMAP_PATH = "D:\\tools\\sqlmap\\sqlmap.py" # 使用您指定的SQLMap路径


class ScanStatus(Enum):
QUEUED = "queued" # 已排队
RUNNING = "running" # 运行中
COMPLETED = "completed"# 已完成
FAILED = "failed" # 已失败


async def run_sqlmap_scan(task_id: str, target_url: str, options: Dict[str, Any]) -> None:
"""异步执行sqlmap扫描并实时捕获输出"""
try:
# 确保任务存在
if task_id not in tasks:
return

# 构建sqlmap命令
cmd = [
"python",
SQLMAP_PATH,
"-u", target_url,
"--batch" # 非交互模式
]

# 添加额外选项
if options:
for key, value in options.items():
if isinstance(value, bool) and value:
cmd.append(f"--{key}")
elif isinstance(value, (int, float)):
cmd.append(f"--{key}={value}")
elif isinstance(value, str):
cmd.append(f"--{key}={value}")

# 更新任务状态
tasks[task_id]["status"] = ScanStatus.RUNNING.value
tasks[task_id]["command"] = " ".join(cmd) # 保存命令用于调试
tasks[task_id]["output"] = "" # 初始化输出缓冲区
tasks[task_id]["critical_lines"] = [] # 存储关键发现
tasks[task_id]["vulnerabilities"] = [] # 存储结构化漏洞信息

# 创建进程
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

# 实时读取输出
while True:
# 逐行读取标准输出
stdout_line = await process.stdout.readline()
if stdout_line:
line = stdout_line.decode('utf-8', errors='ignore').rstrip()
tasks[task_id]["output"] += line + "\n"

# 立即提取关键发现
if "[CRITICAL]" in line:
tasks[task_id]["critical_lines"].append(line)

# 实时提取漏洞信息
if "is vulnerable" in line and "parameter" in line:
parts = line.split()
if len(parts) > 3:
param = parts[1].strip("'")
vuln_type = " ".join(parts[3:])
tasks[task_id]["vulnerabilities"].append({
"parameter": param,
"type": vuln_type
})

# 逐行读取错误输出
stderr_line = await process.stderr.readline()
if stderr_line:
line = stderr_line.decode('utf-8', errors='ignore').rstrip()
tasks[task_id]["output"] += "[ERROR] " + line + "\n"
if "errors" not in tasks[task_id]:
tasks[task_id]["errors"] = []
tasks[task_id]["errors"].append(line)

# 检查进程是否已退出
if process.stdout.at_eof() and process.stderr.at_eof():
break

# 等待进程完成
return_code = await process.wait()

# 更新任务状态
if return_code == 0:
tasks[task_id]["status"] = ScanStatus.COMPLETED.value
# 解析最终结果
parse_scan_results_from_output(task_id)
else:
tasks[task_id]["status"] = ScanStatus.FAILED.value

# 保存最终信息
tasks[task_id]["return_code"] = return_code
tasks[task_id]["end_time"] = asyncio.get_event_loop().time()

except Exception as e:
if task_id in tasks:
tasks[task_id].update({
"status": ScanStatus.FAILED.value,
"error": str(e),
"end_time": asyncio.get_event_loop().time()
})


def parse_scan_results_from_output(task_id: str) -> None:
"""改进的sqlmap输出解析器,处理各种格式"""
try:
if "output" not in tasks[task_id]:
return

output = tasks[task_id]["output"]
results = []

# 1. 使用更灵活的正则表达式提取注入点
injection_points = re.findall(
r"Parameter: (.+?) \(.+?\)\n((?:\s+Type: .+?\n\s+Title: .+?\n\s+Payload: .+?\n)+)",
output,
re.DOTALL
)

for param, vuln_block in injection_points:
# 从块中提取每个漏洞
vulns = re.findall(
r"\s+Type: (.+?)\n\s+Title: (.+?)\n\s+Payload: (.+?)\n",
vuln_block,
re.DOTALL
)
for vuln in vulns:
vuln_type, title, payload = vuln
results.append({
"parameter": param,
"type": vuln_type.strip(),
"title": title.strip(),
"payload": payload.strip()
})

# 2. 处理新版本SQLMap的替代模式
if not results:
alt_points = re.findall(
r"(\w+) parameter '(.+?)' (is vulnerable.+)",
output
)
for method, param, details in alt_points:
results.append({
"parameter": param,
"type": f"{method} - {details}"
})

# 3. 提取数据库信息
db_info = re.search(
r"back-end DBMS: (.+?)\n",
output
)
if db_info:
results.append({
"type": "DBMS",
"info": db_info.group(1).strip()
})

# 4. 添加关键行作为后备
if "critical_lines" in tasks[task_id]:
for line in tasks[task_id]["critical_lines"]:
if "is vulnerable" in line:
results.append({
"type": "CRITICAL",
"info": line.replace("[CRITICAL] ", "")
})

# 5. 添加实时检测到的漏洞
if "vulnerabilities" in tasks[task_id]:
for vuln in tasks[task_id]["vulnerabilities"]:
# 避免重复
if not any(r.get("parameter") == vuln["parameter"] for r in results):
results.append(vuln)

if results:
tasks[task_id]["results"] = results

except Exception as e:
if task_id in tasks:
tasks[task_id]["parse_error"] = str(e)


@mcp.tool()
async def start_scan(target_url: str, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""启动一个新的SQLMap扫描任务

Args:
target_url: 要扫描的URL
options: 额外的sqlmap选项 (例如 {"level": 3, "risk": 2})
"""
task_id = str(uuid.uuid4())

try:
# 初始化任务
tasks[task_id] = {
"status": ScanStatus.QUEUED.value,
"target_url": target_url,
"options": options or {},
"start_time": asyncio.get_event_loop().time(),
"output": "",
"results": None
}

# 在后台启动扫描
asyncio.create_task(run_sqlmap_scan(task_id, target_url, options or {}))

return {
"task_id": task_id,
"message": f"已开始扫描 {target_url}",
"status_url": f"/scan/status/{task_id}"
}

except Exception as e:
return {
"task_id": task_id,
"error": f"无法启动扫描: {str(e)}",
"status": ScanStatus.FAILED.value
}


@mcp.tool()
async def get_scan_status(task_id: str) -> Dict[str, Any]:
"""获取扫描任务的状态

Args:
task_id: 扫描任务的ID
"""
if task_id not in tasks:
return {"error": "无效的任务ID"}

task = tasks[task_id]
status = {
"task_id": task_id,
"status": task["status"],
"target_url": task["target_url"],
}

# 添加时间信息
current_time = asyncio.get_event_loop().time()
if "start_time" in task:
elapsed = current_time - task["start_time"]
status["elapsed_time"] = f"{elapsed:.2f}s"

# 添加结果或错误信息
if "results" in task and task["results"]:
status["results"] = task["results"]

# 对于运行中的任务,显示部分输出
if task["status"] == ScanStatus.RUNNING.value and "output" in task:
# 显示最后20行输出
lines = task["output"].splitlines()
status["partial_output"] = "\n".join(lines[-20:])

# 对于已完成的任务,显示摘要
if task["status"] == ScanStatus.COMPLETED.value:
if "output" in task:
# 提取摘要信息
summary = re.search(
r"sqlmap identified the following injection point\(s\):(.+?)\n\n",
task["output"],
re.DOTALL
)
if summary:
status["summary"] = summary.group(1).strip()
else:
status["summary"] = "未发现漏洞" if not task.get("results") else "发现漏洞"

# 对于失败的任务,显示错误详情
if task["status"] == ScanStatus.FAILED.value:
if "error" in task:
status["error"] = task["error"]
elif "errors" in task and task["errors"]:
status["error"] = task["errors"][-1] # 显示最后一个错误
elif "output" in task:
# 尝试在输出中查找错误
error_match = re.search(r"\[ERROR\] (.+)", task["output"])
if error_match:
status["error"] = error_match.group(1)

# 添加调试信息
if "command" in task:
status["command"] = task["command"]

return status


@mcp.tool()
async def list_scans(include_completed: bool = True) -> Dict[str, Any]:
"""列出所有扫描任务

Args:
include_completed: 是否包含已完成的任务
"""
active_tasks = []
completed_tasks = []

for task_id, task in tasks.items():
task_info = {
"task_id": task_id,
"status": task["status"],
"target_url": task["target_url"],
"start_time": task.get("start_time", 0)
}

if task["status"] in [ScanStatus.QUEUED.value, ScanStatus.RUNNING.value]:
active_tasks.append(task_info)
elif include_completed and task["status"] in [ScanStatus.COMPLETED.value, ScanStatus.FAILED.value]:
task_info["end_time"] = task.get("end_time", 0)
completed_tasks.append(task_info)

return {
"active_tasks": active_tasks,
"completed_tasks": completed_tasks
}


if __name__ == "__main__":
# 初始化并运行服务器
mcp.run(transport='stdio')

image-20250602210035491