python实现ping程序
ICMP检查(ICMP ping)是一种常见的网络主机检查方式。当需要判断网络上的某台主机是否在线时,只需要简单是使用ping检查一下就可以了,当然这也可以检查本地网络状态。基于ping命令的网络扫描,也是扫描器中经常使用的一种方式。比如大名鼎鼎的nmap。今天就和大家分享一下,如何使用python来进行ICMP ping检查。
关于ICMP协议以及ping程序原理,可以看看我的上一篇博文:《ICMP协议与ping原理》
方法一、使用python脚本调用系统中的ping命令简单实现:
import subprocess import shlex cmd = "ping -c 1 www.baidu.com" args = shlex.split(cmd) try: subprocess.check_call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print "baidu server is up!" except subprocess.CalledProcessError: print "Failed to get ping."
但是,很多情况下,系统中的ping可执行文件是不可用,或者无法访问。这时,就需要使用一个纯python的检查脚本了。下面是ICMP ping的python实现脚本。这个脚本中定义了一个Pinger类,使用的一个校验检验和的do_checksum()方法,一个发送ping数据报文的send_ping()方法,接受ping数据报文的receive_ping()方法和一个执行这个类的ping()主方法。下面是具体的代码:
# 定义Pinger类,初始化创建实例 import os import argparse import socket import struct import select import time ICMP_ECHO_REQUEST = 8 # Platform specific DEFAULT_TIMEOUT = 2 DEFAULT_COUNT = 4 class Pinger(object): """ Pings to a host -- the Pythonic way""" def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT): self.target_host = target_host self.count = count self.timeout = timeout
下面定义了do_checksum()方法,进行检验和的校验,校验方法如下:
- 把校验和字段置为0
- 将icmp包(包括header和data)以16bit(2个字节)为一组,并将所有组相加(二进制求和)
- 若高16bit不为0,则将高16bit与低16bit反复相加,直到高16bit的值为0,从而获得一个只有16bit长度的值
- 将此16bit值进行按位求反操作,将所得值替换到校验和字段
具体算法可以参考:【TCP/IP】检验和算法
def do_checksum(self, source_string):
""" Verify the packet integritity """
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while count < max_count: # 分割数据每两比特(16bit)为一组
val = ord(source_string[count + 1])*256 + ord(source_string[count])
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if max_count<len(source_string): # 如果数据长度为基数,则将最后一位单独相加
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff) # 将高16位与低16位相加直到高16位为0
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer # 返回的是十进制整数
下面是接受ICMP类型码为8的ICMP回应报文的方法。在未到达超时时间之前socket处于阻塞状态一直等待响应,当有数据传回时就接受响应,然后提取包含标识符ID的ICMP报文首部和包含发送时间值的ICMP内容部分,计算请求-响应的延时间隔。
def receive_ping(self, sock, ID, timeout): """ Receive ping from the socket. """ time_remaining = timeout while True: start_time = time.time() readable = select.select([sock], [], [], time_remaining) time_spent = (time.time() - start_time) if readable[0] == []: # Timeout return time_received = time.time() recv_packet, addr = sock.recvfrom(1024) icmp_header = recv_packet[20:28] type, code, checksum, packet_ID, sequence = struct.unpack( "bbHHh", icmp_header ) if packet_ID == ID: bytes_In_double = struct.calcsize("d") time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0] return time_received - time_sent time_remaining = time_remaining - time_spent if time_remaining <= 0: return
下面定义的send_ping()方法,获取远程主机的DNS主机名,然后使用struct模块创建一个ICMP_ECHO_REQUEST数据包,将查验请求的数据发送到目标主机。在此发送前也需要进行do_checksum()方法的校验。
def send_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host) my_checksum = 0 # Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + data # Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1))
下面定义了一个ping_once()方法,向远程主机发送一次查验:将ICMP协议传给socket()方法,创建一个原始的ICMP套接字。由于ping程序需要使用SOCK_RAW来构建数据包,所以需要root权限才能运行这个程序。因此,本程序需要使用root权限运行,下面的异常处理部分就是来负责未使用root运行时抛出的异常。
def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Not superuser, so operation not permitted msg += "ICMP messages can only be sent from root user processes" raise socket.error(msg) except Exception, e: print "Exception: %s" %(e) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_ping(sock, my_ID, self.timeout) sock.close() return delay
下面这个ping()是执行这个类的主要方法。在for循环中调用ping_once()方法,发送ping数据报文,并返回结果。
def ping(self): """ Run the ping process """ for i in xrange(self.count): print "Ping to %s..." % self.target_host, try: delay = self.ping_once() except socket.gaierror, e: print "Ping failed. (socket error: '%s')" % e[1] break if delay == None: print "Ping failed. (timeout within %ssec.)" % self.timeout else: delay = delay * 1000 print "Get ping in %0.4fms" % delay if __name__ == '__main__': parser = argparse.ArgumentParser(description='Python ping') parser.add_argument('--target-host', action="store", dest="target_host", required=True) given_args = parser.parse_args() target_host = given_args.target_host pinger = Pinger(target_host=target_host) pinger.ping()
完整代码链接: icmp_ping_tool.py
参考:
《TCP/IP详解 卷2:实现》
《Python网络编程攻略》
我想用Python实现ping的服务器端,该怎么写呢?就是截获别的机器ping我的ICMP数据包,然后用Python生成响应数据包发送回去。
2016-12-20 下午4:42需要管理员权限才可以运行 except socket.error, (errno, msg): if errno == 10013: # Not superuser, so operation not permitted msg += “ICMP messages can only be sent from root user processes” raise socket.error(msg)我用windows测试错误代码是10013不是1工具写的不错~
2017-01-11 上午2:45