【Python爬虫实例学习篇】——6、获取免费IP代理进阶: 在服务器上部署一个高可用代理池(1)

昨天碰巧得到了一个服务器,又恰逢之前的免费代理网站失效了,当即决定在服务器上部署一个代理池并让形成一个api接口。由于这篇文章内容太多,就分成 两篇 文章依次上传。目前在博主电脑上的验证效率约为:10w + /天。 如:
个人博客地址:https://www.asyu17.cn
另外,稍后我也会将代码上传至我的 Github仓库 内:https://github.com/asyu17/ProxyPool
效果图

工具:

  1. python3.6
  2. MongoDB 数据库(用于保存数据)安装可参考教程:https://blog.csdn.net/u011262253/article/details/74858211
  3. pymongo 库 (在python中调用MongoDB数据库)
  4. requests 库
  5. time 库(用于记录数据保存时间)
  6. threading 库 (提高代理验证效率)
  7. lxml 库
  8. 服务器

目录

  1. 获取代理模块
  2. 验证代理模块
  3. 存储模块
  4. 调用模块(本地调用
  5. proxy_pool.py
  6. 调用示例
  7. 服务器配置(见下篇文章
  8. 形成API接口(见下篇文章

获取代理模块

首先确定要爬的代理网站:

OK,现在来构建获取代理模块,由于之前的文章有讲过如何使用xpath抓数据,而且这次要抓的代理网站确实是太多了,就不赘述如何获取标签抓数据了,直接上代码~
整体思路: 构建一个总的 GetFreeProxyList 模块,然后在此之下再依次构建如快代理模块,云代理模块等等,统一返回的的是一个 list型数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# 感觉获取代理用不了多少时间,就采用单线程吧
import requests
from urllib import parse

requests.timeout=20

# 就不传数据了,数据在各个子函数内定义
# 就不传数据了,数据在各个子函数内定义
def GetFreeProxyList():
headers = {
'User-Agent': 'Mozilla/5.0WindowsNT6.1;rv:2.0.1)Gecko/20100101Firefox/4.0.1',
'Accept' : "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
}

# 各个代理网站

# 合并了四个代理网站
def kuai_yun_qinghua_superfast():

url_kuai='https://www.kuaidaili.com/free/inha/{num}'
url_yun='http://www.ip3366.net/free/?stype=1&page={num}'
url_qinghua='http://www.qinghuadaili.com/free/{num}/'
url_superfast='http://www.superfastip.com/welcome/freeip/{num}'

ip_list=[]
for j in range(4):
if j==0:
url=url_kuai
elif j==1:
url=url_yun
elif j==2:
url=url_qinghua
elif j==3:
url = url_superfast
for i in range(10):
temp_ip_list=[]
url=url.format(num=str(i+1))
res=requests.get(url=url,headers=headers)
html=etree.HTML(res.content)
ip=html.xpath("//table//td[1]")
port=html.xpath("//table//td[2]")
temp_ip_list.extend(list(map(lambda ip, port: (ip.text + ':' + port.text), ip, port)))
if len(temp_ip_list):
ip_list[len(ip_list):len(ip_list)+len(temp_ip_list)-1]=temp_ip_list
# 重置url
if j == 0:
url = url_kuai
elif j == 1:
url = url_yun
elif j == 2:
url = url_qinghua
elif j == 3:
url = url_superfast

# 去除重复元素
ip_list = list(set(ip_list))
return ip_list

def _89():
data = {
"num": "200",
"port": "",
"address": "",
"isp": ""
}
url = "http://www.89ip.cn/tqdl.html?api=1&" + str(parse.urlencode(data))
res = requests.get(url=url, headers=headers)
html = etree.HTML(res.content.decode(encoding='utf-8'))
ip_list = html.xpath("//body/text()")[2:-1]
ip_list[0] = ip_list[0].strip()

return ip_list

# 合并了两个
def xila_nima():
url_xila='http://www.xiladaili.com/gaoni/{num}'
url_nima='http://www.nimadaili.com/gaoni/{num}'

ip_list=[]
for j in range(2):
if j == 0:
url = url_xila
elif j == 1:
url = url_nima
for i in range(10):
url = url.format(num=str(i+1))
res = requests.get(url=url, headers=headers)
html = etree.HTML(res.content)
temp_ip_list=html.xpath("//table//td[1]/text()")
if len(temp_ip_list):
ip_list[len(ip_list):len(ip_list)+len(temp_ip_list)-1]=temp_ip_list
# 重置url
if j == 0:
url = url_xila
elif j == 1:
url = url_nima

return ip_list

# 将各个子模块的数据传给ip_list
ip_list=kuai_yun_qinghua_superfast()
temp_ip_list=_89()
ip_list[len(ip_list):len(temp_ip_list)+len(ip_list)-1]=temp_ip_list
temp_ip_list = xila_nima()
ip_list[len(ip_list):len(temp_ip_list) + len(ip_list) - 1] = temp_ip_list

代理获取模块效果图(由于设置了timeout参数,最终获取数量会取决与网络状态,理论上单次获取最高量为1500+):
ip获取模块效果图

验证代理模块

这里采用多线程进行ip验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import threading
import requests
import time

def Check_ippool(ip_list,
test_url='https://www.baidu.com', threading_num=10):
# 多线程要用到的一些全局变量初始化
global ip_list_new, index, success, fail, gLock, done
ip_list_new = []
index = -1
success = 0
fail = 0
gLock = threading.Lock()
done = 0

# 加入数据库内已有代理
db_ip_list = self.get_iplist(self.get_count())
ip_list[len(ip_list):len(ip_list) + len(db_ip_list) - 1] = db_ip_list
# 去除重复元素
ip_list = list(set(ip_list))
total = len(ip_list)

def Muti_Check_ip(ip_list, test_url):
# 多线程检查ip

global ip_list_new, index, success, fail, gLock, done
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36'
}
url = test_url
total = len(ip_list)
while True:
gLock.acquire()
if total > index + 1:
index = index + 1
ip = ip_list[index]
gLock.release()
proxies = {
'http': 'http://' + ip,
'https': 'https://' + ip
}
try:
res = requests.get(url=url, headers=headers, timeout=15, proxies=proxies, allow_redirects=False)
if res.status_code == 200:
flag = 1
else:
flag = -1
except (requests.exceptions.ReadTimeout, requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError) as e:
erro = e
flag = 0
gLock.acquire()
print('-' * 60)
if flag == 1:
success = success + 1
print("代理ip:" + ip + " 连接测试成功!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
ip_list_new.append(ip)
elif flag == 0:
fail = fail + 1
print(erro)
print("代理ip:" + ip + " 连接测试失败!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
elif flag == -1:
fail = fail + 1
print("代理ip:" + ip + " 连接测试失败!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
gLock.release()
else:
done = done + 1
gLock.release()
break

for x in range(threading_num):
# 创建线程
t = threading.Thread(target=Muti_Check_ip, args=([ip_list, test_url, ]))
# 启动线程
t.start()

while True:
if done == threading_num:
print('程序执行完成! 成功数为:%i' % success + '|| 失败数为:%i' % fail + '|| 总数为:%i' % total)
# 清空已有数据,同时存入新数据
self.delete()
self.insert(ip_list_new)
return ip_list_new
else:
time.sleep(0.5)

存储模块

使用存储模块前需要先安装MongoDB,安装完后还需要继续安装pymongo模块。
MongoDB下载地址:https://www.mongodb.com/download-center
pymongo模块安装命令:

1
$ pip install pymongo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import pymongo
# 使用MongoDB存储代理
# 初始化

# 连接MongoDB
self.client=pymongo.MongoClient(host='127.0.0.1',port=27017)
# 指定数据库
self.db=self.client['ProxyPool']
# 指定集合
self.proxy=self.db['proxy']

# 初始化索引
self.proxy.ensure_index('ip',unique=True)

# 插入
def insert(ip_list):
try:
list=[]
current_time=time.time()
for item in ip_list:
a={'ip':item,'add_time':current_time}
list.append(a)
self.proxy.insert(list)
except Exception as e:
print(e)

# 删除
def delete(conditions=None):
try:
self.proxy.remove(conditions)
except Exception as e:
print(e)

# 获取
def get(count,conditions=None):
try:
raw_data=self.proxy.find(conditions,limit=count)
return raw_data
except Exception as e:
print(e)
pass

# 获取当前数据库内的代理数量
def get_count():
return self.proxy.count()

调用模块

该调用模块是满足本地调用的需要!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 设置调用接口
def get_oneip():
proxy=self.get(1)
return proxy[0]['ip']

def get_iplist(num):
current_num=self.get_count()
if current_num<num:
num=current_num
ip_list=[]
proxies=self.get(num)
for item in proxies:
ip_list.append(item['ip'])
return ip_list

proxy_pool.py

在完成各个子模块后,我们在这里将上述各个全部封装到一个类里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# 感觉获取代理用不了多少时间,就采用单线程吧
import requests
from lxml import etree
from urllib import parse
import time
import threading
import pymongo

requests.timeout=20

# 就不传数据了,数据在各个子函数内定义
class proxyPool(object):
def GetFreeProxyList(self):
headers = {
'User-Agent': 'Mozilla/5.0WindowsNT6.1;rv:2.0.1)Gecko/20100101Firefox/4.0.1',
'Accept' : "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
}

# 各个代理网站

# 合并了四个代理网站
def kuai_yun_qinghua_superfast():

url_kuai='https://www.kuaidaili.com/free/inha/{num}'
url_yun='http://www.ip3366.net/free/?stype=1&page={num}'
url_qinghua='http://www.qinghuadaili.com/free/{num}/'
url_superfast='http://www.superfastip.com/welcome/freeip/{num}'

ip_list=[]
for j in range(4):
if j==0:
url=url_kuai
elif j==1:
url=url_yun
elif j==2:
url=url_qinghua
elif j==3:
url = url_superfast
for i in range(10):
temp_ip_list=[]
url=url.format(num=str(i+1))
res=requests.get(url=url,headers=headers)
html=etree.HTML(res.content)
ip=html.xpath("//table//td[1]")
port=html.xpath("//table//td[2]")
temp_ip_list.extend(list(map(lambda ip, port: (ip.text + ':' + port.text), ip, port)))
if len(temp_ip_list):
ip_list[len(ip_list):len(ip_list)+len(temp_ip_list)-1]=temp_ip_list
# 重置url
if j == 0:
url = url_kuai
elif j == 1:
url = url_yun
elif j == 2:
url = url_qinghua
elif j == 3:
url = url_superfast

# 去除重复元素
ip_list = list(set(ip_list))
return ip_list

def _89():
data = {
"num": "200",
"port": "",
"address": "",
"isp": ""
}
url = "http://www.89ip.cn/tqdl.html?api=1&" + str(parse.urlencode(data))
res = requests.get(url=url, headers=headers)
html = etree.HTML(res.content.decode(encoding='utf-8'))
ip_list = html.xpath("//body/text()")[2:-1]
ip_list[0] = ip_list[0].strip()

return ip_list

# 合并了两个
def xila_nima():
url_xila='http://www.xiladaili.com/gaoni/{num}'
url_nima='http://www.nimadaili.com/gaoni/{num}'

ip_list=[]
for j in range(2):
if j == 0:
url = url_xila
elif j == 1:
url = url_nima
for i in range(10):
url = url.format(num=str(i+1))
res = requests.get(url=url, headers=headers)
html = etree.HTML(res.content)
temp_ip_list=html.xpath("//table//td[1]/text()")
if len(temp_ip_list):
ip_list[len(ip_list):len(ip_list)+len(temp_ip_list)-1]=temp_ip_list
# 重置url
if j == 0:
url = url_xila
elif j == 1:
url = url_nima

return ip_list

# 将各个子模块的数据传给ip_list
ip_list=kuai_yun_qinghua_superfast()
temp_ip_list=_89()
ip_list[len(ip_list):len(temp_ip_list)+len(ip_list)-1]=temp_ip_list
temp_ip_list = xila_nima()
ip_list[len(ip_list):len(temp_ip_list) + len(ip_list) - 1] = temp_ip_list


return ip_list


def Check_ippool(self, ip_list,
test_url='https://www.baidu.com', threading_num=10):
# 多线程要用到的一些全局变量初始化
global ip_list_new, index, success, fail, gLock, done
ip_list_new = []
index = -1
success = 0
fail = 0
gLock = threading.Lock()
done = 0

# 加入数据库内已有代理
db_ip_list = self.get_iplist(self.get_count())
ip_list[len(ip_list):len(ip_list) + len(db_ip_list) - 1] = db_ip_list
# 去除重复元素
ip_list = list(set(ip_list))
total = len(ip_list)

def Muti_Check_ip(ip_list, test_url):
# 多线程检查ip

global ip_list_new, index, success, fail, gLock, done
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36'
}
url = test_url
total = len(ip_list)
while True:
gLock.acquire()
if total > index + 1:
index = index + 1
ip = ip_list[index]
gLock.release()
proxies = {
'http': 'http://' + ip,
'https': 'https://' + ip
}
try:
res = requests.get(url=url, headers=headers, timeout=15, proxies=proxies, allow_redirects=False)
if res.status_code == 200:
flag = 1
else:
flag = -1
except (requests.exceptions.ReadTimeout, requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError) as e:
erro = e
flag = 0
gLock.acquire()
print('-' * 60)
if flag == 1:
success = success + 1
print("代理ip:" + ip + " 连接测试成功!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
ip_list_new.append(ip)
elif flag == 0:
fail = fail + 1
print(erro)
print("代理ip:" + ip + " 连接测试失败!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
elif flag == -1:
fail = fail + 1
print("代理ip:" + ip + " 连接测试失败!|| 累计成功:" + str(success) +
" || 累计失败:" + str(fail) + " || 剩余:" + str(
total - success - fail))
gLock.release()
else:
done = done + 1
gLock.release()
break

for x in range(threading_num):
# 创建线程
t = threading.Thread(target=Muti_Check_ip, args=([ip_list, test_url, ]))
# 启动线程
t.start()

while True:
if done == threading_num:
print('程序执行完成! 成功数为:%i' % success + '|| 失败数为:%i' % fail + '|| 总数为:%i' % total)
# 清空已有数据,同时存入新数据
self.delete()
self.insert(ip_list_new)
return ip_list_new
else:
time.sleep(0.5)
# 使用MongoDB存储代理
# 初始化
def __init__(self,host='127.0.0.1',port=27017):
# 连接MongoDB
self.client=pymongo.MongoClient(host=host,port=port)
# 指定数据库
self.db=self.client['ProxyPool']
# 指定集合
self.proxy=self.db['proxy']

# 初始化索引
self.proxy.ensure_index('ip',unique=True)

# 插入
def insert(self,ip_list):
try:
list=[]
current_time=time.time()
for item in ip_list:
a={'ip':item,'add_time':current_time}
list.append(a)
self.proxy.insert(list)
except Exception as e:
print(e)

# 删除
def delete(self,conditions=None):
try:
self.proxy.remove(conditions)
except Exception as e:
print(e)

# 获取
def get(self,count,conditions=None):
try:
raw_data=self.proxy.find(conditions,limit=count)
return raw_data
except Exception as e:
print(e)
pass

# 获取当前数据库内的代理数量
def get_count(self):
return self.proxy.count()

# 设置调用接口
def get_oneip(self):
proxy=self.get(1)
return proxy[0]['ip']

def get_iplist(self,num):
current_num=self.get_count()
if current_num<num:
num=current_num
ip_list=[]
proxies=self.get(num)
for item in proxies:
ip_list.append(item['ip'])
return ip_list

调用示例

在调用之前需要先启动MongoDB,具体方法是:先以管理员身份运行cmd,然后输入下面的命令
运行cmd

1
$ net start mongodb

效果图

启动成功
启动完成后,就可以开始调用我们的代理池了,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from Proxy_Pool import proxy_pool

if __name__ == '__main__':
num = 0
proxy_pool = proxy_pool.proxyPool()
test_url = 'https://www.baidu.com'
while True:
start_time = time.time()
current_num=proxy_pool.get_count()
ip_list = proxy_pool.GetFreeProxyList()
good_ip_list = proxy_pool.Check_ippool(ip_list=ip_list, test_url=test_url, threading_num=20)
end_time = time.time()
d_time = end_time - start_time

print("第 " + str(num) + " 次" + "运行时间为:%.6s 秒" % d_time)
print('当前数据库内可用数:%i' % proxy_pool.get_count()+'|| 本次新加代理数:%i'%(proxy_pool.get_count()-current_num))
num = num + 1
time.sleep(60)

效果图:
效果图!

结语

未完待续·····,敬请期待下篇文章:《【Python爬虫实例学习篇】——6、获取免费IP代理进阶: 在服务器上部署一个高可用代理池(2)》

==微信公众号:==

小术快跑