Автоматический веб-скрапинг с помощью Python и Celery
Это вторая часть руководства по созданию инструмента для скрапинга веб-страниц с помощью Python. Мы будем использовать интеграцию Celery и системы управления задачами.
В части 1 «Создание скрапера RSS-каналов с помощью Python» показано, как можно использовать Requests и Beautiful Soup.
В части 3 этой серии статей «Создание приложения для скрапинга веб-страниц с помощью Python, Celery и Django» я продемонстрирую, как интегрировать инструмент для скрапинга веб-страниц в приложения.

В предыдущей статье я создал простую программу скрапинга RSS-каналов, которая извлекает информацию с помощью Requests и BeautifulSoup (смотрите код на GitHub). Теперь мы будем использовать этот код как основу для создания системы управления задачами и запланированного скрапинга.
Следующим логическим шагом в скрапинге данных с веб-сайтов, которые часто меняются (то есть RSS-канала, отображающего X элементов за раз), является регулярный скрапинг. В предыдущем примере парсинга мы использовали командную строку для выполнения кода по команде. Однако это не масштабируемое решение. Чтобы автоматизировать его, мы добавим Celery для создания системы очереди задач с периодом выполнения.
Я буду использовать следующие инструменты:
- Python 3.7+;
- Requests;
- BeautifulSoup 4;
- Текстовый редактор (я использую Visual Studio Code);
- Celery- распределенная очередь задач;
- RabbitMQ- брокер сообщений.
Примечание. Все зависимости библиотеки перечислены в файлах requirements.txtи Pipfile / Pipfile.lock.
Краткое пояснение по Celery
Celery - это система управления задачами, она работает совместно с брокером сообщений для выполнения асинхронных задач.

Выше показано, что инициатор задач (наше приложение для скрапинга веб-страниц) передает информацию о задаче в очередь (Celery) для выполнения. Планировщик (Celery beat) выполняет их как задачи cron, без какой-либо дополнительной обработки или взаимодействий вне запуска приложения Celery.
Статьи
- Создание скрапера RSS-канала с помощью Python
- Автоматический парсинг веб-страниц с помощью Python и Celery (это руководство)
- Создание приложения для парсинга веб-страниц с помощью Python, Celery и Django
Краткое содержание проекта
Вот схема шагов, которые мы предпримем для создания окончательного проекта:
- Установка Celery и RabbitMQ - Celery управляет постановкой задач в очередь и их выполнением, а RabbitMQ обрабатывает сообщения.
- Начало работы с RabbitMQ и обработка логов.
- Создание доказательства концепции «Hello World» с помощью Celery, чтобы убедиться в том, что он работает.
- Регистрация функции скрапинга py с помощью Celery.
- Дальнейшее развитие и управление задачами скрапинга.
- Создание и выполнение расписания для задач скрапинга.
Примечание. Введение в RabbitMQ и Celery довольно длинное, если у вас есть опыт работы с ними, я рекомендую сразу перейти к шагу 4.
Приступаем к работе
Мы начнем с открытия каталога предыдущего проекта, в данном случае это web_scraping_example из предыдущей статьи. Если хотите, его можно клонировать с GitHub.
Примечание. Я использую Ubuntu, поэтому мои команды могут отличаться от ваших. Кроме того, для краткости я пропустил повторение кода с помощью ...
Кроме того, требования к проекту могут задавать для этой установки использование pip, как в примере, приведенном ниже.
$ pip install celery
Почему Celery & RabbitMQ?
Мы используем Celery и RabbitMQ, потому что они довольно просты в настройке, тестировании и масштабировании в производственной среде. Хотя мы могли бы выполнять периодические задачи с помощью других библиотек, или просто заданий cron, в целом, я хотел, чтобы в следующей статье этой серии мы основывались на этом.
В долгосрочной перспективе нам будет намного проще, если мы будем использовать что-то, что сможем масштабировать в следующем проекте, а также изучим некоторые ключевые команды и инструменты по мере постепенного увеличения сложности.
Настройка RabbitMQ
Настроить и запустить сервер RabbitMQ в Ubuntu значительно проще, чем в операционной системе Windows. Я буду следовать официальному руководству по установке.
Ниже приведены команды установки для Debian, Ubuntu.
$ sudo apt-get update -y
$ sudo apt-get install curl gnupg -y
$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
$ sudo apt-get install apt-transport https
$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF
$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang
$ deb https://dl.bintray.com/rabbitmq/debian bionic main
$ EOF
$ sudo apt-get update -y
$ sudo apt-get install rabbitmq-server -y --fix-missing
Когда я впервые установил RabbitMQ в виртуальной среде, он запустился автоматически. Чтобы проверить, что команда rabbitmq-server
работает (ее мы будем использовать при работе с Celery), мне пришлось закрыть службу.
Я также заметил, что в разрешениях по умолчанию для установки указано Only root or rabbitmq should run rabbitmqctl shutdown
, что мне показалось странным. Вместо того, чтобы решить эту проблему, я решил просто запустить sudo
.
$ sudo rabbitmqctl shutdown
Затем я смог протестировать сервер, используя rabbitmq-server
, я привожу команду и вывод ниже.
$ sudo rabbitmq-server

Вывод sudo rabbitmq-server в терминале
К вашему сведению - вы можете завершить команду rabbitmq-server, используя клавиатурную комбинацию Ctrl + C.
Для настройки RabbitMQ в операционной системе Windows требуются дополнительные действия. В официальной документации есть руководство по ручной установке.
Тестирование Celery с RabbitMQ
Прежде чем перейти к написанию кода проекта, я обычно начинаю с тестирования базовых примеров в стиле «Hello World», которые есть в пакетах и фреймворках. Это дает мне общее представление о том, чего я могу ожидать, а также несколько команд для терминала, которые нужно добавить в набор инструментов для каждой конкретной технологии.
В данном случае работа с Celery будет сопровождаться их собственным подтверждением концепции «Hello World» в виде задачи, выполняющей базовое добавление. Оно доступно в официальной документации Celery. Я собираюсь вкратце проиллюстрировать его. Однако, если вам нужны пояснения или подробный обзор, пожалуйста, ознакомьтесь с официальной документаций.
Теперь, когда у нас установлен и проверен брокер RabbitMQ, можно приступить к созданию файла tasks.py. Он будет содержать задачи, которые мы будем выполнять, будь то добавление, скрапинг веб-страниц или сохранение пользователей в базе данных. Теперь я внесу изменения в каталог проекта.
$ touch tasks.py
Чтобы создать задачу добавления, мы будем импортировать Celery,
и создавать функцию с флагом @app.task
, позволяющую Celery workers получать задачу в системе очереди.
# tasks.py
from celery import Celery
app = Celery('tasks') # определяем название приложения, которое будет использоваться во флаге
@app.task # регистрация задачи для приложения
def add(x, y):
return x + y
Используя add этой задачи, мы можем начать тестирование исполнения. Здесь все может немного запутаться, так как на следующем шаге у меня будут одновременно открыты три терминала.
Я начну с краткого объяснения, затем углублюсь в код и предоставлю снимки экрана.
Пояснение
Чтобы завершить тест, мы будем выполнять задачу Celery с помощью командной строки, импортировав файл tasks.py и вызвав его. Чтобы задачи были получены в очередь, нам нужно, чтобы Celery worker и сервисы RabbitMQ были активными. Сервер RabbitMQ будет действовать как брокер сообщений, в то время как Celery worker будет выполнять задачи.
Я буду обозначать каждый шаг номерами терминалов:
- RabbitMQ
- Celery worker
- Выполнение задачи
Терминал 1
Мы начнем с запуска сервера RabbitMQ в терминале №1.
# RabbitMQ
$ sudo rabbitmq-server

Запуск сервера RabbitMQ
Терминал 2
Впоследствии мы можем начать процесс Celery worker в терминале №2. Я добавил подробные настройки для worker, чтобы проиллюстрировать, как будет выглядеть результат.
Примечание: это необходимо выполнить из каталога проекта.
# Celery worker
$ celery worker -A tasks -l INFO
Разберем приведенную выше команду:
- celery - пакет, который мы вызываем.
- worker - запуск процесса worker.
- -A tasks - явно объявляем, что нам нужно приложение
- -l INFO- задает наличие подробных событий ведения журнала консоли (нам нужно много деталей).
Чтобы проверить, правильно ли загружается worker, найдите в терминале строку concurrency: 4 (prefork).
Кроме того, мы замечаем, что приложение [tasks] было импортировано вместе с регистрацией задач из файла tasks.py. worker зарегистрировал единственную задачу (1) tasks.add.

Запуск Celery worker с подробной информацией
Терминал №3
Затем мы можем начать выполнение теста в терминале №3. Я буду выполнять цикл, чтобы проиллюстрировать, что служба worker перехватывает несколько задач. Мы добьемся этого, введя add из файла tasks.py, а затем выполнив цикл for. Примечание. После строки add.delay(i, i) вам нужно будет использовать клавиатурную комбинацию Ctrl + Enter длявыполнения команды.
$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
... add.delay(i, i) # delay calls the task
Теперь вы должны увидеть большой блок вывода в терминале № 3 (выполнение задачи Celery). Это продемонстрирует, что worker получает результат задачи от терминала №2.

Запуск выполнения задачи Celery
Терминал 2
Если мы проверим Celery worker в терминале № 2, процесс, выполняющий задачу add, мы увидим, что он перехватывает каждое из выполнений задачи.

Celery worker, получающий и выполняющий задачи
Теперь мы успешно доказали, что Celery и RabbitMQ установлены правильно. Это помогает заложить основу для других задач, которые мы будем реализовывать (например, скрапинг веб-страниц), демонстрируя, как взаимодействуют Celery, Celery worker и RabbitMQ.
Теперь, когда мы рассмотрели установку и основы, мы перейдем к файлу tasks.py, чтобы создать задачи скрапинга веб-страниц.
Создание tasks.py с помощью Celery
Приведенный выше пример помог проверить процесс, который мы будем использовать для выполнения задач с помощью Celery, а также продемонстрировал, как задачи регистрируются с помощью Celery worker.
Основываясь на приведенном выше примере, мы начнем с создания задач скрапинга. Сейчас я собираюсь отказаться от файла scraping.py, так как он будет просто скопирован в файл tasks.py для простоты.
Я начну с удаления из примера функции def add(x, y) и копирования зависимостей (Requests и BeautifulSoup) вместе с самими функциями.
Примечание: я буду использовать те же функции, но в файле tasks.py.
# tasks.py
from celery import Celery
import requests # ввод данных
from bs4 import BeautifulSoup # парсинг xml
import json # экспорт в файлы
app = Celery('tasks')
# функция сохранения
def save_function(article_list):
with open('articles.txt', 'w' as outfile:
json.dump(article_list, outfile)
# функция скрапинга
def hackernews_rss():
article_list = []
try:
# выполняем запрос, разбираем данные с помощью XML
# разбираем данные в BS4
r = requests.get('https://news.ycombinator.com/rss')
soup = BeautifulSoup(r.content, features='xml')
# выбираем только "item", которые нам нужны из данных
articles = soup.findAll('item')
# для каждого "item" разбираем его в список
for a in articles:
title = a.find('title').text
link = a.find('link').text
published = a.find('pubDate').text
# создаем объект "article" с данными
# из каждого "item"
article = {
'title': title,
'link': link,
'published': published
}
# добавляем "article_list" с каждым объектом "article"
article_list.append(article)
# после цикла вносим сохраненные объекты в файл .txt
return save_function(article_list)
except Exception as e:
print('The scraping job failed. See exception: ')
print(e)
Упомянутые выше функции парсинга веб-страниц теперь доступны в файле tasks.py
вместе с их зависимостями. Следующим шагом является регистрация задач в приложении Celery, для этого просто размещаем @app.task
над каждой функцией.
# tasks.py...
# то же, что и выше
@app.task
def save_function(article_list):
...@app.task
def hackernews_rss():
Дальнейшее развитие функций скрапинга
Хотя функции скрапинга, которые мы определили, доказали свою эффективность для извлечения данных из RSS-канала, у нас все еще есть возможности для улучшения. Ниже я в общих чертах обрисовал, что мы будем изменять в наборе инструментов для скрапинга, прежде чем автоматизируем его.
Улучшения
- Сохранение результатов в файлах .json с отметками даты и времени.
- Добавление даты и времени created_at для каждой статьи.
- Добавление строки source, если мы хотим извлекать данные и с других сайтов.
Упомянутые выше изменения невелики, так как мы уже выполнили основную часть работы в рамках первой статьи о реализации. Хотя это несущественное изменение, .json будет читать немного удобнее, чем .txt. Два дополнительных столбца также помогут сделать его более «масштабируемым» при добавлении других каналов, а также при последующем анализе данных.
Давайте начнем с save_function: обновим ее для вывода файла .json и добавим временную метку, чтобы улучшить качество при обращении к ранее извлеченным данным.
# tasks.py
from datetime import datetime # for time stamps
...
def save_function(articles_list):
# временная метка и имя файла
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
filename = 'articles-{}.json'.format(timestamp)
# создаем файл статьи с временной меткой
with open(filename, 'w').format(timestamp) as outfile:
json.dump(article_list, outfile)
Я использую функцию datetime.now().strftime(...)
для создания метки даты и времени, используя .format(timestamp)
.
Переходя к двум изменениям в функции hackernews_rss()
- мы добавим некоторую информацию об источнике и отметку времени created_at
. Это два простых изменения, которые помогут нам оставаться в курсе, если мы добавим дополнительные функции скрапинга.
# tasks.py
...
def hackernews_rss():
...
for a in articles:
...
article = {
...
'created_at': str(datetime.now()),
'source': 'HackerNews RSS'
}
...
Указанные выше изменения иллюстрируют добавление столбцов created_at
и source
. К вашему сведению - если вы опустите переход str()
, вы не сможете его выполнить из-за Object of type datetime is not JSON serializable
.
Планирование задач с помощью Celery
Теперь мы будем использовать возможности Celery по планированию задач, опираясь на то, что beat_schedule
поставляется из коробки. Это позволяет нам регистрировать задачи на определенное время с помощью агента планирования.
Отличное описание примеров планирования можно найти в официальной документации. Я также включил несколько дополнительных примеров расписания в файле tasks.py
врепозитории GitHub.
Я собираюсь выполнять задачу скрапинга каждую минуту, так как это продемонстрирует, как Celery worker взаимодействует с запланированными задачами. Это не приведет к разнице в данных, поскольку используемый нами RSS-канал не обновляется ежеминутно.
Моя цель в этой демонстрации - показать выходные файлы статей и простое расписание задач.
Создание расписания
# tasks.py
...
from celery.schedules import crontab # scheduler
# выполнение запланированных задач
app.conf.beat_schedule = {
# выполняется каждую минуту
'scraping-task-one-min': {
'task': 'tasks.hackernews_rss',
'schedule': crontab()
}
}
...
Приведенная выше конфигурация будет регистрировать расписание задач в самом приложении Celery. После запуска мы сможем вызвать расписание Celery, чтобы дважды проверить, что он поставил в очередь.
Выполняем задачи
Теперь, когда расписание создано, пришло время включить сервер RabbitMQ и запустить процессы Celery worker.
В этом примере мы будем использовать две вкладки терминала:
- Сервер RabbitMQ
- Celery worker
Терминал 1
Чтобы запустить сервер RabbitMQ (наш брокер сообщений), мы будем использовать ту же команду, что и раньше.
Примечание: мне также нравится дважды проверять, что узел, созданный при запуске, выключен, поскольку он не запускается с журналом и выдает ошибку, если мы сначала не отключим его.
$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server
Теперь вы должны видеть результат, аналогичный предыдущему (смотрите скриншот экрана, приведенный ниже).

Запуск сервера RabbitMQ
После запуска сервера RabbitMQ мы можем начать с терминала №2.
Терминал 2
Мы немного изменим команду, так как теперь она будет включать в себя обозначение -Bдля того, какие вызовы worker выполняет расписание.
$ celery -A tasks worker -B -l INFO
Вывод консоли будет иллюстрировать запуск приложения и (в зависимости от того, какое у вас расписание) будет выводить информацию о выполнении задачи.
На скриншоте ниже:
- Зарегистрированы [tasks].
- Начинается расписание
- Наш worker MainProcess получает задачу Received task: tasks.hackernews_rss.
- Запускается ForkPoolWorker и выполняет задачу, а затем возвращает результат.

Выполнение запланированной задачи
К вашему сведению - мы можем остановить выполнение запланированной задачи с помощью клавиатурной комбинации Ctrl + C, так как это будет работать бесконечно.
Теперь, когда мы успешно выполнили задачу save_function(), созданный ранее файл вывел файл .json.

Вывод .json файла задачи
Заключение
Мы успешно расширили наш простой инструмент для скрапинга веб-страниц, чтобы создать расписание. Это гарантирует, что нам больше не нужно вручную выполнять задачи скрапинга, и мы можем просто «включить и оставить». После планировки задач, проект сможет скрапить сайты на предмет наличия данных, которые изменяются по заданному расписанию (скажем, каждые 15 минут), и каждый раз возвращать новые данные.
Так что же нам делать дальше?
В третьей части этой серии статей я продемонстрирую приложение Django с интеграцией Celery и скрапингом веб-страниц. Это будет отличный пример веб-приложения, которое извлекает данные на сайт и заполняет его информацией. Нашим конечным продуктом будет агрегатор новостей, который будет извлекать информацию сразу из нескольких RSS-каналов.