# python3数据处理脚本模板 {#python3数据处理脚本模板}
本文给出一些数据处理脚本的模板。平时在我们上线新的功能时,可能会涉及到处理历史数据,那么一般情况下,建议使用python脚本来处理,原因是linux服务器默认内置python环境,不需要为处理脚本额外搭建环境,另外运维人员也比较熟悉python脚本,所以运维人员也可以把控到风险。向运维提交的数据处理脚本,优先考虑sql语句,其次考虑python脚本。
# 1. 示例1: 生成sql插入语句 {#_1-示例1-生成sql插入语句}
本示例实现的数据处理需求:将已关注公众号、且没有入库的用户,保存到数据库中(实际上是生成insert语句)。
本示例用到的知识点:
- 数据库读取
借助pymysql。 - 请求接口
借助requests,可以自动将接口返回的数据转换为json。 - snowflake
生成snowflake ID。 - 日志打印
封装了一个通用的日志记录器。 - 读取命令行参数
- 变量值注入到字符串
借助python3的字符串格式化语法:f''
。
该示例之所以生成了sql语句,而没有直接插入到数据库中。原因有3点:
-
给sql的方式沟通更简单高效,也利于一旦出错时的sql回滚
-
运行该脚本,还需要启动一个snowflake服务,运维还得多一步操作
This is a sample Python script.
Press ⌃R to execute it or replace it with your code.
Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import datetime import logging import sys
import requests import pymysql import snowflake.client
全局变量
logger: logging.Logger = None mysql_connect: pymysql.connections.Connection = None mysql_host = None mysql_user = None mysql_pwd = None mysql_port = None mysql_db = None
初始化日志记录器
def init_logger(): global logger logger = logging.getLogger("data_process") logger.setLevel(logging.DEBUG) # 建立一个filehandler来把日志记录在文件里,级别为debug以上 fh = logging.FileHandler("data_process.log") fh.setLevel(logging.DEBUG) # 建立一个streamhandler来把日志打在CMD窗口上,级别为debug以上 ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # 设置日志格式 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) # 将相应的handler添加在logger对象中 logger.addHandler(ch) logger.addHandler(fh)
初始化mysql连接
def init_mysql_connect(): global mysql_host, mysql_user, mysql_pwd, mysql_port, mysql_db, mysql_connect
mysql_connect = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_pwd, port=mysql_port, database=mysql_db)
请求微信服务号的关注用户的openid列表
def request_fuwuhao_openids(access_token): api = f'https://api.weixin.qq.com/cgi-bin/user/get?access_token={access_token}' wx_openids = requests.get(api).json()
logger.info("请求微信服务号的关注用户的openid列表, 总数:%s", wx_openids['total']) return wx_openids["data"]["openid"]
请求服务号的用户列表
def request_new_fuwuhao_users(): global mysql_connect, access_token
weixin_users = [] openids = request_fuwuhao_openids(access_token) index = 0 for openid in openids: logger.info("请求服务号的用户列表, 当前进度:%s, 总数: %s", index, len(openids)) index = index + 1 # 若当前openid在数据库中不存在,则补充到weixin_users数组 if get_fuwuhao_user_from_db(openid) is None: logger.info("请求服务号的用户列表, 当前openid在数据库中不存在: %s", openid) weixin_user = request_weixin_user(openid) weixin_users.append(weixin_user) else: logging.info("请求服务号的用户列表, 当前openid在数据库中已存在: %s", openid) logger.info("请求服务号的用户列表, 完成, total: %s, count_will_process:%s", len(openids), len(weixin_users)) return weixin_users
请求某微信用户的信息
def request_weixin_user(openid): global access_token
api = f'https://api.weixin.qq.com/cgi-bin/user/info?lang=zh_CN&access_token={access_token}&openid={openid}' wx_user = requests.get(api).json() logger.info("请求某微信用户的信息: %s", wx_user) return wx_user
从数据库中查询某服务号用户
def get_fuwuhao_user_from_db(openid): global mysql_connect cursor = mysql_connect.cursor()
sql = f"select 1 from fuwuhao_users where openid = '{openid}'" cursor.execute(sql) return cursor.fetchone()
生成插入sql
def generate_insert_sql(): global mysql_connect, access_token
# 请求服务号关注用户列表 wx_users = request_new_fuwuhao_users() # 生成sql sql = "INSERT INTO fuwuhao_users (id, is_subscribe, openid, lang, subscribe_time, unionid, remark, groupid, tagids, subscribe_scene, qr_scene, qr_scene_str, created_at, updated_at, deleted_at) VALUES " cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') index = 0 for elem in wx_users: sql = sql + f"({generate_snowflake_id()}, '{elem['subscribe']}', '{elem['openid']}', '{elem['language']}', {elem['subscribe_time']}, '{elem['unionid']}', '{elem['remark']}', {elem['groupid']}, '{elem['tagid_list']}', '{elem['subscribe_scene']}', {elem['qr_scene']}, '{elem['qr_scene_str']}', '{cur_time}', '{cur_time}', null)" if index != len(wx_users) - 1: sql = sql + "," index = index + 1 return sql
生成一个snowflake id
python3生成snowflake的步骤:
1. pip3 install pysnowflake
2. 运行snowflake_start_server
3. 代码中调用snowflake.client.get_guid()获取一个snowflake ID
def generate_snowflake_id(): # 配置snowflake服务的主机配置,若不配置, 则默认使用localhost:8910 snowflake_server_host = 'localhost' snowflake_server_port = 8910 snowflake.client.setup(snowflake_server_host, snowflake_server_port)
return snowflake.client.get_guid()
Press the green button in the gutter to run the script.
if name == 'main': # 从命令行读取参数 # 程序运行示例:python3 ./main.py weixin_api_access_token database_1 127.0.0.1 root 123456 3306 cmd_args = sys.argv access_token = cmd_args[1] mysql_db = cmd_args[2] mysql_host = cmd_args[3] mysql_user = cmd_args[4] mysql_pwd = cmd_args[5] mysql_port = int(cmd_args[6])
# 初始化 init_logger() init_mysql_connect() # 生成数据库表插入语句 sql = generate_insert_sql() logger.info("main, 生成sql语句完成: %s", sql)