搭建免费的代理ip池
需要解决的问题:
-
使用什么方式存储ip
-
文件存储
缺点: 打开文件修改文件操作较麻烦
-
mysql
缺点: 查询速度较慢
-
mongodb
缺点: 查询速度较慢. 没有查重功能
-
redis --> 使用redis存储最为合适
所以 -> 数据结构采用redis中的zset有序集合
-
-
获取ip的网站
-
https://ip.jiangxianli.com/
-
https://free.kuaidaili.com/free/intr/
-
-
项目架构???
项目架构
- 获取api
- 筛选api
- 验证api的有效性
- 提供api
项目结构图
项目结构如下:
项目代码
code文件夹
redis_proxy.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu \"\"\" 这里用来做redis中间商. 去控制redis和ip之间的调用关系 \"\"\" from redis import Redis import random class RedisProxy: def __init__(self): # 连接到redis数据库 self.red = Redis( host=\'localhost\', port=6379, db=9, password=123456, decode_responses=True ) # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉 # 2. 需要校验所有的ip. 查询ip # 3. 验证可用性. 可用分值拉满. 不可用扣分 # 4. 将可用的ip查出来返回给用户 # 先给满分的 # 再给有分的 # 都没有分. 就不给 def add_ip(self, ip): # 外界调用并传入ip # 判断ip在redis中是否存在 if not self.red.zscore(\'proxy_ip\', ip): self.red.zadd(\'proxy_ip\', {ip: 10}) print(\'proxy_ip存储完毕\', ip) else: print(\'存在重复\', ip) def get_all_proxy(self): # 查询所有的ip功能 return self.red.zrange(\'proxy_ip\', 0, -1) def set_max_score(self, ip): self.red.zadd(\'proxy_ip\', {ip: 100}) # 注意是引号的格式 def deduct_score(self, ip): # 先将分数查询出来 score = self.red.zscore(\'proxy_ip\', ip) # 如果有分值.那就扣一分 if score > 0: self.red.zincrby(\'proxy_ip\', -1, ip) else: # 如果分值已经扣的小于0了. 那么可以直接删除了 self.red.zrem(\'proxy_ip\', ip) def effect_ip(self): # 先将ip通过分数筛选出来 ips = self.red.zrangebyscore(\'proxy_ip\', 100, 100, 0, -1) if ips: return random.choice(ips) else: # 没有满分的 # 将九十分以上的筛选出来 ips = self.red.zrangebyscore(\'proxy_ip\', 11, 99, 0, -1) if ips: return random.choice(ips) else: print(\'无可用ip\') return None
ip_collection.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu \"\"\" 这里用来收集ip \"\"\" from redis_proxy import RedisProxy import requests from lxml import html from multiprocessing import Process import time import random def get_kuai_ip(red): url = \"https://free.kuaidaili.com/free/intr/\" headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath(\'//table//tr\') for tr in trs: ip = tr.xpath(\'./td[1]/text()\') port = tr.xpath(\'./td[2]/text()\') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + \":\" + port[0] red.add_ip(proxy_ip) def get_unknown_ip(red): url = \"https://ip.jiangxianli.com/\" headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath(\'//table//tr\') for tr in trs: ip = tr.xpath(\'./td[1]/text()\') port = tr.xpath(\'./td[2]/text()\') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + \":\" + port[0] red.add_ip(proxy_ip) def get_happy_ip(red): page = random.randint(1, 5) url = f\'http://www.kxdaili.com/dailiip/2/{page}.html\' headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath(\'//table//tr\') for tr in trs: ip = tr.xpath(\'./td[1]/text()\') port = tr.xpath(\'./td[2]/text()\') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + \":\" + port[0] red.add_ip(proxy_ip) def get_nima_ip(red): url = \'http://www.nimadaili.com/\' headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath(\'//table//tr\') for tr in trs: ip = tr.xpath(\'./td[1]/text()\') # 这里存在空值. 所以不能在后面加[0] if not ip: continue red.add_ip(ip[0]) def get_89_ip(red): page = random.randint(1, 26) url = f\'https://www.89ip.cn/index_{page}.html\' headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath(\'//table//tr\') for tr in trs: ip = tr.xpath(\'./td[1]/text()\') if not ip: continue red.add_ip(ip[0].strip()) def main(): # 创建一个redis实例化对象 red = RedisProxy() print(\"开始采集数据\") while 1: try: # 这里可以添加各种采集的网站 print(\'>>>开始收集快代理ip\') get_kuai_ip(red) # 收集快代理 # get_unknown_ip(red) # 收集ip print(\">>>开始收集开心代理ip\") get_happy_ip(red) # 收集开心代理 print(\">>>开始收集泥马代理ip\") # get_nima_ip(red) # 收集泥马代理 print(\">>>开始收集89代理ip\") get_89_ip(red) time.sleep(60) except Exception as e: print(\'ip储存出错了\', e) time.sleep(60) if __name__ == \'__main__\': main() # 创建一个子进程 # p = Process(target=main) # p.start()
ip_verify.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:34 # @author: Maxs_hu \"\"\" 这里用来验证ip的可用性: 使用携程发送请求增加效率 \"\"\" from redis_proxy import RedisProxy from multiprocessing import Process import asyncio import aiohttp import time async def verify_ip(ip, red, sem): timeout = aiohttp.ClientTimeout(total=10) # 设置网页等待时间不超过十秒 try: async with sem: async with aiohttp.ClientSession() as session: async with session.get(url=\'http://www.baidu.com/\', proxy=\'http://\'+ip, timeout=timeout) as resp: page_source = await resp.text() if resp.status in [200, 302]: # 如果可用. 加分 red.set_max_score(ip) print(\'验证没有问题. 分值拉满~\', ip) else: # 如果不可用. 扣分 red.deduct_score(ip) print(\'问题ip. 扣一分\', ip) except Exception as e: print(\'出错了\', e) red.deduct_score(ip) print(\'问题ip. 扣一分\', ip) async def task(red): ips = red.get_all_proxy() sem = asyncio.Semaphore(30) # 设置每次三十的信号量 tasks = [] for ip in ips: tasks.append(asyncio.create_task(verify_ip(ip, red, sem))) if tasks: await asyncio.wait(tasks) def main(): red = RedisProxy() time.sleep(5) # 初始的等待时间. 等待采集到数据 print(\"开始验证可用性\") while 1: try: asyncio.run(task(red)) time.sleep(100) except Exception as e: print(\"ip_verify出错了\", e) time.sleep(100) if __name__ == \'__main__\': main() # 创建一个子进程 # p = Process(target=main()) # p.start()
ip_api.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:35 # @author: Maxs_hu \"\"\" 这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip: 1. flask 2. sanic --> 今天使用这个要稍微简单一点 \"\"\" from redis_proxy import RedisProxy from sanic import Sanic, json from sanic_cors import CORS from multiprocessing import Process # 创建一个app app = Sanic(\'ip\') # 随便给个名字 # 解决跨域问题 CORS(app) red = RedisProxy() @app.route(\'maxs_hu_ip\') # 添加路由 def api(req): # 第一个请求参数固定. 请求对象 ip = red.effect_ip() return json({\"ip\": ip}) def main(): # 让sanic跑起来 app.run(host=\'127.0.0.1\', port=1234) if __name__ == \'__main__\': main() # p = Process(target=main()) # p.start()
runner.py
# -*- encoding:utf-8 -*- # @time: 2022/7/5 17:36 # @author: Maxs_hu from ip_api import main as api_run from ip_collection import main as coll_run from ip_verify import main as veri_run from multiprocessing import Process def main(): # 设置互不干扰的三个进程 p1 = Process(target=api_run) # 只需要将目标函数的内存地址传过去即可 p2 = Process(target=coll_run) p3 = Process(target=veri_run) p1.start() p2.start() p3.start() if __name__ == \'__main__\': main()
测试ip是否可用.py
# -*- encoding:utf-8 -*-
# @time: 2022/7/5 18:15
# @author: Maxs_hu
import requests
def get_proxy():
url = \"http://127.0.0.1:1234/maxs_hu_ip\"
resp = requests.get(url)
return resp.json()
def main():
url = \'http://mip.chinaz.com/?query=\' + get_proxy()[\"ip\"]
proxies = {
\"http\": \'http://\' + get_proxy()[\"ip\"],
\"https\": \'http://\' + get_proxy()[\"ip\"] # 目前代理只支持http请求
}
headers = {
\"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\",
}
resp = requests.get(url, proxies=proxies, headers=headers)
resp.encoding = \'utf-8\'
print(resp.text) # 物理位置
if __name__ == \'__main__\':
main()
运行效果
项目运行截图:
redis储存截图:
总结
- 免费代理ip只支持http的网页操作. 并不好用. 如果有需求可以进行购买然后加入ip代理池
- 网页部署到自己的服务器上. 别人访问自己的服务器. 以后学了全栈可以加上登录. 和付费功能. 实现功能的进一步拓展
- 项目架构是生产者消费者模型. 三个模块同时运行. 每个模块为一个进程. 互不影响
- 代理设计细节有待处理. 但总体运行效果还可以. 遇到问题再修改
本文来自博客园,作者:{Max},仅供学习和参考
来源:https://www.cnblogs.com/Max-message/p/16448795.html
本站部分图文来源于网络,如有侵权请联系删除。