Автоматический веб-скрапинг с помощью Python и Celery

Это вторая часть руководства по созданию инструмента для скрапинга веб-страниц с помощью Python. Мы будем использовать интеграцию Celery и системы управления задачами.

В части 1 «Создание скрапера RSS-каналов с помощью Python» показано, как можно использовать Requests и Beautiful Soup.

В части 3 этой серии статей «Создание приложения для скрапинга веб-страниц с помощью Python, Celery и Django» я продемонстрирую, как интегрировать инструмент для скрапинга веб-страниц в приложения.

В предыдущей статье я создал простую программу скрапинга RSS-каналов, которая извлекает информацию с помощью Requests и BeautifulSoup (смотрите код на GitHub). Теперь мы будем использовать этот код как основу для создания системы управления задачами и запланированного скрапинга.

Следующим логическим шагом в скрапинге данных с веб-сайтов, которые часто меняются (то есть RSS-канала, отображающего X элементов за раз), является регулярный скрапинг. В предыдущем примере парсинга мы использовали командную строку для выполнения кода по команде. Однако это не масштабируемое решение. Чтобы автоматизировать его, мы добавим Celery для создания системы очереди задач с периодом выполнения.

Я буду использовать следующие инструменты:

  • Python 3.7+;
  • Requests;
  • BeautifulSoup 4;
  • Текстовый редактор (я использую Visual Studio Code);
  • Celery- распределенная очередь задач;
  • RabbitMQ- брокер сообщений.

Примечание. Все зависимости библиотеки перечислены в файлах requirements.txtи Pipfile / Pipfile.lock.

Краткое пояснение по Celery

Celery - это система управления задачами, она работает совместно с брокером сообщений для выполнения асинхронных задач.

Краткое пояснение по Celery

Выше показано, что инициатор задач (наше приложение для скрапинга веб-страниц) передает информацию о задаче в очередь (Celery) для выполнения. Планировщик (Celery beat) выполняет их как задачи cron, без какой-либо дополнительной обработки или взаимодействий вне запуска приложения Celery.

Статьи

  1. Создание скрапера RSS-канала с помощью Python
  2. Автоматический парсинг веб-страниц с помощью Python и Celery (это руководство)
  3. Создание приложения для парсинга веб-страниц с помощью Python, Celery и Django

Краткое содержание проекта

Вот схема шагов, которые мы предпримем для создания окончательного проекта:

  1. Установка Celery и RabbitMQ - Celery управляет постановкой задач в очередь и их выполнением, а RabbitMQ обрабатывает сообщения.
  2. Начало работы с RabbitMQ и обработка логов.
  3. Создание доказательства концепции «Hello World» с помощью Celery, чтобы убедиться в том, что он работает.
  4. Регистрация функции скрапинга py с помощью Celery.
  5. Дальнейшее развитие и управление задачами скрапинга.
  6. Создание и выполнение расписания для задач скрапинга.

Примечание. Введение в RabbitMQ и Celery довольно длинное, если у вас есть опыт работы с ними, я рекомендую сразу перейти к шагу 4.

Приступаем к работе

Мы начнем с открытия каталога предыдущего проекта, в данном случае это web_scraping_example из предыдущей статьи. Если хотите, его можно клонировать с GitHub.

Примечание. Я использую Ubuntu, поэтому мои команды могут отличаться от ваших. Кроме того, для краткости я пропустил повторение кода с помощью ...

Кроме того, требования к проекту могут задавать для этой установки использование pip, как в примере, приведенном ниже.

$ pip install celery

Почему Celery & RabbitMQ?

Мы используем Celery и RabbitMQ, потому что они довольно просты в настройке, тестировании и масштабировании в производственной среде. Хотя мы могли бы выполнять периодические задачи с помощью других библиотек, или просто заданий cron, в целом, я хотел, чтобы в следующей статье этой серии мы основывались на этом.

В долгосрочной перспективе нам будет намного проще, если мы будем использовать что-то, что сможем масштабировать в следующем проекте, а также изучим некоторые ключевые команды и инструменты по мере постепенного увеличения сложности.

Настройка RabbitMQ

Настроить и запустить сервер RabbitMQ в Ubuntu значительно проще, чем в операционной системе Windows. Я буду следовать официальному руководству по установке.

Ниже приведены команды установки для Debian, Ubuntu.

$ sudo apt-get update -y
$ sudo apt-get install curl gnupg -y
$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
$ sudo apt-get install apt-transport https
$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF
$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang 
$ deb https://dl.bintray.com/rabbitmq/debian bionic main
$ EOF
$ sudo apt-get update -y
$ sudo apt-get install rabbitmq-server -y --fix-missing

Когда я впервые установил RabbitMQ в виртуальной среде, он запустился автоматически. Чтобы проверить, что команда rabbitmq-server работает (ее мы будем использовать при работе с Celery), мне пришлось закрыть службу.

Я также заметил, что в разрешениях по умолчанию для установки указано Only root or rabbitmq should run rabbitmqctl shutdown, что мне показалось странным. Вместо того, чтобы решить эту проблему, я решил просто запустить sudo.

$ sudo rabbitmqctl shutdown

Затем я смог протестировать сервер, используя rabbitmq-server, я привожу команду и вывод ниже.

$ sudo rabbitmq-server
Настройка RabbitMQ

Вывод sudo rabbitmq-server в терминале

К вашему сведению - вы можете завершить команду rabbitmq-server, используя клавиатурную комбинацию Ctrl + C.

Для настройки RabbitMQ в операционной системе Windows требуются дополнительные действия. В официальной документации есть руководство по ручной установке.

Тестирование Celery с RabbitMQ

Прежде чем перейти к написанию кода проекта, я обычно начинаю с тестирования базовых примеров в стиле «Hello World», которые есть в пакетах и ​​фреймворках. Это дает мне общее представление о том, чего я могу ожидать, а также несколько команд для терминала, которые нужно добавить в набор инструментов для каждой конкретной технологии.

В данном случае работа с Celery будет сопровождаться их собственным подтверждением концепции «Hello World» в виде задачи, выполняющей базовое добавление. Оно доступно в официальной документации Celery. Я собираюсь вкратце проиллюстрировать его. Однако, если вам нужны пояснения или подробный обзор, пожалуйста, ознакомьтесь с официальной документаций.

Теперь, когда у нас установлен и проверен брокер RabbitMQ, можно приступить к созданию файла tasks.py. Он будет содержать задачи, которые мы будем выполнять, будь то добавление, скрапинг веб-страниц или сохранение пользователей в базе данных. Теперь я внесу изменения в каталог проекта.

$ touch tasks.py

Чтобы создать задачу добавления, мы будем импортировать Celery, и создавать функцию с флагом @app.task, позволяющую Celery workers получать задачу в системе очереди.

# tasks.py
from celery import Celery
app = Celery('tasks') # определяем название приложения, которое будет использоваться во флаге
@app.task # регистрация задачи для приложения
def add(x, y):
    return x + y

Используя add этой задачи, мы можем начать тестирование исполнения. Здесь все может немного запутаться, так как на следующем шаге у меня будут одновременно открыты три терминала.

Я начну с краткого объяснения, затем углублюсь в код и предоставлю снимки экрана.

Пояснение

Чтобы завершить тест, мы будем выполнять задачу Celery с помощью командной строки, импортировав файл tasks.py и вызвав его. Чтобы задачи были получены в очередь, нам нужно, чтобы Celery worker и сервисы RabbitMQ были активными. Сервер RabbitMQ будет действовать как брокер сообщений, в то время как Celery worker будет выполнять задачи.

Я буду обозначать каждый шаг номерами терминалов:

  1. RabbitMQ
  2. Celery worker
  3. Выполнение задачи

Терминал 1

Мы начнем с запуска сервера RabbitMQ в терминале №1.

# RabbitMQ 
$ sudo rabbitmq-server
Терминал 1

Запуск сервера RabbitMQ

Терминал 2

Впоследствии мы можем начать процесс Celery worker в терминале №2. Я добавил подробные настройки для worker, чтобы проиллюстрировать, как будет выглядеть результат.

Примечание: это необходимо выполнить из каталога проекта.

# Celery worker 
$ celery worker -A tasks -l INFO

Разберем приведенную выше команду:

  • celery - пакет, который мы вызываем.
  • worker - запуск процесса worker.
  • -A tasks - явно объявляем, что нам нужно приложение
  • -l INFO- задает наличие подробных событий ведения журнала консоли (нам нужно много деталей).

Чтобы проверить, правильно ли загружается worker, найдите в терминале строку concurrency: 4 (prefork).

Кроме того, мы замечаем, что приложение [tasks] было импортировано вместе с регистрацией задач из файла tasks.py. worker зарегистрировал единственную задачу (1) tasks.add.

Терминал 2

Запуск Celery worker с подробной информацией

Терминал №3

Затем мы можем начать выполнение теста в терминале №3. Я буду выполнять цикл, чтобы проиллюстрировать, что служба worker перехватывает несколько задач. Мы добьемся этого, введя add из файла tasks.py, а затем выполнив цикл for. Примечание. После строки add.delay(i, i) вам нужно будет использовать клавиатурную комбинацию Ctrl + Enter длявыполнения команды.

$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
... add.delay(i, i) # delay calls the task 

Теперь вы должны увидеть большой блок вывода в терминале № 3 (выполнение задачи Celery). Это продемонстрирует, что worker получает результат задачи от терминала №2.

Терминал №3

Запуск выполнения задачи Celery

Терминал 2

Если мы проверим Celery worker в терминале № 2, процесс, выполняющий задачу add, мы увидим, что он перехватывает каждое из выполнений задачи.

Терминал 2

Celery worker, получающий и выполняющий задачи

Теперь мы успешно доказали, что Celery и RabbitMQ установлены правильно. Это помогает заложить основу для других задач, которые мы будем реализовывать (например, скрапинг веб-страниц), демонстрируя, как взаимодействуют Celery, Celery worker и RabbitMQ.

Теперь, когда мы рассмотрели установку и основы, мы перейдем к файлу tasks.py, чтобы создать задачи скрапинга веб-страниц.

Создание tasks.py с помощью Celery

Приведенный выше пример помог проверить процесс, который мы будем использовать для выполнения задач с помощью Celery, а также продемонстрировал, как задачи регистрируются с помощью Celery worker.

Основываясь на приведенном выше примере, мы начнем с создания задач скрапинга. Сейчас я собираюсь отказаться от файла scraping.py, так как он будет просто скопирован в файл tasks.py для простоты.

Я начну с удаления из примера функции def add(x, y) и копирования зависимостей (Requests и BeautifulSoup) вместе с самими функциями.

Примечание: я буду использовать те же функции, но в файле tasks.py.

# tasks.py
from celery import Celery
import requests # ввод данных
from bs4 import BeautifulSoup # парсинг xml
import json # экспорт в файлы
app = Celery('tasks')
# функция сохранения
def save_function(article_list):
    with open('articles.txt', 'w' as outfile:
        json.dump(article_list, outfile)
# функция скрапинга
def hackernews_rss():
    article_list = []
    try:
        # выполняем запрос, разбираем данные с помощью XML 
        # разбираем данные в BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # выбираем только "item", которые нам нужны из данных
        articles = soup.findAll('item')
        # для каждого "item" разбираем его в список
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            # создаем объект "article" с данными
            # из каждого "item"
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            # добавляем "article_list" с каждым объектом "article"
            article_list.append(article)
        # после цикла вносим сохраненные объекты в файл .txt
        return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e) 

Упомянутые выше функции парсинга веб-страниц теперь доступны в файле tasks.py вместе с их зависимостями. Следующим шагом является регистрация задач в приложении Celery, для этого просто размещаем @app.task над каждой функцией.

# tasks.py... 
# то же, что и выше
@app.task
def save_function(article_list):
    ...@app.task
def hackernews_rss():

Дальнейшее развитие функций скрапинга

Хотя функции скрапинга, которые мы определили, доказали свою эффективность для извлечения данных из RSS-канала, у нас все еще есть возможности для улучшения. Ниже я в общих чертах обрисовал, что мы будем изменять в наборе инструментов для скрапинга, прежде чем автоматизируем его.

Улучшения

  1. Сохранение результатов в файлах .json с отметками даты и времени.
  2. Добавление даты и времени created_at для каждой статьи.
  3. Добавление строки source, если мы хотим извлекать данные и с других сайтов.

Упомянутые выше изменения невелики, так как мы уже выполнили основную часть работы в рамках первой статьи о реализации. Хотя это несущественное изменение, .json будет читать немного удобнее, чем .txt. Два дополнительных столбца также помогут сделать его более «масштабируемым» при добавлении других каналов, а также при последующем анализе данных.

Давайте начнем с save_function: обновим ее для вывода файла .json и добавим временную метку, чтобы улучшить качество при обращении к ранее извлеченным данным.

# tasks.py
from datetime import datetime # for time stamps
... 
def save_function(articles_list):
    # временная метка и имя файла
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = 'articles-{}.json'.format(timestamp)
    # создаем файл статьи с временной меткой
    with open(filename, 'w').format(timestamp) as outfile:
        json.dump(article_list, outfile)

Я использую функцию datetime.now().strftime(...) для создания метки даты и времени, используя .format(timestamp).

Переходя к двум изменениям в функции hackernews_rss() - мы добавим некоторую информацию об источнике и отметку времени created_at. Это два простых изменения, которые помогут нам оставаться в курсе, если мы добавим дополнительные функции скрапинга.

# tasks.py
...
def hackernews_rss():
    ...
        for a in articles:
            ...
            article = {
                ...
                'created_at': str(datetime.now()),
                'source': 'HackerNews RSS'
                }
        ...

Указанные выше изменения иллюстрируют добавление столбцов created_at и source. К вашему сведению - если вы опустите переход str(), вы не сможете его выполнить из-за Object of type datetime is not JSON serializable.

Планирование задач с помощью Celery

Теперь мы будем использовать возможности Celery по планированию задач, опираясь на то, что beat_schedule поставляется из коробки. Это позволяет нам регистрировать задачи на определенное время с помощью агента планирования.

Отличное описание примеров планирования можно найти в официальной документации. Я также включил несколько дополнительных примеров расписания в файле tasks.py врепозитории GitHub.

Я собираюсь выполнять задачу скрапинга каждую минуту, так как это продемонстрирует, как Celery worker взаимодействует с запланированными задачами. Это не приведет к разнице в данных, поскольку используемый нами RSS-канал не обновляется ежеминутно.

Моя цель в этой демонстрации - показать выходные файлы статей и простое расписание задач.

Создание расписания

# tasks.py
... 
from celery.schedules import crontab # scheduler
# выполнение запланированных задач
app.conf.beat_schedule = {
    # выполняется каждую минуту
    'scraping-task-one-min': {
        'task': 'tasks.hackernews_rss',
        'schedule': crontab()
    }
}
...

Приведенная выше конфигурация будет регистрировать расписание задач в самом приложении Celery. После запуска мы сможем вызвать расписание Celery, чтобы дважды проверить, что он поставил в очередь.

Выполняем задачи

Теперь, когда расписание создано, пришло время включить сервер RabbitMQ и запустить процессы Celery worker.

В этом примере мы будем использовать две вкладки терминала:

  1. Сервер RabbitMQ
  2. Celery worker

Терминал 1

Чтобы запустить сервер RabbitMQ (наш брокер сообщений), мы будем использовать ту же команду, что и раньше.

Примечание: мне также нравится дважды проверять, что узел, созданный при запуске, выключен, поскольку он не запускается с журналом и выдает ошибку, если мы сначала не отключим его.

$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server

Теперь вы должны видеть результат, аналогичный предыдущему (смотрите скриншот экрана, приведенный ниже).

Терминал 1

Запуск сервера RabbitMQ

После запуска сервера RabbitMQ мы можем начать с терминала №2.

Терминал 2

Мы немного изменим команду, так как теперь она будет включать в себя обозначение -Bдля того, какие вызовы worker выполняет расписание.

$ celery -A tasks worker -B -l INFO

Вывод консоли будет иллюстрировать запуск приложения и (в зависимости от того, какое у вас расписание) будет выводить информацию о выполнении задачи.

На скриншоте ниже:

  1. Зарегистрированы [tasks].
  2. Начинается расписание
  3. Наш worker MainProcess получает задачу Received task: tasks.hackernews_rss.
  4. Запускается ForkPoolWorker и выполняет задачу, а затем возвращает результат.
Терминал 2

Выполнение запланированной задачи

К вашему сведению - мы можем остановить выполнение запланированной задачи с помощью клавиатурной комбинации Ctrl + C, так как это будет работать бесконечно.

Теперь, когда мы успешно выполнили задачу save_function(), созданный ранее файл вывел файл .json.

Терминал 2 - 2

Вывод .json файла задачи

Заключение

Мы успешно расширили наш простой инструмент для скрапинга веб-страниц, чтобы создать расписание. Это гарантирует, что нам больше не нужно вручную выполнять задачи скрапинга, и мы можем просто «включить и оставить». После планировки задач, проект сможет скрапить сайты на предмет наличия данных, которые изменяются по заданному расписанию (скажем, каждые 15 минут), и каждый раз возвращать новые данные.

Так что же нам делать дальше?

В третьей части этой серии статей я продемонстрирую приложение Django с интеграцией Celery и скрапингом веб-страниц. Это будет отличный пример веб-приложения, которое извлекает данные на сайт и заполняет его информацией. Нашим конечным продуктом будет агрегатор новостей, который будет извлекать информацию сразу из нескольких RSS-каналов.