12 заметок с тегом

clickhouse

Три способа рассчитать накопленную сумму в SQL

Время чтения текста – 7 минут

Расчет накопленной (или кумулятивной, что то же самое) суммы SQL — это очень распространенный запрос, который часто используют в анализе финансов, динамики прибыли и прочих показателей компании. В сегодняшней статье вы узнаете, что такое накопленная сумма и как можно написать SQL-запрос для ее вычисления.

Если вы вдруг являетесь начинающим пользователем SQL, то давайте, как в школьной задаче, поймем, что нам дано и что нам необходимо найти. Накопленная сумма — это совокупная сумма предыдущих чисел в столбце. Давайте посмотрим на пример ниже, чтобы точно знать, какой результат мы ожидаем увидеть в итоге. Итак, существует таблица leftjoin.daily_sales_sample, в которой есть всего два столбца date и revenue. По столбцу revenue нам нужно рассчитать накопленную сумму и записать результат в отдельный столбец.

Что у нас есть?

Date Revenue
10.11.2021 1200
11.11.2021 1600
12.11.2021 800
13.11.2021 3000

Что мы хотим найти?

Date Revenue Cumulative Revenue
10.11.2021 1200 1200 ↓
11.11.2021 1600 2800↓
12.11.2021 800 3600 ↓
13.11.2021 3000 6600

На графике две этих переменных выглядят следующим образом:

Итак, без лишних слов, давайте приступать к решению задачи.

Способ 1 — Идеальный — Используем оконные функции

Итак, если в базе данных можно пользоваться оконными функциями, то жизнь хороша и прекрасна. С их помощью можно написать простой запрос, который будет суммировать значения из столбца revenue по мере увеличения даты и сразу вернет нам таблицу с кумулятивной суммой в столбце, который мы назвали total.

SELECT
	date,
	revenue,
	SUM(revenue) OVER (ORDER BY date asc) as total
FROM leftjoin.daily_sales_sample 
ORDER BY date;

Способ 2 — Хитрый — Решение без оконных функций

Вполне возможно, что вам понадобится решить такую задачу без использования оконных функций. К примеру, если вы используете MySQL (до 8 версии) или любую другую БД, в которой оконных функций нет. Тогда решение задачи чуть усложняется. Однако, вы ведь знаете, что нет ничего невозможного?
Чтобы провернуть все то же самое без оконных функций, нужно использовать INNER JOIN для присоединения таблицы к себе самой. Так, к каждой строке таблицы мы присоединяем строки, которые соответствуют всем предыдущим датам до текущей даты включительно. В нашем примере, для 10 ноября — 10 ноября, для 11 ноября — 10 и 11 ноября и так далее. Промежуточный запрос будет выглядеть вот так:

SELECT * 
FROM leftjoin.daily_sales_sample ds1 
INNER JOIN leftjoin.daily_sales_sample ds2 on ds1.date>=ds2.date
ORDER BY ds1.date, ds2.date;

А его результат:

Date 1 Revenue 1 Date 2 Revenue 2
10.11.2021 1200 10.11.2021 1200
11.11.2021 1600 10.11.2021 1200
11.11.2021 1600 11.11.2021 1600
12.11.2021 800 10.11.2021 1200
12.11.2021 800 11.11.2021 1600
12.11.2021 800 12.11.2021 800
13.11.2021 300 10.11.2021 1200
13.11.2021 300 11.11.2021 1600
13.11.2021 300 12.11.2021 800
13.11.2021 300 13.11.2021 300

А затем, нужно просуммировать прибыли, группируя их по каждой дате. Если собрать все в единый запрос, то он будет выглядеть вот так:

SELECT
	ds1.date,
	ds1.revenue,
	SUM(ds2.revenue) as total
FROM leftjoin.daily_sales_sample ds1 
INNER JOIN leftjoin.daily_sales_sample ds2 on ds1.date>=ds2.date
GROUP BY ds1.date, ds1.revenue
ORDER BY ds1.date;

Способ 3 — Специфический — Решение с помощью массивов в ClickHouse

Если вы используете Clickhouse, то в этой системе есть специальная функция, которая может помочь рассчитать кумулятивную сумму. Для начала, нам нужно преобразовать все столбцы таблицы в массивы и рассчитать показатель «Moving Sum» для столбца revenue.

SELECT groupArray(date) dates, groupArray(revenue) as revs, 
groupArrayMovingSum(revenue) AS total
FROM (SELECT date, revenue FROM leftjoin.daily_sales_sample
	  ORDER BY date)

Спасибо Дмитрию Титову из Altinity за комментарий про сортировку в подзапросе

Так, мы получим три массива значений:

dates revs total
[’10.11.2021’,’11.11.2021’,’12.11.2021’,’13.11.2021’] [1200, 1600, 800, 300] [1200, 2800, 3600, 3900]

Но три массива, которые записаны в ячейки — это не то, что мы хотим получить, хотя значения этих массивов уже абсолютно соответствуют искомому результату. Теперь массивы нужно привести обратно к табличному виду с помощью функции ARRAY JOIN.

SELECT dates, revs, total FROM
(SELECT groupArray(date) dates, groupArray(revenue) as revs, 
groupArrayMovingSum(revenue) AS total
FROM (SELECT date, revenue FROM leftjoin.daily_sales_sample
	  ORDER BY date)) as t
ARRAY JOIN dates, revs, total;

Бонус — Оконные функции в Clickhouse

Если вам не хочется иметь дело с массивами, что иногда и правда бывает затратно по времени, то есть еще один вариант решения задачи. Можно использовать оконные функции, например функцию runningAccumulate(), которая суммирует значения всех ячеек с первой до текущей.

SELECT date, runningAccumulate(revenue)
  FROM 
  (
    SELECT date, sumState(revenue) AS revenue
    FROM leftjoin.daily_sales_sample
    GROUP BY date 
    ORDER BY date ASC
  )
ORDER BY date

Если вы столкнетесь с необходимостью рассчитать кумулятивную сумму в SQL, то теперь вы сможете решить эту задачу, в какой бы системе управления баз данных ни была организована работа :)

 2 комментария    8877   2021   clickhouse   mysql   postgresql   sql

Тренинг по Clickhouse от Altinity

Время чтения текста – 7 минут

Буквально на днях закончил обучение Clickhouse от Altinity (101 Series Training). Для тех, кто только знакомится с Clickhouse Altinity предлагает базовый бесплатный тренинг: Data Warehouse Basics. Рекомендую начать с него, если планируете погружаться в обучение.

Сертификация от Altinity

Хочу поделиться своими впечатлениями об обучении и поделиться своим конспектом с тренинга.
Обучение стоит $500 и длится четыре дня по два часа, проводится в наше вечернее время (начиная с 19:00 GMT+3).

Сессия №1

Первый день в бОльшей степени повторяет пройденное в Data Warehouse Basics, однако в нем есть несколько новых идей, например о том, как можно получить полезную информацию о запросах из системных таблиц.

Например, такой query выдаст какие команды запущены и в каком они статусе:

SELECT command, is_done
FROM system.mutations
WHERE table = 'ontime'

Помимо этого, для меня было очень полезно узнать про компрессию колонок с использованием кодеков:

ALTER TABLE ontime
 MODIFY COLUMN TailNum LowCardinality(String) CODEC(ZSTD(1))

Для тех, кто начинает погружение в Clickhouse первый день будет супер-полезным в том, чтобы разобраться с движками таблиц и синатксисом их создания, партициями, вставкой данных (к примеру, напрямую из S3).

INSERT INTO sdata
SELECT * FROM s3(
 'https://s3.us-east-1.amazonaws.com/d1-altinity/data/sdata*.csv.gz',
 'aws_access_key_id',
 'aws_secret_access_key',
 'Parquet',
 'DevId Int32, Type String, MDate Date, MDatetime
DateTime, Value Float64')

Сессия №2

Второй день мне представляется максимально насыщенным и полезным, потому что в рамках него Robert из Altinity подробно рассказывает про агрегирующие функции в Clickhouse и про создание материализованных представлений (подробно по шагам разбирается схема создания материализованного представления).

Отдельное внимание устройству джойнов в Clickhouse

Мне было супер-полезно узнать про типы индексов в CH

Сессия №3

В рамках третьего дня коллеги делятся знаниями о том как работать с Kafka, JSON-объектами, которые хранятся в таблицах.
Интересно было узнать, что работа с типами данных массив в Clickhouse очень похоже на работу с массивами в Python:

WITH [1, 2, 4] AS array
SELECT
 array[1] AS First,
 array[2] AS Second,
 array[3] AS Third,
 array[-1] AS Last,
 length(array) AS Length

И при работе с массивами крутая фича это ARRAY JOIN, который «разворачивает» массив в плоскую реляционную таблицу:

Clickhouse позволяет эффективно взаимодействовать с JSON-объектами, которые хранятся в таблице:

-- Get a JSON string value
SELECT JSONExtractString(row, 'request') AS request
FROM log_row LIMIT 3
-- Get a JSON numeric value
SELECT JSONExtractInt(row, 'status') AS status
FROM log_row LIMIT 3

На примере этого кусочка кода отдельно извлекаются элементы JSON-массива ’request’ и ’status’.

Их можно сложить в ту же таблицу:

ALTER TABLE log_row
 ADD COLUMN
status Int16 DEFAULT
 JSONExtractInt(row, 'status')
ALTER TABLE log_row
UPDATE status = status WHERE 1 = 1

Сессия №4

А на заключительный четвертый день оставлена самая трудная тема с моей точки зрения: построение шардированных и реплицированных кластеров, построение запросов на распределенных серверах Clickhouse.

Отдельный респект Altinity за отличную подборку лабораторных заданий в ходе обучения.

Ссылки:

 Нет комментариев    509   2021   clickhouse   sql

Парсим вакансии для аналитиков из Indeed

Время чтения текста – 8 минут

В этом материале мы расскажем, как парсить вакансии с сайта Indeed. Indeed — это крупнейший в мире поисковик вакансий. Этим текстом мы начинаем большой проект по анализу и визуализации показателей оплаты труда в области Data Science в разных странах.
Подобный анализ рынка вакансий, но только в России, мы проводили в материале Анализ рынка вакансий аналитики и BI: дашборд в Tableau, когда парсили данные с сайта HeadHunter.

А еще у нас можно почитать материал Парсим данные каталога сайта, используя Beautiful Soup и Selenium

Импорт библиотек
Библиотека fake_useragent имитирует реальный User-Agent, чтобы преодолеть защиту сайта от парсинга. Таким образом мы сможем пройти проверку HTTP заголовка User-Agent.
Модуль urllib.parse разбирает URL-адрес на компоненты и записывает его как кортеж. Он пригодится для перехода на карточки вакансий. BeautifulSoup поможет разобраться в структуре html-страницы и добыть нужную нам информацию.

import requests
from datetime import timedelta, datetime
import urllib.parse
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import pandas as pd
import time
from lxml.html import fromstring
from clickhouse_driver import Client
from clickhouse_driver import errors
import numpy as np
from funcs import check_title, get_skills_row, parse_salary, get_sheetname, create_table

Создадим таблицу в Clickhouse
Данные, которые мы собираемся собрать, будем хранить в базе Clickhouse.

create_table = '''CREATE TABLE if not exists indeed.vacancies (
    row_idx UInt16,
    query_string String,
    country String,
    title String,
    company String,
    city String,
    job_added Date,
    easy_apply UInt8,
    company_rating Nullable(Float32),
    remote UInt8,
    job_id String,
    job_link String,
    sheet String,
    skills String,
    added_date Date,
    month_salary_from_USD Float64,
    month_salary_to_USD Float64,
    year_salary_from_USD Float64,
    year_salary_to_USD Float64,
)
ENGINE = ReplacingMergeTree
SETTINGS index_granularity = 8192'''

Обход блокировок
Нам нужно обойти защиту Indeed и избежать блокировки по IP. Для этого используем анонимные прокси адреса на сайте free-proxy-list.net. Как собрать свежие прокси, мы писали в нашем предыдущем тексте «Пишем парсер свежих прокси на Python для Selenium». Прокси адреса мы запишем в массив, который понадобится в момент обращения к Indeed, когда запрос будет проверять User-Agent.

Данный метод удаляет IP из списка с прокси в том случае, если ответ от Indeed через него так и не пришел.

def remove_proxy_from_list_and_update_if_required(proxy):
    global _proxies
    _proxies.remove(proxy)
    if len(_proxies) == 0:
        update_proxy_list()

Функция, используя прокси, возвращает нам страницу Indeed, из которой мы впоследствии спарсим данные.

def get_page(updated_url, session):
    proxy = get_proxy()
    proxy_dict = {"http": proxy, "https": proxy}
    logger.info(f'try with proxy: {proxy}')
    try:
        session.proxies = proxy_dict
        return session.get(updated_url, timeout=15)
    except (requests.exceptions.RequestException, requests.exceptions.ProxyError, requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout, requests.exceptions.SSLError,
            requests.exceptions.ConnectionError, url_ex.MaxRetryError, ConnectionResetError,
            socket.timeout, url_ex.ReadTimeoutError):
        remove_proxy_from_list_and_update_if_required(proxy)
        logger.info(f'try with proxy {proxy}')
        return get_page(updated_url, session)

Методы для парсера
Искомые данные нужно будет искать по тегам и атрибутам верстки с помощью BeautifulSoup. Мы заранее собрали ключевые слова, которые нас будут интересовать в вакансиях, и подготовили с ними отдельный датасет.

В карточках вакансий нет точной даты публикации, указано лишь сколько дней назад она была опубликована. Сохраним точную дату публикации в традиционном формате с помощью timedelta.

def raw_date_to_str(raw_date):
    raw_date = raw_date.lower()
    if '+' in raw_date or "более" in raw_date:
        delta = timedelta(days=32)
        return (datetime.now() - delta).strftime("%Y-%m-%d")
    else:
        parts = raw_date.split()
        for part in parts:
            if part.isdigit():
                delta = timedelta(days=part.isdigit())
                return (datetime.now() - delta).strftime("%Y-%m-%d")
    return ""

Сохраним id вакансии в системе Indeed. Подставляя id в URL страницы, мы сможем получить доступ к полному описанию вакансий.

def get_job_id_from_card(card):
    try:
        return card['id'].split('_')[1]
    except:
        return ""

Данный метод соберет названия вакансий.

def get_title_from_card(card):
    try:
        job_title = card.find('a', {'class': 'jobtitle'}).text
        return job_title.replace('\n', '')
    except:
        return ''

Аналогичным образом напишем методы, которые будут собирать данные о названии компании, времени публикации объявления, местоположении работодателя и рейтинге работодателя на портале.

URL сайта Indeed пишется для разных стран по-разному. Для США это будет просто indeed.com, а локализации для других стран получают префиксом xx.indeed.com. Список с префиксами мы собрали в массив заранее из https://opensource.indeedeng.io/api-documentation/docs/supported-countries/ списка Indeed.

def get_link_from_card(card, card_country):
    try:
        if card_country == 'us':
            return f"https://indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
        else:
            return f"https://{card_country}.indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
    except:
        return ""

Спарсим описание вакансии, которое можно найти по тегу ’summary’. Именно там содержатся требования, которые предъявляют к кандидату.

def get_summary_from_card_and_transform_to_skills(card):
    try:
        smr = card.find('div', {'class': 'summary'}).text
        return get_skills_row(smr)
    except:
        return ""
Необходимые hard-skills из описания вакансий будем сверять со списком 'skills'. 
skills = ["python", "tableau", "etl", "power bi", "d3.js", "qlik", "qlikview", "qliksense",
          "redash", "metabase", "numpy", "pandas", "congos", "superset", "matplotlib", "plotly",
          "airflow", "spark", "luigi", "machine learning", "amplitude", "sql", "nosql", "clickhouse",
          'sas', "hadoop", "pytorch", "tensorflow", "bash", "scala", "git", "aws", "docker",
          "linux", "kafka", "nifi", "ozzie", "ssas", "ssis", "redis", 'olap', ' r ', 'bigquery', 'api', 'excel']

Эта функция разобьет ’summary’ на слова пробелом и проверит их на соответствие нашему списку. В датасет будут возвращаться совпадения с нашим списком hard-skills.

def get_skills_row(summary):
    summary = summary.lower()
    row = []
    for sk in skills:
        if sk in summary:
            row.append(sk)
    return ','.join(row)

На выходе мы получим таблицу с примерно 30 тысячами строк.

Полный код проекта можно посмотреть в нашем репозитории на GitHub.

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

Частотный словарь и биграммы по постам инвесторов

Время чтения текста – 16 минут

Тинькофф Инвестиции — сервис от Тинькофф Банка для инвестирования на Московской и Санкт-Петербургской биржах. Внутри сервиса есть социальная сеть «Пульс», где инвесторы любого уровня могут делиться своими опытом, мыслями и планами, комментировать и оценивать чужие посты. Сегодня решим такую задачу:
построим частотный словарь и биграммы по постам пользователей, разделив их по объёму портфеля, чтобы понять, чем отличаются посты людей с разным объёмом инвестиций.

Лента по ценной бумаге в Пульсе выглядит вот так:

У каждого инвестора есть личный профиль. Внутри отображается объём портфеля, статистика прироста за год и число сделок за последний месяц.

Схема в Clickhouse

Для биграмм и частотного словаря достаточно собрать только тексты постов, логины пользователей и объёмы портфеля, но ради спортивного интереса будем хранить ещё и рост портфеля, число сделок человека за месяц и количество оценок под его постом. Для хранения данных получится две таблицы posts и users:

CREATE TABLE tinkoff.posts
(
    `login` String,
    `post` String,
    `likes` Int16
)
ENGINE = MergeTree
ORDER BY login

 CREATE TABLE tinkoff.users
(
    `login` String,
    `volume_prefix` String,
    `volume` String,
    `year_stats_prefix` String,
    `year_stats` String
)
ENGINE = MergeTree()
ORDER BY login

В таблице с пользователями volume_prefix — это префикс «до» или «от», стоящий в объёме портфеля, а volume — сам объём портфеля. Соответственно years_stats_prefix обозначает, портфель за год упал или вырос, а year_stats — на сколько он упал или вырос. Такая схема из двух таблиц с ключом сортировки таблиц по полю login позволит их соединить позднее.

Пишем парсер постов

У Пульса нет своего API, поэтому для парсинга постов будем использовать Selenium.

Мы уже писали про то, как парсить сайты с прокруткой при помощи Selenium

Нам понадобятся следующие библиотеки:

from selenium import webdriver
import time
from webdriver_manager.chrome import ChromeDriverManager
from clickhouse_driver import Client
from bs4 import BeautifulSoup as bs
import pandas as pd
import requests
import os
from lxml import html
import re

Сразу составим список интересующих ценных бумаг: это акции Сбербанка, Газпрома, Яндекса, Лукойла, MailRu, Аэрофлота, Киви, ВТБ, Детского Мира и Ленты. Для каждой бумаги будем переходить на страницу с постами, в которых она упоминается и проматывать страницу, пока длина страницы не станет более 300000: это около одной недели.

driver = webdriver.Chrome(ChromeDriverManager().install())

securities = ['SBER', 'GAZP', 'YNDX', 'LKOH', 'MAIL', 'AFLT', 'QIWI', 'VTBR', 'DSKY', 'LNTA']
        
for security in securities:
    try:
        print(security)
        driver.get(f'https://www.tinkoff.ru/invest/stocks/{security}/pulse/')
        
        page_length = driver.execute_script("return document.body.scrollHeight")
        while page_length < 300000:
            driver.execute_script(f"window.scrollTo(0, {page_length - 1000});")
            page_length = driver.execute_script("return document.body.scrollHeight")

После забираем себе весь код полученной страницы и извлекаем нужную информацию: логины, текст постов и число лайков. Формируем из них DataFrame и записываем его в папку data.

source_data = driver.page_source
        soup = bs(source_data, 'lxml')
        
        posts = soup.find_all('div', {'class':'PulsePostCollapsed__text_1ypMP'})
        logins = soup.find_all('div', {'class':'PulsePostAuthor__nicknameLink_19Aca'})
        likes = soup.find_all('div', {'class':'PulsePostBody__likes_3qcu0'})
        
        logins = [login.text for login in logins]
        posts = [post.text for post in posts]
        likes = [like.text.split()[0] for like in likes]
        
        df_posts = pd.DataFrame()
        df_posts['login'] = logins
        df_posts['post'] = posts
        df_posts['likes'] = likes
        
        df_posts.to_csv(f'data/{security}.csv', index=False)
        
        print(f'SAVED {security}')
    except Exception as E:
        print(E)

После того, как нужные посты собраны, отправим их в таблицу posts в Clickhouse. При помощи модуля os переходим в директорию data и собираем в список all_files названия всех файлов в ней — это все csv-таблицы, которые мы спарсили. Затем по очереди читаем файл в DataFrame и вставляем в posts.

client = Client(host='', user='', password='', port='9000', database='tinkoff')

os.chdir('data')
all_files = os.listdir()

for file in all_files:
    df = pd.read_csv(file)
    client.execute("INSERT INTO posts VALUES", df.to_dict('records'))

Собираем информацию о профилях

Чтобы собрать все профили, получим уникальный список логинов из базы:

flatten = lambda t: [item for sublist in t for item in sublist]
logins = flatten(client.execute("SELECT DISTINCT login FROM posts"))

Получать посты будем request-запросом, без Selenium, ведь ничего листать уже не нужно. Но иконка падения или роста портфеля за год не является текстом и получить её нельзя, зато внутри CSS-стилей можно увидеть её цвет — его мы и будем сохранять себе.

Поэтому опишем такую функцию: она примет объект soup и извлечёт цвет иконки.

headers = {'accept': '*/*',
           'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def up_or_down_color(soup):
    string = str(soup.find('div', {'class':'Icon__container_3u7WK'}))
    start_index = string.find('style')
    color = string[start_index + 13:start_index + 20]
    return color

А теперь напишем парсер, который проходит по списку логинов, для каждого отправляет запрос и собирает всю статистику профиля. Причём если цвет иконки роста зеленый, то в поле year_stats_prefix добавим «+», иначе «-». В конце сделаем паузу на 0.2 секунды на всякий случай, чтобы не напороться на неявные ограничения.

session = requests.Session()

login_list = []
volume_list = []
volume_prefix_list = []
year_stats_list = []
year_stats_prefix_list = []

count = 0
for login in logins:
    print(count, '/', len(logins))
    
    try:
        count += 1
        if login == 'blocked_user':
            continue

        url = f'https://www.tinkoff.ru/invest/social/profile/{login}'
        request = session.get(url, headers=headers)
        soup = bs(request.content, 'lxml')

        try:
            login = soup.find('div', {'class':'ProfileHeader__nickname_1oynx'}).text
            volume = soup.find('span', {'class':'Money__money_3_Tn4'}).text
            _to = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('До')
            _from = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('От')
            year_stats = soup.find('div', {'class':'ProfileHeader__statisticsItem_1HPLt'}).text
            color = up_or_down_color(soup)
        except AttributeError as E:
            print(login, E)
            continue
        volume_list.append(volume)
        login_list.append(login)

        if _to == -1:
            volume_prefix_list.append('до')
        else:
            volume_prefix_list.append('от')

        year_stats_list.append(re.findall(r'\d+.+', year_stats)[0])

        if color == '#22a053':
            year_stats_prefix_list.append('-')
        elif color == '#dd5656':
            year_stats_prefix_list.append('+')
        else:
            year_stats_prefix_list.append('')
    except Exception as E:
        print(E)
        continue
    time.sleep(0.2)

Собираем все аккаунты и статистику по ним в DataFrame. Их тоже сохраним себе в базу.

df_users = pd.DataFrame()
df_users['login'] = login_list
df_users['volume_prefix'] = volume_prefix_list
df_users['volume'] = volume_list
df_users['year_stats_prefix'] = year_stats_prefix_list
df_users['year_stats'] = year_stats_list

client.execute("INSERT INTO users VALUES", df_users.to_dict('records'))

А теперь сделаем LEFT JOIN таблицы с постами к таблице с пользователями, чтобы у каждой строки с постом была ещё статистика по аккаунту автора. Запишем результат в DataFrame.

posts_with_users = client.execute('''
    SELECT login, post, likes, volume_prefix, volume, year_stats_prefix, year_stats FROM posts
    LEFT JOIN users
    ON posts.login = users.login
''')
posts_with_users_df = pd.DataFrame(posts_with_users, columns=['login', 'post', 'likes', 'volume_prefix', 'volume', 'year_stats_prefix', 'year_stats'])

Полученный результат будет выглядеть так:

Частотный словарь и биграммы

Для начала составим частотный словарь по постам без разделений на группы.

posts_with_users_df.post.str.split(expand=True).stack().value_counts()

Получим, что предлоги и союзы превалируют над остальными словами:

Значит, их нужно убрать из сета данных. Но даже в том случае, если данные будут очищены, получим что-то подобное. Такие данные ни о чём не говорят, и никаких выводов сделать не удастся.

В таком случае попробуем построить биграммы. Одна биграмма — последовательность из двух элементов, то есть два слова, стоящие рядом друг с другом. Существует много алгоритмов построения n-грамм разной степени оптимизации, мы воспользуемся встроенной функцией в nltk и разберём пример построения биграмм для одной группы. Первым делом импортируем дополнительные библиотеки, загружаем stopwords для русского языка и чистим данные. В список стоп-слов вносим дополнительные: среди них будут и тикеры акций, которые встречаются в каждом посте.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
import unicodedata
import collections

nltk.download("stopwords")
nltk.download('punkt')
russian_stopwords = stopwords.words("russian")
append_stopword = ['это', 'sber', 'акция', 'компания', 'aflt', 'gazp', 'yndx', 'lkoh', 'mail', 'год', 'рынок', 'https', 'млрд', 'руб', 'www', 'кв']
russian_stopwords.extend(append_stopword)

Опишем функцию для подготовки текста, которая переведёт все слова в нижний регистр, приведёт к нормальной форме, удалит стоп-слова и пунктуацию:

mystem = Mystem() 

def preprocess_text(text):
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and token.strip() not in punctuation]
    
    text = " ".join(tokens)
    
    return text

posts_with_users_df.post = posts_with_users_df.post.apply(preprocess_text)

Для примера возьмём посты группы инвесторов с объёмом портфеля до 10 тысяч рублей и построим биграммы, а затем выведем самые частые:

up_to_10k_df = posts_with_users_df[(posts_with_users_df['volume_prefix'] == 'от') & (posts_with_users_df['volume'] == '10 000 ₽')]

up_to_10k_counts = collections.Counter()
for sent in up_to_10k_df["post"]:
    words = nltk.word_tokenize(sent)
    up_to_10k_counts.update(nltk.bigrams(words))
up_to_10k_counts.most_common()

Получаем такой список:

Результаты исследования биграмм

В группе с объёмом портфеля до 10 и 100 тысяч руб. инвесторы чаще пишут о личном опыте и полученной прибыли: на это указывают биграммы «чистая прибыль» и «финансовый результат».

До 10 000 руб:

  1. добрый утро, 44
  2. цена нефть, 36
  3. неквалифицированный инвестор, 36
  4. чистый прибыль, 32
  5. шапка профиль, 30
  6. московский биржа, 30
  7. совет директор, 28

До 100 000 руб:

  1. чистый прибыль, 80
  2. финансовый результат, 67
  3. добрый утро, 66
  4. индекс мосбиржа, 63
  5. цена нефть, 58
  6. квартал 2020, 42
  7. мочь становиться 41

В группе до 500 тысяч руб. впервые появляются биграммы со словами «подписываться», «выкладывать», «новость» — инвесторы с такими характеристиками портфеля часто заводят собственные блоги об инвестировании и продвигают их через посты в Пульсе.

До 500 000 руб:

  1. чистый прибыль, 169
  2. квартал 2020, 154
  3. отчетность 3, 113
  4. выкладывать новость, 113
  5. 🤝подписываться, 80
  6. подписываться выкладывать, 80
  7. публиковать отчёт, 80
  8. цена нефть, 76
  9. колво бумага, 69
  10. вес портфель, 68

В биграммах группы с объёмом портфеля до и от 1 миллиона руб. появляется «фьючерс», что логично — это сложный инструмент, который обычно не рекомендуется новичкам. Кроме того, в постах группы проходит больше обсуждений отчётностей компаний — это биграммы «финансовый отчетность», «опубликовать финансовый», «отчетность мсфо».

До 1 000 000 руб:

  1. 3 квартал, 183
  2. квартал 2020, 157
  3. фьчерс утро, 110
  4. финансовый отчетность, 107
  5. опубликовать финансовый, 104
  6. чистый прибыль, 75
  7. наш биржа, 72
  8. отчетность мсфо, 69
  9. цена нефть, 67
  10. операционный результат, 61
  11. ноябрьский фьючерс, 54
  12. азиатский площадка, 51

От 1 000 000 руб:

  1. октябрь опубликовывать, 186
  2. 3 квартал, 168
  3. квартал 2020, 168
  4. финансовый отчетность, 159,
  5. опубликовывать финансовый, 95
  6. чистый прибыль, 94
  7. операционный результат, 86
  8. целевой цена, 74
  9. опубликовывать операционный, 63
  10. цена повышать, 60
 2 комментария    120   2020   clickhouse   Data Analytics   data science   nltk   python

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

Анализ рынка вакансий аналитики и BI: дашборд в Tableau

Время чтения текста – 16 минут

По данным рейтинга SimilarWeb, hh.ru — третий по популярности сайт о трудоустройстве в мире. В одном из разговоров с Ромой Буниным у нас появилась идея сделать совместный проект: собрать данные из открытого HeadHunter API и визуализировать их при помощи Tableau Public. Нам захотелось понять, как меняется зарплата в зависимости от указанных в вакансии навыков, наименования позиции и сравнить, как обстоят дела в Москве, Санкт-Петербурге и регионах.

Как мы собирали данные?

Схема данных основана на коротком представлении вакансии, которую возвращает метод GET /vacancies. Из представления собираются следующие поля: тип вакансии, идентификатор, премиальность вакансии, необходимость прохождения тестирования, адрес компании, информация о зарплате, график работы и другие. Соответствующий CREATE-запрос для таблицы:


Запрос создания таблицы vacancies_short

CREATE TABLE headhunter.vacancies_short
(
    `added_at` DateTime,
    `query_string` String,
    `type` String,
    `level` String,
    `direction` String,
    `vacancy_id` UInt64,
    `premium` UInt8,
    `has_test` UInt8,
    `response_url` String,
    `address_city` String,
    `address_street` String,
    `address_building` String,
    `address_description` String,
    `address_lat` String,
    `address_lng` String,
    `address_raw` String,
    `address_metro_stations` String,
    `alternate_url` String,
    `apply_alternate_url` String,
    `department_id` String,
    `department_name` String,
    `salary_from` Nullable(Float64),
    `salary_to` Nullable(Float64),
    `salary_currency` String,
    `salary_gross` Nullable(UInt8),
    `name` String,
    `insider_interview_id` Nullable(UInt64),
    `insider_interview_url` String,
    `area_url` String,
    `area_id` UInt64,
    `area_name` String,
    `url` String,
    `published_at` DateTime,
    `employer_url` String,
    `employer_alternate_url` String,
    `employer_logo_urls_90` String,
    `employer_logo_urls_240` String,
    `employer_logo_urls_original` String,
    `employer_name` String,
    `employer_id` UInt64,
    `response_letter_required` UInt8,
    `type_id` String,
    `type_name` String,
    `archived` UInt8,
    `schedule_id` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY vacancy_id

Первый скрипт собирает данные с HeadHunter по API и отправляет их в Clickhouse. Он использует следующие библиотеки:

import requests
from clickhouse_driver import Client
from datetime import datetime
import pandas as pd
import re

Далее загружаем таблицу с запросами и подключаемся к CH:

queries = pd.read_csv('hh_data.csv')
client = Client(host='1.234.567.890', user='default', password='', port='9000', database='headhunter')

Таблица queries хранит список поисковых запросов. Она содержит следующие колонки: тип запроса, уровень вакансии для поиска, направление вакансии и саму поисковую фразу. В строку с запросом можно помещать логические операторы: например, чтобы найти вакансии, в которых должны присутствовать ключевые слова «Python», «data» и «анализ» между ними можно указать логическое «И».

Не всегда вакансии в выдаче соответствуют ожиданиям: случайно в базу могут попасть повара, маркетологи и администраторы магазина. Чтобы этого не произошло, опишем функцию check_name(name) — она будет принимать наименование вакансии и возвращать True в случае, если вакансия не подошла по названию.

def check_name(name):
    bad_names = [r'курьер', r'грузчик', r'врач', r'менеджер по закупу',
           r'менеджер по продажам', r'оператор', r'повар', r'продавец',
          r'директор магазина', r'директор по продажам', r'директор по маркетингу',
          r'кабельщик', r'начальник отдела продаж', r'заместитель', r'администратор магазина', 
          r'категорийный', r'аудитор', r'юрист', r'контент', r'супервайзер', r'стажер-ученик', 
          r'су-шеф', r'маркетолог$', r'региональный', r'ревизор', r'экономист', r'ветеринар', 
          r'торговый', r'клиентский', r'начальник цеха', r'территориальный', r'переводчик', 
          r'маркетолог /', r'маркетолог по']
    for item in bad_names:
        if re.match(item, name):
            return True

Затем объявляем бесконечный цикл — мы собираем данные без перерыва. Идём по DataFrame queries и сразу забираем оттуда тип вакансии, уровень, направление и поисковый запрос в отдельные переменные. Сначала по ключевому слову отправляем один запрос к методу /GET vacancies и получаем количество страниц. После идём от нулевой до последней страницы, отправляем те же запросы и заполняем список vacancies_from_response с полученными в выдаче короткими представлениями всех вакансий. В параметрах указываем 10 вакансий на страницу — больше ограничения HH API получить не позволяют. Так как мы не указали параметр area, API возвращает вакансии по всему миру.

while True:
   for query_type, level, direction, query_string in zip(queries['Тип'], queries['Уровень'], queries['Направление'], queries['Ключевое слово']):
           print(f'ключевое слово: {query_string}')
           url = 'https://api.hh.ru/vacancies'
           par = {'text': query_string, 'per_page':'10', 'page':0}
           r = requests.get(url, params=par).json()
           added_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
           pages = r['pages']
           found = r['found']
           vacancies_from_response = []

           for i in range(0, pages + 1):
               par = {'text': query_string, 'per_page':'10', 'page':i}
               r = requests.get(url, params=par).json()
               try:
                   vacancies_from_response.append(r['items'])
               except Exception as E:
                   continue

Теперь проходим по каждой вакансии на каждой странице двойным итератором. Сперва отправим запрос к Clickhouse и проверим, нет ли уже в базе вакансии с таким идентификатором и таким поисковым запросом. Если проверка пройдена — проверяем название вакансии. В случае неудачи переходим к следующей.

for item in vacancies_from_response:
               for vacancy in item:
                   if client.execute(f"SELECT count(1) FROM vacancies_short WHERE vacancy_id={vacancy['id']} AND query_string='{query_string}'")[0][0] == 0:
                       name = vacancy['name'].replace("'","").replace('"','')
                       if check_name(name):
                           continue

Теперь проходим по вакансии и собираем все нужные поля. В случае отсутствия некоторых данных будем отправлять пустые строки:


Код для сбора данных о вакансии

vacancy_id = vacancy['id']
                       is_premium = int(vacancy['premium'])
                       has_test = int(vacancy['has_test'])
                       response_url = vacancy['response_url']
                       try:
                           address_city = vacancy['address']['city']
                           address_street = vacancy['address']['street']
                           address_building = vacancy['address']['building']
                           address_description = vacancy['address']['description']
                           address_lat = vacancy['address']['lat']
                           address_lng = vacancy['address']['lng']
                           address_raw = vacancy['address']['raw']
                           address_metro_stations = str(vacancy['address']['metro_stations']).replace("'",'"')
                       except TypeError:
                           address_city = ""
                           address_street = ""
                           address_building = ""
                           address_description = ""
                           address_lat = ""
                           address_lng = ""
                           address_raw = ""
                           address_metro_stations = ""
                       alternate_url = vacancy['alternate_url']
                       apply_alternate_url = vacancy['apply_alternate_url']
                       try:
                           department_id = vacancy['department']['id']
                       except TypeError as E:
                           department_id = ""
                       try:
                           department_name = vacancy['department']['name']
                       except TypeError as E:
                           department_name = ""
                       try:
                           salary_from = vacancy['salary']['from']
                       except TypeError as E:
                           salary_from = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_to = vacancy['salary']['to']
                       except TypeError as E:
                           salary_to = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_currency = vacancy['salary']['currency']
                       except TypeError as E:
                           salary_currency = ""
                       try:
                           salary_gross = int(vacancy['salary']['gross'])
                       except TypeError as E:
                           salary_gross = "cast(Null as Nullable(UInt8))"
                       try:
                           insider_interview_id = vacancy['insider_interview']['id']
                       except TypeError:
                           insider_interview_id = "cast(Null as Nullable(UInt64))"
                       try:
                           insider_interview_url = vacancy['insider_interview']['url']
                       except TypeError:
                           insider_interview_url = ""
                       area_url = vacancy['area']['url']
                       area_id = vacancy['area']['id']
                       area_name = vacancy['area']['name']
                       url = vacancy['url']
                       published_at = vacancy['published_at']
                       published_at = datetime.strptime(published_at,'%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S')
                       try:
                           employer_url = vacancy['employer']['url']
                       except Exception as E:
                           print(E)
                           employer_url = ""
                       try:
                           employer_alternate_url = vacancy['employer']['alternate_url']
                       except Exception as E:
                           print(E)
                           employer_alternate_url = ""
                       try:
                           employer_logo_urls_90 = vacancy['employer']['logo_urls']['90']
                           employer_logo_urls_240 = vacancy['employer']['logo_urls']['240']
                           employer_logo_urls_original = vacancy['employer']['logo_urls']['original']
                       except Exception as E:
                           print(E)
                           employer_logo_urls_90 = ""
                           employer_logo_urls_240 = ""
                           employer_logo_urls_original = ""
                       employer_name = vacancy['employer']['name'].replace("'","").replace('"','')
                       try:
                           employer_id = vacancy['employer']['id']
                       except Exception as E:
                           print(E)
                       response_letter_required = int(vacancy['response_letter_required'])
                       type_id = vacancy['type']['id']
                       type_name = vacancy['type']['name']
                       is_archived = int(vacancy['archived'])

Последнее поле — график работы. В случае, если вакансия подразумевает вахтовый метод работы она нам точно не подходит.

try:
    schedule = vacancy['schedule']['id']
except Exception as E:
    print(E)
    schedule = ''
if schedule == 'flyInFlyOut':
    continue

Теперь формируем список из полученных переменных, заменяем в нём None-значения на пустые строки во избежании конфликтов с Clickhouse и вставляем строку в таблицу.

vacancies_short_list = [added_at, query_string, query_type, level, direction, vacancy_id, is_premium, has_test, response_url, address_city, address_street, address_building, address_description, address_lat, address_lng, address_raw, address_metro_stations, alternate_url, apply_alternate_url, department_id, department_name,
salary_from, salary_to, salary_currency, salary_gross, insider_interview_id, insider_interview_url, area_url, area_name, url, published_at, employer_url, employer_logo_urls_90, employer_logo_urls_240,  employer_name, employer_id, response_letter_required, type_id, type_name, is_archived, schedule]
for index, item in enumerate(vacancies_short_list):
    if item is None:
        vacancies_short_list[index] = ""
tuple_to_insert = tuple(vacancies_short_list)
print(tuple_to_insert)
client.execute(f'INSERT INTO vacancies_short VALUES {tuple_to_insert}')

Как подключили Tableau к данным?

Tableau Public не умеет работать с базами данных, поэтому мы написали коннектор Clickhouse к Google Sheets. Он использует библиотеки gspread и oauth2client для авторизации в Google Spreadsheets API и библиотеку schedule для ежедневной работы по графику.

Работа с Google Spreadseets API подробно разобрана в материале «Собираем данные по рекламным кампаниям ВКонтакте»

import schedule
from clickhouse_driver import Client
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='headhunter')
creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
gc = gspread.authorize(creds)

Опишем функцию update_sheet() — она будет брать все данные из Clickhouse и вставлять их в таблицу Google Docs.

def update_sheet():
   print('Updating cell at', datetime.now())
   columns = []
   for item in client.execute('describe table headhunter.vacancies_short'):
       columns.append(item[0])
   vacancies = client.execute('SELECT * FROM headhunter.vacancies_short')
   df_vacancies = pd.DataFrame(vacancies, columns=columns)
   df_vacancies.to_csv('vacancies_short.csv', index=False)
   content = open('vacancies_short.csv', 'r').read()
   gc.import_csv('1ZWS2kqraPa4i72hzp0noU02SrYVo0teD7KZ0c3hl-UI', content.encode('utf-8'))

Чтобы скрипт запускался в 16:00 по МСК каждый день используем библиотеку schedule:

schedule.every().day.at("13:00").do(update_sheet)
while True:
   schedule.run_pending()

А что в результате?

Рома построил на полученных данных дашборд.

И в youtube-ролике рассказывает о том, как эффективно использовать дашборд

Инсайты, которые можно извлечь из дашборда

  1. Аналитики с навыком бизнес-аналитики востребованы на рынке больше всего: по такому запросу нашлось больше всего вакансий. Тем не менее, средняя зарплата выше у продуктовых аналитиков и аналитиков BI.
  2. В Москве средние зарплаты выше на 10-30 тысяч рублей, чем в Санкт-Петербурге и на 30-40 тысячи рублей, чем в регионах. Там же работы нашлось больше всего в России.
  3. Самые высокооплачиваемые должности: руководитель отдела аналитики (в среднем, 110 тыс. руб. в месяц), инженер баз данных (138 тыс. руб. в месяц) и директор по машинному обучению (250 тыс. руб. в месяц).
  4. Самые полезные навыки на рынке — владение Python c библиотеками pandas и numpy, Tableau, Power BI, Etl и Spark. Вакансий с такими требованиями больше и зарплаты в них указаны выше прочих. Для Python-программистов знание matplotlib ценится на рынке выше, чем владение plotly.

Полный код проекта доступен на GitHub

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

Использование словарей в Clickhouse на примере данных Untappd

Время чтения текста – 15 минут

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

Создаём материализованное представление в Clickhouse

Время чтения текста – 10 минут

В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.

Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.

Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:

cd /etc/clickhouse-server
sudo nano config.xml

Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.

Раскоментируем строку

<listen_host>0.0.0.0</listen_host>

чтобы доступ к базе данных был с любого IP-адреса:

Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:

CREATE DATABASE db1
USE db1

Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем CollapsingMergeTree: он будет удалять дубликаты по ключу сортировки:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = CollapsingMergeTree
ORDER BY (date_start, date_stop)

И сразу создадим материализованное представление:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

Подробности рецепта можно посмотреть в блоге Clickhouse.

К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.

Скрипт
Начнём писать скрипт. Понадобится новая библиотека — clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:

В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook». Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port — 9000 и в database базу данных для подключения.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:

client.execute('SHOW DATABASES')

В случае успеха получим на экране такой список:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

Напишем вот такой запрос, получающий все колонки таблицы за это время:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

И выполним запрос, поместив информацию в список old_data_list. А затем поменяем всем sign на -1 и добавим в new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком CollapsingMergeTree и выполняем INSERT новых данных со знаком sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:

И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:

Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а CollapsingMergeTree — чистить дубликаты.

Устанавливаем Clickhouse на AWS

Время чтения текста – 7 минут

Сегодня поработаем с Clickhouse — поставим его на личную бесплатную машину с Amazon Web Services.

Аккаунт на AWS и машина на Ubuntu
Проще всего Clickhouse установить из deb пакетов на машину под управлением Ubuntu. Сегодня вовсе необязательно иметь такую под рукой — достаточно создать годовой бесплатный аккаунт на Amazon Web Services, который выделит нам нужную машину. Переходим на https://aws.amazon.com и создаём аккаунт и попадаем в Dashboard. В разделе «Build a solution» идём в «Launch a virtual machine» и подбираем виртуальную машину. Нам подойдёт та, на которой установлен Ubuntu Server.

Заодно создаём key pair — пара состоит из публичного ключа AWS и приватного ключа-файла. Последний нужно сохранить у себя на компьютере для подключения к машине.

После откроется консоль EC2, где уже будет запущен наш Instance виртуальной машины. У него есть public dns — сохраняем его.

Подключаемся через Termius
Подключаться к виртуальной машине будем через протокол удалённого доступа ssh. Есть много клиентов, поддерживающих этот протокол — я буду использовать Termius. Жмём на «+ NEW HOST» и заполняем информацию.
В поле address вводим наш public dns, который заранее сохранили. В Username вводим «ubuntu», поле пароля оставляем пустым. Чтобы заполнить Key нужно добавить новый ключ — то есть, указать путь к файлу с расширением .pem, который мы получили при создании новой виртуальной машины. Должно получиться что-то такое:

Сразу после авторизации подключаемся к машине и получаем новый экран с консолью:

Для начала установим Clickhouse — это быстро. Выполним следующую команду, чтобы добавить репозиторий Clickhouse к нашим APT репозиториям:

Подробнее со всеми способами установки Clickhouse можно ознакомиться в документации

echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list

Обязательно обновляем пакеты:

sudo apt-get update

Наконец, установим клиент и сервер командой:

sudo apt-get install -y clickhouse-server clickhouse-client

Готово! Клиент и сервер Clickhouse установлены на виртуальной машине. Запустим сервер, чтобы подсоединиться позже через клиент:

sudo service clickhouse-server start

И проверим успешность запуска командой:

sudo service clickhouse-server status

Получим примерно такой результат:

Clickhouse установлен! Для подключения к клиенту необходимо ввести:

clickhouse-client

Чтобы убедиться, что всё работает как надо, документация предлагает ввести запрос

SELECT 1

Вот, что мы должны получить:

Готово! А в следующем материале поработаем в связке Python + Clickhouse: вернёмся к нашему скрипту, который получает информацию по рекламным кампаниям и сделаем так, чтобы эти данные собирались в таблицу и материализованное представление.

Clickhouse в качестве consumer для Amazon MSK

Время чтения текста – 6 минут

Disclaimer: заметка носит технический характер, поэтому может быть интересна меньшему числу лиц с бизнес-бэкграундом.

В этом блоге еще ни разу не затрагивалась тема Clickhouse, одной из самых быстрых баз данных от компании Яндекс. Краткая справка без деталей: Clickhouse — наиболее эффективно написанная СУБД колоночного типа с точки зрения программного кода, информация о СУБД довольно подробно описана в документации и во множестве видео на Youtube (раз, два, три).

В своей практике последние четыре года я использовал Clickhouse в качестве аналитика и эксперта по построению аналитической отчетности. В основном для решения задачи визуализации отчетности / отчетов с параметрами / дашбордов использовался Redash как наиболее удобный интерфейс для доступа к данным Clickhouse.
Однако совсем недавно в Looker, о котором я рассказывал ранее, появилась возможность подключить Clickhouse в качестве источника данных. Следует заметить, что подключение к Clickhouse в Tableau существует уже довольно давно.

Архитектура аналитического сервиса, в основе которого лежит Clickhouse, в основном облачная. И в рассматриваемой задаче было именно так. Предположим, у вас существует выделенный instance EC2 в Amazon (на который вы установили Clickhouse) и отдельный Kafka-кластер (решение Amazon MSK).

Задача: подключить Clickhouse в качестве consumer для получения информации с брокеров вашего кластера Kafka. На самом деле, в документации на сайте Amazon MSK довольно подробно описано как именно подключаться к кластеру Kafka, не буду дублировать эту информацию. В моем случае гайд помог: топики создавались продюсером с машины, на которой установлен Clickhouse и с неё читались консюмером.

Но возникла проблема: при подключении Clickhouse к Kafka в качестве консюмера, происходила следующая ошибка:

020.02.02 18:01:56.209132 [ 46 ] {e7124cd5-2144-4b1d-bd49-8a410cdbd607} <Error> executeQuery: std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 20.1.2.4 (official build) (from 127.0.0.1:46586) (in query: SELECT * FROM events), Stack trace (when copying this message, always include the lines below):

Продолжительное время я искал информацию в документации Clickhouse о том, что может вызывать эту ошибку, но не смог ничего найти. Следующей мыслью стала проверка работы локального брокера Kafka с той же машины. Установил клиент Kafka, подключил Clickhouse, отправил данные в топик и Clickhouse с легкостью их прочитал, т. е. консюмер Clickhouse работает с локальным брокером, а значит и вовсе работает.

Поговорив со всеми своими знакомыми экспертами в области инфраструктуры и Clickhouse (Вася, Макс, привет!), с ходу мы не смогли определить в чем проблема. Проверили firewall, настройки сети, все было открыто. Подтверждалось также тем, что с локальной машины можно было отправить в топик удаленного брокера Kafka сообщения командой bin/kafka-console-producer.sh и прочитать оттуда же bin/kafka-console-consumer.sh.

Затем мне пришла в голову мысль обратиться к главному гуру и разработчику Clickhouse — Алексею Миловидову. Алексей с радостью постарался ответить на возникшие вопросы и предложил ряд гипотез, которые мы проверили (трассировку сетевых подключений и т. п.), однако и после более низкоуровневого аудита проблему локализовать не удалось. Тогда Алексей посоветовал обратиться к Михаилу Филимонову из компании Altinity. Михаил оказался очень отзывчивым экспертом и одну за другой предлагал гипотезы для проверки (параллельно подсказывая как именно будет лучше их проверить).

В итоге в ходе совместных усилий мы обнаружили, что проблема возникает у библиотеки librdkafka, так как другой пакет kafkacat, который использует эту же библиотеку отваливается от подключения к брокеру с той же проблемой (Local: timed out).

После изучения подключения через bin/kafka-console-consumer.sh и параметров подключения, Михаил посоветовал добавить такую строку в /etc/clickhouse-server/config.xml:

<kafka><security_protocol>ssl</security_protocol></kafka>

И, о чудо! Clickhouse подключился к кластеру и вытянул необходимые данные с брокера.

Надеюсь, этот рецепт и мой опыт позволит вам сэкономить время и силы на изучение похожей проблемы :)