43 заметки с тегом

python

Позднее Ctrl + ↑

Пишем парсер свежих прокси на Python для Selenium

Время чтения текста – 6 минут

Случается такое, что во время парсинга страниц через Selenium можно словить бан по IP-адресу. Чтобы этого избежать, лучше использовать прокси. Сегодня напишем скрипт, который сам спарсит новые прокси, проверит их и в случае успеха передаст в Selenium.

Парсинг новых прокси

Начнём с импортирования библиотек — нам понадобятся модули для отправления запросов, для парсинга и хранения данных.

import requests_html
from bs4 import BeautifulSoup
import pickle
import requests

Все прокси будем хранить в множестве px_list, а также отправлять в pickle-файл proxis.pickle. В случае, если он не будет пустым, попробуем взять из него данные.

px_list = set()
try:
    with open('proxis.pickle', 'rb') as f:
            px_list = pickle.load(f)
except:
    pass

Функция scrap_proxy() будет заходить на сайт free-proxy-list.net и собирать оттуда 20 последних прокси. На сайте новые адреса появляются ежеминутно. Вот, как выглядит интересующая нас область сайта:

Из всего этого будем собирать ID Address и Port. Посмотрим, как элементы расположены в коде страницы:

Все нужные данные являются ячейками таблицы. В цикле будем брать первые 20 строк, обращаясь к IP-адресу и порту по xpath. В конце функция будет отправлять свежие прокси в pickle-файл и возвращать список прокси.

def scrap_proxy():  
    global px_list
    px_list = set()

    session = requests_html.HTMLSession()
    r = session.get('https://free-proxy-list.net/')
    r.html.render()
    for i in range(1, 21):
        add=r.html.xpath('/html/body/section[1]/div/div[2]/div/div[2]/div/table/tbody/tr[{}]/td[1]/text()'.format(i))[0]
        port=r.html.xpath('/html/body/section[1]/div/div[2]/div/div[2]/div/table/tbody/tr[{}]/td[2]/text()'.format(i))[0]
        px_list.add(':'.join([add, port]))

    print("---New proxy scraped, left: " + str(len(px_list)))
    with open('proxis.pickle', 'wb') as f:
        pickle.dump(px_list, f)
    return px_list

Проверка полученных прокси

Не всегда свежие прокси оказываются рабочими: мы напишем функцию, которая сама отправит get-запрос к сайту Google с прокси и в случае появления любой ошибки будет возвращать False. В случае, если прокси оказался рабочим, функция вернёт True.

def check_proxy(px):
    try:
        requests.get("https://www.google.com/", proxies = {"https": "https://" + px}, timeout = 3)
    except Exception as x:
        print('--'+px + ' is dead: '+ x.__class__.__name__)
        return False
    return True

Основная функция

Главная функция скрипта будет принимать в аргумент переменную scrap, по умолчанию принимающую False. Мы будем собирать новые прокси только в том случае, если scrap == True или длина списка прокси менее 6. Затем в цикле while True собираем новые прокси, берём последний, проверяем его и в случае, если check_proxy вернёт True, отправляем прочие прокси в pickle-файл и возвращаем рабочий адрес и порт.

def get_proxy(scrap = False):
    global px_list
    if scrap or len(px_list) < 6:
            px_list = scrap_proxy()
    while True:
        if len(px_list) < 6:
            px_list = scrap_proxy()
        px = px_list.pop()
        if check_proxy(px):
            break
    print('-'+px+' is alive. ({} left)'.format(str(len(px_list))))
    with open('proxis.pickle', 'wb') as f:
            pickle.dump(px_list, f)
    return px

Используем скрипт с Selenium

А ещё мы писали, как через Selenium имитировать нажатие кнопки и скроллинг каталога интернет-магазина

Чтобы к скрипту Selenium подключить прокси, импортируем функцию get_proxy. Заходим в бесконечный цикл, в переменную PROXY запишем свежие полученные прокси и, используя опции браузера, добавим наши прокси и инициируем новый webdriver с обновленными опциями. Затем пробуем зайти на сайт, добавить свои cookie и в случае успеха выходим из цикла оператором break. Если новый прокси всё равно оказался нерабочим или вылезла капча, в цикле получим новые прокси и повторим, пока не получится.

from px_scrap import get_proxy

while True:
    PROXY = get_proxy(scrap=True)
    options.add_argument('--proxy-server=%s' % PROXY)
    driver = webdriver.Chrome(chrome_options=options, executable_path=os.path.abspath("chromedriver"))
    try:
        driver.get('https://google.com')
        driver.add_cookie(cookies)
    except:
        print('Captcha!')
 Нет комментариев    1231   2020   proxy   python   selenium

Как построить красивый waterfall chart в Python?

Время чтения текста – 6 минут

Когда-то давно в 2014ом году для одной из презентаций о рынке e-commerce в Юлмарте мы строили широко известную во всем мире консалтинга диаграмму Waterfall, средствами Excel. В этом материале построим Waterfall chart средствами Python — она наглядно демонстрирует изменения с появлением нового положительного или отрицательного фактора. Для построения диаграммы будем использовать библиотеку plotly.
Для тех, кто пропустил: в цикле материалов о визуализации данных на Python мы уже пробовали строить диаграмму Градусник — она полезна, когда мы хотим сравнить, как соотносятся ожидаемые и реальные данные.

В качестве данных используем сведенную в Юлмарте информацию об изменении объёма рынка e-commerce с 2013 по 2014 год. Данные по оси X — подписи к каждому столбцу, по Y — начальные, итоговые значения и их изменения. Функцией sum() посчитаем итог и добавим его в конец списка. Тег <br> в списке x_list означает перенос строки.

import plotly.graph_objects as go

x_list = ['2013','Макроэкономика РФ','Сокращение численности<br>трудоспобного населения','Проникновение интернета','Развитие трансграничной<br>торговли', 'Федеральные компании', '2014']
y_list = [738.5, 48.7, -7.4, 68.7, 99.7, 48.0]
total = round(sum(y_list))
y_list.append(total)

Создадим список text_list — это те самые значения столбцов. Они берутся из списка y_list, но сперва их нужно немного обработать: переведём все числа в строки и если это столбец с изменением, то есть любой столбец, кроме первого и последнего, добавим к строке знак «плюс» для наглядности. А ещё в случае положительного изменения поменяем цвет на зелёный и на красный в обратном случае. Первому и последнему значению прибавим жирности к шрифту тегом <b>.

text_list = []
for index, item in enumerate(y_list):
    if item > 0 and index != 0 and index != len(y_list) - 1:
        text_list.append(f'+{str(y_list[index])}')
    else:
        text_list.append(str(y_list[index]))
for index, item in enumerate(text_list):
    if item[0] == '+' and index != 0 and index != len(text_list) - 1:
        text_list[index] = '<span style="color:#2ca02c">' + text_list[index] + '</span>'
    elif item[0] == '-' and index != 0 and index != len(text_list) - 1:
        text_list[index] = '<span style="color:#d62728">' + text_list[index] + '</span>'
    if index == 0 or index == len(text_list) - 1:
        text_list[index] = '<b>' + text_list[index] + '</b>'

Для того, чтобы поместить на фон пунктирные линии, необходимо задать их параметры. Сделаем список словарей и положим в него пунктирные линии светло-серого цвета с положением по Y от 0 до 1000 с шагом в 200.

dict_list = []
for i in range(0, 1200, 200):
    dict_list.append(dict(
            type="line",
            line=dict(
                 color="#666666",
                 dash="dot"
            ),
            x0=-0.5,
            y0=i,
            x1=6,
            y1=i,
            line_width=1,
            layer="below"))

Теперь зададим диаграмму — она лежит в методе Waterfall(). У каждого столбца есть тип — total, absolute или relative. Колонки с итоговыми значениями получают тип total или absolute, с промежуточными — relative. Кроме того, задаём цвета: делаем соединяющую линию прозрачной, положительные изменения — зелёными, отрицательные — красными, а итоговые колонки — фиолетовыми. Для текста выберем шрифт Open Sans.

О том, как подобрать хорошие шрифты для своей визуализации данных, можно узнать в материале «Choosing Fonts for Your Data Visualization»

fig = go.Figure(go.Waterfall(
    name = "e-commerce", orientation = "v",
    measure = ["absolute", "relative", "relative", "relative", "relative", "relative", "total"],
    x = x_list,
    y = y_list,
    text = text_list,
    textposition = "outside",
    connector = {"line":{"color":'rgba(0,0,0,0)'}},
    increasing = {"marker":{"color":"#2ca02c"}},
    decreasing = {"marker":{"color":"#d62728"}},
    totals={'marker':{"color":"#9467bd"}},
    textfont={"family":"Open Sans, light",
              "color":"black"
             }
))

Наконец, добавим заголовок и описание графика, уберём легенду, подпишем ось Y и внесём пунктирные линии на график.

fig.update_layout(
    title = 
        {'text':'<b>Waterfall chart</b><br><span style="color:#666666">Изменение объема рынка e-commerce с 2013 по 2014 год</span>'},
    showlegend = False,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
    yaxis_title="млрд руб.",
    shapes=dict_list
)
fig.update_xaxes(tickangle=-45, tickfont=dict(family='Open Sans, light', color='black', size=14))
fig.update_yaxes(tickangle=0, tickfont=dict(family='Open Sans, light', color='black', size=14))

fig.show()

Получим такую диаграмму:

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

Использование словарей в Clickhouse на примере данных Untappd

Время чтения текста – 15 минут

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

Обработка изображения с чеком для поиска QR-кода через библиотеку skimage

Время чтения текста – 9 минут

Есть много разных сканеров для QR, но не всегда изображение обладает хорошим качеством. В компьютерном зрении для этого используется Image Pre-processing: предобработка изображения. Сегодня рассмотрим, как средствами библиотеки scikit-image помочь QR-сканеру найти код на картинке.

from matplotlib import pyplot as plt
import skimage
from skimage import util, exposure, io, measure, feature
from scipy import ndimage as ndi
import numpy as np
import cv2

Проблема

Попробуем просканировать чек из материала «Собираем данные с чеков гипермаркетов на Python». Прочтём картинку методом imread библиотеки matplotlib и покажем его на экране:

img = plt.imread('чек.jpg')
plt.imshow(img)

Кажется, в такой каше сложно что-либо разобрать. Воспользуемся готовой функцией для чтения чтения QR-кода из библиотеки opencv:

def qr_reader(img):
    detector = cv2.QRCodeDetector()
    data, bbox, _ = detector.detectAndDecode(img)
    if data:
        print(data)
    else:
        print('Ничего не нашлось!')

И обратимся к ней, чтобы просканировать наше изображение:

qr_reader(img)
Ничего не нашлось!

И это можно понять: обилие лишних пикселей мешает сканеру распознать здесь QR-код. Тем не менее, мы можем помочь сканеру, указав где находится искомая область.

Решение

Сделаем так: уберём с картинки всё лишнее, найдём координаты прямоугольника с QR-кодом, чтобы затем передать в функцию qr_reader не исходное изображение, а исключительно QR-код. Первым делом уменьшим шум, используя медианный фильтр и сконвертируем изображение из rgb в gray: QR-код состоит всего из двух цветов, так что работать с остальными нам не нужно.

image = ndi.median_filter(util.img_as_float(img), size=9)
image = skimage.color.rgb2gray(image)
plt.imshow(image, cmap='gray')

Медианный фильтр размыл изображение, и разбросанные одинокие пиксели стали менее отчётливыми, а QR теперь выделяется на их фоне. Попробуем применить adjust_gamma к изображению. Эта функция возводит в степень gamma значение каждого пикселя: чем меньше будет этот параметр — тем меньше будет значение пикселя и тем ближе к белому он будет становиться. Попробуем взять gamma за 0.5.

pores_gamma = exposure.adjust_gamma(image, gamma=0.5)
plt.imshow(pores_gamma, cmap='gray')

Заметно, что QR стал ещё отчетливее прочего на фото. Воспользуемся этим: все пиксели, значение которых меньше 0.3 сделаем 0, а остальных — 1.

thresholded = (pores_gamma <= 0.3)
plt.imshow(thresholded, cmap='gray')

А теперь воспользуемся детектором границ canny для полученного изображения thresholded. Этот оператор сам сглаживает изображение и ищет градиенты: границы находятся там, где градиент принимает максимальное значение. С повышением параметра sigma детектор canny перестает замечать менее отчетливые границы.

edge = feature.canny(thresholded, sigma=6)
plt.imshow(edge)

Наконец, получим координаты границ: для этого нарисуем контуры. Получаем их методом find_contours и рисуем поверх изображения edge. Объекты массива contours — координаты по осям X и Y.

contours = measure.find_contours(edge, 0.5)
plt.imshow(edge)
for contour in contours:
    plt.plot(contour[:,1], contour[:,0], linewidth=2)

Возьмём максимальные и минимальные координаты по X и по Y: это будут границы видимого прямоугольника.

positions = np.concatenate(contours, axis=0)
min_pos_x = int(min(positions[:,1]))
max_pos_x = int(max(positions[:,1]))
min_pos_y = int(min(positions[:,0]))
max_pos_y = int(max(positions[:,0]))

Теперь, имея координаты, можем на исходном изображении обвести область с кодом:

start = (min_pos_x, min_pos_y)
end = (max_pos_x, max_pos_y)
cv2.rectangle(img, start, end, (255, 0, 0), 5)
io.imshow(img)

Попробуем срезать оригинальное изображение по этим координатам:

new_img = img[min_pos_y:max_pos_y, min_pos_x:max_pos_x]
plt.imshow(new_img)

И передадим новое изображение в функцию qr_reader:

qr_reader(new_img)

Получаем в ответе:

t=20190320T2303&s=5803.00&fn=9251440300007971&i=141637&fp=4087570038&n=1

Это то, чего мы и хотели. Конечно, скрипт не будет универсальным, ведь в каждом изображении будут свои недостатки: где-то шума будет больше, где-то фотография размыта, где-то не будет хватать контраста. Поэтому в отдельных случаях потребуется вносить и иные корректировки в изображение. На следующем этапе обработки фотографии мы воспользуемся уже готовой библиотекой.

Строим модель для предсказания категории продуктов

Время чтения текста – 12 минут

Эта статья — продолжение серии материалов «Собираем данные с чеков гипермаркетов на Python» и «Парсим данные каталога сайта». В этот раз построим модель, которая обучится на датасете из собранного каталога и классифицирует товарные позиции чека из гипермаркета на продуктовые категории. Суть проблемы: в чеке мы видим данные о каждом товаре отдельно, а иногда хочется быстро понять сколько сегодня потратили денег на «Сладкое».

Предобработка датасета

Импортируем библиотеку pandas и прочитаем csv-файл с каталогом igoods (мы сформировали его, когда парсили каталог). Заодно посмотрим, как он выглядит:

Подробнее о том, как программе эмулировать поведение человека на сайте и собрать датасет из каталога можно прочитать в материале «Парсим данные каталога сайта»

import pandas as pd
sku = pd.read_csv('SKU_igoods.csv',sep=';')
sku.head()

После парсинга в таблице осталось несколько ненужных колонок: например, нам ни к чему знать цену на продукт и его вес, чтобы построить модель предсказания категории товара. Избавляемся от этих колонок методом drop(), а остальные переименуем через rename() и снова смотрим на таблицу:

sku.drop(columns=['Unnamed: 0', 'Weight','Price'],inplace=True)
sku.rename(columns={"SKU": "SKU", "Category": "Group"},inplace=True)
sku.head()

Сгруппируем товары по их категории и посчитаем количество функциями groupby() и agg():

sku.groupby('Group').agg(['count'])

Наша модель должна обучиться на каталоге и, увидев наименование товара, предсказать его категорию. Но в каталоге многие названия будут непонятны модели. В русском языке, например, много предлогов, союзов и других стоп-слов: мы хотим, чтобы модель понимала, что «Мангал с ребрами жесткости» и «Мангал с 6 шампурами» — продукты одной и той же категории. Для этого почистим все названия: уберём из них союзы, предлоги, междометия, частицы и приведём слова к своим основам при помощи стеммера.

Стеммер — программа, которая находит для заданного слова его основу.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
nltk.download('stopwords')

Для стемминга будем использовать стеммер Яндекса из библиотеки pymystem3. Список стоп-слов необходимо расширить — каталог товаров из магазина немного отличается от бытовых ситуаций, в которых базовый набор актуален.

mystem = Mystem() 
russian_stopwords = stopwords.words("russian")
russian_stopwords.extend(['лента','ассорт','разм','арт','что', 'это', 'так', 'вот', 'быть', 'как', 'в', '—', 'к', 'на'])

Опишем функцию подготовки текста. Она приводит текст стеммером к своей основе, убирает из него знаки пунктуации, цифры и стоп-слова. Этот код был найден в одном из kernel на kaggle.

def preprocess_text(text):
    text = str(text)
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and len(token)>=3 \
              and token.strip() not in punctuation \
              and token.isdigit()==False]
    text = " ".join(tokens)
    return text

Проверим, как работает функция:

preprocess_text("Мой дядя самых честных правил, Когда не в шутку занемог, Он уважать себя заставил И лучше выдумать не мог.")

Получаем:

'дядя самый честный правило шутка занемогать уважать заставлять выдумывать мочь'

А значит всё работает как надо — все слова в своей морфологической основе и переведены в нижний регистр, отсутствует пунктуация и предлоги. Теперь опробуем функцию на одном из наименований товара из каталога:

print(‘Было:’, sku['SKU'][0])
print(‘Стало:’, preprocess_text(sku['SKU'][0]))

Получаем:

Было: Фисташки соленые жареные ТМ 365 дней
Стало: фисташка соленый жареный день

Функция справляется отлично, теперь можем применить её ко всем наименованиям и вынести обработанные названия в новый столбец processed. Посмотрим, как выглядит датасет теперь:

sku['processed']=sku['SKU'].apply(preprocess_text)
sku.head()

Строим модель предсказания категории

Для предсказания категории товара будем использовать CountVectorizer и наивный байесовский классификатор. Первый разобьёт текст на токены и посчитает их количество, а второй — простейший мультикатегорийный классификатор, позволит обучить модель предсказывать категорию товара. Также нам потребуются TfidfTransformer для подсчета весов вхождения каждого токена. Поскольку мы хотим запустить все функции одну за другой, обратимся к библиотеке Pipeline.

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
from imblearn.pipeline import Pipeline

Поделим наш датасет на X — обработанные наименования товаров и на Y — их категории. Разделим на обучающую и тестовую выборку, отдав под тесты 33% от общего числа данных.

x = sku.processed
y = sku.Group
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.33)

Пройдём пайплайном следующие команды:

  • CountVectorizer() — вернет матрицу с количеством вхождений каждого токена
  • TfidfTransformer() — преобразует эту матрицу в нормализованное представление tf-idf
  • MultinomialNB() — наивный байесовский классификатор для предсказания категории товара
text_clf = Pipeline([('vect', CountVectorizer(ngram_range=(1,2))),
                     ('tfidf', TfidfTransformer()), 
                    ('clf', MultinomialNB())])

На выходе получим модель в text_clf, которую затем обучим по обучающей выборке и посчитаем предсказания по тестовой выборке:

text_clf = text_clf.fit(X_train, y_train)
y_pred = text_clf.predict(X_test)

А теперь оценим модель:

print('Score:', text_clf.score(X_test, y_test))

Получим такую точность:

Score: 0.923949864498645

Верификация на реальных данных

Можем проверить, как работает модель на реальных данных из свежего чека. В материале о том, как получить продукты из чека гипермаркета, на выходе мы получали DataFrame с продуктами — возьмём его и применим к названиям товаров функцию preprocess_text.

my_products['processed']=my_products['name'].apply(preprocess_text)
my_products.head()

Заполним новый столбец prediction — он будет предсказывать категорию товара по его названию. Передаем ему колонку с обработанными названиями и создаём новую колонку с предсказаниями.

prediction = text_clf.predict(my_products['processed'])
my_products['prediction']=prediction
my_products[['name', 'prediction']]

DataFrame станет таким:

И посчитаем сумму по каждой категории:

my_products.groupby('prediction').sum()

В целом, модель справляется неплохо с предсказаниями: сосиски уходят в мясную гастрономию, творог — в молочные продукты, багет — в хлеб и выпечку. И всё же заметно, что киви почему-то относится к молочным продуктам, а груши — к эко-продуктам. Проблема в том, что в каталоге в этих разделах много товаров «со вкусом груши» или «со вкусом киви», из-за чего наивный байесовский классификатор отдаёт предпочтение тому классу, экземпляров которого в датасете больше. Это известная проблема несбалансированных классов, которую можно победить ресемплингом исходного датасета или задав нужные веса в модели.

Красивая визуализация в Python. Диаграмма Градусник

Время чтения текста – 7 минут

Очень часто диаграммы, построенные стандартными средствами Matplotlib, выглядят некрасиво и неинформативно. В 2011ом году для целей одного из отчетов телеком-компании в Excel мы построили полезную симпатичную диаграмму «Градусник», рецепт которой стал известен из популярного в тот момент блога Chandoo про приемы визуализации в Excel.
Вот как она выглядела в Excel:

Времена меняются, и мы попробуем восстановить знание о построении этой полезной диаграммы, используя штатные средства библиотеки matplotlib в Python.

Для каких случаев подойдет диаграмма «Градусник»?
Лучше всего использовать данный тип для сравнения плановых и фактических значений, таким образом наглядно можно увидеть недовыполнение и перевыполнение показателей. При этом план / факт может быть как в процентах, так и в фактических значениях. Мы рассмотрим пример с фактическими значениями в условных единицах.

В этот раз возьмем данные из excel-файла. Используем типичный состав библиотек для работы с данными (и соответствующие им типичные alias):

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

И считываем в DataFrame таблицу:

df = pd.read_excel('data.xlsx')

Посмотрим, как она выглядит:

Начнем извлекать из таблицы колонки. Первый столбец «Продажи» будет вертикальной подписью к каждому столбцу на графике. «План» — статичный столбец, относительно которого измеряется «Факт». Некоторые данные могут приходить в виде вещественных чисел — такие столбцы будут считаны как тип str, если в данных будет встречена запятая. Чтобы работать с такими числами, будем сначала менять в них запятую на точку, а затем переводить в тип float.

xticks = df.iloc[:,0]
try:
    bars2 = df.iloc[:,1].str.replace(',','.').astype('float')
except AttributeError:
    bars2 = df.iloc[:,1].astype('float')
try:
    bars1 = df.iloc[:,2].str.replace(',','.').astype('float')
except AttributeError:
    bars1 = df.iloc[:,2].astype('float')

Так как мы не знаем наверняка, будут ли в данных такие числа, можем словить AttributeError в случае их отсутствия, ведь будем обращаться к методу str, который есть только у строк. Поэтому напишем обработчик исключений try — except, который будет на всякий случай переводить данные в тип float.

Построим из этого классический barchart — график со столбцами. Зададим массив положения на оси Х для bars1 функцией np.arange и bars2, смещённый на ширину столбца:

barWidth = 0.2
r1 = np.arange(len(bars1))
r2 = [x + barWidth for x in r1]
 
plt.bar(r1, bars1, width=barWidth)
plt.bar(r2, bars2, width=barWidth)

И посмотрим, что получилось:

Очевидно, это не совсем то, чего мы ожидали. Зададим разную ширину для графиков, ведь один будет наложен на другой. Массив с расположением по оси X тоже теперь возьмём единый, ведь оба столбца будут идти из одних точек.

barWidth1 = 0.065
barWidth2 = 0.032
x_range = np.arange(len(bars1) / 8, step=0.125)

А теперь отобразим столбцы на графике, задав им положение, ширину, значения, цвет, легенду и подписи к диаграммам bars2:

plt.bar(x_range, bars1, color='#dce6f2', width=barWidth1/2, edgecolor='#c3d5e8', label='План')
plt.bar(x_range, bars2, color='#ffc001', width=barWidth2/2, edgecolor='#c3d5e8', label='Факт')
for i, bar in enumerate(bars2):
    plt.text(i / 8 - 0.015, bar + 1, bar, fontsize=14)

Наконец, сделаем несколько визуальных штрихов — уберём лишние рамки, чёрточки, добавим серую линию под столбцами, поправим размер и шрифт легенде, сделаем диаграмму шире, выведем её на экран и сохраним как plt.png в директории скрипта:

plt.xticks(x_range, xticks)
plt.tick_params(
    bottom=False,
    left=False,
    labelsize=15
)
plt.rcParams['figure.figsize'] = [25, 7]
plt.axhline(y=0, color='gray')
plt.legend(frameon=False, loc='lower center', bbox_to_anchor=(0.25, -0.3, 0.5, 0.5), prop={'size':20})
plt.box(False)
plt.savefig('plt', bbox_inches = "tight")
plt.show()

Получили такую диаграмму:

Создаём материализованное представление в Clickhouse

Время чтения текста – 10 минут

В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.

Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.

Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:

cd /etc/clickhouse-server
sudo nano config.xml

Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.

Раскоментируем строку

<listen_host>0.0.0.0</listen_host>

чтобы доступ к базе данных был с любого IP-адреса:

Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:

CREATE DATABASE db1
USE db1

Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем CollapsingMergeTree: он будет удалять дубликаты по ключу сортировки:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = CollapsingMergeTree
ORDER BY (date_start, date_stop)

И сразу создадим материализованное представление:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

Подробности рецепта можно посмотреть в блоге Clickhouse.

К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.

Скрипт
Начнём писать скрипт. Понадобится новая библиотека — clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:

В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook». Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port — 9000 и в database базу данных для подключения.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:

client.execute('SHOW DATABASES')

В случае успеха получим на экране такой список:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

Напишем вот такой запрос, получающий все колонки таблицы за это время:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

И выполним запрос, поместив информацию в список old_data_list. А затем поменяем всем sign на -1 и добавим в new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком CollapsingMergeTree и выполняем INSERT новых данных со знаком sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:

И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:

Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а CollapsingMergeTree — чистить дубликаты.

Собираем данные по рекламным кампаниям в Facebook

Время чтения текста – 10 минут

Давайте узнаем, как получить информацию о затратах, кликах и показах рекламных кампаний из Facebook.

from facebook_business.api import FacebookAdsApi
from facebook_business.exceptions import FacebookRequestError
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.campaign import Campaign
from facebook_business.adobjects.adset import AdSet
from facebook_business.adobjects.adaccountuser import AdAccountUser as AdUser
from facebook_business import adobjects
from matplotlib import pyplot as plt
from pandas import DataFrame
import time

Получение ключа

Первое, что необходимо сделать для работы с Facebook API — создать приложение. Для этого заходим на https://developers.facebook.com — My Apps — Create App. Заполняем название приложения, контактный Email и жмём на «Create App ID»

Перед нами открывается Dashboard приложения. Переходим в Settings — Basic и записываем себе App ID и App Secret. Эти данные понадобятся для авторизации.

Теперь переходим в Tools — Graph API Explorer и оказываемся в меню для создания нового access token.

Токены выдаются под разные нужды, и нам нужно задать определенные права для нашего. Понадобится только на ads_managment — это право позволяет получать информацию о рекламных кампаниях вашего аккаунта Facebook. Добавляем его и нажимаем на «Generate Access Token».

Осторожно — user access token изначально выдается как short-lived, уже через 1-2 часа он перестанет действовать. Для получения long-lived Token можно нажать на синюю кнопку, чтобы открыть Access Token Info, а затем на «Open in Access Token Tool». Откроется новая страница, внизу которой появится синяя кнопка «Extend Access Token». После этого сгенерируется новый long-lived Access Token, действующий 60 дней.

Написание скрипта

Создадим три переменных, в которые запишем access token, App ID и App Secret. Авторизуемся через метод init() класса FacebooksAdsApi и добавляем пользователя. Метод get_ad_accounts() вернёт нам данные по всем нашим рекламным аккаунтам в виде словаря. По этим же данным можем получить информацию о кампаниях методом get_campaigns().


my_access_token = ''
my_app_id = ''
my_app_secret = ''
FacebookAdsApi.init(my_app_id, my_app_secret, my_access_token)

me = AdUser(fbid='me')
my_accounts = list(me.get_ad_accounts())
my_accounts

my_account = my_accounts[0]
campaigns = my_account.get_campaigns()
print(campaigns)

Попробуем получить amount spent через my_account. Для этого воспользуемся методом api_get(), передав в параметр fields поле AdAccount.Field.amount_spent. Теперь, чтобы получить желаемые данные, выведем поле у переменной my_account, поделив на 100, чтобы обрубить копейки. Расходы получаем в валюте аккаунта, в нашем случае это RUB. То, ради чего мы всё это затеваем — получить данные о расходах на рекламные кампании для последующего анализа.

my_account.api_get(fields=[AdAccount.Field.amount_spent])
print(int(my_account[AdAccount.Field.amount_spent])/100)

Объявим переменную fields — в этом списке будут храниться поля, которые мы хотим получать: id кампании, количество кликов, затрат и просмотров. Теперь опишем две функции. Первая будет асинхронно отправлять запросы к Facebook и возвращать результаты. Вторая — формирует эти запросы и передает в первую функцию. В результате будем получать список словарей.

fields = [
    AdsInsights.Field.campaign_id,
    AdsInsights.Field.clicks,
    AdsInsights.Field.spend,
    AdsInsights.Field.impressions]

count = 0

def wait_for_async_job(async_job):
    global count
    async_job = async_job.api_get()
    while async_job[AdReportRun.Field.async_status] != 'Job Completed' or async_job[
        AdReportRun.Field.async_percent_completion] < 100:
        time.sleep(2)
        async_job = async_job.api_get()
    else:
        print("Job " + str(count) + " completed")
        count += 1
    return async_job.get_result(params={"limit": 1000})

def get_insights(account, date_preset='last_3d'):
    account = AdAccount(account["id"])
    i_async_job = account.get_insights(
        params={
            'level': 'ad',
            'date_preset': date_preset,
            'time_increment': 1},
            fields=fields,
            is_async=True)
    results = [dict(item) for item in wait_for_async_job(i_async_job)]
    return results

Следующий шаг — получение искомых данных о затратах. Будем собирать данные за всё время, поэтому заведём переменную date_preset, значение которой поставим lifetime. И для каждого аккаунта вызовем функцию get_insights(), а список, который она возвращает, положим в insights_lists.
Создадим DataFrame и вытащим из insights_lists интересующие данные — это id кампании, количество кликов, затраты и просмотры.

elem_insights = []
insights_lists = []
date_preset = 'last_year'
for elem in my_accounts:
            elem_insights = get_insights(elem, date_preset)
            insights_lists.append(elem_insights)

insight_campaign_id_list = []
insight_clicks_list = []
insight_spend_list = []
insight_impressions_list = []
insight_date_start_list = []
insight_date_stop_list = []
for elem1 in insights_lists:
    for elem2 in elem1:
        insight_campaign_id_list.append(int(elem2['campaign_id']))
        insight_clicks_list.append(int(elem2['clicks']))
        insight_spend_list.append(float(elem2['spend']))
        insight_impressions_list.append(int(elem2['impressions']))
        insight_date_start_list.append(elem2['date_start'])
        insight_date_stop_list.append(elem2['date_stop'])

df = DataFrame()
df['campaign_id'] = insight_campaign_id_list
df['clicks'] = insight_clicks_list
df['spend'] = insight_spend_list
df['impressions'] = insight_impressions_list
df['date_start'] = insight_date_start_list
df['date_stop'] = insight_date_stop_list

Получим такой DataFrame:

Давайте подытожим эти данные — сгруппируем по кампаниям и посчитаем суммы каждой колонки для групп. Для этого в pandas реализованы методы groupby() и sum() — достаточно указать, по какой колонке проводить группировку.

df.groupby(['campaign_id']).sum()

Теперь построим два графика — по количеству кликов и просмотров относительно дат. Воспользуемся атрибутом rcParams, чтобы задать размеры графиков.

plt.rcParams['figure.figsize'] = [20, 5]
plt.plot(df.date_start.str.replace('2019-', ''), df.clicks)
plt.rcParams['figure.figsize'] = [20, 5]
plt.plot(df.date_start.str.replace('2019-', ''), df.impressions)

Вот и всё! Получили всю информацию о затратах, просмотрах и кликах. В следующем материале посмотрим, как выгрузить её в формате json и передать в Redash для последующего анализа и визуализации.

Пост написан при участии команды Valiotti Analytics: Елисей Рыков (младший аналитик), Георгий Ушаков (технический консультант)

 1 комментарий    261   2020   Data Analytics   Facebook   python

Парсим данные каталога сайта, используя Beautiful Soup и Selenium (часть 2)

Время чтения текста – 7 минут

Продолжение предыдущей статьи о сборе данных с известного онлайн каталога товаров.

Если внимательно проанализировать поведение страницы с товарами, можно заметить, что товары подгружаются динамически, то есть при скролле страницы вниз мы получим новый набор товаров, и, тем самым, код из предыдущей статьи для данной задачи окажется бесполезным.

Динамическая подгрузка товаров на странице

Для таких случае в python также имеется решение — библиотека Selenium, она запускает движок браузера и эмулирует поведение человека.

В первой части скрипта мы соберем дерево категорий аналогично прошлой статье, но уже с использованием Selenium.

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs

browser = webdriver.Chrome()
browser.get("https://i****.ru/products?category_id=1-ovoschi-frukty-griby-yagody&from_category=true")
cookies_1= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': '_igooods_session_cross_domain', 'path': '/', 'secure': False, 'value': 'WWJFaU8wMTBMSE9uVlR2YnRLKzlvdHE3MVgyTjVlS1JKVm1qMjVNK2JSbEYxcVZNQk9OR3A4VU1LUzZwY1lCeVlTNDVsSkFmUFNSRWt3cXdUYytxQlhnYk5BbnVoZktTMUJLRWQyaWxFeXRsR1ZCVzVnSGJRU0tLVVR0MjRYR2hXbXpaZnRnYWRzV0VnbmpjdjA5T1RzZEFkallmMEVySVA3ZkV3cjU5dVVaZjBmajU5bDIxVkEwbUQvSUVyWGdqaTc5WEJyT2tvNTVsWWx1TEZhQXB1L3dKUXl5aWpOQllEV245VStIajFDdXphWFQxVGVpeGJDV3JseU9lbE1vQmxhRklLa3BsRm9XUkNTakIrWXlDc3I5ZjdZOGgwYmplMFpGRGRxKzg3QTJFSGpkNWh5RmdxZzhpTXVvTUV5SFZnM2dzNHVqWkJRaTlwdmhkclEyNVNDSHJsVkZzeVpBaGc1ZmQ0NlhlSG43YnVHRUVDL0ZmUHVIelNhRkRZSVFYLS05UkJqM24yM0d4bjFBRWFVQjlYSzJnPT0%3D--e17089851778bedd374f240c353f399027fe0fb1'}
cookies_2= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': 'sa_current_city_coordinates_cross_domain', 'path': '/', 'secure': False, 'value': '%5B59.91815364%2C30.305578%5D'}
cookies_3= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': 'sa_current_city_cross_domain', 'path': '/', 'secure': False, 'value': '%D0%A1%D0%B0%D0%BD%D0%BA%D1%82-%D0%9F%D0%B5%D1%82%D0%B5%D1%80%D0%B1%D1%83%D1%80%D0%B3'}
browser.add_cookie(cookies_1)
browser.add_cookie(cookies_2)
browser.add_cookie(cookies_3)
browser.get("https://i****.ru/products?category_id=1-ovoschi-frukty-griby-yagody&from_category=true")
source_data = browser.page_source
soup = bs(source_data)
categories=soup.find_all('div', {'class':['with-children']})
tree = {}
for x in categories:
    tree[x.findNext('span').text]=x.findNext('a').get('href')

В этом сниппете мы как и раньше get-запросом с параметрами вызываем желаемую страницу браузера и скачиваем данные, затем у нас появляется объект класса bs, с которым мы проделываем аналогичные операции. Таким образом, мы получили словарь tree, в котором для каждой категории хранится URL страницы для данной категории, в дальнейшем этот словарь нам пригодится для перебора в цикле.

Приступим к сбору данных о товарах. Для этого импортируем библиотеку pandas и создадим новый dataframe с четырьмя колонками.

import pandas as pd
df = pd.DataFrame(columns=['SKU', 'Weight', 'Price','Category'])

Далее, воспользуемся нашим словарем tree и для каждой категории получим данные страницы. Ниже приведен код. Мы по-прежнему должны установить куки, которые установлены у пользователя, а также выполнить некоторые хитрые команды для работы движка браузеры, которые сэмулируют перемещение курсора вниз страницы.

for cat, link in tree.items():
    browser.maximize_window()
    browser.get('https://i****.ru'+link)
    cookies_1= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': '_i****_session_cross_domain', 'path': '/', 'secure': False, 'value': 'WWJFaU8wMTBMSE9uVlR2YnRLKzlvdHE3MVgyTjVlS1JKVm1qMjVNK2JSbEYxcVZNQk9OR3A4VU1LUzZwY1lCeVlTNDVsSkFmUFNSRWt3cXdUYytxQlhnYk5BbnVoZktTMUJLRWQyaWxFeXRsR1ZCVzVnSGJRU0tLVVR0MjRYR2hXbXpaZnRnYWRzV0VnbmpjdjA5T1RzZEFkallmMEVySVA3ZkV3cjU5dVVaZjBmajU5bDIxVkEwbUQvSUVyWGdqaTc5WEJyT2tvNTVsWWx1TEZhQXB1L3dKUXl5aWpOQllEV245VStIajFDdXphWFQxVGVpeGJDV3JseU9lbE1vQmxhRklLa3BsRm9XUkNTakIrWXlDc3I5ZjdZOGgwYmplMFpGRGRxKzg3QTJFSGpkNWh5RmdxZzhpTXVvTUV5SFZnM2dzNHVqWkJRaTlwdmhkclEyNVNDSHJsVkZzeVpBaGc1ZmQ0NlhlSG43YnVHRUVDL0ZmUHVIelNhRkRZSVFYLS05UkJqM24yM0d4bjFBRWFVQjlYSzJnPT0%3D--e17089851778bedd374f240c353f399027fe0fb1'}
    cookies_2= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': 'sa_current_city_coordinates_cross_domain', 'path': '/', 'secure': False, 'value': '%5B59.91815364%2C30.305578%5D'}
    cookies_3= {'domain': '.i****.ru', 'expiry': 1962580137, 'httpOnly': False, 'name': 'sa_current_city_cross_domain', 'path': '/', 'secure': False, 'value': '%D0%A1%D0%B0%D0%BD%D0%BA%D1%82-%D0%9F%D0%B5%D1%82%D0%B5%D1%80%D0%B1%D1%83%D1%80%D0%B3'}
    browser.add_cookie(cookies_1)
    browser.add_cookie(cookies_2)
    browser.add_cookie(cookies_3)
    browser.get('https://i****.ru'+link)
    
    # Скрипт, который через каждые 3 секунды ищет конец страницы и выполняется пока не закончится получение свежих данных
    lenOfPage = browser.execute_script("window.scrollTo(0, document.body.scrollHeight);var lenOfPage=document.body.scrollHeight;return lenOfPage;")
    match=False
    while(match==False):
        lastCount = lenOfPage
        time.sleep(3)
        lenOfPage = browser.execute_script("window.scrollTo(0, document.body.scrollHeight);var lenOfPage=document.body.scrollHeight;return lenOfPage;")
        if lastCount==lenOfPage:
             match=True

Теперь мы дошли до конца страницы и можем собрать данные для работы библиотеки beautifulsoup.

# Собираем данные со страницы
    source_data = browser.page_source
    soup = bs(source_data)
    skus=soup.find_all('div', {'class':['b-product-small-card']})
    last_value=len(df)+1 if len(df)>0 else 0
    for i,x in enumerate(skus):
        df.loc[last_value+i]=[x.findNext('a').contents[0],\
                   x.findNext('div',{'class':'product-weight'}).contents[0],\
                   x.findNext('div',{'class':'g-cart-action small'})['data-price'],\
                   cat]
browser.close()

В приведенном выше фрагменте кода мы ищем все элементы <div>, у которых класс — b-product-small-card, а далее для каждого найденного товара собираем значения полей веса и цены.

Исходный код сайта продуктовой карточки

Ставим выполняться скрипт и пьем кофе. Вуа-ля, теперь у нас есть pandas dataframe с данными всех товаров:

DataFrame с товарами, собранными с сайта

Теперь, у нас есть отличные данные для обучения NLP модели — наименования товаров и их принадлежность к различным категориям.

Парсим данные каталога сайта, используя Beautiful Soup (часть 1)

Время чтения текста – 8 минут

Продолжим разбираться с Python и чековыми данными. У нас имеется информация о чековых записях, полученная с сайта налоговой, это хорошо. Теперь нужно научиться определять категорию, к которой относится товар. Мы будем это делать в автоматическом режиме, предварительно построив модель машинного обучения.

Ключевая «боль» / сложность — нам требуется так называемый сет для обучения (training set). Забегая вперед скажу, что успел протестировать такой сет для обучения на реальных данных из одной продуктовой сети, результаты, к сожалению, оказались не самые радужные, поэтому попробуем собрать другой честный сет для обучения, используя открытые источники в интернете.

Для работы возьмем популярный сайт по доставке еды из гипермаркетов. Сегодня будем использовать простую, удобную и функциональную библиотеку в Python для парсинга данных с .html страниц — Beatiful Soup.

Целевая страница, на которой интересующие нас данные имеет следующий вид.

Страница каталога сайта по доставке еды из гипермаркетов

Однако если мы впервые зайдем на сайт нас перебросит на главную, поскольку мы не выбрали ближайший гипермаркет.

Главная страница сайта по доставке еды из гипермаркетов

Проблема понятна, скорее всего, ее можно решить установлением cookies, подобных тем, что сохраняются локально у пользователя.

Теперь важно определить порядок действий и структуру сбора данных.

Основная задача:

  1. Собрать наименования всех верхнеуровневых категорий и соответствующие им URL сайта
  2. Используя полученный в пункте 1 список, собрать данные о товарах с каждой страницы категории.

Сегодня будем разбирать первую часть. Приступим, в теории нам должно хватить таких команд:

from bs4 import BeautifulSoup #импортируем модуль BeautifulSoup
import requests
r = requests.get('https://i****.ru/products?category_id=1-ovoschi-frukty-griby-yagody&from_category=true')
soup = BeautifulSoup(r.text)

Однако если мы посмотрим содержимое soup, то обнаружим, что мы получили исходный код главной страницы, а не той, что интересует нас. Главная страница не подходит для целей нашего анализа и сбора информации.

Получили исходный код главной страницы сайта

Поэтому воспользуемся методом Session библиотеки requests, которым можно передать cookies в качестве параметра. Итого наш код выглядит так:

import requests
s = requests.Session()
r = s.get('https://i****.ru/products?category_id=1-ovoschi-frukty-griby-yagody&from_category=true', \
       cookies = {'_igooods_session_cross_domain':'WWJFaU8wMTBMSE9uVlR2YnRLKzlvdHE3MVgyTjVlS1JKVm1qMjVNK2JSbEYxcVZNQk9OR3A4VU1LUzZwY1lCeVlTNDVsSkFmUFNSRWt3cXdUYytxQlhnYk5BbnVoZktTMUJLRWQyaWxFeXRsR1ZCVzVnSGJRU0tLVVR0MjRYR2hXbXpaZnRnYWRzV0VnbmpjdjA5T1RzZEFkallmMEVySVA3ZkV3cjU5dVVaZjBmajU5bDIxVkEwbUQvSUVyWGdqaTc5WEJyT2tvNTVsWWx1TEZhQXB1L3dKUXl5aWpOQllEV245VStIajFDdXphWFQxVGVpeGJDV3JseU9lbE1vQmxhRklLa3BsRm9XUkNTakIrWXlDc3I5ZjdZOGgwYmplMFpGRGRxKzg3QTJFSGpkNWh5RmdxZzhpTXVvTUV5SFZnM2dzNHVqWkJRaTlwdmhkclEyNVNDSHJsVkZzeVpBaGc1ZmQ0NlhlSG43YnVHRUVDL0ZmUHVIelNhRkRZSVFYLS05UkJqM24yM0d4bjFBRWFVQjlYSzJnPT0%3D--e17089851778bedd374f240c353f399027fe0fb1', \
               'sa_current_city_coordinates_cross_domain' : '%5B59.91815364%2C30.305578%5D', \
                  'sa_current_city_cross_domain' : '%D0%A1%D0%B0%D0%BD%D0%BA%D1%82-%D0%9F%D0%B5%D1%82%D0%B5%D1%80%D0%B1%D1%83%D1%80%D0%B3',\
                 'lazy_loader_last_url' : '/products?category_id=1-ovoschi-frukty-griby-yagody&from_category=true'})
soup = BeautifulSoup(r.text)

Мы устанавливаем 4 cookies, которые эмулируют поведение человека и выбранный гипермаркет (их мы установили эмпирическим путем из cookies, установленных браузером, при выборе соответствующего гипермаркета):

Cookies, влияющие на отображение главной страницы сайта

Прекрасно, осталось лишь собрать нужные нам категории и ссылки на них. Для этого напишем следующий код:

categories=soup.find_all('div', {'class':['with-children']})
tree = {}
for x in categories:
    tree[x.findNext('span').text]=x.findNext('a').get('href')

В этом сниппете мы как и раньше GET-запросом с параметрами (cookies) вызываем желаемую страницу браузера и скачиваем данные, затем у нас появляется объект класса BeautifulSoup.
Мы используем команду:

categories=soup.find_all('div', {'class':['with-children']})

И находим все элементы <div>, у которых имеется класс with-children, вот то же самое в коде сайта:

Элементы, которые содержат название категории.

Далее, объявляем пустой объект класса dict и для каждого найденного выше элемента в цикле собираем:

tree[x.findNext('span').text]=x.findNext('a').get('href’)

Что фактически означает: мы берем текст следующего, за найденным <div> элемента <span> и адрес ссылки, следующей за найденным <div>.
Именно это нам и требовалось получить. Таким образом мы получили словарь вида {Категория: URL сайта}:

Справочник категорий и соответствующих им URL

В следующей статье — продолжение о сборе информации по карточкам товаров каталога.

 1 комментарий    273   2019   Data Analytics   data science   python

Собираем данные с чеков гипермаркетов на Python

Время чтения текста – 7 минут

Update: к сожалению, информация в данном посте устарела. Рекомендуем изучить наш новый пост.

Недавно, покупая в очередной раз продукты в гипермаркете, вспомнил, что согласно ФЗ-54 любой оператор торговли, который пробивает кассовый чек, обязан отправлять данные чека в налоговую.

Чек из гипермаркета «Лента», QR-код, который нас интересует, обведен

Что это значит для нас, аналитиков данных? Что мы можем лучше узнать себя, свои потребности и получить интересные данные о собственных покупках.

Попробуем в рамках серии постов собрать небольшой прототип приложения, которое позволит строить динамику своих покупок. Итак, начнем с того, что в каждом чеке есть QR-code, если его распознать, то мы получим следующую строку:

t=20190320T2303&s=5803.00&fn=9251440300007971&i=141637&fp=4087570038&n=1

В данной строке содержатся:

t — timestamp, время, когда вы осуществили покупку
s — сумма чека
fn — кодовый номер fss, потребуется далее в запросе к API
i — номер чека, он нам потребуется далее в запросе к API
fp — параметр fiscalsign, потребуется далее в запросе к API

В рамках решения первого шага нашей задачи мы будем парсить данные чека и собирать их в pandas dataframe, используя модули Python.

Мы воспользуемся API, который отдает данные по чеку с сайта налоговой.

В начале получим аутентификационные данные:

import requests
your_phone = '+7XXXYYYZZZZ' #нужно указать ваш телефон, на него придет СМС с паролем
r = requests.post('https://proverkacheka.nalog.ru:9999/v1/mobile/users/signup', json = {"email":"email@email.com","name":"USERNAME","phone":your_phone})

В результате выполнения POST-запроса мы получим пароль в виде SMS на указанный мобильный телефон. Далее, мы будем использовать его в переменной pwd

Теперь распарсим нашу строку со значениями из QR-кода:

import re
qr_string='t=20190320T2303&s=5803.00&fn=9251440300007971&i=141637&fp=4087570038&n=1'
t=re.findall(r't=(\w+)', qr_string)[0]
s=re.findall(r's=(\w+)', qr_string)[0]
fn=re.findall(r'fn=(\w+)', qr_string)[0]
i=re.findall(r'i=(\w+)', qr_string)[0]
fp=re.findall(r'fp=(\w+)', qr_string)[0]

Будем использовать полученные переменные для извлечения данных.
В посте на Хабре довольно подробно изучены статусы ошибок при формировании запроса к API, не буду повторять эту информацию.

В начале необходимо проверить, что по данному чеку есть данные, формируем GET-запрос.

headers = {'Device-Id':'', 'Device-OS':''}
payload = {'fiscalSign': fp, 'date': t,'sum':s}
check_request=requests.get('https://proverkacheka.nalog.ru:9999/v1/ofds/*/inns/*/fss/'+fn+'/operations/1/tickets/'+i,params=payload, headers=headers,auth=(your_phone, pwd))
print(check_request.status_code)

В запросе необходимо указать headers, хотя бы пустые. В моем случае GET-запрос возвращает ошибку 406, из чего я понимаю, что такой чек находится (почему GET-запрос возвращает 406 для меня загадка, буду рад подсказкам в комментариях). Если не указать сумму или дату, то GET-запрос вернет ошибку 400 — bad request.

Переходим к самому интересному, получаем данные чека:

request_info=requests.get('https://proverkacheka.nalog.ru:9999/v1/inns/*/kkts/*/fss/'+fn+'/tickets/'+i+'?fiscalSign='+fp+'&sendToEmail=no',headers=headers,auth=(your_phone, pwd))
print(request_info.status_code)
products=request_info.json()

Должны получить код 200 (успешное выполнение GET-запроса), а в переменной products — все, что относится к нашему чеку.

Чтобы работать с этими данными воспользуемся pandas и преобразуем все в dataframe.

import pandas as pd
from datetime import datetime
my_products=pd.DataFrame(products['document']['receipt']['items'])
my_products['price']=my_products['price']/100
my_products['sum']=my_products['sum']/100
datetime_check = datetime.strptime(t, '%Y%m%dT%H%M') #((https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior отформатируем дату))
my_products['date']=datetime_check
my_products.set_index('date',inplace=True)

Теперь мы имеем рабочий pandas.dataframe с чеками, визуально это выглядит так:

«Шапка» чековых данных

Можно построить гистограмму покупок или посмотреть на все в виде «ящика с усами»:

import matplotlib.pyplot as plt
%matplotlib inline
my_products['sum'].plot(kind='hist', bins=20)
plt.show()
my_products['sum'].plot(kind='box')
plt.show()

В завершение элементарно получим описательные статистики в текстовом виде командой .describe():

my_products.describe()

Данные удобно записать в .csv-файл, чтобы в следующий раз дополнить статистику:

with open('hyper_receipts.csv', 'a') as f:
             my_products.to_csv(f, header=True)