Smokeping 中使用 Python 处理 Targets文件

概述

smokeping 是一个开源免费的监控网络性能的工具,功能例如 ping、dig、echoping、curl 、dns等,绘制图形使用了 RRDtools 。

本文使用 docker 部署,操作比较简单,最主要的是收集全国各运营商的监测 IP ,以及维护这些 IP 的可用性,这里使用 Python 检测测试 IP 的可用性,并自动生成配置文件。

Smokeping 应用部署与配置

  1. 创建数据目录

    # 创建存放smokeping数据的目录mkdir -p /data/smokeping/config mkdir -p /data/smokeping/data
  2. 启动容器

    docker create  --name=smokeping  -e PUID=1000  -e PGID=1000  -e TZ=Asia/Shanghai  -p 80:80  -v /path/to/smokeping/config:/config  -v /path/to/smokeping/data:/data  --restart unless-stopped  linuxserver/smokeping
  3. 修改检测时间间隔

    这部分是修改 RRD 数据库的配置

    vim /data/smokeping/config/Database*** Database ***step   = 120 pings  = 20# 表示每 120 秒执行 20 次ping操作
  4. 修改 Targets 文件

    Targets 是 ping 测试的目标对象,日常修改主要是这个,我们这里通过 Python 检测测试 IP 的可用性,并自动生成配置文件

Python 处理 Targets 文件

  1. 收集测试 IP

    我主要是从 ipip.net收集的

    • 收集全国电信、联通、移动的测试 IP

    • 收集国外的测试 IP

  2. 将收集好的 IP 放到数据库

    我用的是 PostgreSQL ,新建两张表,分别是 ip_test、ip_test_international

    表结构如下图所示

    Smokeping 中使用 Python 处理 Targets文件Smokeping 中使用 Python 处理 Targets文件

  3. 收集完成后,记得备份 table,可使用如下 SQL 语句

    create table ip_test as table backup_ip_test;create table ip_test_international as table backup_ip_test_international;

    代码如下

    将对代码分段解释其作用

    使用到的模块的解释

  4. # 导入所需的模块 import osimport smtplibimport datetimeimport psycopg2from pypinyin import lazy_pinyinfrom email.utils import formataddrfrom email.mime.text import MIMEText
    • OS:执行 ping 操作

    • smtplib、email 相关模块:发送邮件使用

    • psycopg2:操作 PostgreSQL 数据库

    • Pypinyin:将汉字转换为拼音

定义一个邮件的列表

mail_down_ip_body = []

操作IP数据库,测试不通的设置为 is down

def ping_ip():    # 用 os.system ping 太慢了,可以优化一下,使用 python 封装 icmp,并使用多线程   conn = psycopg2.connect(dbname="数据库名称", user="数据库用户名", password="数据库密码", host="填写数据库地址", port="5432")    cursor = conn.cursor()# 为节省时间,这里就不合并到一起了cursor.execute("select china_unicom from ip_test;")ip_list_unicom = cursor.fetchall()for i in ip_list_unicom:  response = os.system('ping -c 8 ' + i[0])  if response == 0:      print(i[0], 'is up')  else:      print(i[0], 'is down')      cursor.execute("select state, city from ip_test where china_unicom = (%s) and china_unicom != 'is down';",                      (i[0],))      china_unicom_down_name = cursor.fetchall()      for z in china_unicom_down_name:          mail_down_ip_body.append(('联通', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))      cursor.execute("update ip_test set china_unicom = 'is down' where china_unicom = (%s);",                      (i[0],))cursor.execute("select china_mobile from ip_test;")ip_list_mobile = cursor.fetchall()for i in ip_list_mobile:  response = os.system('ping -c 8 ' + i[0])  if response == 0:      print(i[0], 'is up')  else:      print(i[0], 'is down')      cursor.execute("select state, city from ip_test where china_mobile = (%s) and china_mobile != 'is down';",                      (i[0],))      china_mobile_down_name = cursor.fetchall()      for z in china_mobile_down_name:          mail_down_ip_body.append(('移动', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))      cursor.execute("update ip_test set china_mobile = 'is down' where china_mobile = (%s);",                      (i[0],))cursor.execute("select china_telecom from ip_test;")ip_list_telecom = cursor.fetchall()for i in ip_list_telecom:  response = os.system('ping -c 8 ' + i[0])  if response == 0:      print(i[0], 'is up')  else:      print(i[0], 'is down')      cursor.execute("select state, city from ip_test where china_telecom = (%s) and china_telecom != 'is down';",                      (i[0],))      china_telecom_down_name = cursor.fetchall()      for z in china_telecom_down_name:          mail_down_ip_body.append(('电信', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))      cursor.execute("update ip_test set china_telecom = 'is down' where china_telecom = (%s);",                      (i[0],))cursor.execute("select ip from ip_test_international;")ip_list_int = cursor.fetchall()for i in ip_list_int:  response = os.system('ping -c 8 ' + i[0])  if response == 0:      print(i[0], 'is up')  else:      print(i[0], 'is down')      cursor.execute("select name from ip_test_international where ip = (%s) and ip != 'is down';",                      (i[0],))      ip_int_down_name = cursor.fetchall()      try:          mail_down_ip_body.append((ip_int_down_name[0][0], i[0]))      except IndexError:          pass      cursor.execute("update ip_test_international set ip = 'is down' where ip = (%s);",                      (i[0],))conn.commit()cursor.close()conn.close()

功能解释

  • 从数据库取到所有 ip,执行 ping 操作,每个 ip ping 8 个包,如果全部 timeout,将存放 IP 的位置更新为 is down

  • 由于原始数据库是从 csv 中导入的,所以有些空格,用了 .rstrip() 处理

从数据库拿到国内和国际的IP测试数据

def get_data():          conn = psycopg2.connect(dbname="数据库名称", user="数据库用户名", password="数据库密码",host="填写数据库地址", port="5432")    cursor = conn.cursor()    cursor.execute("select state, city, china_telecom, china_unicom, china_mobile, china_edu from ip_test;")    ip_test = cursor.fetchall()    cursor.execute("select name, name_en, ip from ip_test_international;")    ip_test_int = cursor.fetchall()    cursor.close()    conn.close()    return ip_test, ip_test_intdef output_config():    china_telcom = []    china_unicom = []    china_mobile = []    int_ip = []    for i in get_data()[0]:        if 'is down' != (i[2].rstrip()):            china_telcom.append(('+++ dianxin-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '电信', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[2].rstrip(), 'host = ' + i[2].rstrip()))for i in get_data()[0]:  # print(i[3].rstrip())  if 'is down' != (i[3].rstrip()):      china_unicom.append(('+++ liantong-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '联通', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[3].rstrip(), 'host = ' + i[3].rstrip()))for i in get_data()[0]:  if 'is down' != (i[4].rstrip()):      china_mobile.append(('+++ yidong-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '移动', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[4].rstrip(), 'host = ' + i[4].rstrip()))for i in get_data()[1]:  if 'is down' != (i[2]):      int_ip.append(('++ ' + ''.join(i[1]), 'menu = ' + i[0], 'title = ' + i[1] + '-' + i[2], 'host = ' + i[2]))return china_telcom, china_unicom, china_mobile, int_ip

功能解释

从数据库取到 ping 处理后的数据,处理后输出为如下格式的列表

[(+++ yidong-sd-2637,menu = 山东青岛移动,title = shandong-qingdao-218.201.98.33,host = 218.201.98.33)]

将之前所有获取的信息整理成smokeping所需的”Targets”配置文件

 #这里变量设置的比较乱def finally_target():  with open('ip_target.txt', 'r+') as f:      content = f.read()      f.seek(0, 0)      with open('begin_text', 'r') as file:          f.write(file.read() + 'n' + content)          print(f.read())  with open('ip_target.txt', 'a') as f:      with open('end_text', 'r') as file:          f.write(file.read())  with open('ip_target.txt', 'r+') as f:      aa = f.read()      pos = aa.find('+++ liantong')      cc = aa[:pos] + """++ liantong #联通menu = 联通网络监控 title = China Unicom #host = /Other/liantong/liantong-bj /Other/liantong/liantong-sh       /Other/liantong/liantong-gz       """ + 'n' + aa[pos:]         dd = cc.find('+++ yidong')      ee = cc[:dd] + """++ yidong #移动menu = 移动网络监控 title = China mobile""" + 'n' + cc[dd:]      ff = ee.find('++ Tokyo-Japan')      gg = ee[:ff] + """+ Internetmenu = 国际线路title = 国际线路      """ + 'n' + ee[ff:]      with open('Targets', 'w') as finally_txt:          finally_txt.write(gg)

功能解释

  • 得到之前处理后的数据,将 Target 文件拼装起来

    • Targets 包含了开头和结尾的文件如下

开头文件

*** Targets ***probe = FPingmenu = Toptitle = Network Latency Grapherremark = Welcome to the SmokePing website of WORKS Company.         Here you will learn all about the latency of our network.Other # 第一层级menu = 国内线路    title =  国内线路    ++ dianxin #电信    menu = 电信网络监控    title = 中国电信------------# 结尾文件+ InternetSitesmenu = Internet Sitestitle = Internet Sites++ JupiterBroadcastingmenu = JupiterBroadcastingtitle = JupiterBroadcastinghost = jupiterbroadcasting.com

这里省略了一些

def mail():    my_sender = 'houm01@foxmail.com'    my_user = 'houm01@foxmail.com'    my_pass = 'qq邮件授权码'with open('text.txt', 'w') as f:  for i in mail_down_ip_body:      f.write(str(i).replace('(', '').replace(')', '').replace(''', '').replace(',', ' -- ') + 'n')with open('text.txt', 'r') as f:  mail_txt_str = f.read()if len(mail_txt_str) != 0:  mail_text = '''{} 检测到新增 down ip 如下nn{}n已生成Targets文件,请管理员判断是否处理'''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), mail_txt_str)  scan_state = '有新增的down ip,请关注'else:  mail_text = '''{} 经过检测,没有发现有新增down的IPn          {}          '''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), mail_txt_str)  scan_state = '无新增down ip'ret = Truetry:  msg = MIMEText(mail_text, 'plain', 'utf-8')  msg['From'] = formataddr(["Smokeping 测试", my_sender]) # 括号里的对应发件人邮箱昵称、发件人邮箱账号  msg['To'] = formataddr(["Service", my_user]) # 括号里的对应收件人邮箱昵称、收件人邮箱账号  msg['Subject'] = "Smokeping 测试结果报告 - {} - {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), scan_state) # 邮件的主题,也可以说是标题  server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器,端口是25  server.login(my_sender, my_pass) # 括号中对应的是发件人邮箱账号、邮箱密码  server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件  server.quit() # 关闭连接except Exception: # 如果 try 中的语句没有执行,则会执行下面的 ret=False  ret = Falseif ret:  print("邮件发送成功")else:  print("邮件发送失败")

功能解释

将之前处理好的文件,构造成邮件,发出

需要做一个判断,如果检测结果没有新增的不通的 IP,邮件内容要说明无新增 IP

如果有新增 down 的 IP,列出是哪些 IP,并列出 IP 的归属地

if name == 'main':    ping_ip()    with open('ip_target.txt', 'w') as f:        for i in output_config():            for y in i:                for z in y:                    f.write(z + 'n')    finally_target()mail()

功能解释

  • 将之前几个函数拼起来

Crontab 调度之前写的脚本

crontab 的调度脚本如下,每天9点45分开始执行脚本

crontab -lSHELL=/bin/bash45 09 * * * cd /data/python_script/smokeping_test/ && python3 ip_test.py

总结

目前脚本还有很多不完善的地方,例如 邮件发送时没有带上附件、排列国际站点时没有对同一国家按顺序排列、ping 检测较慢,没有多进程处理 等问题,还需要持续优化一下.

但基本实现了 Smokeping 最麻烦的步骤,也就是处理维护 Targets 文件的问题.

在一个站点部署后,可以放使用 nginx 将 Target 文件发布出去,其他站点通过 wget 的方式获取.