Pyshark 使用手册(Python网络抓包分析库)¶
Pyshark 是基于Wireshark的Python封装库,提供强大的网络数据包捕获和分析功能,在CTF中用于流量分析和取证。
目录¶
概述¶
Pyshark 是Wireshark/Tshark的Python接口,提供: - 实时抓包: 捕获网络接口流量 - 离线分析: 读取PCAP/PCAPNG文件 - 协议解析: 支持数千种网络协议 - 数据过滤: BPF/Display过滤器 - 深度解析: 逐层分析数据包结构
CTF应用场景: - 网络流量分析 - 协议还原 - 敏感信息提取 - 加密流量分析 - 恶意流量检测
安装方法¶
# 1. 安装Wireshark/Tshark (必需)
# macOS
brew install wireshark
# Ubuntu/Debian
sudo apt-get install tshark
# Windows: 下载安装包
# https://www.wireshark.org/download.html
# 2. 安装pyshark
pip install pyshark
# 验证安装
python3 -c "import pyshark; print(pyshark.__version__)"
tshark --version
权限配置¶
# Linux: 允许非root用户抓包
sudo usermod -aG wireshark $USER
sudo chmod +x /usr/bin/dumpcap
# 或设置SUID
sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap
# 重新登录使权限生效
基础概念¶
Capture对象类型¶
import pyshark
# 1. LiveCapture - 实时抓包
capture = pyshark.LiveCapture(interface='eth0')
# 2. FileCapture - 读取PCAP文件
capture = pyshark.FileCapture('traffic.pcap')
# 3. RemoteCapture - 远程抓包
capture = pyshark.RemoteCapture('192.168.1.1', 'rpcapd')
# 4. InMemCapture - 内存抓包
capture = pyshark.InMemCapture()
数据包结构¶
import pyshark
capture = pyshark.FileCapture('sample.pcap')
packet = capture[0]
# 数据包层次结构
print(packet.layers) # 所有协议层
# 访问特定层
eth_layer = packet.eth # 以太网层
ip_layer = packet.ip # IP层
tcp_layer = packet.tcp # TCP层
# 层的字段
print(packet.ip.src) # 源IP
print(packet.ip.dst) # 目标IP
print(packet.tcp.srcport) # 源端口
抓包操作¶
实时抓包¶
import pyshark
# 简单抓包
capture = pyshark.LiveCapture(interface='en0')
# 抓取前10个数据包
for packet in capture.sniff_continuously(packet_count=10):
print(f'Time: {packet.sniff_time}')
print(f'Protocol: {packet.highest_layer}')
print(f'Length: {packet.length}')
print('-' * 50)
capture.close()
带过滤的抓包¶
import pyshark
# BPF过滤器
capture = pyshark.LiveCapture(
interface='en0',
bpf_filter='tcp port 80' # 只抓HTTP流量
)
# 抓取指定时间
capture.sniff(timeout=30) # 抓取30秒
# 遍历数据包
for packet in capture:
try:
print(f'HTTP: {packet.http.request_uri}')
except AttributeError:
pass
保存抓包结果¶
import pyshark
# 抓包并保存
capture = pyshark.LiveCapture(
interface='en0',
output_file='captured.pcap'
)
# 抓取100个包
capture.sniff(packet_count=100)
capture.close()
print("抓包完成,已保存到 captured.pcap")
PCAP文件分析¶
读取PCAP文件¶
import pyshark
# 打开PCAP文件
capture = pyshark.FileCapture('traffic.pcap')
# 统计基本信息
packet_count = 0
protocols = {}
for packet in capture:
packet_count += 1
protocol = packet.highest_layer
protocols[protocol] = protocols.get(protocol, 0) + 1
print(f"总数据包数: {packet_count}")
print(f"协议分布: {protocols}")
capture.close()
使用Display过滤器¶
import pyshark
# Display过滤器(Wireshark语法)
capture = pyshark.FileCapture(
'traffic.pcap',
display_filter='http.request.method == "POST"'
)
# 只处理POST请求
for packet in capture:
print(f'POST to: {packet.http.request_uri}')
if hasattr(packet.http, 'file_data'):
print(f'Data: {packet.http.file_data}')
capture.close()
常用Display过滤器¶
# HTTP过滤
'http' # 所有HTTP
'http.request' # HTTP请求
'http.response' # HTTP响应
'http.request.method == "GET"' # GET请求
'http.host == "example.com"' # 特定主机
# TCP过滤
'tcp.port == 80' # 端口80
'tcp.flags.syn == 1' # SYN包
'tcp.stream eq 0' # TCP流0
# IP过滤
'ip.addr == 192.168.1.1' # IP地址
'ip.src == 10.0.0.1' # 源IP
'ip.dst == 10.0.0.2' # 目标IP
# DNS过滤
'dns.qry.name == "example.com"' # DNS查询
'dns.flags.response == 1' # DNS响应
# 组合过滤
'http and ip.addr == 192.168.1.1'
'tcp.port == 80 or tcp.port == 443'
协议解析¶
HTTP协议分析¶
import pyshark
capture = pyshark.FileCapture('http_traffic.pcap', display_filter='http')
for packet in capture:
try:
# HTTP请求
if hasattr(packet.http, 'request_method'):
print(f"\n[REQUEST]")
print(f"Method: {packet.http.request_method}")
print(f"URI: {packet.http.request_uri}")
print(f"Host: {packet.http.host}")
if hasattr(packet.http, 'user_agent'):
print(f"User-Agent: {packet.http.user_agent}")
if hasattr(packet.http, 'cookie'):
print(f"Cookie: {packet.http.cookie}")
# HTTP响应
if hasattr(packet.http, 'response_code'):
print(f"\n[RESPONSE]")
print(f"Code: {packet.http.response_code}")
print(f"Phrase: {packet.http.response_phrase}")
if hasattr(packet.http, 'content_type'):
print(f"Content-Type: {packet.http.content_type}")
except AttributeError:
pass
capture.close()
DNS协议分析¶
import pyshark
capture = pyshark.FileCapture('dns_traffic.pcap', display_filter='dns')
for packet in capture:
try:
# DNS查询
if hasattr(packet.dns, 'qry_name'):
print(f"[Query] {packet.dns.qry_name}")
# DNS响应
if hasattr(packet.dns, 'a'):
print(f"[Answer] {packet.dns.qry_name} -> {packet.dns.a}")
except AttributeError:
pass
capture.close()
TCP流分析¶
import pyshark
capture = pyshark.FileCapture('tcp_traffic.pcap')
# 分析TCP流
tcp_streams = {}
for packet in capture:
if hasattr(packet, 'tcp'):
stream_id = packet.tcp.stream
if stream_id not in tcp_streams:
tcp_streams[stream_id] = {
'packets': [],
'bytes': 0
}
tcp_streams[stream_id]['packets'].append(packet)
tcp_streams[stream_id]['bytes'] += int(packet.length)
# 输出统计
for stream_id, data in tcp_streams.items():
print(f"Stream {stream_id}: {len(data['packets'])} packets, {data['bytes']} bytes")
capture.close()
数据提取¶
提取HTTP文件¶
import pyshark
import base64
def extract_http_files(pcap_file, output_dir='extracted'):
"""从PCAP中提取HTTP传输的文件"""
import os
os.makedirs(output_dir, exist_ok=True)
capture = pyshark.FileCapture(pcap_file, display_filter='http')
file_count = 0
for packet in capture:
try:
# 检查HTTP响应中的文件数据
if hasattr(packet.http, 'file_data'):
file_data = packet.http.file_data
# 获取Content-Type
content_type = packet.http.content_type if hasattr(packet.http, 'content_type') else 'unknown'
# 确定文件扩展名
ext_map = {
'image/png': '.png',
'image/jpeg': '.jpg',
'image/gif': '.gif',
'text/html': '.html',
'application/pdf': '.pdf',
'application/zip': '.zip'
}
ext = ext_map.get(content_type.split(';')[0], '.bin')
# 保存文件
file_count += 1
filename = f'{output_dir}/file_{file_count}{ext}'
# 文件数据是十六进制字符串,需要转换
file_bytes = bytes.fromhex(file_data.replace(':', ''))
with open(filename, 'wb') as f:
f.write(file_bytes)
print(f"提取文件: {filename} ({content_type})")
except Exception as e:
print(f"错误: {e}")
capture.close()
print(f"\n总共提取 {file_count} 个文件")
# 使用示例
extract_http_files('web_traffic.pcap')
提取FTP文件¶
import pyshark
def extract_ftp_files(pcap_file):
"""提取FTP传输的文件"""
capture = pyshark.FileCapture(pcap_file, display_filter='ftp-data')
for packet in capture:
if hasattr(packet, 'ftp_data'):
# FTP数据通道内容
data = packet.ftp_data.command_data
print(f"FTP Data: {data[:100]}...") # 前100字节
capture.close()
extract_ftp_files('ftp_traffic.pcap')
提取凭据信息¶
import pyshark
def extract_credentials(pcap_file):
"""提取明文传输的凭据"""
capture = pyshark.FileCapture(pcap_file)
print("正在搜索凭据...")
for packet in capture:
try:
# HTTP基本认证
if hasattr(packet, 'http') and hasattr(packet.http, 'authorization'):
auth = packet.http.authorization
if 'Basic' in auth:
print(f"[HTTP Auth] {auth}")
# FTP登录
if hasattr(packet, 'ftp'):
if hasattr(packet.ftp, 'request_command'):
cmd = packet.ftp.request_command
if cmd in ['USER', 'PASS']:
arg = packet.ftp.request_arg
print(f"[FTP] {cmd}: {arg}")
# SMTP认证
if hasattr(packet, 'smtp'):
if hasattr(packet.smtp, 'req_parameter'):
param = packet.smtp.req_parameter
if 'AUTH' in str(param):
print(f"[SMTP Auth] {param}")
except AttributeError:
pass
capture.close()
extract_credentials('network.pcap')
CTF常见场景¶
场景1: USB键盘流量分析¶
import pyshark
# USB键盘扫描码映射表
usb_codes = {
0x04: 'a', 0x05: 'b', 0x06: 'c', 0x07: 'd', 0x08: 'e',
0x09: 'f', 0x0a: 'g', 0x0b: 'h', 0x0c: 'i', 0x0d: 'j',
0x0e: 'k', 0x0f: 'l', 0x10: 'm', 0x11: 'n', 0x12: 'o',
0x13: 'p', 0x14: 'q', 0x15: 'r', 0x16: 's', 0x17: 't',
0x18: 'u', 0x19: 'v', 0x1a: 'w', 0x1b: 'x', 0x1c: 'y',
0x1d: 'z', 0x1e: '1', 0x1f: '2', 0x20: '3', 0x21: '4',
0x22: '5', 0x23: '6', 0x24: '7', 0x25: '8', 0x26: '9',
0x27: '0', 0x28: '\n', 0x2c: ' '
}
def parse_usb_keyboard(pcap_file):
"""解析USB键盘流量"""
capture = pyshark.FileCapture(pcap_file)
keystrokes = []
for packet in capture:
try:
if hasattr(packet, 'usb') and hasattr(packet.usb, 'capdata'):
data = packet.usb.capdata.replace(':', '')
# 键盘数据格式: [修饰键][保留][按键码]...
if len(data) >= 6:
key_code = int(data[4:6], 16)
if key_code in usb_codes:
keystrokes.append(usb_codes[key_code])
except:
pass
capture.close()
typed_text = ''.join(keystrokes)
print(f"键盘输入: {typed_text}")
return typed_text
# 使用示例
parse_usb_keyboard('usb_keyboard.pcap')
场景2: TCP流重组¶
import pyshark
def follow_tcp_stream(pcap_file, stream_id):
"""重组TCP流"""
capture = pyshark.FileCapture(
pcap_file,
display_filter=f'tcp.stream eq {stream_id}'
)
stream_data = []
for packet in capture:
if hasattr(packet, 'tcp') and hasattr(packet.tcp, 'payload'):
# 提取TCP载荷
payload = packet.tcp.payload.replace(':', '')
payload_bytes = bytes.fromhex(payload)
stream_data.append({
'time': packet.sniff_time,
'src': f"{packet.ip.src}:{packet.tcp.srcport}",
'dst': f"{packet.ip.dst}:{packet.tcp.dstport}",
'data': payload_bytes
})
capture.close()
# 输出重组数据
print(f"TCP流 {stream_id} 重组:")
for segment in stream_data:
print(f"\n{segment['time']} {segment['src']} -> {segment['dst']}")
print(f"Data: {segment['data'][:100]}")
# 合并所有数据
full_data = b''.join([s['data'] for s in stream_data])
return full_data
# 使用示例
data = follow_tcp_stream('traffic.pcap', stream_id=0)
with open('stream_0.bin', 'wb') as f:
f.write(data)
场景3: 无线流量分析¶
import pyshark
def analyze_wifi_traffic(pcap_file):
"""分析WiFi流量"""
capture = pyshark.FileCapture(pcap_file)
ssids = set()
clients = set()
for packet in capture:
try:
# 802.11协议
if hasattr(packet, 'wlan'):
# SSID
if hasattr(packet.wlan, 'ssid'):
ssid = packet.wlan.ssid
if ssid:
ssids.add(ssid)
# 客户端MAC地址
if hasattr(packet.wlan, 'sa'):
clients.add(packet.wlan.sa)
except:
pass
capture.close()
print("发现的SSID:")
for ssid in ssids:
print(f" - {ssid}")
print(f"\n客户端数量: {len(clients)}")
analyze_wifi_traffic('wifi.pcap')
场景4: ICMP隐蔽通道¶
import pyshark
def detect_icmp_covert_channel(pcap_file):
"""检测ICMP隐蔽通道"""
capture = pyshark.FileCapture(pcap_file, display_filter='icmp')
data_extracted = []
for packet in capture:
try:
if hasattr(packet.icmp, 'data'):
# ICMP数据部分
data = packet.icmp.data.replace(':', '')
data_bytes = bytes.fromhex(data)
# 检查是否为可打印字符
try:
text = data_bytes.decode('ascii')
if text.isprintable():
data_extracted.append(text)
print(f"发现可疑数据: {text}")
except:
pass
except:
pass
capture.close()
full_message = ''.join(data_extracted)
print(f"\n提取的消息: {full_message}")
return full_message
detect_icmp_covert_channel('icmp_covert.pcap')
实战案例¶
案例1: 提取图片¶
import pyshark
import re
def extract_images_from_pcap(pcap_file, output_dir='images'):
"""从HTTP流量中提取图片"""
import os
os.makedirs(output_dir, exist_ok=True)
capture = pyshark.FileCapture(pcap_file, display_filter='http')
image_count = 0
for packet in capture:
try:
if hasattr(packet.http, 'content_type'):
content_type = packet.http.content_type
# 检查是否为图片
if 'image' in content_type:
if hasattr(packet.http, 'file_data'):
file_data = packet.http.file_data.replace(':', '')
image_bytes = bytes.fromhex(file_data)
# 确定扩展名
if 'png' in content_type:
ext = '.png'
elif 'jpeg' in content_type or 'jpg' in content_type:
ext = '.jpg'
elif 'gif' in content_type:
ext = '.gif'
else:
ext = '.bin'
image_count += 1
filename = f'{output_dir}/image_{image_count}{ext}'
with open(filename, 'wb') as f:
f.write(image_bytes)
print(f"提取图片: {filename}")
except Exception as e:
pass
capture.close()
print(f"总共提取 {image_count} 张图片")
extract_images_from_pcap('web.pcap')
案例2: 密码爆破检测¶
import pyshark
from collections import defaultdict
def detect_brute_force(pcap_file):
"""检测密码爆破攻击"""
capture = pyshark.FileCapture(pcap_file)
# 统计每个IP的登录尝试
login_attempts = defaultdict(list)
for packet in capture:
try:
# SSH登录尝试
if hasattr(packet, 'ssh'):
src_ip = packet.ip.src
login_attempts[src_ip].append(packet.sniff_time)
# HTTP POST (可能是登录)
if hasattr(packet, 'http'):
if hasattr(packet.http, 'request_method'):
if packet.http.request_method == 'POST':
if 'login' in packet.http.request_uri.lower():
src_ip = packet.ip.src
login_attempts[src_ip].append(packet.sniff_time)
except:
pass
capture.close()
# 分析结果
print("可疑登录活动:")
for ip, attempts in login_attempts.items():
if len(attempts) > 10: # 阈值
print(f"\n[!] {ip}: {len(attempts)} 次尝试")
print(f" 时间跨度: {attempts[0]} 到 {attempts[-1]}")
detect_brute_force('auth.pcap')
案例3: DNS隧道检测¶
import pyshark
def detect_dns_tunneling(pcap_file):
"""检测DNS隧道"""
capture = pyshark.FileCapture(pcap_file, display_filter='dns')
suspicious_domains = []
for packet in capture:
try:
if hasattr(packet.dns, 'qry_name'):
domain = packet.dns.qry_name
# 检测特征
# 1. 子域名长度异常
subdomains = domain.split('.')
for subdomain in subdomains:
if len(subdomain) > 30:
suspicious_domains.append(domain)
print(f"[!] 异常长子域名: {domain}")
break
# 2. 熵值异常(随机字符串)
import math
from collections import Counter
def entropy(s):
p, lns = Counter(s), float(len(s))
return -sum(count / lns * math.log(count / lns, 2) for count in p.values())
if entropy(domain) > 3.5: # 高熵值
suspicious_domains.append(domain)
print(f"[!] 高熵值域名: {domain} (熵={entropy(domain):.2f})")
except:
pass
capture.close()
print(f"\n发现 {len(set(suspicious_domains))} 个可疑域名")
detect_dns_tunneling('dns.pcap')
高级技巧¶
自定义解析器¶
import pyshark
class CustomProtocolAnalyzer:
def __init__(self, pcap_file):
self.capture = pyshark.FileCapture(pcap_file)
def analyze(self):
for packet in self.capture:
# 自定义分析逻辑
self.process_packet(packet)
def process_packet(self, packet):
# 实现自定义协议解析
pass
analyzer = CustomProtocolAnalyzer('custom.pcap')
analyzer.analyze()
并发处理¶
import pyshark
from concurrent.futures import ThreadPoolExecutor
def process_packet(packet):
"""处理单个数据包"""
# 自定义处理逻辑
return packet.highest_layer
def analyze_pcap_concurrent(pcap_file, max_workers=4):
capture = pyshark.FileCapture(pcap_file)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(process_packet, capture)
return list(results)
# 使用示例
results = analyze_pcap_concurrent('large.pcap')
常见问题解决¶
问题1: tshark未找到¶
# 确认tshark路径
which tshark
# 设置tshark路径
import pyshark
pyshark.config.tshark_path = '/usr/local/bin/tshark'
问题2: 权限不足¶
# Linux: 添加用户到wireshark组
sudo usermod -aG wireshark $USER
# 设置dumpcap权限
sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap
问题3: 内存不足¶
# 使用eventloop方式处理大文件
capture = pyshark.FileCapture('large.pcap', use_json=True, include_raw=True)
# 或使用keep_packets=False
capture = pyshark.FileCapture('large.pcap', keep_packets=False)
for packet in capture:
# 处理数据包
pass # 处理完立即释放
参考资源¶
官方资源¶
- GitHub: https://github.com/KimiNewt/pyshark
- 文档: https://pyshark.readthedocs.io/
- Wireshark: https://www.wireshark.org/
学习资源¶
- Wireshark用户指南: https://www.wireshark.org/docs/wsug_html_chunked/
- Display过滤器参考: https://www.wireshark.org/docs/dfref/
- CTF流量分析: https://ctf-wiki.org/misc/traffic/
相关工具¶
- Scapy: Python数据包操作库
- Tshark: Wireshark命令行版本
- NetworkMiner: 网络取证工具
文档版本: v1.0 更新日期: 2025-01 适用版本: Pyshark v0.6+