需求:
并发对程序进行压测:
import threading
import random
import requests
import time
cookies = {
'Cookie_2': 'value',
}
headers = {
'Content-Type': 'application/json',
# 'Cookie': 'Cookie_2=value',
}
生成随机数的线程类
class RandomNumberThread(threading.Thread):
def init(self, name):
threading.Thread.init(self)
self.name = name
self.random_number = None
self.cookies = cookies
self.headers = headers
self.json_data=None
def run(self):
self.random_number = random.randint(1, 100)
print(f"{self.name} generated random number: {self.random_number}")
json_data = {
'duration': 120,
'image_url': 'https://kt-smarthome.oss-cn-beijing.aliyuncs.com/musicFile/img/738bea0984de48dea02c06d272bcf48e.png',
'logotype': 2,
'songtype': 'singing',
'suno_id': 'f511aa1e-a3f3-4df1-9b44-f3cf0944d2' + str(self.random_number),
'title': '海之宁静',
'video_url': 'https://cdn1.suno.ai/1fb5fcd4-82d4-4ee7-803d-346cf6560d9e.mp4',
'write_lyrics': '你好红红火火恍恍惚惚哈哈哈哈哈哈哈哈哈哈',
'write_music': '路人饼',
}
self.json_data=json_data
response = requests.post(
'http://music2.baimeidashu.com/api/processCustomVideoDirect',
cookies=self.cookies,
headers=self.cookies,
json=self.json_data,
)
print(self.json_data)
start_time = time.time()
创建多个线程
threads = []
for i in range(100):
thread = RandomNumberThread(f"Thread-{i+1}")
threads.append(thread)
thread.start()
# print("------------")
# print(thread.random_number)
等待所有线程执行完毕
for thread in threads:
thread.join()
汇总并打印随机数
random_numbers = [thread.random_number for thread in threads]
print(f"All random numbers: {random_numbers}")
end_time = time.time()
execution_time = end_time - start_time
print("代码执行时间:", execution_time)
不知道为什么