51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

python3数据处理脚本模板

# python3数据处理脚本模板 {#python3数据处理脚本模板}

本文给出一些数据处理脚本的模板。平时在我们上线新的功能时,可能会涉及到处理历史数据,那么一般情况下,建议使用python脚本来处理,原因是linux服务器默认内置python环境,不需要为处理脚本额外搭建环境,另外运维人员也比较熟悉python脚本,所以运维人员也可以把控到风险。向运维提交的数据处理脚本,优先考虑sql语句,其次考虑python脚本。

# 1. 示例1: 生成sql插入语句 {#_1-示例1-生成sql插入语句}

本示例实现的数据处理需求:将已关注公众号、且没有入库的用户,保存到数据库中(实际上是生成insert语句)。

本示例用到的知识点:

  • 数据库读取
    借助pymysql。
  • 请求接口
    借助requests,可以自动将接口返回的数据转换为json。
  • snowflake
    生成snowflake ID。
  • 日志打印
    封装了一个通用的日志记录器。
  • 读取命令行参数
  • 变量值注入到字符串
    借助python3的字符串格式化语法:f''

该示例之所以生成了sql语句,而没有直接插入到数据库中。原因有3点:

  • 给sql的方式沟通更简单高效,也利于一旦出错时的sql回滚

  • 运行该脚本,还需要启动一个snowflake服务,运维还得多一步操作

    This is a sample Python script.

    Press ⌃R to execute it or replace it with your code.

    Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.

    import datetime import logging import sys

    import requests import pymysql import snowflake.client

    全局变量

    logger: logging.Logger = None mysql_connect: pymysql.connections.Connection = None mysql_host = None mysql_user = None mysql_pwd = None mysql_port = None mysql_db = None

    初始化日志记录器

    def init_logger(): global logger logger = logging.getLogger("data_process") logger.setLevel(logging.DEBUG) # 建立一个filehandler来把日志记录在文件里,级别为debug以上 fh = logging.FileHandler("data_process.log") fh.setLevel(logging.DEBUG) # 建立一个streamhandler来把日志打在CMD窗口上,级别为debug以上 ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # 设置日志格式 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) # 将相应的handler添加在logger对象中 logger.addHandler(ch) logger.addHandler(fh)

    初始化mysql连接

    def init_mysql_connect(): global mysql_host, mysql_user, mysql_pwd, mysql_port, mysql_db, mysql_connect

      mysql_connect = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_pwd, port=mysql_port,
                                      database=mysql_db)
    

    请求微信服务号的关注用户的openid列表

    def request_fuwuhao_openids(access_token): api = f'https://api.weixin.qq.com/cgi-bin/user/get?access_token={access_token}' wx_openids = requests.get(api).json()

      logger.info("请求微信服务号的关注用户的openid列表, 总数:%s", wx_openids['total'])
      return wx_openids["data"]["openid"]
    

    请求服务号的用户列表

    def request_new_fuwuhao_users(): global mysql_connect, access_token

      weixin_users = []
    
      openids = request_fuwuhao_openids(access_token)
      index = 0
      for openid in openids:
          logger.info("请求服务号的用户列表, 当前进度:%s, 总数: %s", index, len(openids))
          index = index + 1
    
          # 若当前openid在数据库中不存在,则补充到weixin_users数组
          if get_fuwuhao_user_from_db(openid) is None:
              logger.info("请求服务号的用户列表, 当前openid在数据库中不存在: %s", openid)
              weixin_user = request_weixin_user(openid)
              weixin_users.append(weixin_user)
          else:
              logging.info("请求服务号的用户列表, 当前openid在数据库中已存在: %s", openid)
    
      logger.info("请求服务号的用户列表, 完成, total: %s, count_will_process:%s", len(openids), len(weixin_users))
      return weixin_users
    

    请求某微信用户的信息

    def request_weixin_user(openid): global access_token

      api = f'https://api.weixin.qq.com/cgi-bin/user/info?lang=zh_CN&access_token={access_token}&openid={openid}'
      wx_user = requests.get(api).json()
      logger.info("请求某微信用户的信息: %s", wx_user)
      return wx_user
    

    从数据库中查询某服务号用户

    def get_fuwuhao_user_from_db(openid): global mysql_connect cursor = mysql_connect.cursor()

      sql = f"select 1 from fuwuhao_users where openid = '{openid}'"
      cursor.execute(sql)
      return cursor.fetchone()
    

    生成插入sql

    def generate_insert_sql(): global mysql_connect, access_token

      # 请求服务号关注用户列表
      wx_users = request_new_fuwuhao_users()
    
      # 生成sql
      sql = "INSERT INTO fuwuhao_users (id, is_subscribe, openid, lang, subscribe_time, unionid, remark, groupid, tagids, subscribe_scene, qr_scene, qr_scene_str, created_at, updated_at, deleted_at) VALUES "
      cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
      index = 0
      for elem in wx_users:
          sql = sql + f"({generate_snowflake_id()}, '{elem['subscribe']}', '{elem['openid']}', '{elem['language']}', {elem['subscribe_time']}, '{elem['unionid']}', '{elem['remark']}', {elem['groupid']}, '{elem['tagid_list']}', '{elem['subscribe_scene']}', {elem['qr_scene']}, '{elem['qr_scene_str']}', '{cur_time}', '{cur_time}', null)"
    
          if index != len(wx_users) - 1:
              sql = sql + ","
    
          index = index + 1
    
      return sql
    

    生成一个snowflake id

    python3生成snowflake的步骤:

    1. pip3 install pysnowflake

    2. 运行snowflake_start_server

    3. 代码中调用snowflake.client.get_guid()获取一个snowflake ID

    def generate_snowflake_id(): # 配置snowflake服务的主机配置,若不配置, 则默认使用localhost:8910 snowflake_server_host = 'localhost' snowflake_server_port = 8910 snowflake.client.setup(snowflake_server_host, snowflake_server_port)

      return snowflake.client.get_guid()
    

    Press the green button in the gutter to run the script.

    if name == 'main': # 从命令行读取参数 # 程序运行示例:python3 ./main.py weixin_api_access_token database_1 127.0.0.1 root 123456 3306 cmd_args = sys.argv access_token = cmd_args[1] mysql_db = cmd_args[2] mysql_host = cmd_args[3] mysql_user = cmd_args[4] mysql_pwd = cmd_args[5] mysql_port = int(cmd_args[6])

      # 初始化
      init_logger()
      init_mysql_connect()
    
      # 生成数据库表插入语句
      sql = generate_insert_sql()
      logger.info("main, 生成sql语句完成: %s", sql)
    
赞(1)
未经允许不得转载:工具盒子 » python3数据处理脚本模板