- 经验
- 382
- 分贝
- 0
- 家园分
- 1692
- 在线时间:
- 682 小时
- 最后登录:
- 2024-10-23
- 帖子:
- 185
- 精华:
- 0
- 注册时间:
- 2008-9-20
- UID:
- 273058
注册:2008-9-20
|
发表于 2013-7-14 15:02:07
|显示全部楼层
嵌入式Web 服务器,需要压力测试,正好可以使用Python快速写一个小脚本,只花了半天时间,推荐给大家,继续完善,可以多线程发包,长连接或者短连接。
import socket #for sockets
import sys #for exit
import time
import thread,threading
remote_ip = '60.1.21.163'
port = 80
sockIndex = 1
def connToServer (x,times):
global sockIndex
Reconnect = 0
try:
#create an AF_INET, STREAM socket (TCP)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((remote_ip , port))
except socket.error:
print ('Failed to create socket. Error code: ')
sys.exit();
for i in range(1, times):
try:
#global Reconnect
if(1 == Reconnect):
print('Reconnect')
s.close
time.sleep(1)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((remote_ip , port))
print('Reconnect success')
Reconnect = 0
#s.sendall('POST /OMC_TOOL?action=UDT/ACS_OAM_CTRL HTTP/1.1\r\n')
#s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\nconnection: keep-alive\r\n\r\n')
s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /5OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /6OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /7OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /8OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
#s.sendall('GET /9OMC_TOOL?action=UDT/ACS_OAM_DATA_REPORT HTTP/1.1\r\n\r\n')
print ('socket s Senddata Successfully!')
time.sleep(1)
except socket.error,arg:
#global Reconnect
Reconnect =1
(errno,err_msg) = arg
print"fail:%s,errno=%d" %(err_msg,errno)
s.close
def test(): #Use thread.start_new_thread() to create 2 new threads
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
thread.start_new_thread(connToServer,(1,10000))
if __name__=='__main__':
test()
print ('Message test over')
|
|