Сквозной идентификатор: решение проблемы мэтчинга персональных данных студентов Refocus

Время чтения текста – 19 минут

В системах сквозной аналитики ключевую роль играет правильная модель атрибуции. Без нее данные невозможно интерпретировать, и их ценность для бизнеса невелика. При этом важно понимать, что любая модель напрямую зависит от качества данных.

Частая проблема с сырыми данными в том, что информация об одном клиенте дублируется или, напротив, противоречит друг другу в разных источниках.

Кроме того, что предобработка данных — база для аналитика, без правильного объединения персональных данных в принципе сложно отследить клиентский путь. Значит, нужно настраивать процессы объединения неоднородных персональных данных.

Сегодня в любом клиентском бизнесе воронки регистрации устроены таким образом, что клиенты попадают в базу множеством способов — часто через маркетинговые каналы, которых всегда много (рассылки, реклама, соцсети). В каждом таком канале может быть ссылка на форму подписки, регистрацию на платформе или чат, и один клиент часто проходит все эти этапы. Сразу же образуется путаница в идентификации, которая сильно влияет на качество данных и результаты аналитики, если ее не лечить.

Мы столкнулись с этой проблемой, работая с одним из наших клиентов, и решили ее, создав сквозной идентификатор. Это уникальный номер, который присваивается реальному клиенту и дублируется во все источники, где есть данные об этом клиенте, тем самым избавляя от путаницы.

Кейс Refocus: данные и путь клиента

Мы разрабатывали кастомную систему сквозной аналитики для эдтех-стартапа Refocus. Данные каждого студента в системы Refocus попадали из нескольких источников и были записаны несколько раз — как минимум при регистрации на курс, при первом входе на образовательную платформу и при входе в чат сопровождения.

В нашем случае мэтчинг был важнее всего по трем источникам из тринадцати:

  • amoCRM, где фиксируется весь клиентский путь студента;
  • Discord, где проходило сопровождение студентов;
  • Thinkific, сама образовательная платформа с курсами.

Остальные источники, с которыми мы работали, либо не содержали данных студентов (например, цифры эффективности работы sales-менеджеров были завязаны на данных сотрудников и трекались через другие системы), либо дублировали информацию из указанных трех.

В Discord и Thinkific данные попадали напрямую, от студентов при регистрации в системах, а затем подтягивались в amoCRM. Основные причины несовпадения клиентских данных как у Refocus, так и в похожих случаях — человеческий фактор (опечатки), наличие у людей более чем одного телефона или адреса почты и ограничения самих платформ, с которых приходят данные: разный заданный формат полей и их количество.

Часть этих факторов может решаться корректировкой самой клиентской воронки. Правда, не все платформы позволяют одинаково настроить вводные поля, а просьбы вводить данные в конкретном формате не всегда работают и не страхуют от ошибок. Плюс, задача аналитиков — получить чистые данные в любом случае.

Задача и поиск решения

Данные в Refocus мы подгружали в хранилище в BigQuery напрямую из интересующих нас источников (рекламных кабинетов, LMS и т. д.), используя Python. В дальнейшем на этих данных строились дашборды в Tableau.

Обнаружить проблему несложно — при создании хранилища и дальнейшей выгрузке данных из него мы в любом случае чистим датасет от дубликатов и несовпадений.

Поля, в которых возникали ошибки и для которых нам важен был мэтчинг, чтобы правильно отследить клиентский путь:

  • имя — да, люди иногда вводят разные вариации ФИО (Юлия, Юля и Бля — на деле один человек!);
  • телефон — с кодом страны или без, с пробелами, дефисами или слитно;
  • электронная почта — длинные строки сложного формата, в которых легко опечататься.

Поначалу, пока количество студентов Refocus было относительно небольшим, достаточно было скриптов, которые объединяли данные по одному из этих полей. В полученных таблицах в Tableau проводился поиск строк с пустым значением в соответствующем поле — и вот видно всех студентов, чьи данные не сошлись.

Количество таких строк было в пределах пары десятков, и трекать и объединять их было несложно вручную. Это делалось прямо в первоисточниках сотрудниками Refocus, которые могли поправить опечатки и ошибки у себя в системах. После этого наш код выгрузки в хранилище перезапускался и тянул уже чистые данные. Если после этого что-то не сходилось, то наши аналитики правили информацию на уровне базы данных.

Но при росте компании в какой-то момент число студентов, потерянных при мэтчинге, могло достигать сотни за месяц. Пока ошибка обнаружится, данные поправят в источниках, а мы перезапустим код выгрузки, могло пройти несколько часов — а это критичный интервал. Да и перезапускать выгрузку каждый день ради нескольких несовпадений — неэффективно. Стало понятно, что масштаб проблемы требует более точного и универсального решения.

Вообще, в такой ситуации возможны несколько вариантов. Можно бесконечно править скрипты мэтчинга, учитывая новые и новые случаи и создавая костыли. А можно, например, настроить алерты в оркестраторе процессов (в нашем случае  — Airflow), которые позволят моментально узнавать о появившемся несовпадении и объединять “потерянные” клиентские сущности по паре за раз. Но это все еще неполная автоматизация, и она только ускоряет, а не упрощает процесс.

Руководствуясь соображениями эффективности, мы предложили ввести сквозной идентификатор — одно значение ID, присваиваемое одному клиенту после автоматической интеграции его данных из разных источников.

Реализация решения и рабочий процесс

Чтобы понять масштаб проблемы, мы начали с того, что создали таблицы несовпадающих персональных данных. Для этого мы использовали скрипты на Python. Эти скрипты объединяли данные из разных источников и создавали из них большую сводную таблицу. Для того, чтобы свести данные о студенте в одну сущность, использовался мэтчинг по адресу электронной почты. Мы попробовали мэтчить по имени, фамилии, телефону (который сначала надо было привести к одному формату!) и почте, и именно последний вариант показал самую высокую точность. Возможно, дело в том, что из всех данных почта имеет самый однородный формат, поэтому остается учитывать только опечатки.

Например, нам нужно было мэтчить данные для создания дашборда по возвратам, о которых информация объединялась как раз из наших трех основных источников. В ранней версии скрипта данные отбирались таким образом:

WITH snapshot_ AS (
      SELECT DISTINCT s.*,
        IFNULL(ae.name, ap.name) as contact_name,
        ap.phone, ae.email,
        split(replace(trim(lower(ae.email)),' ',''),'@')[OFFSET(0)] as email_first_part,
        ai.thinkific_id, ai.intercom_id, ac.student_id
      FROM (
        SELECT *,
          ROW_NUMBER() OVER(PARTITION BY lead_id ORDER BY updated_at DESC) as num_,
        FROM `Differture.amocrm_leads_snapshot`
      ) s
      LEFT JOIN (
        SELECT DISTINCT lead_id, contact_id, name, email,
          ROW_NUMBER() OVER(PARTITION BY lead_id ORDER BY contact_id) as num1_
        FROM `Differture.amo_emails`
      ) ae using(lead_id)
      LEFT JOIN (
        SELECT DISTINCT lead_id, contact_id, name, phone,
          ROW_NUMBER() OVER(PARTITION BY lead_id ORDER BY contact_id) as num2_
        FROM `Differture.amo_phones`
      ) ap using(lead_id)
      LEFT JOIN `Differture.amo_contact_thinkific_intercom_match` ai using(lead_id)
      LEFT JOIN `Differture.AmoContacts` ac on cast(ae.contact_id as string)=ac.amo_id
      WHERE (num_=1 or num_ is null) and (num1_=1 or num1_ is null) and (num2_=1 or num2_ is null)
        and s.pipeline_id in (4920421,5245535) and s.status_id=142 and lower(s.lead_name) not like '%test%'
    )

Как можно заметить, идея сквозного идентификатора здесь уже присутствует — фигурирует student_id. На самом деле, в этой версии скрипта это графа из AmoContacts — таблицы, в которой хранятся только данные из amoCRM. Никаких джойнов по student_id пока не происходит. А происходят по email_first_part, адресу почты до символа @:

select distinct * from th_amo_ds_rf
    left join calendly ce using(email_first_part)
    left join typeform_live tfl on email_first_part=tf_email_first_part
    left join typeform tf using(email_first_part)
    left join csat using(email_first_part)

Первым шагом по практическому введению идентификатора была таблица students_main_info, созданная в BigQuery in-house специалистом Refocus. К сожалению, у нас нет доступа к коду, который использовался для присвоения идентификатора. Зато мы можем показать вид этой таблицы:

student_full_name string
student_email string
student_country_id string
student_country_name string
student_courses_ids array
student_courses_names array
student_cohort_id string
student_cohort_name string
cohort_community_manager_name string
cohort_community_manager_email string
student_onboarding_live_session_id string
student_onboarding_live_session_time string
student_onboarding_live_session_zoom_url string
amo_contact_id string
intercom_contact_id string
thinkific_student_id string
discord_user_id string
discord_user_discord_id string
discord_guild_id string
discord_channel_id string
discord_roles string

В students_main_info хранились данные из нужных источников с общим идентификатором в первой строке, и объединение проходило через сравнение этого поля.

При этом поле student_id использовалось пока не везде; также использовались другие поля этой таблицы — например, thinkific_student_id или discord_user_id.

После выгрузки и мэтчинга данных с помощью students_main_info студентов, которые потерялись при объединении, стало меньше, чем при первой схеме мэтчинга. Так мы убедились, что движемся в верном направлении. Тем не менее, использование одной таблицы, которая содержит больше десятка полей обо всех имеющихся персональных данных, не очень эффективно. Данные в ней уже обработаны скриптом специалиста Refocus, и если надо сверить их с сырыми источниками или ввести новый критерий отслеживания, все придется менять на бэкенде.

Что получилось в итоге

После теста сквозного идентификатора через одну большую таблицу мы продолжили улучшать структуру данных на бэке. Вместо students_main_info усилиями специалиста Refocus появилась подробная сеть более мелких таблиц, которые могут обращаться друг к другу и лежат в одном хранилище с нашими таблицами сырых данных.

Вот так выглядела схема соотношения этих таблиц:

А вот так выглядела основная таблица Students:

В ней-то и находились основные персональные данные студентов с присвоенным идентификатором, и к ней можно было обращаться для мэтчинга из остальных источников.

Остальные таблицы выглядели похоже: всегда было поле с идентификатором и информация о какой-то характеристике студента — когорта, курс, роль в дискорде и так далее.

Финальный код, написанный нашими аналитиками,  объединял данные при выгрузке из хранилища, и больше не опирался на ненадежный мэтчинг через имейл.

Сначала он отбирал собранные нами данные из amoCRM (amocrm_leads_snapshot) и объединял их с контактной информацией клиентов. Затем в таблицу добавлялось поле student_id и отбирались данные, которые понадобятся нам дальше.

WITH snapshot_ AS (
      SELECT DISTINCT s.*,
        ac.name as contact_name, ac.phone, ac.email,
        split(replace(trim(lower(ac.email)),' ',''),'@')[OFFSET(0)] as email_first_part,
        ac.intercom_id, ac.student_id
      FROM (
        SELECT *,
          ROW_NUMBER() OVER(PARTITION BY lead_id ORDER BY updated_at DESC) as num_,
        FROM `Differture.amocrm_leads_snapshot`
      ) s
      LEFT JOIN (
        select cast(al.amo_id as INT64) as lead_id, cast(ac.amo_id as INT64) as contact_id,
          ac.name, emails as email, phone, student_id, ic.intercom_id,
          ROW_NUMBER() OVER(PARTITION BY al.amo_id ORDER BY ac.amo_id) as num1_
        from `Differture.AmoContacts` ac
        left join `Differture.AmoLeads` al on al.amo_contact_id=ac.id
        left join `Differture.IntercomContacts` ic using(student_id)
        , unnest(ac.emails) emails
      ) ac using(lead_id)
      WHERE (num_=1 or num_ is null) and (num1_=1 or num1_ is null)
        and s.pipeline_id in (4920421,5245535) and s.status_id=142 and lower(s.lead_name) not like '%test%'
    )

Теперь при создании общей таблицы о возвратах с данными из amo, Thinkific и Discord объединение проходило через student_id:

th_amo_ds_rf as (
      select distinct * except (channel_id, channel),
        ifnull(channel_id, 'Not in discord') as channel_id,
        ifnull(channel, 'Not in discord') as channel
      from thinkific_amo_refunds
      full outer join discord using(student_id)
    )

Когда объединенные таблицы данных студентов были созданы, получить таблицы несовпадений можно было простой строкой кода в Tableau:

Пустое значение поля student_id означает, что мэтча не случилось — где-то информация расходилась слишком сильно и не подтянулась в таблицы с идентификатором. Раньше, до введения идентификатора, поиск был таким же, но обращался к полям почты, телефона или имени-фамилии.

Ниже можно увидеть таблицу, где данные из Thinkific не совпадали с amoCRM после перехода на Student ID. В этом случае студент есть в LMS, значит, на курсе учится — но его либо нет в системе учета, либо данные в ней разнятся с LMS.

А вот таблица, где данные из Discord не совпадали с amoCRM. Все так же, как выше — студент есть в чатах сопровождения, но не ищется по своим данным в amoCRM.

Оба скриншота показывают количество несовпадений примерно за месяц. Как видно по этим таблицам, количество несовпадений уменьшилось с 80-90 до пары десятков — примерно на 75%. Это позволило сократить количество перезапусков кода выгрузки вручную и уменьшить затраты времени и технических ресурсов на поддержание системы.

Выводы

Сквозной идентификатор — эффективное решение проблемы мэтчинга персональных данных. Он позволяет максимально автоматизировать процесс отслеживания и устранения несовпадений или дубликатов клиентских сущностей при выгрузке данных для анализа. В случаях, когда объем данных в системе невелик, а у компании нет возможности выделить ресурсы на реализацию такого решения, можно воспользоваться и другими вариантами. Например, алерты в оркестраторе процессов хорошо справятся в ситуации, когда объединить данные — вопрос ручного запуска одного скрипта раз в неделю. Но сквозной идентификатор — наверное, самое универсальное из доступных решений, которое покроет большинство ошибок и заметно уменьшит погрешность в качестве данных.

 Нет комментариев    1624   6 мес   Analytics Engineering   Data Analytics   sql   tableau

Эффективное логирование в Python

Время чтения текста – 5 минут

В Python существует встроенный модуль logging, который позволяет журналировать этапы выполнения программы. Логирование полезно когда, например, нужно оставить большой скрипт сбора / обработки данных на длительное время, а в случае возникновения непредвиденных ошибок выяснить, с чем они могут быть связаны. Анализ логов позволяет быстро и эффективно выявлять проблемные места в коде, но для удобного использования модуля следует написать несколько функций по взаимодействию с ним и вынести их в отдельный файл — сегодня мы этим и займёмся.

Пишем логгер

Создадим файл loggers.py. Для начала импортируем модули и задаём пару значений по умолчанию — директорию для файла с логом и наименование конфигурационного файла, содержащего шаблоны логирования. Его мы опишем следом.

import os
import json
import logging
import logging.config

FOLDER_LOG = "log"
LOGGING_CONFIG_FILE = 'loggers.json'

Опишем функцию для создания папки с логом: она принимает наименование для папки, но по умолчанию будет называть её «log». Директорию создаём при помощи модуля os и только в том случае, если такой директории ещё не существует.

def create_log_folder(folder=FOLDER_LOG):
    if not os.path.exists(folder):
        os.mkdir(folder)

Теперь опишем функцию создания нового логгера по заданному шаблону. Функция должна создать директорию для логирования, открыть конфигурационный файл и достать нужный шаблон. Затем по шаблону при помощи модуля logging создаём новый логгер:

def get_logger(name, template='default'):
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        dict_config = json.load(f)
        dict_config["loggers"][name] = dict_config["loggers"][template]
    logging.config.dictConfig(dict_config)
    return logging.getLogger(name)

Для удобства опишем ещё одну функцию — получение стандартного лога. Она ничего не принимает и нужна только для инициализации лога с шаблоном default:

def get_default_logger():
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        logging.config.dictConfig(json.load(f))

    return logging.getLogger("default")

Описываем конфигурационный файл

Создадим по соседству файл loggers.json — он будет содержать настройки логгера. Внутри указываем такие настройки, как версию логгера, форматы логирования для разных уровней, наименование выходного файла и его максимальный размер:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "default": {
            "format": "%(asctime)s - %(processName)-10s - %(name)-10s - %(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "default"
        },
        "rotating_file": {
            "class": "logging.handlers.RotatingFileHandler",
            "level": "DEBUG",
            "formatter": "default",
            "filename": "log/main.log",
            "maxBytes": 10485760,
            "backupCount": 20
        }
    },
    "loggers": {
        "default": {
            "handlers": ["console", "rotating_file"],
            "level": "DEBUG"
        }
    }
}

Использование логгера

Теперь давайте представим, что вы выгружаете данные по API и складываете их в базу данных на примере нашего материала про транзакции в SQLAlchemy. Рассмотрим заключительную часть кода: добавим строку с инициализацией стандартного логгера и изменим код так, чтобы сначала в лог выводился offset, затем в случае успеха предложение «Successfully inserted data», а в случае ошибки выводилась сама ошибка и предложение: «Error: tried to insert data but got an error».

logger = get_logger('main')

offset = 0
subs_count = get_subs_count(group_id)

while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            logger.info(f"{offset} / {subs_count}")
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError("This is a test errror"))
            transaction.commit()
            logger.info(f"Successfully inserted data")
        except Exception as E:
            transaction.rollback()
            logger.error(f"Error: tried to insert {df} but got an error: {E}")
    time.sleep(1)
    offset += 10

Теперь во время работы программы будет отображаться такой вывод, который также будет записан в файл main.log папки log в директории проекта. После завершения работы программы можно исследовать логи, посмотреть, на каких offset возникли проблемы, какие данные не удалось вставить и прочитать текст ошибки:

 Нет комментариев    602   2021   Analytics Engineering   python   логирование

Бот для преобразования данных из Coinkeeper

Время чтения текста – 6 минут

Coinkeeper — кроссплатформенное приложение для учёта финансов. Внутри можно выпустить виртуальную банковскую карту Visa с бесплатным годовым обслуживанием, которая будет присылать уведомления, если вы тратите больше, чем запланировали. Помимо уведомлений, приложение ведёт историю трат и позволяет выгрузить сводный отчёт в формате csv. Данные, которое выгружает приложение ещё не готовы к анализу и выглядят так:

Азат Шарипов сделал скрипт обработки данных в пригодный для Tableau вид и подготовил Tableau Public книгу, а Рома Бунин в рамках своего проекта «Переверстка» переработал дашборд.

Мы решили тоже поучаствовать, и с нашей стороны Елизавета Мазурова сделала чат-бота.

Чат-бот крутой! Помимо того, что он может как и прежде отдавать обратно .csv-файл, он позволяет автоматизировать рутину по обновлению отчета через Google-таблицы. Как, наверное, многие помнят, Tableau Public может работать на гугл-таблицах или csv файлах, но не разрешает подключение к данным. Бот умный: он создаст за вас гугл-таблицу и когда вы повторно отправите ему новый файл обновит ее.

Использование бота

Перейдите в диалог с ботом и введите команду /start — в ответе бот расскажет немного о себе. Для продолжения работы нажмите на кнопку «Начать».

Сразу после можно отправить csv-файл, выгруженный из Coinkeeper:

Выберите тип файла — csv или таблицу в Google Spreadsheets.

В случае выбора csv-файла бот пришлёт его:

А в случае ссылки в первый раз нужно будет пройти небольшую регистрацию — указать почту и наименование для файла.

Затем бот пришлёт ссылку на файл:

Скрипт преобразовал данные, и таблицу можно указать в качестве источника данных в Tableau. А благодаря тому, что в случае загрузки нового файла создаётся не новая таблица, а обновляется старая, отчёт в Tableau тоже обновится. В результате открывается возможность еженедельно присылать боту новую таблицу и сразу переходить в обновлённый отчёт.

 Нет комментариев    132   2021   Analytics Engineering   coinkeeper   google analytics   tableau   telegram

Python и тексты нового альбома Земфиры: анализируем суть песен

Время чтения текста – 18 минут

Неделю назад вышёл первый за 8 лет студийный альбом Земфиры «Бордерлайн». К работе помимо рок-певицы приложили руку разные люди, в том числе и её родственники — рифф для песни «таблетки» написал её племянник из Лондона. Альбом получился разнообразным: например, песня «остин» посвящена главному персонажу игры Homescapes российской студии Playrix (кстати, посмотрите свежие Бизнес-секреты с братьями Бухманами, там они тоже про это рассказывают) — Земфире нравится игра, и для трека она связалась со студией. А сингл «крым» был написан в качестве саундтрека к новой картине соратницы Земфиры — Ренаты Литвиновой.

Послушать альбом в Apple Music / Яндекс.Музыке / Spotify

Тем не менее, дух всего альбома довольно мрачен — в песнях часто повторяются слова «боль», «ад», «бесишь» и прочие по смыслу. Мы решили провести разведочный анализ нового альбома, а затем при помощи модели Word2Vec и косинусной меры посмотреть на семантическую близость песен между собой и вычислить общее настроение альбома.

Для тех, кому скучно читать про подготовку данных и шаги анализа можно перейти сразу к результатам.

Подготовка данных

Для начала работы напишем скрипт обработки данных. Цель скрипта — из множества текстовых файлов, в каждом из которых лежит по песне, собрать единую csv-таблицу. При этом текст треков очищаем от знаков пунктуации и ненужных слов.

import pandas as pd
import re
import string
import pymorphy2
from nltk.corpus import stopwords

Создаём морфологический анализатор и расширяем список всего, что нужно отбросить:

morph = pymorphy2.MorphAnalyzer()
stopwords_list = stopwords.words('russian')
stopwords_list.extend(['куплет', 'это', 'я', 'мы', 'ты', 'припев', 'аутро', 'предприпев', 'lyrics', '1', '2', '3', 'то'])
string.punctuation += '—'

Названия песен приведены на английском — создадим словарь для перевода на русский и словарь, из которого позднее сделаем таблицу:

result_dict = dict()

songs_dict = {
    'snow':'снег идёт',
    'crimea':'крым',
    'mother':'мама',
    'ostin':'остин',
    'abuse':'абьюз',
    'wait_for_me':'жди меня',
    'tom':'том',
    'come_on':'камон',
    'coat':'пальто',
    'this_summer':'этим летом',
    'ok':'ок',
    'pills':'таблетки'
}

Опишем несколько функций. Первая читает целиком песню из файла и удаляет переносы строки, вторая очищает текст от ненужных символов и слов, а третья при помощи морфологического анализатора pymorphy2 приводит слова к нормальной форме. Модуль pymorphy2 не всегда хорошо справляется с неоднозначностью — для слов «ад» и «рай» потребуется дополнительная обработка.

def read_song(filename):
    f = open(f'{filename}.txt', 'r').read()
    f = f.replace('\n', ' ')
    return f

def clean_string(text):
    text = re.split(' |:|\.|\(|\)|,|"|;|/|\n|\t|-|\?|\[|\]|!', text)
    text = ' '.join([word for word in text if word not in string.punctuation])
    text = text.lower()
    text = ' '.join([word for word in text.split() if word not in stopwords_list])
    return text

def string_to_normal_form(string):
    string_lst = string.split()
    for i in range(len(string_lst)):
        string_lst[i] = morph.parse(string_lst[i])[0].normal_form
        if (string_lst[i] == 'аду'):
            string_lst[i] = 'ад'
        if (string_lst[i] == 'рая'):
            string_lst[i] = 'рай'
    string = ' '.join(string_lst)
    return string

Проходим по каждой песне и читаем файл с соответствующим названием:

name_list = []
text_list = []
for song, name in songs_dict.items():
    text = string_to_normal_form(clean_string(read_song(song)))
    name_list.append(name)
    text_list.append(text)

Затем объединяем всё в DataFrame и сохраняем в виде csv-файла.

df = pd.DataFrame()
df['name'] = name_list
df['text'] = text_list
df['time'] = [290, 220, 187, 270, 330, 196, 207, 188, 269, 189, 245, 244]
df.to_csv('borderline.csv', index=False)

Результат:

Облако слов по всему альбому

Начнём анализ с построения облака слов — оно отобразит, какие слова чаще всего встречаются в песнях. Импортируем нужные библиотеки, читаем csv-файл и устанавливаем конфигурации:

import nltk
from wordcloud import WordCloud
import pandas as pd
import matplotlib.pyplot as plt
from nltk import word_tokenize, ngrams

%matplotlib inline
nltk.download('punkt')
df = pd.read_csv('borderline.csv')

Теперь создаём новую фигуру, устанавливаем параметры оформления и при помощи библиотеки wordcloud отображаем слова с размером прямо пропорциональным частоте упоминания слова. Над каждым графиком дополнительно указываем название песни.

fig = plt.figure()
fig.patch.set_facecolor('white')
plt.subplots_adjust(wspace=0.3, hspace=0.2)
i = 1
for name, text in zip(df.name, df.text):
    tokens = word_tokenize(text)
    text_raw = " ".join(tokens)
    wordcloud = WordCloud(colormap='PuBu', background_color='white', contour_width=10).generate(text_raw)
    plt.subplot(4, 3, i, label=name,frame_on=True)
    plt.tick_params(labelsize=10)
    plt.imshow(wordcloud)
    plt.axis("off")
    plt.title(name,fontdict={'fontsize':7,'color':'grey'},y=0.93)
    plt.tick_params(labelsize=10)
    i += 1

EDA текстов альбома

Теперь проанализируем тексты песен — импортируем библиотеки для работы с данными и визуализации:

import plotly.graph_objects as go
import plotly.figure_factory as ff
from scipy import spatial
import collections
import pymorphy2
import gensim

morph = pymorphy2.MorphAnalyzer()

Сначала посчитаем число слов в каждой песне, число уникальных слов и процентное соотношение:

songs = []
total = []
uniq = []
percent = []

for song, text in zip(df.name, df.text):
    songs.append(song)
    total.append(len(text.split()))
    uniq.append(len(set(text.split())))
    percent.append(round(len(set(text.split())) / len(text.split()), 2) * 100)

А теперь составим из этого DataFrame и дополнительно посчитаем число слов в минуту для каждой песни:

df_words = pd.DataFrame()
df_words['song'] = songs
df_words['total words'] = total
df_words['uniq words'] = uniq
df_words['percent'] = percent
df_words['time'] = df['time']
df_words['words per minute'] = round(total / (df['time'] // 60))
df_words = df_words[::-1]

Данные хорошо бы визуализировать — построим две столбиковые диаграммы: одну для числа слов в песне, а другую для числа слов в минуту.

colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(62,142,231,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='📝 Всего слов',
           text=df_words['total words'],
           textposition='auto',
           x=df_words.song,
           y=df_words['total words'],
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),),
    go.Bar(name='🌀 Уникальных слов',
           text=df_words['uniq words'].astype(str) + '<br>'+ df_words.percent.astype(int).astype(str) + '%' ,
           textposition='inside',
           x=df_words.song,
           y=df_words['uniq words'],
           textfont_color='white',
           marker_color=colors_2,
           marker=dict(line=dict(width=0)),),
])

fig.update_layout(barmode='group')

fig.update_layout(
    title = 
        {'text':'<b>Соотношение числа уникальных слов к общему количеству</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
)
fig.update_layout(legend=dict(
    yanchor="top",
    xanchor="right",
))

fig.show()
colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(238,85,59,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='⏱️ Длина трека, мин.',
           text=round(df_words['time'] / 60, 1),
           textposition='auto',
           x=df_words.song,
           y=-df_words['time'] // 60,
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),
          ),
    go.Bar(name='🔄 Слов в минуту',
           text=df_words['words per minute'],
           textposition='auto',
           x=df_words.song,
           y=df_words['words per minute'],
           marker_color=colors_2,
           textfont_color='white',
           marker=dict(line=dict(width=0)),
          ),
])

fig.update_layout(barmode='overlay')

fig.update_layout(
    title = 
        {'text':'<b>Длина трека и число слов в минуту</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)'
)


fig.show()

Работа с Word2Vec моделью

При помощи модуля gensim загружаем модель, указывая на бинарный файл:

model = gensim.models.KeyedVectors.load_word2vec_format('model.bin', binary=True)

Для материала мы использовали готовую обученную на Национальном Корпусе Русского Языка модель от сообщества RusVectōrēs

Модель Word2Vec основана на нейронных сетях и позволяет представлять слова в виде векторов, учитывая семантическую составляющую. Это означает, что если мы возьмём два слова — например, «мама» и «папа», представим их в виде двух векторов и посчитаем косинус, значения будет близко к 1. Аналогично, у двух слов, не имеющих ничего общего по смыслу косинусная мера близка к 0.

Опишем функцию get_vector: она будет принимать список слов, распознавать для каждого часть речи, а затем получать и суммировать вектора — так мы сможем находить вектора не для одного слова, а для целых предложений и текстов.

def get_vector(word_list):
    vector = 0
    for word in word_list:
        pos = morph.parse(word)[0].tag.POS
        if pos == 'INFN':
            pos = 'VERB'
        if pos in ['ADJF', 'PRCL', 'ADVB', 'NPRO']:
            pos = 'NOUN'
        if word and pos:
            try:
                word_pos = word + '_' + pos
                this_vector = model.word_vec(word_pos)
                vector += this_vector
            except KeyError:
                continue
    return vector

Для каждой песни находим вектор и собираем соответствующий столбец в DataFrame:

vec_list = []
for word in df['text']:
    vec_list.append(get_vector(word.split()))
df['vector'] = vec_list

Теперь сравним вектора между собой, посчитав их косинусную близость. Те песни, у которых косинусная метрика выше 0,5 запомним отдельно — так мы получим самые близкие пары песен. Данные о сравнении векторов запишем в двумерный список result.

similar = dict()
result = []
for song_1, vector_1 in zip(df.name, df.vector):
    sub_list = []
    for song_2, vector_2 in zip(df.name.iloc[::-1], df.vector.iloc[::-1]):
        res = 1 - spatial.distance.cosine(vector_1, vector_2)
        if res > 0.5 and song_1 != song_2 and (song_1 + ' / ' + song_2 not in similar.keys() and song_2 + ' / ' + song_1 not in similar.keys()):
            similar[song_1 + ' / ' + song_2] = round(res, 2)
        sub_list.append(round(res, 2))
    result.append(sub_list)

Самые похожие треки соберём в отдельный DataFrame:

df_top_sim = pd.DataFrame()
df_top_sim['name'] = list(similar.keys())
df_top_sim['value'] = list(similar.values())
df_top_sim.sort_values(by='value', ascending=False)

И построим такой же bar chart:

colors = ['rgba(101,181,205,255)'] * 5

fig = go.Figure([go.Bar(x=df_top_sim['name'],
                        y=df_top_sim['value'],
                        marker_color=colors,
                        width=[0.4,0.4,0.4,0.4,0.4],
                        text=df_top_sim['value'],
                        textfont_color='white',
                        textposition='auto')])

fig.update_layout(
    title = 
        {'text':'<b>Топ-5 схожих песен</b><br><span style="color:#666666"></span>'},
    showlegend = False,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
    xaxis={'categoryorder':'total descending'}
)

fig.show()

Имея вектор каждой песни, давайте посчитаем вектор всего альбома — сложим вектора песен. Затем для такого вектора при помощи модели получим самые близкие по духу и смыслу слова.

def get_word_from_tlist(lst):
    for word in lst:
        word = word[0].split('_')[0]
        print(word, end=' ')

vec_sum = 0
for vec in df.vector:
    vec_sum += vec
sim_word = model.similar_by_vector(vec_sum)
get_word_from_tlist(sim_word)

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Наверное, это ключевой результат и описание альбома Земфиры всего лишь в 10 словах.

Наконец, построим общую тепловую карту, каждая ячейка которой — результат сравнения косинусной мерой текстов двух треков.

colorscale=[[0.0, "rgba(255,255,255,255)"],
            [0.1, "rgba(229,232,237,255)"],
            [0.2, "rgba(216,222,232,255)"],
            [0.3, "rgba(205,214,228,255)"],
            [0.4, "rgba(182,195,218,255)"],
            [0.5, "rgba(159,178,209,255)"],
            [0.6, "rgba(137,161,200,255)"],
            [0.7, "rgba(107,137,188,255)"],
            [0.8, "rgba(96,129,184,255)"],
            [1.0, "rgba(76,114,176,255)"]]

font_colors = ['black']
x = list(df.name.iloc[::-1])
y = list(df.name)
fig = ff.create_annotated_heatmap(result, x=x, y=y, colorscale=colorscale, font_colors=font_colors)
fig.show()

Результаты анализа и интерпретация данных

Давайте ещё раз посмотрим на всё, что у нас получилось — начнём с облака слов. Нетрудно заметить, что у слов «боль», «невозможно», «сорваться», «растерзаны», «сложно», «терпеть», «любить» размер весьма приличный — всё потому, что такие слова встречаются часто на протяжении всего текста песен:

Одной из самых «разнообразных» песен оказался сингл «крым» — в нём 74% уникальных слов. А в песне «снег идёт» слов совсем мало, поэтому большинство — 82% уникальны. Самой большой песней в альбоме получился трек «таблетки» — суммарно там около 150 слов.

Как было выяснено на прошлом графике, самый «динамичный» трек — «таблетки», целых 37 слов в минуту — практически по слову на каждые две секунды. А самый длинный трек — «абъюз», в нём же и согласно предыдущему графику практически самый низкий процент уникальных слов — 46%.

Топ-5 самых семантически похожих пар текстов:

Ещё мы получили вектор всего альбома и подобрали самые близкие слова. Только посмотрите на них — «тьма», «тоска», «плакать», «горе», «печаль», «сердце» — это же ведь и есть тот перечень слов, который характеризует лирику Земфиры!

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Финал — тепловая карта. По визуализации заметно, что практически все песни достаточно схожи между собой — косинусная мера у многих пар превышает значение в 0.4.

Выводы

В материале мы провели EDA всего текста нового альбома и при помощи предобученной модели Word2Vec доказали гипотезу — большинство песен «бордерлайна» пронизывают довольно мрачные и тексты. И это нормально, ведь Земфиру мы любим именно за искренность и прямолинейность.

 1 комментарий    1270   2021   analysis   Analytics Engineering   Data Analytics   plotly   python   бордерлайн   земфира

Экспорт исторических данных Apple Health в Google Sheets

Время чтения текста – 9 минут

Для устройств на базе iOS и watchOS существует приложение Health, которое ежедневно записывает все данные о здоровье носителя и синхронизирует их со сторонними приложениями. Все эти данные в любой момент можно получить прямо из приложения в виде XML-документа. Сегодня мы выгрузим исторические данные о здоровье из приложения Apple Health, обработаем их и отправим в Google Sheets для анализа и визуализации в будущем.

Экспорт архива из приложения

Зайдите в приложение Health на iPhone. Нажмите на аватарку своего профиля в верхнем правом углу — откроется меню приложения.

Внизу нажмите на кнопку «Экспортировать медданные». Через некоторое время откроется меню экспорта — отправьте архив себе на компьютер любым способом, можно по AirDrop или даже по почте в письме самому себе. Из архива нужен только один файл — «экспорт.xml». Достаньте его и положите в папку с ноутбуком jupyter.

Парсер XML в DataFrame

При помощи библиотеки XML составляем дерево на основе документа из Health. Собирать в словарь будем следующие атрибуты: тип, единица измерения, дата создания, дата начала, дата конца, значение. Проходим по всему дереву и отправляем полученные значения атрибутов в records_dict.

from xml.etree import ElementTree
import pandas as pd
import datetime

tree = ElementTree.parse('экспорт.xml')
root = tree.getroot()
records = root.findall('Record')

records_dict = {
    'type':[],
    'unit':[],
    'creationDate':[],
    'startDate':[],
    'endDate':[],
    'value':[]
}

for record in records:
    for attribute in records_dict.keys():
        attribute_value = record.get(attribute)
        records_dict[attribute].append(attribute_value)

События записаны в нечитабельном виде — для перевода составим специальный словарь с нужными типами, где ключ — старое название, а значение — новое. Мы возьмём только 11 событий: минуты осознанности, дистанция на велосипеде, дистанция заплыва, дистанция ходьбы и бега, пройдено пролётов, пульс, пульс в покое, шаги, активная энергия, энергия покоя и средний пульс при ходьбе.

types_dict = {
    'HKCategoryTypeIdentifierMindfulSession': 'Mindful Session',
    'HKQuantityTypeIdentifierDistanceCycling': 'Cycling Distance',
    'HKQuantityTypeIdentifierDistanceSwimming': 'Swimming Distance',
    'HKQuantityTypeIdentifierDistanceWalkingRunning': 'Walking + Running Distance',
    'HKQuantityTypeIdentifierFlightsClimbed': 'Flights Climbed',
    'HKQuantityTypeIdentifierHeartRate': 'Heart Rate',
    'HKQuantityTypeIdentifierRestingHeartRate': 'Resting Heart Rate',
    'HKQuantityTypeIdentifierStepCount': 'Steps',
    'HKQuantityTypeIdentifierActiveEnergyBurned': 'Active Calories',
    'HKQuantityTypeIdentifierBasalEnergyBurned': 'Resting Calories',
    'HKQuantityTypeIdentifierWalkingHeartRateAverage': 'Walking Heart Rate Average'
}

Для минут осознанности в поле значения записей нет — мы сами посчитаем позже это поле как разницу даты окончания и начала события. Разница будет представлена как timedelta, поэтому напишем функцию перевода timedelta в минуты:

def td_to_m(td):
    seconds = td.seconds + td.days * 24 * 60 * 60
    return seconds // 60

Из словаря создаём DataFrame и задаём названия колонок. Оставляем только те 11 событий, которые есть в словаре types_dict и приводим все колонки к нужным типам данных:

df = pd.DataFrame(records_dict)
df.columns = ['type', 'unit', 'date', 'start', 'end', 'value']
df = df[df['type'].isin(types_dict.keys())]
df['value'] = df['value'].astype(float)
df['date'] = df['date'].astype('datetime64')
df['date'] = df['date'].dt.date
df['start'] = df['start'].astype('datetime64')
df['end'] = df['end'].astype('datetime64')
df['unit'] = df['unit'].astype(str)

Данные Health при экспорте никак не группируются — мы сделаем это самостоятельно. DataFrame можно поделить на три: в первом будут события, у которых единица измерения «количество в минуту» — для таких событий нужно искать среднее значение. В другой группе будут минуты осознанности — считаем число минут в каждой записи и суммируем. В последней группе находятся все остальные записи, связанные с количественными событиями — шаги, дистанция ходьбы и бега и так далее. Их тоже суммируем.

df_1 = df[df['unit'] == 'count/min']
df_1 = df_1.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'mean'})

df_2 = df[df['type'] == 'HKCategoryTypeIdentifierMindfulSession']
df_2['value'] = df_2['end'] - df_2['start']
df_2['value'] = df_2['value'].map(td_to_m)
df_2 = df_2.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                     'end':'max',
                                                                     'value':'sum'})
df_3 = df[(df['unit'] != 'count/min') & (df['type'] != 'HKCategoryTypeIdentifierMindfulSession')]
df_3 = df_3.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'sum'})
df = pd.concat([df_1, df_2, df_3])

Дату создания записи переводим в строковый тип. Все наименования типов событий заменяем согласно словарю types_dict. В переменную dates записываем все уникальные даты.

df['date'] = df['date'].astype(str)
df['type'] = df['type'].apply(lambda x: types_dict[x])
dates = df['date'].unique()

В результате нужен словарь с колонкой даты и отдельной колонкой под каждое из 11 событий:

result = {
    'date': [],
    'Steps': [],
    'Walking + Running Distance': [],
    'Swimming Distance': [],
    'Cycling Distance': [],
    'Resting Calories': [],
    'Active Calories': [],
    'Flights Climbed': [],
    'Heart Rate': [],
    'Resting Heart Rate': [],
    'Walking Heart Rate Average': [],
    'Mindful Session': []
}

Проходим по каждой дате и получаем кусок DataFrame за эту дату. Добавляем её в словарь и проходим по каждому ключу, пробуя добавить значение:

for date in dates:
    part = df[df['date'] == date]
    result['date'].append(date)
    for key in result.keys():
        if key == 'date':
            continue
        else:
            field = 'value'
        try:
            result[key].append(part[part['type'] == key][field].values[0])
        except IndexError:
            result[key].append(None)

Из полученного словаря создаём DataFrame, округляем всё до двух знаков после запятой и сортируем по дате:

result_df = pd.DataFrame(result)
result_df = result_df.round(2)
result_df = result_df.sort_values(by='date')

В результате получается такая таблица с историческими данными по 11 событиям:

Экспорт DataFrame в Google Sheets

Для экспорта в Google Docs необходим сервисный аккаунт и json-файл с ключом. О том, как его получить, мы писали в материале «Собираем данные по рекламным кампаниям ВКонтакте»

Создайте новый документ в Google Sheets. Весь DataFrame можно вставить одним действием при помощи методов библиотеки gspread. Импортируйте её, а также укажите идентификатор документа и json-файл с ключом. В методе get_worksheet указывается порядковый номер листа в файле начиная с нуля.

import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
gc = gspread.service_account(filename='serviceAccount.json')
sh = gc.open_by_key('1osKA63LQkUC0FC0eIZ63jEJwn1TeIkUvqCV6ur')
worksheet = sh.get_worksheet(0)

В итоге в Google Spreadsheets появится такая таблица:

А в следующем материале посмотрим, как наладить ежедневный экспорт данных Здоровья в эту таблицу при помощи шорткатов и Google AppScript!

 Нет комментариев    326   2021   Analytics Engineering   apple health   Data Analytics   pandas   python

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

 Нет комментариев    463   2021   Analytics Engineering   python   sql   sqlalchemy

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 6 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

Для запуска в Jupyter Notebook описанный ниже код можно просто вставить в ячейку, но при запуске из Python-файла будет такая ошибка:

SyntaxError: 'await' outside function

Устранить проблему можно, записав весь код в асинхронную функцию. Целиком выглядеть код будет так:

from telethon import TelegramClient
import pandas as pd
import asyncio

async def main():
        api_id = 1234567
        api_hash = '1b42hj25kd8jw42b234kwj242c'
        phone = '+71234567890'
        channel_href = 'https://t.me/leftjoin'

	client = TelegramClient('session_name', api_id, api_hash)
	client = await client.start()
	dialogs = await client.get_dialogs()

	channels = {d.entity.username: d.entity
				for d in dialogs
				if d.is_channel}
	my_channel = channel_href.split('/')[-1]
	channel = channels[my_channel]

	members_telethon_list = await client.get_participants(channel, aggressive=True)

	username_list = [member.username for member in members_telethon_list]
	first_name_list = [member.first_name for member in members_telethon_list]
	last_name_list = [member.last_name for member in members_telethon_list]
	phone_list = [member.phone for member in members_telethon_list]

	df = pd.DataFrame()
	df['username'] = username_list
	df['first_name'] = first_name_list
	df['last_name'] = last_name_list
	df['phone'] = phone_list
	df.to_csv('subscribers.csv', index=False)

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	loop.run_until_complete(main())
 32 комментария    11066   2021   Analytics Engineering   python   telegram   telethon

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

 Нет комментариев    34   2021   Analytics Engineering   clickhouse   Data Analytics   data stack   reda

Робот для автоматизированного просмотра Instagram на Python и Selenium

Время чтения текста – 13 минут

Недавно мы начали вести Instagram — подписывайтесь, чтобы не пропустить контент, которого нет в блоге и Telegram!

Многие из нас ежедневно заходят в Instagram, чтобы посмотреть истории друзей и полистать ленту постов и рекомендаций. Предлагаем действенный способ сохранить своё время — напишем на Python и Selenium робота, который возьмёт на себя рутинную задачу проверки свежих новостей друзей и подсчитает число новых историй и входящих сообщений.

Авторизация в аккаунт

При переходе в браузерную версию сайта, нас встречает такое окно:

Но просто вставить логин, пароль и нажать на кнопку «Войти» недостаточно: впереди будет ещё два окна. Во-первых, предложение сохранить данные — здесь мы тактично жмём «Не сейчас». Instagram тщательно следит за каждым нашим действием и малейшие аномалии в поведении приводят к блокировке, поэтому любые предложения по сохранению данных будем на всякий случай пропускать.

Следующим препятствием будет предложение включить уведомление, которое мы тоже пропустим:

Первым делом импортируем библиотеки:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs
import time
import random

И описываем функцию authorize — она будет принимать driver в качестве аргумента, отправлять в нужные поля логин и пароль, нажимать на кнопку «Войти», затем ждать десять секунд на загрузку страницы, нажимать на кнопку «Не сейчас», снова ждать загрузки страницы и пропускать уведомления:

def authorize(driver):
    username = 'login'
    password = 'password'
    driver.get('https://www.instagram.com')
    time.sleep(5)
    driver.find_element_by_name("username").send_keys(username)
    driver.find_element_by_name("password").send_keys(password)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('aOOlW   HoLwm ')[0].click()")

Новые сообщения

В Instagram могут прийти сообщения двух видов. В случае, если вы не подписаны на отправителя — придёт запрос на диалог. Если подписаны — придёт входящее сообщения. Оба случая обрабатываются по-разному. Число входящих сообщений можно получить с главной страницы — это число над иконкой бумажного самолётика:

А число запросов можно забрать текстом заголовка h5 из раздела «Сообщения». Сперва перейдём в этот раздел и попробуем найти строку с запросами на сообщение. Затем вернёмся на главную страницу и возьмём то самое число новых сообщений.

def messages_count(driver):
    driver.get('https://www.instagram.com/direct/inbox/')
    time.sleep(2)
    inbox = bs(driver.page_source)
    try:
        queries_text = inbox.find_all('h5')[0].text
    except Exception:
        queries_text = None
    driver.get('https://www.instagram.com')
    time.sleep(2)
    content = bs(driver.page_source)
    try:
        messages_count = int(content.find_all('div', attrs={'class':'KdEwV'})[0].text)
    except Exception:
        messages_count = 0
    return queries_text, messages_count

Подсчёт числа новых сторис

Все истории хранятся в одном блоке:

Это список с одинаковым классом, но в каждом элементе списка лежит ещё один div-блок. У новых историй это класс eebAO h_uhZ, у просмотренных — eebAO.

Ещё есть такая кнопка, которая показывает следующую пачку историй:

При этом Instagram динамически прогружает код страницы, и в нём не найти те элементы, которые вы не видите своими глазами. Поэтому мы возьмём первые 8 видимых новых историй, добавим в список, нажмём на кнопку «Показать следующие истории» и будем продолжать так, пока кнопка ещё отображается. А затем подсчитаем число уникальных элементов, чтобы избежать возможных дубликатов.

def get_stories_count(driver):
    stories_divs = []
    scroll = True
    while scroll:
        try:
            content = bs(driver.page_source)
            stories_divs.extend(content.find_all('div', attrs={'class':'eebAO h_uhZ'}))
            driver.execute_script("document.getElementsByClassName('  _6CZji oevZr  ')[0].click()")
            time.sleep(1)
        except Exception as E:
            scroll = False
    return len(set(stories_divs))

Просмотр сторис

Следующее, чем может заняться реальный пользователь после авторизации — просмотр свежих историй. Для того, чтобы зайти в блок историй, нужно просто нажать на кнопку класса OE3OK:

Есть еще две кнопки, о которых мы должны знать. Это кнопка для переключения на следующую историю — она в классе FhutL и кнопка закрытия блока историй — класс wpO6b. Пускай одна история будет отнимать у нас от 10 до 15 секунд, и с вероятностью 1/5 мы переключим на следующую. При этом зададим переменные counter и limit — пусть сейчас мы хотим посмотреть случайное число историй от 5 до 45, и если мы уже посмотрели столько, то выходим из функции и историй.

def watch_stories(driver):
    watching = True
    counter = 0
    limit = random.randint(5, 45)
    driver.execute_script("document.getElementsByClassName('OE3OK ')[0].click()")
    try:
        while watching:
            time.sleep(random.randint(10, 15))
            if random.randint(1, 5) == 5:
                driver.execute_script("document.getElementsByClassName('FhutL')[0].click()")
            counter += 1
            if counter > limit:
                driver.execute_script("document.getElementsByClassName('wpO6b ')[1].click()")
                watching = False
    except Exception as E:
        print(E)
        watching = False

Скроллинг ленты

После просмотра актуальных историй можно поскроллить ленту — это действие ничем не отличается от классического скроллинга страниц в Selenium. Запоминаем последнюю доступную длину страницы, скроллим до неё, ожидаем прогрузки, получаем новую. Прекратим просматривать ленту в двух случаях — если в random.randint() сгенерировалась единица или если лента кончилась.

def scroll_feed(driver):
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height

Просмотр рекомендуемых аккаунтов

Instagram в заглавной странице сам рекомендует нам для подписки некоторые аккаунты. Выглядит она так:

И на ней тоже придётся скроллить, чтобы дойти до конца. Заходим на страницу и ожидаем 5 секунд прогрузки, затем снова получаем длину страницы и скроллим вниз. Выходим тоже с вероятностью 1/10 или если страница кончилась, но ещё с вероятностью 1/2 подписываемся на некоторые из первых 100 аккаунтов рекомендаций:

def scroll_recomendations(driver):
   driver.get('https://www.instagram.com/explore/people/suggested/')
    time.sleep(5)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height
        if random.randint(0, 1):
            try:
                driver.execute_script(f"document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[{random.randint(1,100)}].click()")
            except Exception as E:
                print(E)

Просмотр рекомендуемых постов

Помимо ленты, которая сформирована из наших подписок, Instagram собирает ленту рекомендаций. Туда входят все посты, которые потенциально могут вам понравиться — мы просто пройдём вниз по этой ленте. Выйдем с вероятностью 1/5 или когда кончится, чтобы долго не засиживаться.

def scroll_explore(driver):
    driver.get('https://www.instagram.com/explore')
    time.sleep(3)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 5) == 1:
            scrolling = False
        last_height = new_height

Итог

Теперь можно собрать все функции вместе — создаём новый driver, проводим авторизацию, считаем число новых сторис и сообщений, просматриваем сторис, переходим в рекомендуемые подписки и листаем ленту. В конце печатаем полученные данные — число новых сообщений, запросов и историй друзей.

driver = webdriver.Chrome(ChromeDriverManager().install())
authorize(driver)
queries_text, messages_count = messages_count(driver)
stories_count = get_stories_count(driver)
watch_stories(driver)
scroll_recomendations(driver)
scroll_feed(driver)
scroll_explore(driver)

if queries_text is not None:
    print(queries_text)
else:
    print('Нет новых запросов на диалог')
print('Новых сообщений:', messages_count)

print('Новых историй:', stories_count)
 Нет комментариев    181   2021   analysis   Analytics Engineering   instagram   python   selenium

Парсинг целевой аудитории ВКонтакте

Время чтения текста – 7 минут

При размещении рекламы некоторые площадки в настройках аудитории позволяют загрузить список конкретных людей, которые увидят рекламу. Для парсинга id по конкретным пабликам существуют специальные инструменты, но куда интереснее (и дешевле) сделать это собственноручно при помощи Python и VK API. Сегодня расскажем, как для рекламной кампании LEFTJOIN мы спарсили целевую аудиторию и загрузили её в рекламный кабинет.

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

Парсинг пользователей

Для отправки запросов потребуется токен пользователя и список пабликов, чьих участников мы хотим получить. Мы собрали около 30 сообществ, посвящённых аналитике, BI-инструментам и Data Science.

import requests
import time

group_list =  ['datacampus', '185023286', 'data_mining_in_action', '223456', '187222444', 'nta_ds_ai', 'business__intelligence', 'club1981711', 'datascience', 'ozonmasters', 'businessanalysts', 'datamining.team', 'club.shad', '174278716', 'sqlex', 'sql_helper', 'odssib', 'sapbi', 'sql_learn', 'hsespbcareer', 'smartdata', 'pomoshch_s_spss', 'dwhexpert', 'k0d_ds', 'sql_ex_ru', 'datascience_ai', 'data_club', 'mashinnoe_obuchenie_ai_big_data', 'womeninbigdata', 'introstats', 'smartdata', 'data_mining_in_action', 'dlschool_mipt']

token = 'ваш_токен'

Запрос на получение участников сообщества к API ВКонтакте вернёт максимум 1000 строк — для получения последующих тысяч потребуется смещать параметр offset на единицу. Но нужно знать, до какого момента это делать — поэтому опишем функцию, которая принимает id сообщества, получает информацию о числе участников сообщества и возвращает максимальное значение для offset — отношение числа участников к 1000, ведь мы можем получить ровно тысячу человек за раз.

def get_offset(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':0,
            'fields':'last_seen'
        }).json()['response']['count']
    return count // 1000

Следующим этапом опишем функцию, которая принимает id сообщества, собирает в один список id всех подписчиков и возвращает его. Для этого отправляем запросы на получение 1000 человек, пока не кончается offset, вносим данные в список и возвращаем его. Проходя по каждому человеку дополнительно проверяем дату его последнего посещения социальной сети — если он не заходил с середины ноября, добавлять его не будем. Время указывается в формате unixtime.

def get_users(group_id):
    good_id_list = []
    offset = 0
    max_offset = get_offset(group_id)
    while offset < max_offset:
        response = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':offset,
            'fields':'last_seen'
        }).json()['response']
        offset += 1
        for item in response['items']:
            try:
                if item['last_seen']['time'] >= 1605571200:
                    good_id_list.append(item['id'])
            except Exception as E:
                continue
    return good_id_list

Теперь пройдём по всем сообществам из списка и для каждого соберём участников, а затем внесём их в общий список all_users. В конце переводим сначала список в множество, а затем опять в список, чтобы избавиться от возможных дубликатов: одни и те же люди могли быть участниками разных пабликов. Лишним не будет после каждого паблика приостановить работу программы на секунду, чтобы не столкнуться с ограничениями на число запросов.

all_users = []

for group in group_list:
    print(group)
    try:
        users = get_users(group)
        all_users.extend(users)
        time.sleep(1)
    except KeyError as E:
        print(group, E)
        continue

all_users = list(set(all_users))

Последним шагом записываем каждого пользователя в файл с новой строки.

with open('users.txt', 'w') as f:
    for item in all_users:
        f.write("%s\n" % item)

Аудитория в рекламном кабинете из файла

Переходим в свой рекламный кабинет ВКонтакте и заходим во вкладку «Ретаргетинг». Там будем кнопка «Создать аудиторию»:

После нажатия на неё откроется новое окно, где можно будет выбрать в качестве источника файл и указать название для аудитории:

После загрузки пройдёт несколько секунд и аудитория будет доступна. Первые минут 10 будет указано, что аудитория слишком мала: это не так и панель вскоре обновится, если в вашей аудитории действительно более 100 человек.

Итоги

Сравним среднюю стоимость привлечённого в наше сообщество участника в объявлении с автоматической настройкой аудитории и в объявлении, аудиторию для которого мы спарсили. В первом случае получаем среднюю стоимость в 52,4 рубля, а во втором — в 33,2 рубля. Подбор качественной аудитории при помощи методов парсинга данных из ВКонтакте помог снизить среднюю стоимость на 37%.

Для рекламной кампании мы подготовили такой пост (нажмите на картинку, чтобы перейти к нему):

 3 комментария    320   2020   Analytics Engineering   api   python   vk   vk api

Итоги прохождения курса по dbt

Время чтения текста – 4 минуты

Недавно прошёл курс по dbt от команды dbt. Курс классный, в нем много практики. Я использовал Google BigQuery и публичные датасеты от dbt для решения описанных примеров, а в обучающих материалах все построено на Snowflake.

В целом, узнал много нового и полезного о dbt, кратко summary:

  1. Во введении ребята объясняют роль Analytics Engineer, о котором так много разговоров и ссылаются на их пост блога
  2. Дается исчерпывающая информация о том, как подключить dbt к хранилищу и .git
  3. В dbt довольно тривиальными запросами реализовано тестирование данных на предмет уникальности и соответствия значениям. Это реально базовые SQL-запросы, которые проверяют наличие или отсутствие поля или значений. И тут интересно следующее: когда пишешь самостоятельно похожие запросы иногда думаешь, что во всем остальном мире так никто не делает, ну, к примеру, как в запросе ниже. А оказывается еще как делают, вот даже публично внутри dbt все эти тесты так и реализованы. И, кстати, крайне удобно, что SQL-код каждого теста можно изучить и скомпилировать.
SELECT sum(amount) FROM ... HAVING sum(amount) > 0
  1. Круто и удобно формируется документация и DAG (Directed Acyclic Graph), который показывает все шаги преобразований модели
  2. Поскольку dbt построен на Liquid и использовании Jinja (движок шаблонов в Python), то можно делать всякие невероятные вещи вроде написания внутреннего макроса (читай, условный операторы, циклы или создание функций) и применять этот макрос для автоматизации однотипных частей запроса. Это прям вау 🙂
  3. Многие вещи уже придуманы и разработаны коммьюнити, поэтому существует dbt hub, через который можно подключить интересующие пакеты и не изобретать велосипед.
  4. Отдельного упоминания достойны алгоритмы формирования инкрементального наполнения таблиц и создания снэпшотов. Для одного из проектов абсолютно такой же алгоритм по созданию снэпшотов с date_form / date_to мне доводилось проектировать самостоятельно.
    Было приятно увидеть, что у ребят из dbt это работает абсолютно аналогичным образом.
  1. Разумеется, используя Jinja и dbt, можно автоматизировать построение аналитических запросов, это так и называется Analyses. Скомпилированный код запроса, можно имплементировать в любимую BI-систему и наслаждаться результатами.

Общие впечатления очень положительные: dbt ждет большое будущее и развитие, ведь коммьюнити растет вместе с возможностями и ресурсами компании. Ждем коннекторов к другим СУБД помимо PostgreSQL, BigQuery, Snowflake, Redshift.

 Нет комментариев    96   2020   Analytics Engineering   dbt   sql

Строим Motion chart по индексу Биг Мака на Python

Время чтения текста – 14 минут

Одной из самых знаменитых визуализаций, конечно же, является работа Hans Rosling и его знаменитое выступление про изменение уровня экономики в странах. Посмотрите это видео, если вдруг еще не видели:

Иногда у экономистов возникает желание сравнить уровень жизни в разных странах. Одной из таких опций считается индекс Биг Мака, учёт которого журнал «The Economist» ведёт с 1986 года. Основная мысль — изучить паритет покупательской способности в разных странах, максимально учитывая стоимость внутреннего производства. В производстве Биг Мака участвует стандартный набор ингредиентов, одинаковый во всех странах: сыр, мясо, хлеб и овощи. Считается, что все эти ингредиенты произведены локально, а, значит, цена на Биг Мак позволяет сравнивать покупательскую способность в разных странах на данный товар. Помимо этого, McDonalds — глобальный бренд и его рестораны есть в огромном количестве стран, что обеспечивает широкий охват Биг Маком.

Сегодня при помощи библиотеки Plotly построим Motion Chart для индекса Биг Мака. Мы, следуя за Hann Rosling, хотим получить Motion Chart, где по оси X будет численность населения, по Y — ВВП на душу населения в долларах, а размер точек будет обозначать индекс Биг Мака в данной стране. Кроме того, цвет точки будет обозначать континент, на котором расположилась страна.

Подготовка данных

Хотя «The Economist» ведёт учёт уже более 30 лет и делится своими наблюдениями в интернете, датасет содержит множество пропусков по разным странам. В то же время в датасете журнала не представлены названия континентов, к которым принадлежат страны и численность населения. Поэтому мы дополним данные журнала тремя другими датасетами, представленными в нашем репозитории.

Начнём с импорта библиотек:

import pandas as pd
from pandas.errors import ParserError
import plotly.graph_objects as go
import numpy as np
import requests
import io

Прочитаем все 4 датасета прямо из GitHub. Для этого опишем функцию, которая отправляет GET-запрос к csv-файлу и формирует из него DataFrame. По двум датасетам может возникнуть ошибка ParseError из-за наличия подписи в заглавии: пропустим несколько строк, если это произошло.

def read_raw_file(link):
    raw_csv = requests.get(link).content
    try:
        df = pd.read_csv(io.StringIO(raw_csv.decode('utf-8')))
    except ParserError:
        df = pd.read_csv(io.StringIO(raw_csv.decode('utf-8')), skiprows=3)
    return df

bigmac_df = read_raw_file('https://github.com/valiotti/leftjoin/raw/master/motion-chart-big-mac/big-mac.csv')
population_df = read_raw_file('https://github.com/valiotti/leftjoin/raw/master/motion-chart-big-mac/population.csv')
dgp_df = read_raw_file('https://github.com/valiotti/leftjoin/raw/master/motion-chart-big-mac/gdp.csv')
continents_df = read_raw_file('https://github.com/valiotti/leftjoin/raw/master/motion-chart-big-mac/continents.csv')

От датасета «The Economist» оставим только название страны, местную цену, курс доллара, код страны и дату записи. После оставим строки, записанные между 2005 и 2020 годом: данные за этот период наиболее полные. Последним действием посчитаем цену на Биг Мак в долларах: для этого цену в местной валюте поделим на валютный курс.

bigmac_df = bigmac_df[['name', 'local_price', 'dollar_ex', 'iso_a3', 'date']]
bigmac_df = bigmac_df[bigmac_df['date'] >= '2005-01-01']
bigmac_df = bigmac_df[bigmac_df['date'] < '2020-01-01']
bigmac_df['date'] = pd.DatetimeIndex(bigmac_df['date']).year
bigmac_df = bigmac_df.drop_duplicates(['date', 'name'])
bigmac_df = bigmac_df.reset_index(drop=True)
bigmac_df['dollar_price'] = bigmac_df['local_price'] / bigmac_df['dollar_ex']

Взглянем на наш DataFrame:

У нас есть датасет с континентами и странами, и нужно к bigmac_df добавить колонку «continents». Для удобства оставим от continents_df только колонки с названием континента и трёхбуквенным кодом страны, а затем для каждой страны в bigmac_df найдём континент. В случае, например, с Россией или с Турцией может произойти ошибка, ведь нельзя однозначно сказать, Европа это или Азия, так что такие страны будем определять как европейские.

continents_df = continents_df[['Continent_Name', 'Three_Letter_Country_Code']]
continents_list = []
for country in bigmac_df['iso_a3']:
    try:
        continents_list.append(continents_df.loc[continents_df['Three_Letter_Country_Code'] == country]['Continent_Name'].item())
    except ValueError:
        continents_list.append('Europe')
bigmac_df['continent'] = continents_list

Удалим использованные колонки, отсортируем для удобства по названиям стран и дате, переведём дату в числовой тип и снова взглянем на промежуточный результат:

bigmac_df = bigmac_df.drop(['local_price', 'iso_a3', 'dollar_ex'], axis=1)
bigmac_df = bigmac_df.sort_values(by=['name', 'date'])
bigmac_df['date'] = bigmac_df['date'].astype(int)

Заполним пробелы: по тем годам, где нет данных и установим цену в 0 долларов. Ещё придётся удалить Китайскую Республику — Тайвань: это частично признанное государство отсутствует в датасетах World Bank. А Арабские Эмираты повторяются дважды, с этим тоже могут возникнуть проблемы.

countries_list = list(bigmac_df['name'].unique())
years_set = {i for i in range(2005, 2020)}
for country in countries_list:
    if len(bigmac_df[bigmac_df['name'] == country]) < 15:
        this_continent = bigmac_df[bigmac_df['name'] == country].continent.iloc[0]
        years_of_country = set(bigmac_df[bigmac_df['name'] == country]['date'])
        diff = years_set - years_of_country
        dict_to_df = pd.DataFrame({
                      'name':[country] * len(diff),
                      'date':list(diff),
                      'dollar_price':[0] * len(diff),
                      'continent': [this_continent] * len(diff)
                     })
        bigmac_df = bigmac_df.append(dict_to_df)
bigmac_df = bigmac_df[bigmac_df['name'] != 'Taiwan']
bigmac_df = bigmac_df[bigmac_df['name'] != 'United Arab Emirates']

Осталось добавить ВВП на душу населения и численность населения из других датасетов. В обоих датасетах многие страны записаны иначе, поэтому пропишем словарь и переименуем все страны в обоих датасетах методом replace().

years = [str(i) for i in range(2005, 2020)]

countries_replace_dict = {
    'Russian Federation': 'Russia',
    'Egypt, Arab Rep.': 'Egypt',
    'Hong Kong SAR, China': 'Hong Kong',
    'United Kingdom': 'Britain',
    'Korea, Rep.': 'South Korea',
    'United Arab Emirates': 'UAE',
    'Venezuela, RB': 'Venezuela'
}
for key, value in countries_replace_dict.items():
    population_df['Country Name'] = population_df['Country Name'].replace(key, value)
    gdp_df['Country Name'] = gdp_df['Country Name'].replace(key, value)

Наконец, соберём данные по численности и ВВП за нужные года и добавим в основной DataFrame:

countries_list = list(bigmac_df['name'].unique())

population_list = []
gdp_list = []
for country in countries_list:
    population_for_country_df = population_df[population_df['Country Name'] == country][years]
    population_list.extend(list(population_for_country_df.values[0]))
    gdp_for_country_df = gdp_df[gdp_df['Country Name'] == country][years]
    gdp_list.extend(list(gdp_for_country_df.values[0]))
    
bigmac_df['population'] = population_list
bigmac_df['gdp'] = gdp_list
bigmac_df['gdp_per_capita'] = bigmac_df['gdp'] / bigmac_df['population']

В итоге получили такой датасет:

Формируем график в plotly

Логарифмируем значения по оси X. В Китае и Индии, например, население в 10 раз больше, чем в среднем в других странах: из-за этого получим сложно интерпретируемую визуализацию, в которой у нас будет много наблюдений около оси и несколько наблюдений справа. Логарифмирование — часто используемый экономистами прием для учета эффекта масштаба в данных.

fig_dict = {
    "data": [],
    "layout": {},
    "frames": []
}

fig_dict["layout"]["xaxis"] = {"title": "Численность населения", "type": "log"}
fig_dict["layout"]["yaxis"] = {"title": "ВВП на душу населения (в $)", "range":[-10000, 120000]}
fig_dict["layout"]["hovermode"] = "closest"
fig_dict["layout"]["updatemenus"] = [
    {
        "buttons": [
            {
                "args": [None, {"frame": {"duration": 500, "redraw": False},
                                "fromcurrent": True, "transition": {"duration": 300,
                                                                    "easing": "quadratic-in-out"}}],
                "label": "Play",
                "method": "animate"
            },
            {
                "args": [[None], {"frame": {"duration": 0, "redraw": False},
                                  "mode": "immediate",
                                  "transition": {"duration": 0}}],
                "label": "Pause",
                "method": "animate"
            }
        ],
        "direction": "left",
        "pad": {"r": 10, "t": 87},
        "showactive": False,
        "type": "buttons",
        "x": 0.1,
        "xanchor": "right",
        "y": 0,
        "yanchor": "top"
    }
]

Помимо кнопок у нас будет Slider, позволяющий получать данные за определённый год:

sliders_dict = {
    "active": 0,
    "yanchor": "top",
    "xanchor": "left",
    "currentvalue": {
        "font": {"size": 20},
        "prefix": "Год: ",
        "visible": True,
        "xanchor": "right"
    },
    "transition": {"duration": 300, "easing": "cubic-in-out"},
    "pad": {"b": 10, "t": 50},
    "len": 0.9,
    "x": 0.1,
    "y": 0,
    "steps": []
}

Для статичного графика до нажатия на кнопку «Start» возьмём данные за 2005 год и заполним ими поле data фигуры.

continents_list_from_df = list(bigmac_df['continent'].unique())
year = 2005
for continent in continents_list_from_df:
    dataset_by_year = bigmac_df[bigmac_df["date"] == year]
    dataset_by_year_and_cont = dataset_by_year[dataset_by_year["continent"] == continent]
    
    data_dict = {
        "x": dataset_by_year_and_cont["population"],
        "y": dataset_by_year_and_cont["gdp_per_capita"],
        "mode": "markers",
        "text": dataset_by_year_and_cont["name"],
        "marker": {
            "sizemode": "area",
            "sizeref": 200000,
            "size":  np.array(dataset_by_year_and_cont["dollar_price"]) * 20000000
        },
        "name": continent,
        "customdata": np.array(dataset_by_year_and_cont["dollar_price"]).round(1),
        "hovertemplate": '<b>%{text}</b>' + '<br>' +
                         'ВВП на душу населения: %{y}' + '<br>' +
                         'Численность населения: %{x}' + '<br>' +
                         'Стоимость Биг Мака: %{customdata}$' +
                         '<extra></extra>'
    }
    fig_dict["data"].append(data_dict)

А для анимации заполним поле frames. Каждый frame — данные за год с 2005 по 2019.

for year in years:
    frame = {"data": [], "name": str(year)}
    for continent in continents_list_from_df:
        dataset_by_year = bigmac_df[bigmac_df["date"] == int(year)]
        dataset_by_year_and_cont = dataset_by_year[dataset_by_year["continent"] == continent]

        data_dict = {
            "x": list(dataset_by_year_and_cont["population"]),
            "y": list(dataset_by_year_and_cont["gdp_per_capita"]),
            "mode": "markers",
            "text": list(dataset_by_year_and_cont["name"]),
            "marker": {
                "sizemode": "area",
                "sizeref": 200000,
                "size": np.array(dataset_by_year_and_cont["dollar_price"]) * 20000000
            },
            "name": continent,
            "customdata": np.array(dataset_by_year_and_cont["dollar_price"]).round(1),
            "hovertemplate": '<b>%{text}</b>' + '<br>' +
                             'ВВП на душу населения: %{y}' + '<br>' +
                             'Численность населения: %{x}' + '<br>' +
                             'Стоимость Биг Мака: %{customdata}$' +
                             '<extra></extra>'
        }
        frame["data"].append(data_dict)

    fig_dict["frames"].append(frame)
    slider_step = {"args": [
        [year],
        {"frame": {"duration": 300, "redraw": False},
         "mode": "immediate",
         "transition": {"duration": 300}}
    ],
        "label": year,
        "method": "animate"}
    sliders_dict["steps"].append(slider_step)

Наконец, создадим объект графика, поправим цвета, шрифты и добавим описание.

fig_dict["layout"]["sliders"] = [sliders_dict]

fig = go.Figure(fig_dict)

fig.update_layout(
    title = 
        {'text':'<b>Motion chart</b><br><span style="color:#666666"> Биг Мака для стран мира с 2005 по 2019 год </span>'},
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_yaxes(nticks=4)
fig.update_xaxes(tickfont=dict(family='Open Sans, light', color='black', size=12), nticks=4, gridcolor='lightgray', gridwidth=0.5)
fig.update_yaxes(tickfont=dict(family='Open Sans, light', color='black', size=12), nticks=4, gridcolor='lightgray', gridwidth=0.5)

fig.show()

В итоге получаем такой Motion Chart:

Полный код проекта доступен на GitHub

 Нет комментариев    58   2020   Analytics Engineering   Data Analytics   plotly

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

 Нет комментариев    108   2020   Analytics Engineering   clickhouse   Data Analytics   instagram   python

Анализ рынка вакансий аналитики и BI: дашборд в Tableau

Время чтения текста – 16 минут

По данным рейтинга SimilarWeb, hh.ru — третий по популярности сайт о трудоустройстве в мире. В одном из разговоров с Ромой Буниным у нас появилась идея сделать совместный проект: собрать данные из открытого HeadHunter API и визуализировать их при помощи Tableau Public. Нам захотелось понять, как меняется зарплата в зависимости от указанных в вакансии навыков, наименования позиции и сравнить, как обстоят дела в Москве, Санкт-Петербурге и регионах.

Как мы собирали данные?

Схема данных основана на коротком представлении вакансии, которую возвращает метод GET /vacancies. Из представления собираются следующие поля: тип вакансии, идентификатор, премиальность вакансии, необходимость прохождения тестирования, адрес компании, информация о зарплате, график работы и другие. Соответствующий CREATE-запрос для таблицы:


Запрос создания таблицы vacancies_short

CREATE TABLE headhunter.vacancies_short
(
    `added_at` DateTime,
    `query_string` String,
    `type` String,
    `level` String,
    `direction` String,
    `vacancy_id` UInt64,
    `premium` UInt8,
    `has_test` UInt8,
    `response_url` String,
    `address_city` String,
    `address_street` String,
    `address_building` String,
    `address_description` String,
    `address_lat` String,
    `address_lng` String,
    `address_raw` String,
    `address_metro_stations` String,
    `alternate_url` String,
    `apply_alternate_url` String,
    `department_id` String,
    `department_name` String,
    `salary_from` Nullable(Float64),
    `salary_to` Nullable(Float64),
    `salary_currency` String,
    `salary_gross` Nullable(UInt8),
    `name` String,
    `insider_interview_id` Nullable(UInt64),
    `insider_interview_url` String,
    `area_url` String,
    `area_id` UInt64,
    `area_name` String,
    `url` String,
    `published_at` DateTime,
    `employer_url` String,
    `employer_alternate_url` String,
    `employer_logo_urls_90` String,
    `employer_logo_urls_240` String,
    `employer_logo_urls_original` String,
    `employer_name` String,
    `employer_id` UInt64,
    `response_letter_required` UInt8,
    `type_id` String,
    `type_name` String,
    `archived` UInt8,
    `schedule_id` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY vacancy_id

Первый скрипт собирает данные с HeadHunter по API и отправляет их в Clickhouse. Он использует следующие библиотеки:

import requests
from clickhouse_driver import Client
from datetime import datetime
import pandas as pd
import re

Далее загружаем таблицу с запросами и подключаемся к CH:

queries = pd.read_csv('hh_data.csv')
client = Client(host='1.234.567.890', user='default', password='', port='9000', database='headhunter')

Таблица queries хранит список поисковых запросов. Она содержит следующие колонки: тип запроса, уровень вакансии для поиска, направление вакансии и саму поисковую фразу. В строку с запросом можно помещать логические операторы: например, чтобы найти вакансии, в которых должны присутствовать ключевые слова «Python», «data» и «анализ» между ними можно указать логическое «И».

Не всегда вакансии в выдаче соответствуют ожиданиям: случайно в базу могут попасть повара, маркетологи и администраторы магазина. Чтобы этого не произошло, опишем функцию check_name(name) — она будет принимать наименование вакансии и возвращать True в случае, если вакансия не подошла по названию.

def check_name(name):
    bad_names = [r'курьер', r'грузчик', r'врач', r'менеджер по закупу',
           r'менеджер по продажам', r'оператор', r'повар', r'продавец',
          r'директор магазина', r'директор по продажам', r'директор по маркетингу',
          r'кабельщик', r'начальник отдела продаж', r'заместитель', r'администратор магазина', 
          r'категорийный', r'аудитор', r'юрист', r'контент', r'супервайзер', r'стажер-ученик', 
          r'су-шеф', r'маркетолог$', r'региональный', r'ревизор', r'экономист', r'ветеринар', 
          r'торговый', r'клиентский', r'начальник цеха', r'территориальный', r'переводчик', 
          r'маркетолог /', r'маркетолог по']
    for item in bad_names:
        if re.match(item, name):
            return True

Затем объявляем бесконечный цикл — мы собираем данные без перерыва. Идём по DataFrame queries и сразу забираем оттуда тип вакансии, уровень, направление и поисковый запрос в отдельные переменные. Сначала по ключевому слову отправляем один запрос к методу /GET vacancies и получаем количество страниц. После идём от нулевой до последней страницы, отправляем те же запросы и заполняем список vacancies_from_response с полученными в выдаче короткими представлениями всех вакансий. В параметрах указываем 10 вакансий на страницу — больше ограничения HH API получить не позволяют. Так как мы не указали параметр area, API возвращает вакансии по всему миру.

while True:
   for query_type, level, direction, query_string in zip(queries['Тип'], queries['Уровень'], queries['Направление'], queries['Ключевое слово']):
           print(f'ключевое слово: {query_string}')
           url = 'https://api.hh.ru/vacancies'
           par = {'text': query_string, 'per_page':'10', 'page':0}
           r = requests.get(url, params=par).json()
           added_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
           pages = r['pages']
           found = r['found']
           vacancies_from_response = []

           for i in range(0, pages + 1):
               par = {'text': query_string, 'per_page':'10', 'page':i}
               r = requests.get(url, params=par).json()
               try:
                   vacancies_from_response.append(r['items'])
               except Exception as E:
                   continue

Теперь проходим по каждой вакансии на каждой странице двойным итератором. Сперва отправим запрос к Clickhouse и проверим, нет ли уже в базе вакансии с таким идентификатором и таким поисковым запросом. Если проверка пройдена — проверяем название вакансии. В случае неудачи переходим к следующей.

for item in vacancies_from_response:
               for vacancy in item:
                   if client.execute(f"SELECT count(1) FROM vacancies_short WHERE vacancy_id={vacancy['id']} AND query_string='{query_string}'")[0][0] == 0:
                       name = vacancy['name'].replace("'","").replace('"','')
                       if check_name(name):
                           continue

Теперь проходим по вакансии и собираем все нужные поля. В случае отсутствия некоторых данных будем отправлять пустые строки:


Код для сбора данных о вакансии

vacancy_id = vacancy['id']
                       is_premium = int(vacancy['premium'])
                       has_test = int(vacancy['has_test'])
                       response_url = vacancy['response_url']
                       try:
                           address_city = vacancy['address']['city']
                           address_street = vacancy['address']['street']
                           address_building = vacancy['address']['building']
                           address_description = vacancy['address']['description']
                           address_lat = vacancy['address']['lat']
                           address_lng = vacancy['address']['lng']
                           address_raw = vacancy['address']['raw']
                           address_metro_stations = str(vacancy['address']['metro_stations']).replace("'",'"')
                       except TypeError:
                           address_city = ""
                           address_street = ""
                           address_building = ""
                           address_description = ""
                           address_lat = ""
                           address_lng = ""
                           address_raw = ""
                           address_metro_stations = ""
                       alternate_url = vacancy['alternate_url']
                       apply_alternate_url = vacancy['apply_alternate_url']
                       try:
                           department_id = vacancy['department']['id']
                       except TypeError as E:
                           department_id = ""
                       try:
                           department_name = vacancy['department']['name']
                       except TypeError as E:
                           department_name = ""
                       try:
                           salary_from = vacancy['salary']['from']
                       except TypeError as E:
                           salary_from = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_to = vacancy['salary']['to']
                       except TypeError as E:
                           salary_to = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_currency = vacancy['salary']['currency']
                       except TypeError as E:
                           salary_currency = ""
                       try:
                           salary_gross = int(vacancy['salary']['gross'])
                       except TypeError as E:
                           salary_gross = "cast(Null as Nullable(UInt8))"
                       try:
                           insider_interview_id = vacancy['insider_interview']['id']
                       except TypeError:
                           insider_interview_id = "cast(Null as Nullable(UInt64))"
                       try:
                           insider_interview_url = vacancy['insider_interview']['url']
                       except TypeError:
                           insider_interview_url = ""
                       area_url = vacancy['area']['url']
                       area_id = vacancy['area']['id']
                       area_name = vacancy['area']['name']
                       url = vacancy['url']
                       published_at = vacancy['published_at']
                       published_at = datetime.strptime(published_at,'%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S')
                       try:
                           employer_url = vacancy['employer']['url']
                       except Exception as E:
                           print(E)
                           employer_url = ""
                       try:
                           employer_alternate_url = vacancy['employer']['alternate_url']
                       except Exception as E:
                           print(E)
                           employer_alternate_url = ""
                       try:
                           employer_logo_urls_90 = vacancy['employer']['logo_urls']['90']
                           employer_logo_urls_240 = vacancy['employer']['logo_urls']['240']
                           employer_logo_urls_original = vacancy['employer']['logo_urls']['original']
                       except Exception as E:
                           print(E)
                           employer_logo_urls_90 = ""
                           employer_logo_urls_240 = ""
                           employer_logo_urls_original = ""
                       employer_name = vacancy['employer']['name'].replace("'","").replace('"','')
                       try:
                           employer_id = vacancy['employer']['id']
                       except Exception as E:
                           print(E)
                       response_letter_required = int(vacancy['response_letter_required'])
                       type_id = vacancy['type']['id']
                       type_name = vacancy['type']['name']
                       is_archived = int(vacancy['archived'])

Последнее поле — график работы. В случае, если вакансия подразумевает вахтовый метод работы она нам точно не подходит.

try:
    schedule = vacancy['schedule']['id']
except Exception as E:
    print(E)
    schedule = ''
if schedule == 'flyInFlyOut':
    continue

Теперь формируем список из полученных переменных, заменяем в нём None-значения на пустые строки во избежании конфликтов с Clickhouse и вставляем строку в таблицу.

vacancies_short_list = [added_at, query_string, query_type, level, direction, vacancy_id, is_premium, has_test, response_url, address_city, address_street, address_building, address_description, address_lat, address_lng, address_raw, address_metro_stations, alternate_url, apply_alternate_url, department_id, department_name,
salary_from, salary_to, salary_currency, salary_gross, insider_interview_id, insider_interview_url, area_url, area_name, url, published_at, employer_url, employer_logo_urls_90, employer_logo_urls_240,  employer_name, employer_id, response_letter_required, type_id, type_name, is_archived, schedule]
for index, item in enumerate(vacancies_short_list):
    if item is None:
        vacancies_short_list[index] = ""
tuple_to_insert = tuple(vacancies_short_list)
print(tuple_to_insert)
client.execute(f'INSERT INTO vacancies_short VALUES {tuple_to_insert}')

Как подключили Tableau к данным?

Tableau Public не умеет работать с базами данных, поэтому мы написали коннектор Clickhouse к Google Sheets. Он использует библиотеки gspread и oauth2client для авторизации в Google Spreadsheets API и библиотеку schedule для ежедневной работы по графику.

Работа с Google Spreadseets API подробно разобрана в материале «Собираем данные по рекламным кампаниям ВКонтакте»

import schedule
from clickhouse_driver import Client
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='headhunter')
creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
gc = gspread.authorize(creds)

Опишем функцию update_sheet() — она будет брать все данные из Clickhouse и вставлять их в таблицу Google Docs.

def update_sheet():
   print('Updating cell at', datetime.now())
   columns = []
   for item in client.execute('describe table headhunter.vacancies_short'):
       columns.append(item[0])
   vacancies = client.execute('SELECT * FROM headhunter.vacancies_short')
   df_vacancies = pd.DataFrame(vacancies, columns=columns)
   df_vacancies.to_csv('vacancies_short.csv', index=False)
   content = open('vacancies_short.csv', 'r').read()
   gc.import_csv('1ZWS2kqraPa4i72hzp0noU02SrYVo0teD7KZ0c3hl-UI', content.encode('utf-8'))

Чтобы скрипт запускался в 16:00 по МСК каждый день используем библиотеку schedule:

schedule.every().day.at("13:00").do(update_sheet)
while True:
   schedule.run_pending()

А что в результате?

Рома построил на полученных данных дашборд.

И в youtube-ролике рассказывает о том, как эффективно использовать дашборд

Инсайты, которые можно извлечь из дашборда

  1. Аналитики с навыком бизнес-аналитики востребованы на рынке больше всего: по такому запросу нашлось больше всего вакансий. Тем не менее, средняя зарплата выше у продуктовых аналитиков и аналитиков BI.
  2. В Москве средние зарплаты выше на 10-30 тысяч рублей, чем в Санкт-Петербурге и на 30-40 тысячи рублей, чем в регионах. Там же работы нашлось больше всего в России.
  3. Самые высокооплачиваемые должности: руководитель отдела аналитики (в среднем, 110 тыс. руб. в месяц), инженер баз данных (138 тыс. руб. в месяц) и директор по машинному обучению (250 тыс. руб. в месяц).
  4. Самые полезные навыки на рынке — владение Python c библиотеками pandas и numpy, Tableau, Power BI, Etl и Spark. Вакансий с такими требованиями больше и зарплаты в них указаны выше прочих. Для Python-программистов знание matplotlib ценится на рынке выше, чем владение plotly.

Полный код проекта доступен на GitHub

 4 комментария    4708   2020   Analytics Engineering   bi   BI-инструменты   clickhouse   Data Analytics   headhunter

Семантический анализ мнений о поправках к Конституции на основе данных ВКонтакте

Время чтения текста – 14 минут

Сегодня поработаем с открытыми данными из ВКонтакте и получим семантическую оценку на популярное и актуальное событие — поправки к Конституции Российской Федерации.

Обзор методов API

Воспользуемся методом newsfeed.search: он позволяет получить до тысячи последних постов из новостной ленты по ключевому слову. В результате приходит много полей: среди них идентификаторы записи и пользователя или сообщества, текст поста, количество лайков, комментарии, приложения, геопозиция и прочее. Нас интересуют только идентификаторы и текст.
Для аналитики пригодится расширенная информация об авторе поста: его город, пол и возраст можно получить методом users.get, причём в запросе будем отправлять сразу до тысячи пользователей.

Создаём таблицы в Clickhouse

Данные нужно будет где-то хранить, в качестве СУБД подойдёт Clickhouse. Создадим две таблицы: для постов и для пользователей. В первой будем хранить идентификаторы и текст поста, во второй — данные о пользователе: его id, пол, возраст и город. Движок ReplacingMergeTree() будет удалять дубликаты.

Мы уже писали о том, как установить Clickhouse на бесплатную машину AWS, создавать в нём внешние словари и материализованные представления

CREATE TABLE vk_posts(
   post_id UInt64,
   post_date DateTime,
   owner_id UInt64,
   from_id UInt64,
   text String
) ENGINE ReplacingMergeTree()
ORDER BY post_date

CREATE TABLE vk_users(
   user_id UInt64,
   user_sex Nullable(UInt8),
   user_city String,
   user_age Nullable(UInt16)
) ENGINE ReplacingMergeTree()
ORDER BY user_id

Сбор постов через API ВКонтакте

Перейдём к написанию скрипта. Импортируем библиотеки и задаём несколько константных значений:

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

from clickhouse_driver import Client
from datetime import datetime
import requests
import pandas as pd
import time

token = 'your_token'
version = 5.103
client = Client(host='ec1-23-456-789-1011.us-east-2.compute.amazonaws.com', user='default', password='', port='9000', database='default')      
data_list = []
start_from = 0
query_string = 'конституция'

Опишем функцию get_and_insert_info_by_user — она будет принимать список идентификаторов пользователей, получать расширенную информацию о них и отправлять в таблицу vk_users. Так как параметр user_ids метода принимает список как строку, переводим структуру в тип str и отсекаем квадратные скобки. Многие пользователи скрывают пол, возраст или город — в таком случае вставляет Nullable значения. Для получения возраста берём текущий год и вычитаем год из даты рождения, если он представлен — проверку делаем регулярным выражением по четырём цифрам.


Функция get_and_insert_info_by_user

def get_and_insert_info_by_user(users):
    try:
        r = requests.get('https://api.vk.com/method/users.get', params={
            'access_token':token,
            'v':version,
            'user_ids':str(users)[1:-2],
            'fields':'sex, city, bdate'
        }).json()['response']
        for user in r:
            user_list = []
            user_list.append(user['id'])
            if client.execute(f"SELECT count(1) FROM vk_users where user_id={user['id']}")[0][0] == 0:
                print(user['id'])
                try:
                    user_list.append(user['sex'])
                except Exception:
                    user_list.append('cast(Null as Nullable(UInt8))')
                try:
                    user_list.append(user['city']['title'])
                except Exception:
                    user_list.append('')
                try:
                    now = datetime.now()
    			    year = item.split('.')[-1]
    			    if re.match(r'\d\d\d\d', year):
        		        age = now.year - int(year)
			    	   user_list.append(age)
                except Exception:
                    user_list.append('cast(Null as Nullable(UInt16))')
                user_insert_tuple = tuple(user_list)
                client.execute(f'INSERT INTO vk_users VALUES {user_insert_tuple}')
    except KeyError:
        pass


Наш скрипт будет работать в вечном цикле, чтобы постоянно добирать новые данные, ведь мы можем получать только тысячу последних. Метод newsfeed.search за раз возвращает двести постов, так что нужно вызывать его пять раз подряд и собирать все ответы.


Цикл сбора новых постов

while True:
    for i in range(5):
        r = requests.get('https://api.vk.com/method/newsfeed.search', params={
            'access_token':token,
            'v':version,
            'q':query_string,
            'count':200,
            'start_from': start_from
        })
        data_list.append(r.json()['response'])
        try:
            start_from = r.json()['response']['next_from']
        except KeyError:
            pass

Полученные в ответе данные можно распарсить. В ВКонтакте у пользователей id всегда положительный, а у сообществ идёт со знаком минус. Чтобы получить данные только от пользователей, будем собирать только те, где from_id больше нуля. Следующая проверка — на отсутствие текста в посте, такие нам тоже не нужны. Наконец, будем собирать данные только если таких ещё нет — для этого обращаемся к таблице vk_posts по текущему id. В конце приостановим скрипт на 180 секунд, чтобы дождаться новых постов и не столкнуться с ограничениями по запросам VK API.


Занесение новых данных в Clickhouse

user_ids = []
    for data in data_list:
        for data_item in data['items']:
            if data_item['from_id'] > 0:
                post_list = []
                if not data_item['text']:
                    continue
                if client.execute(f"SELECT count(1) FROM vk_posts WHERE post_id={data_item['id']} AND from_id={data_item['from_id']}")[0][0] == 0:
                    user_ids.append(data_item['from_id'])
                    date = datetime.fromtimestamp(data_item['date'])
                    date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S')
                    post_list.append(date)
                    post_list.append(data_item['id'])
                    post_list.append(data_item['owner_id'])
                    post_list.append(data_item['from_id'])
post_list.append(data_item['text'].replace("'","").replace('"','').replace("\n",""))
                    post_list.append(query_string)
                    post_tuple = tuple(post_list)
                    print(post_list)
                    try:
                        client.execute(f'INSERT INTO vk_posts VALUES {post_tuple}')
                    except Exception as E:
                        print('!!!!! try to insert into vk_post but got', E)
    try:
        get_and_insert_info_by_user(user_ids)
    except Exception as E:
        print("Try to insert user list:", user_ids, "but got:", E)
    time.sleep(180)

Анализ постов через Dostoevsky

Этот скрипт мы оставили работать на неделю: за это время он набрал почти 20000 постов из ВКонтакте, в которых упоминается ключевое слово «конституция». Напишем второй скрипт — для аналитики и визуализации данных. Для начала соберём данные из таблицы, сформируем DataFrame и для каждого поста получим значения тональности: насколько он положителен, отрицателен и нейтрален. Для оценки тональности текста будем использовать библиотеку Dostoevsky.

from dostoevsky.tokenization import RegexTokenizer
from dostoevsky.models import FastTextSocialNetworkModel
from clickhouse_driver import Client
import pandas as pd
client = Client(host='ec1-23-456-789-1011.us-east-2.compute.amazonaws.com', user='default', password='', port='9000', database='default')

Простым запросом содержимое всей таблицы с постами занесём в переменную vk_posts. Пройдём все посты, выберем те посты, где есть текст помимо пробелов и положим их в DataFrame.

vk_posts = client.execute('SELECT * FROM vk_posts')
list_of_posts = []
list_of_ids = []
for post in vk_posts:
    if str(post[-2]).replace(" ", ""):
        list_of_posts.append(str(post[-2]).replace("\n",""))
        list_of_ids.append(int(post[2]))
df_posts = pd.DataFrame()
df_posts['post'] = list_of_posts
df_posts['id'] = list_of_ids

Обходим моделью весь список постов с текстом и получаем к оценку тональности для каждой записи.

tokenizer = RegexTokenizer()
model = FastTextSocialNetworkModel(tokenizer=tokenizer)
sentiment_list = []
results = model.predict(list_of_posts, k=2)
for sentiment in results:
    sentiment_list.append(sentiment)

Для каждой строки в DataFrame заведём ещё три колонки: насколько запись положительна, отрицательна и нейтральна. В случае, если по одному из трёх параметров ничего не вернулось, будем заносить ноль.

neutral_list = []
negative_list = []
positive_list = []
speech_list = []
skip_list = []
for sentiment in sentiment_list:
    neutral = sentiment.get('neutral')
    negative = sentiment.get('negative')
    positive = sentiment.get('positive')
    if neutral is None:
        neutral_list.append(0)
    else:
        neutral_list.append(sentiment.get('neutral'))
    if negative is None:
        negative_list.append(0)
    else:
        negative_list.append(sentiment.get('negative'))
    if positive is None:
        positive_list.append(0)
    else:
        positive_list.append(sentiment.get('positive'))
df_posts['neutral'] = neutral_list
df_posts['negative'] = negative_list
df_posts['positive'] = positive_list

Посмотрим, как выглядит наш DataFrame теперь:

Можем посмотреть примеры самых негативных постов:

df_posts[df_posts.negative > 0.9]

Нашей таблице не хватает данных об авторах постов. Возьмём их из таблицы vk_users и сольём обе таблицы по полю «id».

vk_users = client.execute('SELECT * FROM vk_users')
vk_user_ids_list = []
vk_user_sex_list = []
vk_user_city_list = []
vk_user_age_list = []
for user in vk_users:
    vk_user_ids_list.append(user[0])
    vk_user_sex_list.append(user[1])
    vk_user_city_list.append(user[2])
    vk_user_age_list.append(user[3])
df_users = pd.DataFrame()
df_users['id'] = vk_user_ids_list
df_users['sex'] = vk_user_sex_list
df_users['city'] = vk_user_city_list
df_users['age'] = vk_user_age_list
df = df_posts.merge(df_users, on='id')

Теперь таблица выглядит так:

Анализируем графики от plotly

В материале «Как построить красивый waterfall chart в Python?» мы уже строили графики библиотекой plotly

Для начала посчитаем процентное соотношение постов с положительной, отрицательной и нейтральной тональностью: пройдём все три столбца и подсчитаем для каждого случая строки, отличные от нуля. Затем проделаем то же самое для разных возрастных категорий и половых принадлежностей.

Из графика следует, что 46% постов по запросу «конституция» за последнюю неделю имеют негативный окрас. Другие 52% высказываются нейтрально. Чуть позже узнаем, насколько мнения в интернете совпадают с официальными результатами голосования.

Заметно, что доля положительных постов среди мужской аудитории составляет 2%, среди женской — вдвое больше, 4%. Впрочем, негативных постов в обоих группах практически поровну: 47% среди мужской и 44% среди женской.

Наконец, оценка постов по возрастным группам: больше всего доля позитивного текста наблюдается в группе 18 — 25 лет, это 3%. Меньше всего позитивных постов в группе до 18 лет, но это может происходить и в связи с тем, что многие пользователи моложе 18 лет предпочитают скрывать возраст, и точные данные по такой группе получить не удастся. Негативных постов во всех группах кроме 18 — 25 поровну: 46%.
Заметно, что на всех трёх графиках данные распределены приблизительно одинаково. Это говорит о том, что за последнюю неделю практически половина всех постов по ключевому слову «конституция» в новостной ленте ВКонтакте имела негативный окрас.

 Нет комментариев    827   2020   Analytics Engineering   Data Analytics   plotly

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

 Нет комментариев    358   2020   Amazon Web Services   Analytics Engineering   AWS   clickhouse   python

Использование словарей в Clickhouse на примере данных Untappd

Время чтения текста – 15 минут

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

 Нет комментариев    187   2020   Amazon Web Services   Analytics Engineering   clickhouse   Data Analytics   python

Шпаргалка по оконным функциям в SQL

Время чтения текста – 1 минута

Перевели увесистую шпаргалку по оконным функциям в SQL от learnsql.com: вспоминаем синтаксис, функции распределения, ранжирования и многое другое.

Версия в  pdf

За cheatsheet спасибо Telegram-каналу DataEng

 Нет комментариев    649   2020   Analytics Engineering   sql

Собираем данные по рекламным кампаниям ВКонтакте

Время чтения текста – 14 минут

В пятничном лонгриде проделаем большую работу: возьмём информацию по рекламным кампаниям ВКонтакте и сопоставим их с данными Google Analytics в Redash. Чтобы снова не поднимать сервер, будем передавать данные через Google Docs, используя Spreadsheet API.

Получение access token
Для получение пользовательского ключа ВКонтакте нужно создать приложение. Идём в раздел «Разработчики» по https://vk.com/apps?act=manage, жмём на кнопку «Создать приложение». В поле «Тип приложения» выбираем «Standalone-приложение» и даём любое название. После этого в меню слева идём в настройки и сохраняем себе ID приложения.

Актуальную информацию о ключах можно посмотреть в статье «Получение ключа доступа»

Теперь копируем себе эту ссылку:

https://oauth.vk.com/authorize?client_id=YourClientID&scope=ads&response_type=token

Но вместо YourClientID вставляем ID своего созданного приложения. В scope у этой ссылки только ads, так что с этим ключом можно будет получать только информацию о рекламном кабинете. Вставляем её в браузер и нас скидывает на другую страницу — в адресе этой странице будет указан ваш сгенерированный access token.

Срок жизни токена — 86400 секунд: ровно сутки. Чтобы получить токен без временных ограничений можно добавить в scope параметр offline. Если токен понадобилось отозвать — смените пароль от страницы или в настройках безопасности завершите активные сессии.

Ещё для запросов к API нам пригодится ID рекламного кабинета — проходим по https://vk.com/ads?act=settings и копируем «номер кабинета».

Сбор данных через запросы к API
Напишем скрипт, который обращается к серверу ВКонтакте с нашим access token и номером рекламного кабинета и берёт информацию о всех кампаниях пользователя: количество просмотров на рекламах, кликов и затрат. Затем скрипт будет формировать из него DataFrame и отправлять в Google Docs.

from oauth2client.service_account import ServiceAccountCredentials
from pandas import DataFrame
import requests
import gspread
import time

Зададим несколько константных значений: access token, ID рекламного кабинета и версию API ВКонтакте, которую будем использовать. Актуальной является версия 5.103.

token = 'fa258683fd418fafcab1fb1d41da4ec6cc62f60e152a63140c130a730829b1e0bc'
version = 5.103
id_rk = 123456789

За получение статистики по рекламе отвечает метод ads.getStatistics, но один из обязательных параметров при его вызове — ’ids’, ID рекламного объявления, статистику по которому мы хотим получить. Так как ID у нас пока нет, придётся сначала воспользоваться методов ads.getAds, который возвращает ID объявлений и кампаний.

Подробнее со всеми методами ВКонтакте API можно ознакомиться в документации

Библиотекой requests отправляем запрос к серверу и передаём свои параметры. Полученный ответ сразу переведём в формат json


campaign_ids = []
ads_ids = []
r = requests.get('https://api.vk.com/method/ads.getAds', params={
    'access_token': token,
    'v': version,
    'account_id': id_rk
})
data = r.json()['response']

Вот, как выглядит объект data: нам вернулся обычный список словарей, с которым мы уже имели дело в материале “Передаём и анализируем собранные данные по рекламным капманиям в Redash”.

Заполняем словарь ad_campaign_dict. Ключом будет ID объявления, а значением — ID кампании, к которой принадлежит объявление. Так будет удобнее присваивать к объявлению ID кампании, к которой оно принадлежало.

ad_campaign_dict = {}
for i in range(len(data)):
    ad_campaign_dict[data[i]['id']] = data[i]['campaign_id']

Теперь, имея ID каждого нужного объявления, можно обратиться к методу ads.getStatistics. Мы будем собирать количество просмотров, кликов, затрат и даты начала и конца объявления, поэтому заблаговременно заведём пустые списки.

ads_campaign_list = []
ads_id_list = []
ads_impressions_list = []
ads_clicks_list = []
ads_spent_list = []
ads_day_start_list = []
ads_day_end_list = []

Вызывать getStatistics нужно отдельно для каждого объявления — будем делать это в итераторе по ad_campaign_dict. Отправляем запрос, передавая в ‘period’ значение ‘overall’ — берём данные за всё время. У некоторых объявлений могут отсутствовать данные по полю «Просмотры» или «Клики» если они не были запущены, и, потребовав их, мы словим KeyError — во избежание этого добавим обработчик try — except, который заставит скрипт не обращать внимания на эту ошибку.

for ad_id in ad_campaign_dict:
        r = requests.get('https://api.vk.com/method/ads.getStatistics', params={
            'access_token': token,
            'v': version,
            'account_id': id_rk,
            'ids_type': 'ad',
            'ids': ad_id,
            'period': 'overall',
            'date_from': '0',
            'date_to': '0'
        })
        try:
            data_stats = r.json()['response']
            for i in range(len(data_stats)):
                for j in range(len(data_stats[i]['stats'])):
                    ads_impressions_list.append(data_stats[i]['stats'][j]['impressions'])
                    ads_clicks_list.append(data_stats[i]['stats'][j]['clicks'])
                    ads_spent_list.append(data_stats[i]['stats'][j]['spent'])
                    ads_day_start_list.append(data_stats[i]['stats'][j]['day_from'])
                    ads_day_end_list.append(data_stats[i]['stats'][j]['day_to'])
                    ads_id_list.append(data_stats[i]['id'])
                    ads_campaign_list.append(ad_campaign_dict[ad_id])
        except KeyError:
            continue

Теперь сформируем из списков DataFrame и выведем первые 5 элементов:

df = DataFrame()
df['campaign_id'] = ads_campaign_list
df['ad_id'] = ads_id_list
df['impressions'] = ads_impressions_list
df['clicks'] = ads_clicks_list
df['spent'] = ads_spent_list
df['day_start'] = ads_day_start_list
df['day_end'] = ads_day_end_list
print(df.head())

Экспорт данных в Google Docs
Для экспорта DataFrame в таблицу Google Sheets необходим ключ доступа Google API. Пройдём по https://console.developers.google.com и создадим новый проект. Даём ему любое имя и в Dashboard жмём на кнопку “Подключить API и сервисы”. Нужно включить два API — Google Drive API и Google Sheets API. Ищем первый в поиске, нажимаем на “Включить API”, затем ищем второй и проделываем то же самое.

После включения нас отправят на панель управления API. Жмём на «Создать учётные данные» — по ним будем проводить авторизацию в скрипте. Отмечаем, что используем Google Sheets API из веб-сервера и обращаемся к данным пользователя. Нажимаем на «Выбрать тип учётных данных» и создаем сервисный аккаунт. В поле «Роль» выбираем Проект — Редактор, а тип ключа оставим JSON.

После этого нам отправят файл в формате JSON с нашими учетными данными — назовём его «credentials.json» — и перенаправят на страницу с сервисными аккаунтами. Ниже будет поле с почтой — копируем её себе.

Переходим по https://docs.google.com/spreadsheets и создаем пустой файл с названием data, в который будут отправляться данные из DataFrame. В настройках доступа даём доступ по почте, скопированной ранее из сервисных аккаунтов — от неё будут приходить данные из скрипта.

Закинем файл credentials.json в директорию со скриптом и продолжим писать код. Перечисляем область видимости в виде ссылок:

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']

И при помощи библиотек oauth2client и gspread проводим авторизацию методами ServiceAccountCredentials.from_json_keyfile_name и gspread.authorize, указывая в параметрах первого наш файл и переменную scope. Через переменную sheet будем обращаться к нашему файлу в Google Docs.

creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
client = gspread.authorize(creds)
sheet = client.open('data').sheet1

Для ввода значений в ячейку таблички есть метод update_cell. Важно: нумерация индексов ячеек при обращении начинается не с нуля, а с единицы. Первым циклом пройдём по первой строке и перенесем туда заголовки нашего DataFrame. Во втором будем идти по каждой ячейке и вставлять соответствующие значения DataFrame. По умолчанию стоит ограничение — 100 запросов в 100 секунд. Это ограничение может остановить наш скрипт на полпути: чтобы избежать ошибки пропишем time.sleep, чтобы после каждой вставки скрипт секунду выжидал.

count_of_rows = len(df)
count_of_columns = len(df.columns)
for i in range(count_of_columns):
    sheet.update_cell(1, i + 1, list(df.columns)[i])
for i in range(1, count_of_rows + 1):
    for j in range(count_of_columns):
        sheet.update_cell(i + 1, j + 1, str(df.iloc[i, j]))
        time.sleep(1)

Если всё сделаем правильно — получим таблицу такого вида:

Экспорт данных в Redash

Подключение Google Analytics к Redash описано в статье «Как подключить Google Analytics как Redash?».

Имея в Redash таблицу с Google Analytics и рекламным кампаниям ВКонтакте, можем сопоставить их друг другу. Напишем такой запрос:

SELECT
    query_50.day_start,
    CASE WHEN ga_source LIKE '%vk%' THEN 'vk.com' END AS source,
    query_50.spent,
    query_50.impressions,
    query_50.clicks,
    SUM(query_49.ga_sessions) AS sessions,
    SUM(query_49.ga_newUsers) AS users
FROM query_49
JOIN query_50
ON query_49.ga_date = query_50.day_start
WHERE query_49.ga_source LIKE '%vk%' AND DATE(query_49.ga_date) BETWEEN '2020-05-16' AND '2020-05-20'
GROUP BY query_49.ga_date, source

ga_source — источник, с которого человек пришел на сайт. Всё, что похоже на vk оператором CASE объединяем в столбец «vk.com». Оператором JOIN добавляем таблицу с данными из ВКонтакте, объединяя по полю даты. Отсеиваем данные — возьмём день последней рекламной кампании и посмотрим на несколько дней после него. На выходе получим таблицу такого вида:

Итоги
Получилась таблица, сообщающая, сколько всего было затрачено на объявления в этот день, сколько человек его посмотрели, зашли к нам на сайт и стали нашими новыми пользователями.

Создаём материализованное представление в Clickhouse

Время чтения текста – 10 минут

В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.

Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.

Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:

cd /etc/clickhouse-server
sudo nano config.xml

Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.

Раскоментируем строку

<listen_host>0.0.0.0</listen_host>

чтобы доступ к базе данных был с любого IP-адреса:

Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:

CREATE DATABASE db1
USE db1

Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем CollapsingMergeTree: он будет удалять дубликаты по ключу сортировки:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = CollapsingMergeTree
ORDER BY (date_start, date_stop)

И сразу создадим материализованное представление:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

Подробности рецепта можно посмотреть в блоге Clickhouse.

К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.

Скрипт
Начнём писать скрипт. Понадобится новая библиотека — clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:

В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook». Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port — 9000 и в database базу данных для подключения.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:

client.execute('SHOW DATABASES')

В случае успеха получим на экране такой список:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

Напишем вот такой запрос, получающий все колонки таблицы за это время:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

И выполним запрос, поместив информацию в список old_data_list. А затем поменяем всем sign на -1 и добавим в new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком CollapsingMergeTree и выполняем INSERT новых данных со знаком sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:

И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:

Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а CollapsingMergeTree — чистить дубликаты.

 1 комментарий    445   2020   Amazon Web Services   Analytics Engineering   AWS   clickhouse   Data Analytics   python

Устанавливаем Clickhouse на AWS

Время чтения текста – 7 минут

Сегодня поработаем с Clickhouse — поставим его на личную бесплатную машину с Amazon Web Services.

Аккаунт на AWS и машина на Ubuntu
Проще всего Clickhouse установить из deb пакетов на машину под управлением Ubuntu. Сегодня вовсе необязательно иметь такую под рукой — достаточно создать годовой бесплатный аккаунт на Amazon Web Services, который выделит нам нужную машину. Переходим на https://aws.amazon.com и создаём аккаунт и попадаем в Dashboard. В разделе «Build a solution» идём в «Launch a virtual machine» и подбираем виртуальную машину. Нам подойдёт та, на которой установлен Ubuntu Server.

Заодно создаём key pair — пара состоит из публичного ключа AWS и приватного ключа-файла. Последний нужно сохранить у себя на компьютере для подключения к машине.

После откроется консоль EC2, где уже будет запущен наш Instance виртуальной машины. У него есть public dns — сохраняем его.

Подключаемся через Termius
Подключаться к виртуальной машине будем через протокол удалённого доступа ssh. Есть много клиентов, поддерживающих этот протокол — я буду использовать Termius. Жмём на «+ NEW HOST» и заполняем информацию.
В поле address вводим наш public dns, который заранее сохранили. В Username вводим «ubuntu», поле пароля оставляем пустым. Чтобы заполнить Key нужно добавить новый ключ — то есть, указать путь к файлу с расширением .pem, который мы получили при создании новой виртуальной машины. Должно получиться что-то такое:

Сразу после авторизации подключаемся к машине и получаем новый экран с консолью:

Для начала установим Clickhouse — это быстро. Выполним следующую команду, чтобы добавить репозиторий Clickhouse к нашим APT репозиториям:

Подробнее со всеми способами установки Clickhouse можно ознакомиться в документации

echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list

Обязательно обновляем пакеты:

sudo apt-get update

Наконец, установим клиент и сервер командой:

sudo apt-get install -y clickhouse-server clickhouse-client

Готово! Клиент и сервер Clickhouse установлены на виртуальной машине. Запустим сервер, чтобы подсоединиться позже через клиент:

sudo service clickhouse-server start

И проверим успешность запуска командой:

sudo service clickhouse-server status

Получим примерно такой результат:

Clickhouse установлен! Для подключения к клиенту необходимо ввести:

clickhouse-client

Чтобы убедиться, что всё работает как надо, документация предлагает ввести запрос

SELECT 1

Вот, что мы должны получить:

Готово! А в следующем материале поработаем в связке Python + Clickhouse: вернёмся к нашему скрипту, который получает информацию по рекламным кампаниям и сделаем так, чтобы эти данные собирались в таблицу и материализованное представление.

 4 комментария    60   2020   Amazon Web Services   Analytics Engineering   AWS   clickhouse

Clickhouse в качестве consumer для Amazon MSK

Время чтения текста – 6 минут

Disclaimer: заметка носит технический характер, поэтому может быть интересна меньшему числу лиц с бизнес-бэкграундом.

В этом блоге еще ни разу не затрагивалась тема Clickhouse, одной из самых быстрых баз данных от компании Яндекс. Краткая справка без деталей: Clickhouse — наиболее эффективно написанная СУБД колоночного типа с точки зрения программного кода, информация о СУБД довольно подробно описана в документации и во множестве видео на Youtube (раз, два, три).

В своей практике последние четыре года я использовал Clickhouse в качестве аналитика и эксперта по построению аналитической отчетности. В основном для решения задачи визуализации отчетности / отчетов с параметрами / дашбордов использовался Redash как наиболее удобный интерфейс для доступа к данным Clickhouse.
Однако совсем недавно в Looker, о котором я рассказывал ранее, появилась возможность подключить Clickhouse в качестве источника данных. Следует заметить, что подключение к Clickhouse в Tableau существует уже довольно давно.

Архитектура аналитического сервиса, в основе которого лежит Clickhouse, в основном облачная. И в рассматриваемой задаче было именно так. Предположим, у вас существует выделенный instance EC2 в Amazon (на который вы установили Clickhouse) и отдельный Kafka-кластер (решение Amazon MSK).

Задача: подключить Clickhouse в качестве consumer для получения информации с брокеров вашего кластера Kafka. На самом деле, в документации на сайте Amazon MSK довольно подробно описано как именно подключаться к кластеру Kafka, не буду дублировать эту информацию. В моем случае гайд помог: топики создавались продюсером с машины, на которой установлен Clickhouse и с неё читались консюмером.

Но возникла проблема: при подключении Clickhouse к Kafka в качестве консюмера, происходила следующая ошибка:

020.02.02 18:01:56.209132 [ 46 ] {e7124cd5-2144-4b1d-bd49-8a410cdbd607} <Error> executeQuery: std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 20.1.2.4 (official build) (from 127.0.0.1:46586) (in query: SELECT * FROM events), Stack trace (when copying this message, always include the lines below):

Продолжительное время я искал информацию в документации Clickhouse о том, что может вызывать эту ошибку, но не смог ничего найти. Следующей мыслью стала проверка работы локального брокера Kafka с той же машины. Установил клиент Kafka, подключил Clickhouse, отправил данные в топик и Clickhouse с легкостью их прочитал, т. е. консюмер Clickhouse работает с локальным брокером, а значит и вовсе работает.

Поговорив со всеми своими знакомыми экспертами в области инфраструктуры и Clickhouse (Вася, Макс, привет!), с ходу мы не смогли определить в чем проблема. Проверили firewall, настройки сети, все было открыто. Подтверждалось также тем, что с локальной машины можно было отправить в топик удаленного брокера Kafka сообщения командой bin/kafka-console-producer.sh и прочитать оттуда же bin/kafka-console-consumer.sh.

Затем мне пришла в голову мысль обратиться к главному гуру и разработчику Clickhouse — Алексею Миловидову. Алексей с радостью постарался ответить на возникшие вопросы и предложил ряд гипотез, которые мы проверили (трассировку сетевых подключений и т. п.), однако и после более низкоуровневого аудита проблему локализовать не удалось. Тогда Алексей посоветовал обратиться к Михаилу Филимонову из компании Altinity. Михаил оказался очень отзывчивым экспертом и одну за другой предлагал гипотезы для проверки (параллельно подсказывая как именно будет лучше их проверить).

В итоге в ходе совместных усилий мы обнаружили, что проблема возникает у библиотеки librdkafka, так как другой пакет kafkacat, который использует эту же библиотеку отваливается от подключения к брокеру с той же проблемой (Local: timed out).

После изучения подключения через bin/kafka-console-consumer.sh и параметров подключения, Михаил посоветовал добавить такую строку в /etc/clickhouse-server/config.xml:

<kafka><security_protocol>ssl</security_protocol></kafka>

И, о чудо! Clickhouse подключился к кластеру и вытянул необходимые данные с брокера.

Надеюсь, этот рецепт и мой опыт позволит вам сэкономить время и силы на изучение похожей проблемы :)

 Нет комментариев    67   2020   Analytics Engineering   clickhouse   expert   troubleshooting   yandex

Обзор Looker

Время чтения текста – 13 минут

Сегодня поговорим о BI-платформе Looker, над которой мне удалось поработать в течение 2019-го года.

Представляю краткое содержание статьи для удобной и быстрой навигации:

  1. Что такое Looker?
  2. Как и к каким СУБД можно подключиться через Looker?
  3. Построение Looker ML модели данных
  4. Режим Explore (исследование данных на построенной модели)
  5. Построение отчетов и сохранение их в Look
  6. Примеры дашбордов в Looker

Что такое Looker?

Создатели Looker позиционируют его как программное обеспечение класса business intelligence и платформу big data аналитики, которая помогает исследовать, анализировать и делиться аналитикой бизнеса в режиме реального времени.
Looker — это действительно удобный инструмент и один из немногих продуктов класса BI, который позволяет в режиме онлайн работать с преднастроенными кубами данных (на самом деле, реляционными таблицами, которые описаны в Look ML-модели).
Инженеру, работающему над Looker, требуется описать модель данных на языке Look ML (что-то среднее между CSS и SQL), опубликовать эту модель данных и далее настроить отчетность и дашборды.
Сам Look ML достаточно прост, взаимосвязи между объектами данных задаются data-инженером, что впоследствии позволяет использовать данные без знания языка SQL (если быть точным: движок Looker сам за пользователя генерирует код на языке SQL).

Буквально недавно, в июне 2019-го года Google объявил о покупке платформы Looker за $2.6 млрд.

Как и к каким СУБД можно подключиться через Looker?

Выбор СУБД, с которыми работает Looker довольно богатый. На скриншоте ниже перечислены всевозможные подключения на Октябрь 2019 г.:

Доступные СУБД для подключения

Настроить подключение к базе данных несложно через веб-интерфейс:

Веб-интерфейс подключения к СУБД

В вопросе соединений с базами данных отдельно хочется отметить два факта: к сожалению, на текущий момент и в ближайшем будущем отсутствует поддержка Clickhouse от Яндекса. Скорее всего поддержка не появится, учитывая тот факт, что Looker был приобретен конкурирующей компанией Google.
Второй досадный факт состоит в том, что построить одну модель данных, которая бы обращалась в разные СУБД нельзя. В Looker нет встроенного хранилища, которое могло бы объединять результаты запроса (кстати, в отличии даже от того же Redash).
Это означает, что аналитическая архитектура данных должна быть построена в рамках одной СУБД (желательно с высоким быстродействием или на агрегированных данных).

Построение Looker ML модели данных

Для того чтобы построить отчет или дашборд в Looker, предварительно необходимо настроить модель данных. Синтаксис языка Look ML достаточно подробно описан в документации. От себя могу добавить, что описание модели не требует долгого погружения для специалиста со знанием SQL. Скорее, необходимо перестроить подход к подготовке модели данных. Язык Look ML очень похож на CSS:

Консоль создания Look ML модели

В модели данных прописываются связи с таблицами, ключи, гранулярность, информация о том, какие поля являются фактами, какие измерениями. Для фактов прописывается агрегация. Разумеется, при создании модели можно использовать различные IF / CASE выражения.

Режим Explore

Наверное, это самая главная киллер-фича Looker, поскольку позволяет любым бизнес-поздразделениям самостоятельно получать данные без привлечения аналитиков / инженеров данных. И, видимо, поэтому использование аккаунтов с использованием режиме Explore тарифицируется отдельно.

Фактически, режим Explore это интерфейс, который позволяет использовать настроенную Look ML модель данных, выбрать необходимые метрики и измерения и построить кастомный отчет / визуализацию.
К примеру, мы хотим понять сколько каких действий в интерфейсе Лукера было совершено на прошлой неделе. Для этого, используя режим Explore, мы выберем поле Date и поставим на него фильтр: прошлая неделя (Looker в этом смысле достаточно умный и в фильтре будет достаточно написать ‘Last week’), затем из измерений выберем «Категорию» и в качестве метрики — Количество. После нажатия кнопки Run сформируется готовый отчет.

Построение отчета в Looker

Затем, используя полученные данные в табличной форме можно настроить визуализацию любого вида. Например, Pie chart:

Применение визуализации к отчету

Построение отчетов и сохранение их в Look

Полученный набор данных / визуализацию в режиме Explore иногда хочется сохранить и поделиться с коллегами, для этого в Looker существует отдельная сущность — Look. Это готовый построенный отчет с выбранными фильтрами / измерениями / фактами.

Пример сохраненного Look

Примеры дашбордов в Looker

Систематизируя склад созданных Look зачастую хочется получить готовую композицию / overview ключевых метрик, которые было бы видно на одном листе.
Для этих целей отлично подходит создание дашборда. Дашборд создается либо на лету, либо используя ранее созданные Look. Одной из «фишек» дашборда является установка параметров, которые меняются на всем дашборде и могут быть применены ко всем Look сразу.

Интересные фишки одной строкой

  • В Looker можно ссылаться на другие отчеты и, используя данных функционал, можно создать динамический параметр, который передается по ссылке.
    Например, построили отчет с разделениям выручки по странам и в этот отчете можем ссылаться на дашборд по отдельно взятой стране. Переходя по ссылке, пользователь видит дашборд по конкретной стране, на которую перешел
  • На каждой странице Looker существует чат, в котором оперативно отвечает поддержка
  • Looker не умеет работать с merge данных на уровне разных СУБД, однако может объединить данные на уровне готовых Look (в нашем случае эта функциональность работает очень странно).
  • В рамках работы с различными моделями данных, я обнаружил крайне нетривиальное использование SQL для подсчета уникальных значений в ненормализованный таблице данных, Looker называет это симметричными агрегатами.
    SQL действительно выглядит очень нетривиально:
SELECT 
 order_items.order_id AS "order_items.order_id",
 order_items.sale_price AS "order_items.sale_price",
 (COALESCE(CAST( ( SUM(DISTINCT (CAST(FLOOR(COALESCE(users.age ,0)
 *(1000000*1.0)) AS DECIMAL(38,0))) + 
 CAST(STRTOL(LEFT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))
 * 1.0e8 + CAST(STRTOL(RIGHT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0)) ) 
 - SUM(DISTINCT CAST(STRTOL(LEFT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))
 * 1.0e8 + CAST(STRTOL(RIGHT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))) ) 
 AS DOUBLE PRECISION) 
 / CAST((1000000*1.0) AS DOUBLE PRECISION), 0) 
 / NULLIF(COUNT(DISTINCT CASE WHEN users.age IS NOT NULL THEN users.id 
 ELSE NULL END), 0)) AS "users.average_age"
FROM order_items AS order_items
LEFT JOIN users AS users ON order_items.user_id = users.id

GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 500
  • При внедрении Looker к покупке обязателен JumpStart Kit, который стоит не менее $6k, в рамках которого вы получаете поддержку и консультации от Looker при внедрении инструмента.
 Нет комментариев    357   2020   analysis   Analytics Engineering   BI-инструменты   looker   sql

Как посчитать Retention?

Время чтения текста – 8 минут

В этой заметке разберём как правильно построить отчет по Retention с использованием Redash и языка SQL.
Для начала, в двух словах, что это за метрика Retention rate, почему она важна, какой бывает и каким образом считается.

Retention rate

Метрика Retention rate довольно широко распространена и особенно известна в мобильной индустрии, поскольку позволяет понять насколько хорошо продукт вовлекает пользователей в ежедневное использование. Вспомним (или узнаем) как рассчитывается Retention:
Retention дня X — это N% процентов пользователей, которые вернутся к продукту в день X. Иными словами, если в какой-то конкретный день (день 0) пришло 100 новых пользователей, а на 1-ый день вернулось 15, то Retention 1-го дня составит 15 / 100 = 15%.
Чаще всего выделяют Retention дней 1, 3, 7 и 30 как наиболее описательные метрики продукта, однако полезно в целом рассматривать кривую Retention и делать выводы исходя из нее.

Кривая retention

В конечном итоге нас интересует построение такой кривой, которая показывает удержание пользователей с 0-го дня до 30-го.

Кривая Retention rate c 0-го по 30-ый день

Rolling Retention (RR)

Кроме классического Retention rate выделяют также Rolling Retention (далее RR). При расчете Rolling Retention помимо дня X учитываются также все последующие дни. Таким образом RR 1-го дня — количество пользователей, которые вернулись в 1-ый и последующие дни.

Сравним Retention и Rolling Retention 10-го дня:
Retention10 — количество пользователей, вернувшихся в 10-ый день / количество пользователей, установивших приложение 10 дней назад * 100%.
Rolling Retention10 — количество пользователей, вернувшихся в 10-ый день или позже / количество пользователей, установивших приложение 10 дней назад * 100%.

Гранулярность (retention временных отрезков)

В некоторых отраслях и соответствующих задачах полезно понимать Retention конкретного дня (чаще всего в мобильной индустрии), в других случаях полезно понимать удержание пользователя на разных временных интервалах: например, недельные отрезки или месячные (часто полезно в e-commerce, ретейле).

Пример когорт по месяцам и соответствующий им месячный Retention

Как построить Retention отчет на языке SQL?

Выше мы разобрали как посчитать Retention в формулах. Теперь приложим это к языку SQL.
Допустим, что у нас есть две таблицы: user — храним данные об идентификаторах пользователей и мета-информацию, client_session — информация о посещениях пользователями мобильного приложения.
В запросе будут фигурировать только две эти таблицы, поэтому вы с легкостью сможете адаптировать запрос под себя.
примечание: в рамках данного кода я использую Impala в качестве СУБД.

Собираем размер когорт

SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          ndv(user.id) AS users
   FROM USER
   WHERE from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1

Разберем этот довольно несложный запрос: по каждому дню мы считаем количество уникальных пользователей для отрезка [60 дней назад; 31 день назад].
Чтобы не лезть в документацию: команда ndv() в Impala аналог команды count(distinct).

Считаем количество вернувшихся пользователей по каждой когорте

SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) AS date_diff,
          ndv(user.id) AS ret_base
   FROM USER
   LEFT JOIN client_session cs ON cs.user_id=user.id
   WHERE 1=1
     AND datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) between 0 and 30
     AND from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1, 2

В этом запросе ключевая часть содержится в команде datediff: теперь мы считаем для каждой когорты и для каждого datediff количество уникальных пользователей все той же командой ndv() (фактически, количество пользователей которые вернулись в дни от 0-го до 30-го).

Отлично, теперь у нас есть размер когорт и количество вернувшихся пользователей

Собираем все вместе

SELECT reg.reg_date AS date_registration,
       reg.users AS cohort_size,
       cohort.date_diff AS day_difference,
       cohort.ret_base AS retention_base,
       cohort.ret_base/reg.users AS retention_rate
FROM
  (SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          ndv(user.id) AS users
   FROM USER
   WHERE from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1) reg
LEFT JOIN
  (SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) AS date_diff,
          ndv(user.id) AS ret_base
   FROM USER
   LEFT JOIN client_session cs ON cs.user_id=user.id
   WHERE 1=1
     AND datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) between 0 and 30
     AND from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1, 2) cohort ON reg.reg_date=cohort.reg_date
    ORDER BY 1,3

Мы получили запрос, который для каждой когорты считает Retention, в итоге результат можно отобразить в таком виде:

Retention rate, посчитанный для каждой когорты пользователей

Построение единственной кривой Retention

Несколько модифицируем наш запрос и получим требуемые данные для построения одной кривой Retention:

SELECT 
       cohort.date_diff AS day_difference,
       avg(reg.users) AS cohort_size,
       avg(cohort.ret_base) AS retention_base,
       avg(cohort.ret_base)/avg(reg.users)*100 AS retention_rate
FROM
  (SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          ndv(user.id) AS users
   FROM USER
   WHERE from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1) reg
LEFT JOIN
  (SELECT from_unixtime(user.installed_at, "yyyy-MM-dd") AS reg_date,
          datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) AS date_diff,
          ndv(user.id) AS ret_base
   FROM USER
   LEFT JOIN client_session cs ON cs.user_id=user.id
   WHERE 1=1
     AND datediff(cast(cs.created_at AS TIMESTAMP), cast(installed_at AS TIMESTAMP)) between 0 and 30
     AND from_unixtime(user.installed_at)>=date_add(now(), -60)
     AND from_unixtime(user.installed_at)<=date_add(now(), -31)
   GROUP BY 1,2) cohort ON reg.reg_date=cohort.reg_date
    GROUP BY 1        
    ORDER BY 1

Теперь у нас для каждого дня посчитан средний по всем когортам Retention rate.

Больше по теме