Автоматический веб-скрапинг с помощью Python и Celery

Это вторая часть руководства по созданию инструмента для скрапинга веб-страниц с помощью Python. Мы будем использовать интеграцию Celery и системы управления задачами.

В части 1 «Создание скрапера RSS-каналов с помощью Python» показано, как можно использовать Requests и Beautiful Soup.

В части 3 этой серии статей «Создание приложения для скрапинга веб-страниц с помощью Python, Celery и Django» я продемонстрирую, как интегрировать инструмент для скрапинга веб-страниц в приложения.

В предыдущей статье я создал простую программу скрапинга RSS-каналов, которая извлекает информацию с помощью Requests и BeautifulSoup (смотрите код на GitHub). Теперь мы будем использовать этот код как основу для создания системы управления задачами и запланированного скрапинга.

Следующим логическим шагом в скрапинге данных с веб-сайтов, которые часто меняются (то есть RSS-канала, отображающего X элементов за раз), является регулярный скрапинг. В предыдущем примере парсинга мы использовали командную строку для выполнения кода по команде. Однако это не масштабируемое решение. Чтобы автоматизировать его, мы добавим Celery для создания системы очереди задач с периодом выполнения.

Я буду использовать следующие инструменты:

  • Python 3.7+;
  • Requests;
  • BeautifulSoup 4;
  • Текстовый редактор (я использую Visual Studio Code);
  • Celery- распределенная очередь задач;
  • RabbitMQ- брокер сообщений.

Примечание. Все зависимости библиотеки перечислены в файлах requirements.txtи Pipfile / Pipfile.lock.

Краткое пояснение по Celery

Celery - это система управления задачами, она работает совместно с брокером сообщений для выполнения асинхронных задач.

Краткое пояснение по Celery

Выше показано, что инициатор задач (наше приложение для скрапинга веб-страниц) передает информацию о задаче в очередь (Celery) для выполнения. Планировщик (Celery beat) выполняет их как задачи cron, без какой-либо дополнительной обработки или взаимодействий вне запуска приложения Celery.

Статьи

  1. Создание скрапера RSS-канала с помощью Python
  2. Автоматический парсинг веб-страниц с помощью Python и Celery (это руководство)
  3. Создание приложения для парсинга веб-страниц с помощью Python, Celery и Django

Краткое содержание проекта

Вот схема шагов, которые мы предпримем для создания окончательного проекта:

  1. Установка Celery и RabbitMQ - Celery управляет постановкой задач в очередь и их выполнением, а RabbitMQ обрабатывает сообщения.
  2. Начало работы с RabbitMQ и обработка логов.
  3. Создание доказательства концепции «Hello World» с помощью Celery, чтобы убедиться в том, что он работает.
  4. Регистрация функции скрапинга py с помощью Celery.
  5. Дальнейшее развитие и управление задачами скрапинга.
  6. Создание и выполнение расписания для задач скрапинга.

Примечание. Введение в RabbitMQ и Celery довольно длинное, если у вас есть опыт работы с ними, я рекомендую сразу перейти к шагу 4.

Приступаем к работе

Мы начнем с открытия каталога предыдущего проекта, в данном случае это web_scraping_example из предыдущей статьи. Если хотите, его можно клонировать с GitHub.

Примечание. Я использую Ubuntu, поэтому мои команды могут отличаться от ваших. Кроме того, для краткости я пропустил повторение кода с помощью ...

Кроме того, требования к проекту могут задавать для этой установки использование pip, как в примере, приведенном ниже.

$ pip install celery

Почему Celery & RabbitMQ?

Мы используем Celery и RabbitMQ, потому что они довольно просты в настройке, тестировании и масштабировании в производственной среде. Хотя мы могли бы выполнять периодические задачи с помощью других библиотек, или просто заданий cron, в целом, я хотел, чтобы в следующей статье этой серии мы основывались на этом.

В долгосрочной перспективе нам будет намного проще, если мы будем использовать что-то, что сможем масштабировать в следующем проекте, а также изучим некоторые ключевые команды и инструменты по мере постепенного увеличения сложности.

Настройка RabbitMQ

Настроить и запустить сервер RabbitMQ в Ubuntu значительно проще, чем в операционной системе Windows. Я буду следовать официальному руководству по установке.

Ниже приведены команды установки для Debian, Ubuntu.

$ sudo apt-get update -y
$ sudo apt-get install curl gnupg -y
$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
$ sudo apt-get install apt-transport https
$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF
$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang 
$ deb https://dl.bintray.com/rabbitmq/debian bionic main
$ EOF
$ sudo apt-get update -y
$ sudo apt-get install rabbitmq-server -y --fix-missing

Когда я впервые установил RabbitMQ в виртуальной среде, он запустился автоматически. Чтобы проверить, что команда rabbitmq-server работает (ее мы будем использовать при работе с Celery), мне пришлось закрыть службу.

Я также заметил, что в разрешениях по умолчанию для установки указано Only root or rabbitmq should run rabbitmqctl shutdown, что мне показалось странным. Вместо того, чтобы решить эту проблему, я решил просто запустить sudo.

$ sudo rabbitmqctl shutdown

Затем я смог протестировать сервер, используя rabbitmq-server, я привожу команду и вывод ниже.

$ sudo rabbitmq-server
Настройка RabbitMQ

Вывод sudo rabbitmq-server в терминале

К вашему сведению - вы можете завершить команду rabbitmq-server, используя клавиатурную комбинацию Ctrl + C.

Для настройки RabbitMQ в операционной системе Windows требуются дополнительные действия. В официальной документации есть руководство по ручной установке.

Тестирование Celery с RabbitMQ

Прежде чем перейти к написанию кода проекта, я обычно начинаю с тестирования базовых примеров в стиле «Hello World», которые есть в пакетах и ​​фреймворках. Это дает мне общее представление о том, чего я могу ожидать, а также несколько команд для терминала, которые нужно добавить в набор инструментов для каждой конкретной технологии.

В данном случае работа с Celery будет сопровождаться их собственным подтверждением концепции «Hello World» в виде задачи, выполняющей базовое добавление. Оно доступно в официальной документации Celery. Я собираюсь вкратце проиллюстрировать его. Однако, если вам нужны пояснения или подробный обзор, пожалуйста, ознакомьтесь с официальной документаций.

Теперь, когда у нас установлен и проверен брокер RabbitMQ, можно приступить к созданию файла tasks.py. Он будет содержать задачи, которые мы будем выполнять, будь то добавление, скрапинг веб-страниц или сохранение пользователей в базе данных. Теперь я внесу изменения в каталог проекта.

$ touch tasks.py

Чтобы создать задачу добавления, мы будем импортировать Celery, и создавать функцию с флагом @app.task, позволяющую Celery workers получать задачу в системе очереди.

# tasks.py
from celery import Celery
app = Celery('tasks') # определяем название приложения, которое будет использоваться во флаге
@app.task # регистрация задачи для приложения
def add(x, y):
    return x + y

Используя add этой задачи, мы можем начать тестирование исполнения. Здесь все может немного запутаться, так как на следующем шаге у меня будут одновременно открыты три терминала.

Я начну с краткого объяснения, затем углублюсь в код и предоставлю снимки экрана.

Пояснение

Чтобы завершить тест, мы будем выполнять задачу Celery с помощью командной строки, импортировав файл tasks.py и вызвав его. Чтобы задачи были получены в очередь, нам нужно, чтобы Celery worker и сервисы RabbitMQ были активными. Сервер RabbitMQ будет действовать как брокер сообщений, в то время как Celery worker будет выполнять задачи.

Я буду обозначать каждый шаг номерами терминалов:

  1. RabbitMQ
  2. Celery worker
  3. Выполнение задачи

Терминал 1

Мы начнем с запуска сервера RabbitMQ в терминале №1.

# RabbitMQ 
$ sudo rabbitmq-server
Терминал 1

Запуск сервера RabbitMQ

Терминал 2

Впоследствии мы можем начать процесс Celery worker в терминале №2. Я добавил подробные настройки для worker, чтобы проиллюстрировать, как будет выглядеть результат.

Примечание: это необходимо выполнить из каталога проекта.

# Celery worker 
$ celery worker -A tasks -l INFO

Разберем приведенную выше команду:

  • celery - пакет, который мы вызываем.
  • worker - запуск процесса worker.
  • -A tasks - явно объявляем, что нам нужно приложение
  • -l INFO- задает наличие подробных событий ведения журнала консоли (нам нужно много деталей).

Чтобы проверить, правильно ли загружается worker, найдите в терминале строку concurrency: 4 (prefork).

Кроме того, мы замечаем, что приложение [tasks] было импортировано вместе с регистрацией задач из файла tasks.py. worker зарегистрировал единственную задачу (1) tasks.add.

Терминал 2

Запуск Celery worker с подробной информацией

Терминал №3

Затем мы можем начать выполнение теста в терминале №3. Я буду выполнять цикл, чтобы проиллюстрировать, что служба worker перехватывает несколько задач. Мы добьемся этого, введя add из файла tasks.py, а затем выполнив цикл for. Примечание. После строки add.delay(i, i) вам нужно будет использовать клавиатурную комбинацию Ctrl + Enter длявыполнения команды.

$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
... add.delay(i, i) # delay calls the task 

Теперь вы должны увидеть большой блок вывода в терминале № 3 (выполнение задачи Celery). Это продемонстрирует, что worker получает результат задачи от терминала №2.

Терминал №3

Запуск выполнения задачи Celery

Терминал 2

Если мы проверим Celery worker в терминале № 2, процесс, выполняющий задачу add, мы увидим, что он перехватывает каждое из выполнений задачи.

Терминал 2

Celery worker, получающий и выполняющий задачи

Теперь мы успешно доказали, что Celery и RabbitMQ установлены правильно. Это помогает заложить основу для других задач, которые мы будем реализовывать (например, скрапинг веб-страниц), демонстрируя, как взаимодействуют Celery, Celery worker и RabbitMQ.

Теперь, когда мы рассмотрели установку и основы, мы перейдем к файлу tasks.py, чтобы создать задачи скрапинга веб-страниц.

Создание tasks.py с помощью Celery

Приведенный выше пример помог проверить процесс, который мы будем использовать для выполнения задач с помощью Celery, а также продемонстрировал, как задачи регистрируются с помощью Celery worker.

Основываясь на приведенном выше примере, мы начнем с создания задач скрапинга. Сейчас я собираюсь отказаться от файла scraping.py, так как он будет просто скопирован в файл tasks.py для простоты.

Я начну с удаления из примера функции def add(x, y) и копирования зависимостей (Requests и BeautifulSoup) вместе с самими функциями.

Примечание: я буду использовать те же функции, но в файле tasks.py.

# tasks.py
from celery import Celery
import requests # ввод данных
from bs4 import BeautifulSoup # парсинг xml
import json # экспорт в файлы
app = Celery('tasks')
# функция сохранения
def save_function(article_list):
    with open('articles.txt', 'w' as outfile:
        json.dump(article_list, outfile)
# функция скрапинга
def hackernews_rss():
    article_list = []
    try:
        # выполняем запрос, разбираем данные с помощью XML 
        # разбираем данные в BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # выбираем только "item", которые нам нужны из данных
        articles = soup.findAll('item')
        # для каждого "item" разбираем его в список
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            # создаем объект "article" с данными
            # из каждого "item"
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            # добавляем "article_list" с каждым объектом "article"
            article_list.append(article)
        # после цикла вносим сохраненные объекты в файл .txt
        return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e) 

Упомянутые выше функции парсинга веб-страниц теперь доступны в файле tasks.py вместе с их зависимостями. Следующим шагом является регистрация задач в приложении Celery, для этого просто размещаем @app.task над каждой функцией.

# tasks.py... 
# то же, что и выше
@app.task
def save_function(article_list):
    ...@app.task
def hackernews_rss():

Дальнейшее развитие функций скрапинга

Хотя функции скрапинга, которые мы определили, доказали свою эффективность для извлечения данных из RSS-канала, у нас все еще есть возможности для улучшения. Ниже я в общих чертах обрисовал, что мы будем изменять в наборе инструментов для скрапинга, прежде чем автоматизируем его.

Улучшения

  1. Сохранение результатов в файлах .json с отметками даты и времени.
  2. Добавление даты и времени created_at для каждой статьи.
  3. Добавление строки source, если мы хотим извлекать данные и с других сайтов.

Упомянутые выше изменения невелики, так как мы уже выполнили основную часть работы в рамках первой статьи о реализации. Хотя это несущественное изменение, .json будет читать немного удобнее, чем .txt. Два дополнительных столбца также помогут сделать его более «масштабируемым» при добавлении других каналов, а также при последующем анализе данных.

Давайте начнем с save_function: обновим ее для вывода файла .json и добавим временную метку, чтобы улучшить качество при обращении к ранее извлеченным данным.

# tasks.py
from datetime import datetime # for time stamps
... 
def save_function(articles_list):
    # временная метка и имя файла
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = 'articles-{}.json'.format(timestamp)
    # создаем файл статьи с временной меткой
    with open(filename, 'w').format(timestamp) as outfile:
        json.dump(article_list, outfile)

Я использую функцию datetime.now().strftime(...) для создания метки даты и времени, используя .format(timestamp).

Переходя к двум изменениям в функции hackernews_rss() - мы добавим некоторую информацию об источнике и отметку времени created_at. Это два простых изменения, которые помогут нам оставаться в курсе, если мы добавим дополнительные функции скрапинга.

# tasks.py
...
def hackernews_rss():
    ...
        for a in articles:
            ...
            article = {
                ...
                'created_at': str(datetime.now()),
                'source': 'HackerNews RSS'
                }
        ...

Указанные выше изменения иллюстрируют добавление столбцов created_at и source. К вашему сведению - если вы опустите переход str(), вы не сможете его выполнить из-за Object of type datetime is not JSON serializable.

Планирование задач с помощью Celery

Теперь мы будем использовать возможности Celery по планированию задач, опираясь на то, что beat_schedule поставляется из коробки. Это позволяет нам регистрировать задачи на определенное время с помощью агента планирования.

Отличное описание примеров планирования можно найти в официальной документации. Я также включил несколько дополнительных примеров расписания в файле tasks.py врепозитории GitHub.

Я собираюсь выполнять задачу скрапинга каждую минуту, так как это продемонстрирует, как Celery worker взаимодействует с запланированными задачами. Это не приведет к разнице в данных, поскольку используемый нами RSS-канал не обновляется ежеминутно.

Моя цель в этой демонстрации - показать выходные файлы статей и простое расписание задач.

Создание расписания

# tasks.py
... 
from celery.schedules import crontab # scheduler
# выполнение запланированных задач
app.conf.beat_schedule = {
    # выполняется каждую минуту
    'scraping-task-one-min': {
        'task': 'tasks.hackernews_rss',
        'schedule': crontab()
    }
}
...

Приведенная выше конфигурация будет регистрировать расписание задач в самом приложении Celery. После запуска мы сможем вызвать расписание Celery, чтобы дважды проверить, что он поставил в очередь.

Выполняем задачи

Теперь, когда расписание создано, пришло время включить сервер RabbitMQ и запустить процессы Celery worker.

В этом примере мы будем использовать две вкладки терминала:

  1. Сервер RabbitMQ
  2. Celery worker

Терминал 1

Чтобы запустить сервер RabbitMQ (наш брокер сообщений), мы будем использовать ту же команду, что и раньше.

Примечание: мне также нравится дважды проверять, что узел, созданный при запуске, выключен, поскольку он не запускается с журналом и выдает ошибку, если мы сначала не отключим его.

$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server

Теперь вы должны видеть результат, аналогичный предыдущему (смотрите скриншот экрана, приведенный ниже).

Терминал 1

Запуск сервера RabbitMQ

После запуска сервера RabbitMQ мы можем начать с терминала №2.

Терминал 2

Мы немного изменим команду, так как теперь она будет включать в себя обозначение -Bдля того, какие вызовы worker выполняет расписание.

$ celery -A tasks worker -B -l INFO

Вывод консоли будет иллюстрировать запуск приложения и (в зависимости от того, какое у вас расписание) будет выводить информацию о выполнении задачи.

На скриншоте ниже:

  1. Зарегистрированы [tasks].
  2. Начинается расписание
  3. Наш worker MainProcess получает задачу Received task: tasks.hackernews_rss.
  4. Запускается ForkPoolWorker и выполняет задачу, а затем возвращает результат.
Терминал 2

Выполнение запланированной задачи

К вашему сведению - мы можем остановить выполнение запланированной задачи с помощью клавиатурной комбинации Ctrl + C, так как это будет работать бесконечно.

Теперь, когда мы успешно выполнили задачу save_function(), созданный ранее файл вывел файл .json.

Терминал 2 - 2

Вывод .json файла задачи

Заключение

Мы успешно расширили наш простой инструмент для скрапинга веб-страниц, чтобы создать расписание. Это гарантирует, что нам больше не нужно вручную выполнять задачи скрапинга, и мы можем просто «включить и оставить». После планировки задач, проект сможет скрапить сайты на предмет наличия данных, которые изменяются по заданному расписанию (скажем, каждые 15 минут), и каждый раз возвращать новые данные.

Так что же нам делать дальше?

В третьей части этой серии статей я продемонстрирую приложение Django с интеграцией Celery и скрапингом веб-страниц. Это будет отличный пример веб-приложения, которое извлекает данные на сайт и заполняет его информацией. Нашим конечным продуктом будет агрегатор новостей, который будет извлекать информацию сразу из нескольких RSS-каналов.

Пожалуйста, оставляйте ваши комментарии по текущей теме статьи. Мы очень благодарим вас за ваши комментарии, лайки, дизлайки, подписки, отклики!

Пожалуйста, оставьте свои мнения по текущей теме статьи. За комментарии, дизлайки, подписки, лайки, отклики низкий вам поклон!

Вадим Дворниковавтор-переводчик статьи «Automated web scraping with Python and Celery»