搞安全的应该都知道端口扫描在渗透测试 、漏洞扫描过程中的重要性,其与URL爬虫等技术构成了漏洞扫描的第一阶段 ,即目标信息收集。因此能否开发出一款高效稳定的端口扫描器,往往决定了漏洞扫描器的好坏 。那么说到端口扫描器,我们往往会先想到nmap、masscan等神器 ,它们是这个领域的标杆。但本篇并不是为了介绍这几款工具,而是谈谈如何自研一款高效稳定的端口扫描器。
端口扫描器,顾名思义就是为了探测服务器上的某个端口是否开放 ,究其原理可以分为很多种探测方式,比如tcp三次握手扫描,syn扫描等等 ,本篇并不打算详细介绍这些扫描方式的区别 ,有兴趣的可以看下nmap的文档,对这几种扫描方式有详细的介绍 。
那么说下本文重点,基于这几天我研究并尝试利用python、go开发tcp扫描器 、tcp-syn扫描器 ,以及对比它们之间的速度性能、稳定性差异情况,将测试结果在此做个记录,并分享一下代码以及方案。
说明:文章结尾将给出本篇所使用代码的Github地址 ,可供大家测试,代码测试环境为centos7。
scan for Python Socket
Python的Socket模块可以创建套接字,创建tcp三次握手连接 ,以此探测目标端口是否存活 。本篇将使用socket模块编写tcp扫描以及syn扫描,并对比两者的差异。
tcp scan
快来看代码:
1 #! -*- coding:utf-8 -*- 2 import time 3 import socket 4 socket_timeout = 0.1 5 def tcp_scan(ip,port): 6 try: 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 s.settimeout(socket_timeout) 9 c=s.connect_ex((ip,port)) 10 if c==0: 11 print “%s:%s is open” % (ip,port) 12 else : 13 # print “%s:%s is not open ” % (ip,port) 14 pass 15 except Exception,e: 16 print e 17 s.close() 18 if __name__== “__main__” : 19 s_time = time.time() 20 ip = “14.215.177.38” 21 for port in range(0,1024): 22 ” ‘ 此处可用协作 ‘ ” 23 tcp_scan(ip,port) 24 e_time = time.time() 25 print “scan time is “ ,e_time-s_time
运行结果:
说明一下:可以看到此代码扫描1024个端口用了102s,当然代码并没有用多线程、协程等方式提高扫描效率(使用协程测试过扫65535个端口用时400s左右) ,因为python在这方面的能力比较弱;由于扫描过程中会建立tcp三次握手,因此比较消耗资源。
tcp syn scan
相对tcp扫描,tcp syn扫描方式更为隐蔽 ,也更节省资源 ,那么如何利用socket模块实现tcp syn扫描呢?这里需要用到SOCK_RAW,这个在socket编程中相对少用,资料也不多 。
1 # -*- coding: UTF-8 -*- 2 import time 3 import random 4 import socket 5 import sys 6 from struct import * 7 ” ‘ 8 Warning:must run it as root 9 yum install python-devel libpcap-devel 10 pip install pcap 11 ‘ ” 12 def checksum(msg): 13 ” ‘ Check Summing ‘ ” 14 s = 0 15 for i in range(0,len(msg),2): 16 w = (ord(msg[i]) << 8) + (ord(msg[i+1])) 17 s = s+w 18 s = (s>>16) + (s & 0xffff) 19 s = ~s & 0xffff 20 return s 21 def CreateSocket(source_ip,dest_ip): 22 ” ‘ create socket connection ‘ ” 23 try: 24 s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP) 25 except socket.error, msg: 26 print ‘Socket create error: ‘ ,str(msg[0]), ‘message: ‘ ,msg[1] 27 sys.exit() 28 ” ‘ Set the IP header manually ‘ ” 29 s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) 30 return s 31 def CreateIpHeader(source_ip, dest_ip): 32 ” ‘ create ip header ‘ ” 33 # packet = ” 34 # ip header option 35 headerlen = 5 36 version = 4 37 tos = 0 38 tot_len = 20 + 20 39 id = random.randrange(18000,65535,1) 40 frag_off = 0 41 ttl = 255 42 protocol = socket.IPPROTO_TCP 43 check = 10 44 saddr = socket.inet_aton ( source_ip ) 45 daddr = socket.inet_aton ( dest_ip ) 46 hl_version = (version << 4) + headerlen 47 ip_header = pack( ‘!BBHHHBBH4s4s’ , hl_version, tos, tot_len, id, frag_off, ttl, protocol, check, saddr, daddr) 48 return ip_header 49 def create_tcp_syn_header(source_ip, dest_ip, dest_port): 50 ” ‘ create tcp syn header function ‘ ” 51 source = random.randrange(32000,62000,1) # randon select one source_port 52 seq = 0 53 ack_seq = 0 54 doff = 5 55 ” ‘ tcp flags ‘ ” 56 fin = 0 57 syn = 1 58 rst = 0 59 psh = 0 60 ack = 0 61 urg = 0 62 window = socket.htons (8192) # max windows size 63 check = 0 64 urg_ptr = 0 65 offset_res = (doff << 4) + 0 66 tcp_flags = fin + (syn<<1) + (rst<<2) + (psh<<3) + (ack<<4) + (urg<<5) 67 tcp_header = pack( ‘!HHLLBBHHH’ , source , dest_port, seq, ack_seq, offset_res, tcp_flags, window, check, urg_ptr) 68 ” ‘ headers option ‘ ” 69 source_address = socket.inet_aton( source_ip ) 70 dest_address = socket.inet_aton( dest_ip ) 71 placeholder = 0 72 protocol = socket.IPPROTO_TCP 73 tcp_length = len(tcp_header) 74 psh = pack( ‘!4s4sBBH’ , source_address, dest_address, placeholder, protocol, tcp_length); 75 psh = psh + tcp_header; 76 tcp_checksum = checksum(psh) 77 ” ‘ Repack the TCP header and fill in the correct checksum ‘ ” 78 tcp_header = pack( ‘!HHLLBBHHH’ , source , dest_port, seq, ack_seq, offset_res, tcp_flags, window, tcp_checksum, urg_ptr) 79 return tcp_header 80 def syn_scan(source_ip, dest_ip, des_port) : 81 s = CreateSocket(source_ip, dest_ip) 82 ip_header = CreateIpHeader(source_ip, dest_ip) 83 tcp_header = create_tcp_syn_header(source_ip, dest_ip, des_port) 84 packet = ip_header + tcp_header 85 s.sendto(packet, (dest_ip, 0)) 86 data = s.recvfrom(1024) [0][0:] 87 ip_header_len = (ord(data[0]) & 0x0f) * 4 88 # ip_header_ret = data[0: ip_header_len – 1] 89 tcp_header_len = (ord(data[32]) & 0xf0)>>2 90 tcp_header_ret = data[ip_header_len:ip_header_len+tcp_header_len – 1] 91 ” ‘ SYN/ACK flags ‘ ” 92 if ord(tcp_header_ret[13]) == 0x12: 93 print “%s:%s is open” % (dest_ip,des_port) 94 else : 95 print “%s:%s is not open” % (dest_ip,des_port) 96 if __name__== “__main__” : 97 t_s = time.time() 98 source_ip = ” # 填写本机ip 99 dest_ip = ‘14.215.177.38’ 100 for des_port in range(1024): 101 syn_scan(source_ip, dest_ip, des_port) 102 t_e = time.time() 103 print “time is “ ,(t_e-t_s)
有一点需要注意的 ,运行这段代码前,需要在系统上安装依赖:
yum install python-devel libpcap-devel pip install pcap
运行结果:
说明:从运行结果上来看,并没有很准确 ,而且速度也不快,不清楚是不是代码上有问题 。
scan for Python scapy
除了socket模块外,python还有一个scapy模块 ,可以用来模拟发包,但只能在linux下使用,其他操作系统不建议使用此模块。
tcp syn csan
代码在这里:
1 #! -*- coding:utf-8 -*- 2 import time 3 from scapy.all import * 4 ip = “14.215.177.38” 5 TIMEOUT = 0.5 6 threads = 500 7 port_range = 1024 8 retry = 1 9 def is_up(ip): 10 “” ” Tests if host is up “ “” 11 icmp = IP(dst=ip)/ICMP() 12 resp = sr1(icmp, timeout=TIMEOUT) 13 if resp == None: 14 return False 15 else : 16 return True 17 def reset_half_open(ip, ports): 18 # Reset the connection to stop half-open connections from pooling up 19 sr(IP(dst=ip)/TCP(dport=ports, flags= ‘AR’ ), timeout=TIMEOUT) 20 def is_open(ip, ports): 21 to_reset = [] 22 results = [] 23 p = IP(dst=ip)/TCP(dport=ports, flags= ‘S’ ) # Forging SYN packet 24 answers, un_answered = sr(p, verbose=False, retry=retry ,timeout=TIMEOUT) # Send the packets 25 for req, resp in answers: 26 if not resp.haslayer(TCP): 27 continue 28 tcp_layer = resp.getlayer(TCP) 29 if tcp_layer.flags == 0x12: 30 # port is open 31 to_reset.append(tcp_layer.sport) 32 results.append(tcp_layer.sport) 33 elif tcp_layer.flags == 0x14: 34 # port is open 35 pass 36 reset_half_open(ip, to_reset) 37 return results 38 def chunks(l, n): 39 “” “Yield successive n-sized chunks from l. ” “” 40 for i in range(0, len(l), n): 41 yield l[i:i + n] 42 if __name__ == ‘__main__’ : 43 start_time = time.time() 44 open_port_list = [] 45 for ports in chunks(list(range(port_range)), threads): 46 results = is_open(ip, ports) 47 if results: 48 open_port_list += results 49 end_time = time.time() 50 print “%s %s” % (ip,open_port_list) 51 print “%s Scan Completed in %fs ” % (ip, end_time-start_time)
运行结果:
说明:由于scapy可以一次性发多个syn包 ,因此速度相对socket更快一些,但稳定性没有很好。
scan for python+nmap
文章开头提到了nmap,其实在python中也可以直接调用nmap ,看代码:
1 #! -*- coding:utf-8 -*- 2 ” ‘ 3 pip install python-nmap 4 ‘ ” 5 import nmap 6 nm =nmap.PortScanner() 7 def scan(ip,port,arg): 8 try: 9 nm.scan(ip, arguments=arg+str(port)) 10 except nmap.nmap.PortScannerError: 11 print “Please run -O method for root privileges ” 12 else : 13 for host in nm.all_hosts(): 14 for proto in nm[host].all_protocols(): 15 lport = nm[host][proto].keys() 16 lport.sort() 17 for port in lport: 18 print ( ‘port : %ststate : %s’ % (port, nm[host][proto][port][ ‘state’ ])) 19 if __name__== “__main__” : 20 port= “80,443,22,21” 21 scan(ip= “14.215.177.38” ,port=port,arg= “-sS -Pn -p ” ) 22 # tcp scan -sT 23 # tcp syn scan -sS
运行结果:
由于nmap扫描速度相对比较慢 ,因此这里只演示扫描4个端口,不做速度的对比,当然其稳定性还是可以的 。
scan for go
前文一直在介绍使用python语言开发端口扫描器 ,然而由于python在多线程方面的弱势,扫描器的性能可想而知,因此我又利用go语言的高并发性优势 ,尝试开发端口扫描器。(题外话:为此我花了半天时间看了下go语言的基础,勉强看懂了扫描代码,并做了一些修改)
tcp scan
直接看代码吧:
1 package main 2 // port tcp scan 3 import ( 4 “fmt” 5 “net” 6 “os ” 7 “runtime” 8 “strconv” 9 “sync ” 10 “time” 11 ) 12 func loop(inport chan int, startport, endport int) { 13 for i := startport; i <= endport; i++ { 14 inport <- i 15 } 16 close(inport) 17 } 18 type ScanSafeCount struct { 19 // 结构体 20 count int 21 mux sync.Mutex 22 } 23 var scanCount ScanSafeCount 24 func scanner(inport int, outport chan int, ip string, endport int) { 25 // 扫描函数 26 in := inport // 定义要扫描的端口号 27 // fmt.Printf( ” %d “ , in ) // 输出扫描的端口 28 host := fmt.Sprintf( “%s:%d ” , ip, in ) // 类似(ip,port) 29 tcpAddr, err := net.ResolveTCPAddr( “tcp4” , host) // 根据域名查找ip 30 if err != nil { 31 // 域名解析ip失败 32 outport <- 0 33 } else { 34 conn, err := net.DialTimeout( “tcp” , tcpAddr.String(), 10*time.Second) //建立tcp连接 35 if err != nil { 36 // tcp连接失败 37 outport <- 0 38 } else { 39 // tcp连接成功 40 outport <- in // 将端口写入outport信号 41 fmt.Printf( “n *************( %d 可以 )*****************n ” , in ) 42 conn.Close() 43 } 44 } 45 // 线程锁 46 scanCount.mux.Lock() 47 scanCount.count = scanCount.count – 1 48 if scanCount.count <= 0 { 49 close(outport) 50 } 51 scanCount.mux.Unlock() 52 } 53 func main () { <本文版权归趣快排营销www.seoguRubloG.com 所有,如有转发请注明来出,竞价开户托管,seo优化请联系✚Qq61910465
- •selenium中关于鼠标下滑window.scrollTo方法知乎爬虫数据采集实例
- •全平台短视频无水印解析源码(支持超30好几个平台)
- •第一个django运用--简易的 hello world 项目
- •Python WEB开发 用Python开展web开发必须学习什么?
- •Python用27行代码绘制一幅满天星
- •Django模板
- •pip安装难题:There was a problem confirming ssl certificate
- •Python XLRDError: Excel xlsx file; not supported解决方法
- •Django view视图
- •Django中的auth模块
SEO优化/竞价信息流托管/百度360搜狗推广开户/网站定制开发/建设推广流程