集蜂云是一个可以让开发者在上面构建、部署、运行、发布采集器的数据采集云平台。加入到数百名开发者中,将你的采集器发布到市场,从而给你带来被动收入吧!
大多数 Python 应用程序依赖外部数据或需要与其他服务器通过 HTTP 进行通信。然而,由于 HTTP 调用速度通常较慢,这可能会显著降低应用程序的性能。为了避免这种情况,我们可以利用 Python 中的并行请求技术,通过线程同时发出多个网络请求。
在接下来的指南中,您将学习如何使用线程在 Python 中实现并行网络请求,以达到显著的性能优化效果。
让我们开始吧!
在Python中,实现并行请求意味着同时执行多个HTTP调用。这通过启动多个线程并允许每个线程并行处理网络请求来实现。利用多核处理器,这种方法能够显著提升性能和网络利用率。
在Web开发中,并行请求有多种应用场景:
通过以上优化方法,利用Python的并行请求技术可以在网络通信和数据处理方面获得显著的优势,使得应用程序更加高效和响应迅速。
ThreadPoolExecutor是Python中进行并行请求的简单方法之一,它是concurrent.futures库中的线程池管理类。其设计目的是在多线程环境中并行执行任务,特别适用于I/O密集型场景,如网络请求。由于可以有效利用并行性,无需启动额外的进程,因此非常有价值。
该类提供了直观的API,使任务并行化变得简单,无需处理复杂的线程管理。ThreadPoolExecutor提供的主要方法包括:
利用这些方法进行并行请求可以显著节省时间。例如,假设平均每个请求需要250毫秒,您需要执行8次请求。按顺序执行这些请求将需要总计2000毫秒(250毫秒 * 8)。
相比之下,使用ThreadPoolExecutor在管理的8个工作线程上并行执行这些请求,几乎可以将执行时间缩短到大约250毫秒,再加上线程处理的少量开销。这几乎是节省了8倍的时间!
现在,让我们按以下步骤学习如何构建Python并行请求的脚本!
为了构建并行请求的Python脚本,我们将使用两个关键的Python包:
我们将利用这两个包来并行执行以下操作:
使用以下命令安装所需的库:
pip install requests beautifulsoup4
然后,通过在 Python 脚本顶部添加以下两行来导入:
import requests
from bs4 import BeautifulSoup
首先,我们需要定义一个任务函数,该函数将被ThreadPoolExecutor并行执行。这个函数将接收一个URL作为参数,使用Requests库发送HTTP请求,并从返回的HTML页面中提取标题信息。
def parse_page(url):
# perform the HTTP request to the specified URL
response = requests.get(url)
# parse the HTML content returned by server
soup = BeautifulSoup(response.text, "html.parser")
# extract the title from the page and print it
title_element = soup.find('title')
title = title_element.text
print(title)
接下来,定义 URL 列表:
urls = [
'https://www.scrapingcourse.com/ecommerce/page/1/',
'https://www.scrapingcourse.com/ecommerce/page/2/',
'https://www.scrapingcourse.com/ecommerce/page/3/',
'https://www.scrapingcourse.com/ecommerce/page/4/',
'https://www.scrapingcourse.com/ecommerce/page/5/',
'https://www.scrapingcourse.com/ecommerce/page/6/',
'https://www.scrapingcourse.com/ecommerce/page/7/',
'https://www.scrapingcourse.com/ecommerce/page/8/'
]
初始化一个ThreadPoolExecutor对象,并使用它并行执行parse_page()函数来处理4个不同的URL。
MAX_THREADS = 4
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(parse_page, urls)
完整代码如下:
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
def parse_page(url):
# perform the HTTP request to the specified URL
response = requests.get(url)
# parse the HTML content returned by server
soup = BeautifulSoup(response.text, "html.parser")
# extract the title from the page and print it
title_element = soup.find('title')
title = title_element.text
print(title)
## list of URLs to parse concurrently
urls = [
'https://www.scrapingcourse.com/ecommerce/page/1/',
'https://www.scrapingcourse.com/ecommerce/page/2/',
'https://www.scrapingcourse.com/ecommerce/page/3/',
'https://www.scrapingcourse.com/ecommerce/page/4/',
'https://www.scrapingcourse.com/ecommerce/page/5/',
'https://www.scrapingcourse.com/ecommerce/page/6/',
'https://www.scrapingcourse.com/ecommerce/page/7/',
'https://www.scrapingcourse.com/ecommerce/page/8/'
]
# max number of threads to use
MAX_THREADS = 4
# initialize ThreadPoolExecutor and use it to call parse_page() in parallel
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(parse_page, urls)
启动该脚本,会看到程序打印了以下类似内容:
Ecommerce Test Site to Learn Web Scraping – Page 4 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 3 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 2 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 6 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 5 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 7 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 8 – ScrapingCourse.com
每次运行时,打印顺序都会发生变化。这意味着脚本按预期执行 Python 并行请求。
当前的脚本可以并行发出请求,但存在一个主要问题:如果parse_page()函数在处理某个URL时失败,整个过程将会中断。为了避免这种情况,可以使用try...except块将代码包装在函数内部。
def parse_page(url):
try:
# ...
except Exception as e:
print('Request failed due to error:', e)
另一个限制是任务函数目前仅在终端中打印提取的数据。如果希望将这些信息存储在Python数据结构中以备将来使用,可以通过以下方式修改parse_page()函数,使其返回抓取的数据:
def parse_page(url):
try:
# perform the HTTP request to the specified URL
response = requests.get(url)
# parse the HTML content returned by server
soup = BeautifulSoup(response.text, "html.parser")
# extract the title from the page
title_element = soup.find('title')
title = title_element.text
# return the scraped data in a dictionary
return {'title': title}
except Exception as e:
print('Request failed due to error:', e)
现在,可以通过调用返回的值来初始化一个Python列表,这可以通过对parse_page()函数进行修改实现。由于ThreadPoolExecutor.map()返回一个迭代器,您可以通过将其包装为列表来将其转换为列表,即使用list()函数:
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
titles = list(executor.map(parse_page, urls))
新的并行请求 Python 代码将是:
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
def parse_page(url):
try:
# perform the HTTP request to the specified URL
response = requests.get(url)
# parse the HTML content returned by server
soup = BeautifulSoup(response.text, "html.parser")
# extract the title from the page
title_element = soup.find('title')
title = title_element.text
# return the scraped data in a dictionary
return {'title': title}
except Exception as e:
print('Request failed due to error:', e)
## list of URLs to parse concurrently
urls = [
'https://www.scrapingcourse.com/ecommerce/page/1/',
'https://www.scrapingcourse.com/ecommerce/page/2/',
'https://www.scrapingcourse.com/ecommerce/page/3/',
'https://www.scrapingcourse.com/ecommerce/page/4/',
'https://www.scrapingcourse.com/ecommerce/page/5/',
'https://www.scrapingcourse.com/ecommerce/page/6/',
'https://www.scrapingcourse.com/ecommerce/page/7/',
'https://www.scrapingcourse.com/ecommerce/page/8/'
]
# max number of threads to use
MAX_THREADS = 4
# initialize ThreadPoolExecutor and use it to call parse_page() in parallel
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
titles = list(executor.map(parse_page, urls))
print(titles)
执行它,它将始终打印:
[
{'title': 'Ecommerce Test Site to Learn Web Scraping – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 2 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 3 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 4 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 5 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 6 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 7 – ScrapingCourse.com'},
{'title': 'Ecommerce Test Site to Learn Web Scraping – Page 8 – ScrapingCourse.com'}
]
请注意,结果列表中的元素顺序与URL列表中的顺序相同。这是因为ThreadPoolExecutor.map()函数始终按照任务函数调用的顺序返回结果,即使它在评估任务函数时是无序的。
恭喜!您刚刚学会了如何使用Python并行发出多个请求的方法。
现在是时候探索一些技巧和窍门,将您的 Python 并行请求能力提升到一个新的水平。
性能主要取决于使用的线程数ThreadPoolExecutor。如果打开的线程数过少,会限制脚本的并行化潜力。相反,使用过多线程会降低脚本的速度,因为创建和管理线程会带来大量的时间开销。
由于确定最佳性能的最佳线程数并不总是容易的,因此建议进行一些实验。您可以按照以下方式包装Python请求并行逻辑,并跟踪执行时间:
import time
start_time = time.time()
# parallel ThreadPoolExecutor logic...
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.3f}s")
如果urls不包含太多元素,则可以使用列表的长度作为线程数:
MAX_THREADS = len(urls)
然而,可能会出现 len(urls) 大于可用的逻辑 CPU 数量的情况。因此,最佳做法是取 len(urls) 和 os.cpu_count() 之间的较小值:
MAX_THREADS = min(os.cpu_count(), len(urls))
ThreadPoolExecutor现在将始终使用合理数量的线程!
在大规模流程中,找到正确的批处理大小变得尤为重要。在这种情况下,并不总是最佳选择利用所有可用的核心资源。以下是几个原因:
因此,小批处理更适合扩展并行 Python 请求。这样可以实现更快、响应更快的进程,同时也能避免耗尽所有资源。
专业提示:在进行大规模操作时,请始终跟踪 CPU、内存和网络使用情况,以避免系统过载。整合应用程序监控工具到脚本中可以简化任务的跟踪和管理。
我们将深入研究两种方法,使并行逻辑更有效、更高效。
在短时间内向同一台服务器发送过多请求可能会触发速率限制措施。速率限制器定义了在特定时间窗口内可以发送的最大请求数。一旦超过这一限制,服务器将返回429 Too Many Requests错误响应。
您可以通过以下方式了解服务器对传入请求设置的限制:
request_counter = 0
def parse_page(url):
global request_counter
# make the request....
# increment the request counter
request_counter += 1
# if it is the 10th request, wait 30 seconds
if request_counter % 10 == 0:
time.sleep(30)
# return the scraped data...
速率限制将不再是问题。
异步编程依赖于并发任务,而不是线程的ThreadPoolExecutor。换句话说,任务可以启动而无需等待线程完成其操作并进入空闲状态。这种方式可以减少空闲时间,提高系统性能。
Python中的asyncio模块使得单个线程能够管理多个并发任务。这种方法不仅可以提升整体性能,还能有效利用网络资源。此外,相比于创建和管理多个线程所带来的开销,asyncio也能显著减少系统负担。
因此,对于Python的并行请求来说,利用asyncio方法可能比使用ThreadPoolExecutor更为优越,尤其是在线程未完全利用其分配资源的情况下。
在使用Python进行并行请求时,面临的主要挑战是防止被反机器人系统阻止。随着发出的请求数量增加,系统检测到机器人的可能性也随之增加。
一种有效的对抗方法是尽可能随机化请求。可以通过轮流使用真实的User-Agent设置头部,并添加随机延迟来实现这一点。然而,这些步骤可能仍然不够。例如,如果退出IP地址始终相同,这一细节很容易被检测到。
为了规避这些问题,可以考虑使用支持Python服务的代理请求。然而,对于像Cloudflare这样的高级解决方案,这种方法可能并不总是有效。绕过Web应用程序防火墙(WAF)一直是一个艰巨的挑战,因为它们可能会使用CAPTCHAs和JavaScript挑战来识别和阻止自动化请求。
ThreadPoolExecutor并非Python中唯一用于构建并行请求脚本的工具。事实上,还有许多其他用于执行并发HTTP请求的软件包可供选择。这些工具有些与标准API方法集成,而另一些则直接支持并行请求。
以下是一些流行的用于实现Python并行请求的工具:
这些工具各有其独特的优势和适用场景,可以根据具体的需求选择合适的工具来实现并行请求。
在本教程中,您掌握了Python中并行请求的概念。从基础到高级技术,您已经成为了Python并行请求的专家。
您现在了解到:
通过这些知识和技能,您现在能够更有效地处理并发请求,提高应用程序的性能和响应速度。
集蜂云是一个可以让开发者在上面构建、部署、运行、发布采集器的数据采集云平台。平台提供了海量任务调度、三方应用集成、数据存储、监控告警、运行日志查看等功能,能够提供稳定的数据采集环境。平台提供丰富的采集模板,简单配置就可以直接运行,快来试一下吧。