logo

理解Python中的并行请求

2024-06-24 12:44
本文介绍了python中并行请求技术和使用场景

集蜂云是一个可以让开发者在上面构建、部署、运行、发布采集器的数据采集云平台。加入到数百名开发者中,将你的采集器发布到市场,从而给你带来被动收入吧!

大多数 Python 应用程序依赖外部数据或需要与其他服务器通过 HTTP 进行通信。然而,由于 HTTP 调用速度通常较慢,这可能会显著降低应用程序的性能。为了避免这种情况,我们可以利用 Python 中的并行请求技术,通过线程同时发出多个网络请求。

在接下来的指南中,您将学习如何使用线程在 Python 中实现并行网络请求,以达到显著的性能优化效果。

让我们开始吧!

Python 中的并行请求是什么

在Python中,实现并行请求意味着同时执行多个HTTP调用。这通过启动多个线程并允许每个线程并行处理网络请求来实现。利用多核处理器,这种方法能够显著提升性能和网络利用率。

在Web开发中,并行请求有多种应用场景:

  • AJAX 数据加载:通过在页面加载时同时发起多个AJAX请求,可以减少页面加载时间,改善用户体验。
  • 后端API调用:在后端服务中,同时调用多个微服务可以优化系统的整体效率,特别是在处理复杂逻辑或大量数据时尤为重要。
  • 网页抓取:在进行网页抓取时,由于网页服务器响应通常较慢,串行请求可能导致效率低下。通过并行化请求,将网络请求分发到多个线程中,可以减少等待时间,提升抓取效率。
  • 大规模数据提取:在数据挖掘或网页爬虫项目中,频繁进行大量的HTTP调用是常见的需求。通过并行化处理这些请求,可以显著加快数据提取和处理的速度,从而提高整体项目的效率和性能。

通过以上优化方法,利用Python的并行请求技术可以在网络通信和数据处理方面获得显著的优势,使得应用程序更加高效和响应迅速。

如何实现并行请求

ThreadPoolExecutor是Python中进行并行请求的简单方法之一,它是concurrent.futures库中的线程池管理类。其设计目的是在多线程环境中并行执行任务,特别适用于I/O密集型场景,如网络请求。由于可以有效利用并行性,无需启动额外的进程,因此非常有价值。

该类提供了直观的API,使任务并行化变得简单,无需处理复杂的线程管理。ThreadPoolExecutor提供的主要方法包括:

  • submit(task_func):提交一个任务函数作为参数,并返回一个表示结果的Future对象。调用submit()方法会阻塞执行,直到任务函数终止。
  • map(task_func, iterable):将任务函数应用于可迭代参数中的每个项目,并返回结果的迭代器。

利用这些方法进行并行请求可以显著节省时间。例如,假设平均每个请求需要250毫秒,您需要执行8次请求。按顺序执行这些请求将需要总计2000毫秒(250毫秒 * 8)。

相比之下,使用ThreadPoolExecutor在管理的8个工作线程上并行执行这些请求,几乎可以将执行时间缩短到大约250毫秒,再加上线程处理的少量开销。这几乎是节省了8倍的时间!

现在,让我们按以下步骤学习如何构建Python并行请求的脚本!

步骤 1:安装必要的库

为了构建并行请求的Python脚本,我们将使用两个关键的Python包:

  • Requests:这是一个流行的Python HTTP客户端库,用于执行Web请求。它提供了简单且强大的API,使得向网站发送GET或POST请求变得非常容易。
  • Beautiful Soup:这是一个用于解析HTML和XML文档的库,特别适合从网页中提取数据。它能够帮助我们解析和操作HTML结构,从而提取我们需要的信息。

我们将利用这两个包来并行执行以下操作:

  1. 使用Requests库发送GET请求获取网站的HTML内容。
  2. 并行处理多个网页,每个网页都使用Requests来获取HTML文档。
  3. 使用Beautiful Soup解析每个HTML文档的内容,从中提取所需的数据。

使用以下命令安装所需的库:

pip install requests beautifulsoup4

然后,通过在 Python 脚本顶部添加以下两行来导入:

import requests
from bs4 import BeautifulSoup

第 2 步:编写第一个并行请求代码

首先,我们需要定义一个任务函数,该函数将被ThreadPoolExecutor并行执行。这个函数将接收一个URL作为参数,使用Requests库发送HTTP请求,并从返回的HTML页面中提取标题信息。

def parse_page(url):
    # perform the HTTP request to the specified URL
    response = requests.get(url)

    # parse the HTML content returned by server
    soup = BeautifulSoup(response.text, "html.parser")

    # extract the title from the page and print it
    title_element = soup.find('title')
    title = title_element.text
    print(title) 

接下来,定义 URL 列表:

urls = [
    'https://www.scrapingcourse.com/ecommerce/page/1/',
    'https://www.scrapingcourse.com/ecommerce/page/2/',
    'https://www.scrapingcourse.com/ecommerce/page/3/',
    'https://www.scrapingcourse.com/ecommerce/page/4/',
    'https://www.scrapingcourse.com/ecommerce/page/5/',
    'https://www.scrapingcourse.com/ecommerce/page/6/',
    'https://www.scrapingcourse.com/ecommerce/page/7/',
    'https://www.scrapingcourse.com/ecommerce/page/8/'
]

初始化一个ThreadPoolExecutor对象,并使用它并行执行parse_page()函数来处理4个不同的URL。

MAX_THREADS = 4
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    executor.map(parse_page, urls)

完整代码如下:

import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

def parse_page(url):
    # perform the HTTP request to the specified URL
    response = requests.get(url)

    # parse the HTML content returned by server
    soup = BeautifulSoup(response.text, "html.parser")

    # extract the title from the page and print it
    title_element = soup.find('title')
    title = title_element.text
    print(title)

## list of URLs to parse concurrently
urls = [
    'https://www.scrapingcourse.com/ecommerce/page/1/',
    'https://www.scrapingcourse.com/ecommerce/page/2/',
    'https://www.scrapingcourse.com/ecommerce/page/3/',
    'https://www.scrapingcourse.com/ecommerce/page/4/',
    'https://www.scrapingcourse.com/ecommerce/page/5/',
    'https://www.scrapingcourse.com/ecommerce/page/6/',
    'https://www.scrapingcourse.com/ecommerce/page/7/',
    'https://www.scrapingcourse.com/ecommerce/page/8/'
]

# max number of threads to use
MAX_THREADS = 4

# initialize ThreadPoolExecutor and use it to call parse_page() in parallel
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    executor.map(parse_page, urls)

启动该脚本,会看到程序打印了以下类似内容:

Ecommerce Test Site to Learn Web Scraping – Page 4 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 3 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 2 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 6 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 5 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 7 – ScrapingCourse.com
Ecommerce Test Site to Learn Web Scraping – Page 8 – ScrapingCourse.com

每次运行时,打印顺序都会发生变化。这意味着脚本按预期执行 Python 并行请求。

步骤 3:处理响应和异常

当前的脚本可以并行发出请求,但存在一个主要问题:如果parse_page()函数在处理某个URL时失败,整个过程将会中断。为了避免这种情况,可以使用try...except块将代码包装在函数内部。

def parse_page(url):
    try:
        # ...
    except Exception as e:
        print('Request failed due to error:', e)

另一个限制是任务函数目前仅在终端中打印提取的数据。如果希望将这些信息存储在Python数据结构中以备将来使用,可以通过以下方式修改parse_page()函数,使其返回抓取的数据:

def parse_page(url):
    try:
        # perform the HTTP request to the specified URL
        response = requests.get(url)

        # parse the HTML content returned by server
        soup = BeautifulSoup(response.text, "html.parser")

        # extract the title from the page
        title_element = soup.find('title')
        title = title_element.text

        # return the scraped data in a dictionary
        return {'title': title}
    except Exception as e:
        print('Request failed due to error:', e)

现在,可以通过调用返回的值来初始化一个Python列表,这可以通过对parse_page()函数进行修改实现。由于ThreadPoolExecutor.map()返回一个迭代器,您可以通过将其包装为列表来将其转换为列表,即使用list()函数:

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    titles = list(executor.map(parse_page, urls))

新的并行请求 Python 代码将是:

import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

def parse_page(url):
    try:
        # perform the HTTP request to the specified URL
        response = requests.get(url)

        # parse the HTML content returned by server
        soup = BeautifulSoup(response.text, "html.parser")

        # extract the title from the page
        title_element = soup.find('title')
        title = title_element.text

        # return the scraped data in a dictionary
        return {'title': title}
    except Exception as e:
        print('Request failed due to error:', e)


## list of URLs to parse concurrently
urls = [
    'https://www.scrapingcourse.com/ecommerce/page/1/',
    'https://www.scrapingcourse.com/ecommerce/page/2/',
    'https://www.scrapingcourse.com/ecommerce/page/3/',
    'https://www.scrapingcourse.com/ecommerce/page/4/',
    'https://www.scrapingcourse.com/ecommerce/page/5/',
    'https://www.scrapingcourse.com/ecommerce/page/6/',
    'https://www.scrapingcourse.com/ecommerce/page/7/',
    'https://www.scrapingcourse.com/ecommerce/page/8/'
]

# max number of threads to use
MAX_THREADS = 4

# initialize ThreadPoolExecutor and use it to call parse_page() in parallel
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    titles = list(executor.map(parse_page, urls))

print(titles)

执行它,它将始终打印:

[
    {'title': 'Ecommerce Test Site to Learn Web Scraping – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 2 – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 3 – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 4 – ScrapingCourse.com'},
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 5 – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 6 – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 7 – ScrapingCourse.com'}, 
    {'title': 'Ecommerce Test Site to Learn Web Scraping – Page 8 – ScrapingCourse.com'}
]

请注意,结果列表中的元素顺序与URL列表中的顺序相同。这是因为ThreadPoolExecutor.map()函数始终按照任务函数调用的顺序返回结果,即使它在评估任务函数时是无序的。

恭喜!您刚刚学会了如何使用Python并行发出多个请求的方法。

优化性能和扩展

现在是时候探索一些技巧和窍门,将您的 Python 并行请求能力提升到一个新的水平。

性能优化

性能主要取决于使用的线程数ThreadPoolExecutor。如果打开的线程数过少,会限制脚本的并行化潜力。相反,使用过多线程会降低脚本的速度,因为创建和管理线程会带来大量的时间开销。

由于确定最佳性能的最佳线程数并不总是容易的,因此建议进行一些实验。您可以按照以下方式包装Python请求并行逻辑,并跟踪执行时间:

import time

start_time = time.time()

# parallel ThreadPoolExecutor logic...

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.3f}s")

如果urls不包含太多元素,则可以使用列表的长度作为线程数:

MAX_THREADS = len(urls)

然而,可能会出现 len(urls) 大于可用的逻辑 CPU 数量的情况。因此,最佳做法是取 len(urls) 和 os.cpu_count() 之间的较小值:

MAX_THREADS = min(os.cpu_count(), len(urls))

ThreadPoolExecutor现在将始终使用合理数量的线程!

高效扩展

在大规模流程中,找到正确的批处理大小变得尤为重要。在这种情况下,并不总是最佳选择利用所有可用的核心资源。以下是几个原因:

  • 资源饱和:并行利用所有核心容易导致系统资源饱和,使得在同一台服务器上无法同时运行其他进程。
  • 并行化开销:大规模批处理可能因为争用共享资源(例如磁盘 I/O、网络带宽或数据库连接)而效率低下。这通常会造成瓶颈,从而降低整体性能。
  • 错误处理缓慢:当请求失败时,可能需要重复整个批处理。但是,如果批量过大,这将导致较长的执行时间。因此,在处理容易出错的请求时,大批处理效率会受到影响。

因此,小批处理更适合扩展并行 Python 请求。这样可以实现更快、响应更快的进程,同时也能避免耗尽所有资源。

专业提示:在进行大规模操作时,请始终跟踪 CPU、内存和网络使用情况,以避免系统过载。整合应用程序监控工具到脚本中可以简化任务的跟踪和管理。

高级技术

我们将深入研究两种方法,使并行逻辑更有效、更高效。

速率限制处理

在短时间内向同一台服务器发送过多请求可能会触发速率限制措施。速率限制器定义了在特定时间窗口内可以发送的最大请求数。一旦超过这一限制,服务器将返回429 Too Many Requests错误响应。

您可以通过以下方式了解服务器对传入请求设置的限制:

  1. 检查robots.txt文件以查看是否有相关限制。
  2. 验证服务器响应中是否包含RateLimit-X标头字段,其中X是具体的限制参数。
  3. 通过在指定的时间范围内发送一定数量的并行请求,直到出现429错误,来测试服务器的限制。
  4. 确定了适当的请求速率后,请确保在发出新的Python并行请求之前等待一段时间。例如,如果您需要在发送10个请求后等待30秒,您可以使用以下全局计数器来实现这一点:
request_counter = 0

def parse_page(url):
    global request_counter

    # make the request....

    # increment the request counter
    request_counter += 1
    # if it is the 10th request, wait 30 seconds
    if request_counter % 10 == 0:
        time.sleep(30)

    # return the scraped data... 

速率限制将不再是问题。

异步编程

异步编程依赖于并发任务,而不是线程的ThreadPoolExecutor。换句话说,任务可以启动而无需等待线程完成其操作并进入空闲状态。这种方式可以减少空闲时间,提高系统性能。

Python中的asyncio模块使得单个线程能够管理多个并发任务。这种方法不仅可以提升整体性能,还能有效利用网络资源。此外,相比于创建和管理多个线程所带来的开销,asyncio也能显著减少系统负担。

因此,对于Python的并行请求来说,利用asyncio方法可能比使用ThreadPoolExecutor更为优越,尤其是在线程未完全利用其分配资源的情况下。

Web 抓取并行请求的常见问题

在使用Python进行并行请求时,面临的主要挑战是防止被反机器人系统阻止。随着发出的请求数量增加,系统检测到机器人的可能性也随之增加。

一种有效的对抗方法是尽可能随机化请求。可以通过轮流使用真实的User-Agent设置头部,并添加随机延迟来实现这一点。然而,这些步骤可能仍然不够。例如,如果退出IP地址始终相同,这一细节很容易被检测到。

为了规避这些问题,可以考虑使用支持Python服务的代理请求。然而,对于像Cloudflare这样的高级解决方案,这种方法可能并不总是有效。绕过Web应用程序防火墙(WAF)一直是一个艰巨的挑战,因为它们可能会使用CAPTCHAs和JavaScript挑战来识别和阻止自动化请求。

使用 Python 进行并行请求的其他工具

ThreadPoolExecutor并非Python中唯一用于构建并行请求脚本的工具。事实上,还有许多其他用于执行并发HTTP请求的软件包可供选择。这些工具有些与标准API方法集成,而另一些则直接支持并行请求。

以下是一些流行的用于实现Python并行请求的工具:

  1. asyncio:这是Python标准库中的一个包,使用async/await语法编写并发代码。它非常适合处理I/O密集型任务,如HTTP请求。
  2. multiprocessing:这是Python的原生库,用于在多个CPU核心上执行任务。它特别适合处理CPU密集型任务。
  3. aiohttp:这是基于asyncio构建的异步Python HTTP客户端。它能够轻松地进行并行HTTP调用,并支持高效的异步操作。
  4. twisted:这是一个基于事件驱动的框架,用于实现Python Web应用程序中的多任务处理。它提供了强大的网络编程能力和异步支持。
  5. tornado:这是一个基于非阻塞网络I/O的Python Web框架和异步网络库。它适合构建高性能的Web应用程序和服务。
  6. httpx:这是一个功能全面的HTTP客户端,提供了同步和异步的API。它支持多种请求方法和灵活的配置选项。

这些工具各有其独特的优势和适用场景,可以根据具体的需求选择合适的工具来实现并行请求。

结论

在本教程中,您掌握了Python中并行请求的概念。从基础到高级技术,您已经成为了Python并行请求的专家。

您现在了解到:

  1. 在Python中,发出并行请求意味着同时处理多个网络请求,以提高效率和性能。
  2. 您学会了如何使用ThreadPoolExecutor来执行并发请求,这是一种管理线程池的工具,适用于I/O密集型任务。
  3. 您探索了如何优化同时进行的HTTP调用,包括减少等待时间和优化网络连接。
  4. 您已经了解到了几种最佳的Python工具,用于实现并行网络请求,如asyncio、aiohttp、multiprocessing、twisted和httpx。

通过这些知识和技能,您现在能够更有效地处理并发请求,提高应用程序的性能和响应速度。

集蜂云是一个可以让开发者在上面构建、部署、运行、发布采集器的数据采集云平台。平台提供了海量任务调度、三方应用集成、数据存储、监控告警、运行日志查看等功能,能够提供稳定的数据采集环境。平台提供丰富的采集模板,简单配置就可以直接运行,快来试一下吧。

导航目录