43 заметки с тегом

python

Анализируем речь с помощью Python: Сколько раз в минуту матерятся на интервью YouTube-канала «вДудь»?

Время чтения текста – 19 минут

Выход практически каждого ролика на канале «вДудь» считается событием, а некоторые из этих релизов даже сопровождаются скандалами из-за неосторожных высказываний его гостей.
Сегодня при помощи статистических подходов и алгоритмов ML мы будем анализировать прямую речь. В качестве данных используем интервью, которые журналист Юрий Дудь (признан иностранным агентом на территории РФ) берет для своего YouTube-канала. Посмотрим с помощью Python, о чем таком интересном говорили в интервью на канале «вДудь».

Сбор данных

C помощью YouTube API мы получили список всех видео с канала Юрия Дудя, а также их метаинформацию. О том, как это сделать, вы можете узнать, например, из статьи нашего блога.
Если вы уже слышали знаменитое “Юрий будет дуть, дуть будет Юрий”, то наверняка знаете, что на этом канале есть документальные фильмы, а также интервью, в которых участвуют сразу несколько гостей. Нас заинтересовали только те выпуски, в которых преимущественно говорит только один гость. Поэтому нам пришлось провести фильтрацию всех видео вручную.
Для дальнейшего анализа нам необходимо было получить длительности роликов. Это мы сделали с помощью GET-запросов к YouTube API. Результаты приходили в специфическом формате (для примера: “PT1H49M35S”), поэтому их нам пришлось распарсить и перевести в секунды.
Итак, мы получили датафрейм, состоящий из 122 записей:

На основе метаинформации по лайкам, комментариям и просмотрам мы построили следующий Bubble Chart:

Так как наша цель — проанализировать речь в интервью, нам необходимо было получить текстовые составляющие роликов. В этом нам помог API-интерфейс youtube_transcript_api, который скачивает субтитры из видео на YouTube. Для каких-то роликов субтитры были прописаны вручную, но для большинства они были сгенерированы автоматически. К сожалению, для 10 видео субтитров не оказалось: беседы с L’one, Шнуром, Ресторатором, Амираном, Ильичом, Ильей Найшуллером, Соболевым, Иваном Дорном, Навальным, Noize MC. Причину их отсутствия мы, к сожалению, понять не смогли.

А гости кто?

Спектр рода деятельности гостей канала «вДудь» достаточно обширен, поэтому было решено пополнить исходные данные информацией о том, чем же в основном занимается приглашенный участник каждого интервью. К сожалению, ролики не сопровождаются четкими метками профессиональной принадлежности гостя, поэтому мы прописали эту информацию сами. На момент выгрузки данных последним видео на канале был разговор с комиком Дмитрием Романовым.
Если с идентификацией профессии каждого гостя мы не ошиблись, то вот такое распределение в итоге получается:

Музыканты, рэперы и актеры — самые частые гости Юрия, скорее всего, они являются самыми интересными для автора и аудитории. Представителей научного сообщества (астрофизик, историк, экономист и т.д) наоборот, гораздо меньше, ведь научно-популярные интервью — прерогатива других интервьюеров.

Обработка текста

Анализ текстовой информации сложен в той степени, в какой сложен язык, на котором написан текст. Подробно о подготовке текста к анализу мы рассказывали в материале «Python и тексты нового альбома Земфиры». Тут была проведена идентичная работа.
Как и раньше, для решения аналитической задачи мы решили использовать такой подход как лемматизация, т. е. приведение слова к его словарной форме. Проведя лемматизацию текстовых данных по правилам русского языка, мы получим существительные в именительном падеже единственного числа (кошками — кошка), прилагательные в именительном падеже мужского рода (пушистая — пушистый), а глаголы в инфинитиве (бежит — бежать). В этом проекте мы опять воспользовались библиотекой Pymorphy, представляющую собой морфологический анализатор.
Помимо приведения к словарной форме нам потребовалось убрать из текстов часто встречающиеся слова, которые не несут ценности для анализа. Это было необходимо, потому что так называемые стоп-слова могут повлиять на работу используемой модели машинного обучения. Список таких слов мы взяли из пакета ntlk.corpus, а после расширили его, изучив тексты интервью. Конечно, мы также убрали все знаки пунктуации.

Анализ словарного запаса

После обработки текста мы посчитали для каждого интервью количество всех слов, а также абсолютное и относительное количество уникальных слов. Конечно, полученные значения неидеальны, так как, во-первых, для большинства интервью были получены автоматически сгенерированные субтитры, которые являются неточными, а во-вторых, тексты были очищены от лишней информации.
Сперва мы решили наглядно представить основной массив лексики, которая звучит в интервью. После группировки интервью по роду деятельности гостя нам удалось это сделать и в этом нам помогла библиотека wordcloud. У нас получились такие облака слов:

Лейтмотивом всех интервью Юрия являются обсуждение России (политики, социальной жизни и других особенностей), уровня заработка гостей, а также непосредственно профессиональной деятельности гостя (это особенно заметно у представителей индустрии кинопроизводства).
Далее мы решили построить боксплот для количества слов для каждого рода деятельности (профессии, которые были представлены единственным гостем, мы не стали учитывать):

Наиболее разговорчивыми гостями оказались блогеры. По медиане, они наговорили больше всего слов. Чуть поодаль от них журналисты и комики, а вот самыми немногословными оказались рэперы.

Что касается количества уникальных слов, то тут ситуация аналогичная. И рэперы опять в аутсайдерах…

Если говорить об отношении уникальных слов к общему количеству, то тут можно увидеть совершенно иную картину. Теперь впереди оказываются, рэперы, музыканты и бизнесмены. Предыдущие же лидеры, наоборот, становятся самыми последними.
Конечно, стоит отметить, что такие сравнения могут быть несправедливыми, так как длительность интервью у каждого гостя Дудя разная, а потому кто-то просто мог успеть наговорить больше слов, чем остальные. Наглядно в этом можно убедиться, взглянув на распределение длительности интервью по роду деятельности (для построения использовался тот же пул гостей, что и для боксплотов выше):

К тому же, разные роды деятельности представляет разное количество человек, это тоже могло сказаться на результатах.
Далее мы составили список слов, появление которых в интервью было бы интересно отследить, и посмотрели как часто они упоминаются для каждого рода деятельности. Также мы решили учесть дисбаланс среди представителей разных профессиональных категорий и разделили полученные частоты на соответствующее количество гостей.

Первое место по упоминаниям очевидно занимает Россия. Что касается Запада, то про США гости говорили в 2,5 раза меньше. Что касается лидера РФ, то про него речь заходила достаточно часто. Его оппонент, Алексей Навальный, в этой словесной “баталии” потерпел поражение. Интересно, что политики далеко не в топе по упоминаниям Путина. Впереди оказался экономист Сергей Гуриев, после него ведущий Александр Гордон, а тройку замкнули журналисты.
Глагол “любить” чаще использовали люди, имеющие отношение к искусству, творчеству и гуманитарным наукам — кинокритик Антон Долин, мультипликатор Олег Куваев, историк Тамара Эйдельман, актеры, рэперы, художник Федор Павлов-Андреевич, комики, музыканты, режиссеры. Про страхи (если судить по глаголу “бояться”) гости говорили реже, чем о любви. В топ вошли историк Эйдельман, дизайнер Артемий Лебедев, кинокритик Долин и политики. Может быть в этом кроется ответ на вопрос, почему же политики не так охотно произносили имя президента России.
Что касается денег, то о них говорили все. Ну, за исключением человека науки, астрофизика Константина Батыгина. С церковью же имеем совершенно обратную ситуацию. О ней по большей части говорили только писатели и художник Павлов-Андреевич.

Анализ мата

Далее мы решили проанализировать то, как часто гости Юрия Дудя ругались матом. С помощью регулярных выражений мы составили словарь матерных слов со всех интервью. После этого, для каждого ролика было подсчитано суммарное количество вхождений элементов составленного словаря.
Мы построили диаграммы, отражающие топ-10 любителей нецензурно выражаться по количеству “запрещенных” слов в минуту.

Как видим, рэперы и музыканты почти полностью захватили топ. Помимо них очень часто ругались такие гости как блогер Данила Поперечный и комики Иван Усович и Алексей Щербаков. Первое место в рейтинге с большим отрывом от остальных держит Morgenstern (признан иностранным агентом на территории РФ), а вот Олег Тиньков в своем последнем интервью матерился не так много, чтобы попасть в Топ-10.
Зато, как искрометно!

После персонального анализа мы решили узнать, насколько насыщена нецензурными словами речь представителей разных профессиональных групп. Нулевые показатели при этом были опущены.

Ожидаемо, что больше всех матерились рэперы. На втором месте оказались блогеры (по большей части за счет Поперечного). За ними следует Артемий Лебедев, единственный дизайнер в нашей выборке, благодаря разнообразия речи которого, представители этой профессии и попали в топ-3 этого распределения. Кстати, если вы еще не знакомы с нашим анализом телеграм-канала Лебедева, то мы не понимаем, чего же вы ждете! Несмотря на то что генератор постов Артемия Лебедева сейчас выключен, исследование его телеграм-канала все равно заслуживает вашего внимания.

Ограничения анализа

Стоит отметить, что в нашем небольшом исследовании есть два недостатка:

  1. Как уже говорилось ранее, мы не смогли отделить слова гостей Дудя от речи Юрия, который и сам зачастую не брезгует использовать нецензурные выражения. Однако, задача интервьюера — подстроиться под стиль речи гостя, поэтому, скорее всего, результаты бы не сильно изменились.
  2. В автосгенерированных субтитрах нам встретилось некое подобие цензуры — некоторые слова были заменены на ‘[ __ ]’. Тут можно выделить несколько интересных моментов:
    • действительно некоторые матерные слова были зацензурены (по большей части слово “бл**ь”);
    • остальные матерные слова остались нетронутыми;
    • под чистку попали некоторые другие грубые слова, при этом не являющиеся матерными (“мудак”, “гавно”).

Продемонстрируем наглядно на примере следующего диалога:
Дудь: Почему твои треки такое гавно?
Гнойный: Мои треки ох**тельные, Юра, просто ты любишь гавно.

Такие замены встречались в субтитрах роликов с людьми, которые не употребляли нецензурные выражения в своей речи (по крайней мере на протяжении интервью). Однозначное решение, что же делать с ‘[ __ ]’, мы не смогли принять, поэтому для некоторых гостей какая-то часть матерных слов была, увы, не подсчитана.

Работа с Word2vec

После статистического анализа интервью мы перешли к определению их контекста. Для этого мы, как и раньше, воспользовались моделью Word2vec. Она основана на нейронной сети и позволяет представлять слова в виде векторов с учетом семантической составляющей. Косинусная мера семантически схожих слов будет стремиться к 1, а у двух слов, не имеющих ничего общего по смыслу, она близка к 0. Модель можно обучать самостоятельно на подготовленном корпусе текстов, но мы решили взять готовую — от RusVectores. Для ее использования нам понадобилась библиотека gensim.
Мы рассчитали векторы-представления для каждой профессиональной группы. Наверное, можно ожидать, что режиссёры обсуждали кино и все, что с ним связано, а музыканты — музыку. Поэтому для каждого рода деятельности мы получили список слов, описывающих тематику текстов соответствующих роликов. Также мы раскрасили ячейки в зависимости от того, насколько каждое полученное слово было близко к текстам соответствующей категории гостей.

Можно сказать, что в целом каждая профессиональная категория описывается вполне соответствующими терминами. Конечно, некоторые слова могут показаться спорными. К примеру, на первом месте для рэперов стоит слово “джазовый”, хотя ни с 1 представителем хип-хоп течения речь о джазе не заходила. Тем не менее модель посчитала, что это слово достаточно близко к общему смыслу интервью людей, относящихся к этой категории (видимо, за счет непосредственного отношения рэперов к музыке).

P.S. Мистическое число 25.000000

Как мы уже говорили, среди скачанных субтитров некоторые были написаны вручную. Интересно, что все они начинаются с числа 25.000000, причем оно нигде не озвучивается.

Что же это за мистическое число? Если уйти в конспирологию, то можно вспомнить про 25-й кадр. К сожалению, нам об этом ничего неизвестно, мы просто оставим это как пищу для размышлений…

 Нет комментариев    1023   2022   api   Data Analytics   nltk   python   word cloud   YouTube

Где поесть? Куда сходить? Ищем ответ на вопрос с помощью пары рекомендаций и скрипта Python

Время чтения текста – 12 минут

Поскольку наш блог придерживается технологий, аналитики и IT-тематики, то обсуждение политики мы здесь, естественно опустим. Однако, сложно не заметить, что многие сейчас целенаправленно или волей случая оказываются в незнакомых городах и странах. Или планируют переезд, или просто путешествуют. В любом случае, где бы вы ни оказались, всегда хочется выстроить быт, сходить на завтрак или выпить вкусный кофе. Если вам интересно, чем в этих вопросах может помочь наш скрипт и проверить его в действии — продолжайте читать.

Что мы придумали?

Итак, предположим, что вы оказались в незнакомом городе. У вас есть несколько рекомендаций от друзей или просто несколько проверенных мест, где вам вкусно и красиво.
Наш алгоритм может быстро увеличить этот список в несколько раз, дополнив его 5-10 рекомендациями того же качества или уровня. Звучит здорово, да?

Как мы это реализовали?

Мы все еще не умеем колдовать, поэтому решили прибегнуть к более простому способу — написать Python-скрипт для решения этой задачи.
Начинаем, как всегда, с подготовки. Самая важная шестеренка в нашем скрипте — Instagram API (деятельность социальной сети признана экстремистской и запрещена в Российской Федерации).

from instagrapi import Client
import time
import pandas as pd

Затем, подключаемся к API, чтобы приступить к обработке данных.

cl = Client()
cl.login("username", "password")

Наша задача реализуется двумя небольшими скриптами. Первый собирает и обрабатывает геометки из Instagram (деятельность этой социальной сети признана экстремистской и запрещена в Российской Федерации), а также находит людей, которые отмечали эти геометки на фотографиях. Мы знаем, что эти места наверняка нравятся не только нам и предполагаем, что мы нашли тех людей, которые разделяют наши вкусы и ценности. Поэтому, мы собираем все недавние геометки в их профиле и так получаем список рекомендаций. Затем, с помощью второго скрипта мы узнаем точные адреса этих мест в Яндекс.Картах, чтобы определиться с выбором.

Сбор и обработка данных

Начинаем работу. Для того чтобы получить список рекомендаций нам нужны три подходящих примера, например кофейни Санкт-Петербурга: Смена, ТЧК, Civil. Наш скрипт принимает на вход идентификаторы геоточек заведений.
Как их получить?

  1. Переходим по ссылке в профиль заведения, например:
    https://www.instagram.com/smenacafe/
  2. Анализируем геометки в постах (считаем, что официальный профиль содержит правильные геометки)
  3. Находим ссылку на геометку, например:
    https://www.instagram.com/explore/locations/727911037416015/smenacafe/
  4. Цифры в ссылке и есть идентификатор геоточки
    727911037416015
    После того как мы нашли геометки первичных рекомендаций, нам нужно найти тех пользователей, кто ходит в это заведение (= отмечает его на своих фотографиях). Для этого мы берем последние 150 отметок заведения.
    Конечно, это должны быть реальные пользователи, а не бизнес-аккаунты, потому что там могут присутствовать рекламные интеграции, а мы в них не заинтересованы.
pk_place_ids = [541835746291313, 2103750586526340, 100059475]
 
print('Getting users started')
 
# получаем пользователей, которые отметили заведение на фото
users = []
for i in pk_place_ids:
    # получаем 150 последних постов с выбранной геометкой
    medias = cl.location_medias_recent(i, amount=150)
    
    for m in medias:
        user_id = m.dict()['user']['pk']
        if not user_id in users:
            users.append(user_id)
    
count_users = len(users)
 
print(f'Getting {count_users} users finished')
 
print('Getting not business users started')
 
# отбираем тех пользователей, чьи аккаунты не являются комерческими
users_not_business = []
for u in users:
    try:        
        u_info = cl.user_info(u).dict()
        
        if not u_info['is_business']:
            users_not_business.append(u)
    except:
        next
 
count_nb_users = len(users_not_business)
 
print(f'Getting {count_nb_users} not business users finished')
 
print('Getting location started')

Мы нашли людей, которые отмечали эти места у себя в профиле. Теперь мы собираем все места, которые отмечены в профилях этих людей и выводим эти места списком, сортируя по числу упоминаний.

# получаем места, которые посетил пользователь
locations = {}
 
end_cursor = None
for u in users_not_business:
    # скрипт работает довольно медленно, поэтому анализируем только 100 последних постов пользователя
    # посты получаем порциями 20 раз по 5 с сохранением курсора
    for page in range(20):
        u_medias, end_cursor = cl.user_medias_paginated(u, 5, end_cursor=end_cursor)
        for m in u_medias:
            # обернул в обработку исключений, т.к. иногда парсер падает
            try:
                # задержка для снижения чатсоты запросов в инстаграм
                time.sleep(1)
                # по идентификатору поста получаем данные поста (важно, что есть имя места, но нет его координат)
                info = cl.media_info(m.dict()['pk']).dict()
                if 'location' in info:               
                    loc_key = info['location']['pk']
                    
                    # вывод имени отмеченного места (для лога)
                    #print(info['location']['name'])
                    
                    # если место встретилось первый раз, то узнаем его координаты
                    if loc_key not in locations:
                        
                        # для того, чтобы узнать координаты, берем последний пост с такой геометкой
                        loc_data = cl.location_medias_recent(loc_key, amount=1)[0].dict()
                        
                        lng=''
                        lat=''
                        
                        if 'location' in loc_data:
                            lng=loc_data['location']['lng']
                            lat=loc_data['location']['lat']
                        
                        locations[info['location']['pk']] = [info['location']['name'],1,lng,lat] 
                    else:
                        locations[info['location']['pk']][1] = locations[info['location']['pk']][1] + 1
                    
                    # сохраняем результат в csv файл
                    ids = [i for i in locations]
                    names = [locations[i][0] for i in locations]
                    vizits = [locations[i][1] for i in locations]
                    lngs = [locations[i][2] for i in locations]
                    lats = [locations[i][3] for i in locations]
 
                    df = pd.DataFrame(
                        {'id': ids,
                        'name': names,
                        'vizit': vizits,     
                        'lng': lngs,
                        'lat': lats     
                        })
 
                    df.sort_values('vizit', ascending=False).to_csv('places.csv', index=False)                  
                    
            except:
                next
                
count_locations = len(locations)
 
print(f'Getting {count_locations} location finished')

Второй скрипт должен находить координаты новых рекомендаций в Яндекс.Картах.

# получим категории из справочника организаций сервиса Яндекс.Карты
import requests
 
# ключ API можно найти в кабинете разработчика
api_key = "---"
 
addrs = []
urls = []
cat1s = []
cat2s = []
cat3s = []
 
df = pd.read_csv("places.csv")
 
for i, row in df.iterrows():
    
    time.sleep(1)
    
    lng = row['lng']
    lat = row['lat']
    name = row['name']
    print(name)
        
    req = f"https://search-maps.yandex.ru/v1/?text={name}&results=1&type=biz&lang=ru_RU&ll={lng},{lat}&spn=0.01,0.01&apikey={api_key}"
 
    response = requests.get(req)
 
    addr = ''
    url = ''
    cat1 = ''
    cat2 = ''
    cat3 = ''
 
    if response.status_code == 200:
        # обернули в обработку исключений, т.к. иногда падает
        try:
            company_data = response.json()['features'][0]['properties']['CompanyMetaData']
                        
            addr = company_data['address']
            url = company_data['url']
            
            count_categories = len(company_data['Categories'])
            
            # у организации может быть до 3 категорий, сохраняем их все
            if count_categories > 0:
                if count_categories == 1:
                    cat1 = company_data['Categories'][0]['name']
                elif count_categories == 2:
                    cat1 = company_data['Categories'][0]['name']
                    cat2 = company_data['Categories'][1]['name']
                elif count_categories == 3:
                    cat1 = company_data['Categories'][0]['name']
                    cat2 = company_data['Categories'][1]['name']
                    cat3 = company_data['Categories'][2]['name']             
            
        except:
            pass
        
    addrs.append(addr)
    urls.append(url)
    cat1s.append(cat1)
    cat2s.append(cat2)
    cat3s.append(cat3)
    
df['address'] = addrs
df['url'] = urls
df['cat1'] = cat1s
df['cat2'] = cat2s
df['cat3'] = cat3s
df.to_csv('places_24.csv', index=False)

Результаты

Выпить кофе в Петербурге
Мы решили проверить, как работает скрипт на примере трех наших любимых кофеен в Петербурге. Получилось следующее:

Поужинать в Петербурге
Также мы протестировали наш скрипт, задав три рекомендации классных ресторанов Питера: Chang, Tiger Lilly, Jack and Chan.

Ограничения скрипта

  1. Скрипт сбора данных работает недостаточно быстро (примерно 100 геоточек в час).
  2. У геоточек может быть несколько дублей. Бывают геоточки с одинаковыми названиями, но отличающимися по координатам. Если честно, то по сути в геоточках просто хаос. Поэтому остается много ручной работы на этапе обработки и получения результатов.
  3. Яндекс отдает данные хорошо: база организаций очень большая, но для большинства объектов заполнена не одна, а целых три категории. У кафе может быть первая категория бар, затем кафе, затем ресторан. В общем, приходится опять же проверять многое вручную.
  4. Сделать единую процедуру пока не очень получается.
    Пока со скриптом можно работать в два этапа:
    — Первый этап — получение данных из инстаграма (название, частота посещений, координаты) и затем полуавтоматическая чистка от ненужных объектов + объединение дублей (это важно, так как бесплатная квота Яндекс.Карт — 500 запросов в день),
    — Второй этап — получение из Яндекс.Карт категорий, адреса, сайта и затем ручная сверка категорий и выбор наиболее точной категории из трех.
 Нет комментариев    294   2022   python   recommendation

Решение головоломок Wordle с помощью Basic Python

Время чтения текста – 21 минута

Перевод статьи “Solving Wordle Puzzles with Basic Python” автора Mickey Petersen

Вы наверняка слышали о Wordle? Это словесная головоломка не так проста, как кажется на первый взгляд. Вас просят угадать английское «слово дня», которое состоит из пяти букв. Если вы ошибетесь, вам дадут несколько подсказок: буква в слове будет зеленой, если вы правильно угадали нужную букву в нужном месте; желтой, если эта буква присутствует в слове, но не на этом месте; и серой, если буквы вообще нет в слове.

На самом деле, решать эту головоломку довольно сложно! Вот как вы можете написать Wordle Solver на Python, с использованием множеств, представления списков (list comprehension) и капелькой удачи!

Суть головоломки

Каждый день Wordle генерирует новое слово, которое нужно угадать. Поскольку у нас есть только шесть попыток — сайт использует файлы cookie для отслеживания вашего прогресса — попытки нужно использовать аккуратно!

На первый взгляд, есть несколько подсказок, которые упрощают решение:

  1. Слово состоит ровно из пяти букв.
  1. Слово из английского языка и использовать можно только алфавит — никаких знаков препинания, цифр или других символов.
  1. Любая попытка дает подсказки:
  • Зеленая буква, если буква и её место в слове правильные.
  • Желтая буква, если буква присутствует в слове, но было выбрано не то место.
  • Серая буква, если буквы вообще нет в слове.
  1. Существует конечное число слов, и их количество дополнительно ограничено словарем, используемым Wordle.

Поскольку я не хочу пытаться извлечь тот же словарь, что использует Wordle (это слишком просто), вместо этого я буду использовать свободно доступный словарь, который устанавливается через Linux в директорию /usr/share/dict/american-english. Словарь — это текстовый файл с одним словом в каждой строке.

Загрузка и генерация слов

Для начала нам понадобится словарь — вы можете установить любой удобный вам или использовать уже установленный, если такой есть.

Далее нам нужно закодировать правила игры:

import string

DICT = "/usr/share/dict/american-english"

ALLOWABLE_CHARACTERS = set(string.ascii_letters)
ALLOWED_ATTEMPTS = 6
WORD_LENGTH = 5

У нас есть всего шесть попыток; длина слова равна пяти, и мы можем использовать все доступные буквы английского алфавита.

Я преобразовываю допустимые символы в Python set(), чтобы использовать функции, которые доступны для работы с множествами, для проверки наличия букв в слове — но об этом чуть позже.

Теперь я могу сгенерировать множество тех слов, которые соответствуют правилам:

from pathlib import Path

WORDS = {
  word.lower()
  for word in Path(DICT).read_text().splitlines()
  if len(word) == WORD_LENGTH and set(word) < ALLOWABLE_CHARACTERS
}

Здесь я использую представление списков (list comprehension) для создания множества допустимых слов. Я использую отличный класс Path для чтения непосредственно из файла. Если вы еще не знакомы с Path, я рекомендую вам узнать о нем, поскольку это очень удобный инструмент.

Так, я фильтрую слова из словаря, которые имеют правильную длину и в которых набор символов в слове является подмножеством ALLOWABLE_CHARACTERS. Другими словами, выбираются только слова, которые составлены из набора допустимых символов.

Алфавитно-частотный анализ на английского языка

Особенность английского языка заключается в неравномерном распределении букв, используемых в словах. Например, буква E используется гораздо чаще, чем X. Поэтому, если мы сможем генерировать слова с наиболее распространенными буквами, у нас больше шансов угадать некоторые или даже все буквы в слове. Таким образом, выигрышная стратегия состоит в том, чтобы придумать систему, которая вычислит наиболее популярные буквы английского языка.

К счастью, у нас есть словарь английских слов!

from collections import Counter
from itertools import chain

LETTER_COUNTER = Counter(chain.from_iterable(WORDS))

Класс Counter — полезное изобретение! Это модифицированный словарь, который считает количество повторений каждого элемента. Когда вы передаете ему список элементов, они становятся ключами, а затем он сохраняет количество появлений каждого ключа в его значение. Это как раз то, что нам нужно, чтобы посчитать популярность каждой буквы среди всех английских слов из 5 букв.

Для этого я использую малоизвестную функцию chain из модуля itertools. Эта функция имеет один скрытый метод from_iterable, который берет один элемент и итерирует его:

Я думаю, что лучше всего объяснить на конкретном примере:

>>> list(chain.from_iterable(["inspired", "python"]))
['i', 'n', 's', 'p', 'i', 'r', 'e', 'd', 'p', 'y', 't', 'h', 'o', 'n']

Поскольку строки также можно итерировать, а WORDS — это множество строк, то мы можем разбить это множество (или список и т. д.) на составные элементы. Это очень полезное свойство строк; вы можете прогнать строку через set() и получить все уникальные символы строки:

>>> set("hello")
{'e', 'h', 'l', 'o'}

Множества созданы по образцу своих математических тезок.

Это означает, что множества могут содержать только уникальные значения — без дубликатов — и они неупорядочены. Вот почему порядок в множестве и в строке получился разным.

Множества обладают многими полезными функциями, такими как проверка, содержится ли одно множество полностью в другом множестве (подмножестве); получение элементов, содержащихся в обоих множествах (пересечение); совмещение элементов двух множеств (объединение) и так далее.

Итак, мы посчитали количество букв во всем словаре, и вот что получилось:

>>> LETTER_COUNTER
Counter({'h': 828,
         'o': 1888,
         'n': 1484,
         'e': 3106,
         's': 2954,
         'v': 338,
         # ...etc...
        })

Так, мы получили только абсолютное количество букв в словаре. Теперь нужно разделить его на общее количество букв, чтобы перейти к относительным величинам. К счастью, в классе Counter есть удобный метод total, который может посчитать общее количество букв всех слов словаря.

Затем составляем таблицу частот:

LETTER_FREQUENCY = {
    character: value / LETTER_COUNTER.total()
    for character, value in LETTER_COUNTER.items()
}

Метод Counter.total() был добавлен в Python 3.10, поэтому, если вы используете более старую версию Python, вы можете заменить его на sum(LETTER_COUNTER.values()), который делает то же самое.

Здесь я использую представление словарей (dictionary comprehension) для обработки каждого ключа и значения LETTER_COUNTER (это модифицированный словарь) и деления каждого значения на общее количество символов:

>>> LETTER_FREQUENCY
{ 'h': 0,02804403048264183,
  'o': 0,06394580863674852,
  'n': 0,050262489415749366,
  'e': 0,10519898391193903,
  's': 0.10005080440304827,
  # ...etc...
  }

И теперь у нас есть идеальный счетчик букв в подмножестве словаря, которые мы считаем существующими словами Wordle. Обратите внимание, что я не делал эти операции для всего словаря — я обработал только те части, которые нам интересны. Маловероятно, что это сильно повлияло бы на ранжирование популярности букв, но в конечном итоге мы руководствуемся именно этим набором слов.

Теперь нам нужен способ оценки каждого слова, чтобы мы могли предположить, какие слова встретятся с наибольшей вероятностью. Итак, нам нужно взять нашу таблицу частоты и создать функцию подсчета популярности слов, которая оценивает, насколько «популярны» буквы в этом слове:

def calculate_word_commonality(word):
    score = 0.0
    for char in word:
        score += LETTER_FREQUENCY[char]
    return score / (WORD_LENGTH - len(set(word)) + 1)

Снова вспоминаем, что строка является итерируемой, и перебираем каждую букву в слове. Затем получаем популярность каждой буквы и суммируем их. Потом общее количество делится на длину слова минус количество уникальных символов (плюс один, чтобы предотвратить деление на ноль).

Это не то что бы удивительная функция оценки слов, но она проста и взвешивает слова таким образом, что более уникальным символам придается больший вес. В идеале нам нужно как можно больше уникальных, часто встречающихся символов, чтобы максимизировать вероятность получения зеленых или желтых совпадений в Wordle.

Быстрый тест подтверждает, что слова с редкими и повторяющимися символами имеют более низкий вес, чем слова с частыми и более уникальными символами.

>>> calculate_word_commonality("fuzzy")
0.04604572396274344
>>> calculate_word_commonality("arose")
0.42692633361558

Все, что нам сейчас нужно — придумать способ сортировки и отображения этих слов, чтобы игрок мог выбирать из них:

import operator

def sort_by_word_commonality(words):
    sort_by = operator.itemgetter(1)
    return sorted(
        [(word, calculate_word_commonality(word)) for word in words],
        key=sort_by,
        reverse=True,
    )

def display_word_table(word_commonalities):
    for (word, freq) in word_commonalities:
        print(f"{word:<10} | {freq:<5.2}")

С помощью sort_by_word_commonality я создаю отсортированный (от большего к меньшему) список кортежей, где каждый кортеж содержит слово и рассчитанный балл для этого слова. Ключ, по которому я сортирую, — это относительная популярность слова в словаре.

Я не использую лямбду для получения первого элемента; для таких простых вещей я предпочитаю operator.itemgetter, который делает то же самое.

Также, я добавил функцию быстрого отображения для форматирования слов и их оценки в простую таблицу.

Теперь поговорим о самом решении головоломки.

Решение головоломки Wordle

Поскольку я создаю его как простое консольное приложение, я собираюсь использовать input() и print().

def input_word():
    while True:
        word = input("Input the word you entered> ")
        if len(word) == WORD_LENGTH and word.lower() in WORDS:
            break
    return word.lower()


def input_response():
    print("Type the color-coded reply from Wordle:")
    print("  G for Green")
    print("  Y for Yellow")
    print("  ? for Gray")
    while True:
        response = input("Response from Wordle> ")
        if len(response) == WORD_LENGTH and set(response) <= {"G", "Y", "?"}:
            break
        else:
            print(f"Error - invalid answer {response}")
    return response

Функционал приложения очень прост. Нужно узнать у пользователя слово WORD_LENGTH, которое он ввел в игре Wordle, и записать ответ от Wordle. Поскольку есть только три возможных цвета для буквы (зеленый, желтый и серый), ответ закодирован в простую строку из трех символов: G, Y и ?.

Я также добавил обработку ошибок на тот случай, если пользователь ошибается при вводе данных, повторяя цикл до тех пор, пока не будет задана правильная последовательность. Я делаю это, снова преобразовывая полученную информацию в множество, а затем проверяя, является ли это множество подмножеством допустимых ответов.

Фильтрация зеленых, желтых и серых букв с помощью вектора слова

Зеленая буква указывает на правильность буквы и ее места в слове. Желтый означает, что место неправильное, но буква в слове присутствует; а серый что буквы нигде нет.

Другой способ интерпретации этой информации заключается в том, что пока Wordle не сообщит нам, какие буквы зеленые, желтые или серые, существуют все варианты.

word_vector = [set(string.ascii_lowercase) for _ in range(WORD_LENGTH)]

Я создаю список из пяти множеств, так как нам нужно определить 5 букв слова. Каждый элемент списка — это множество всех строчных английских букв. Проходясь по каждому множеству, я могу удалять буквы, в соответствии с тем, как окрашены буквы после попытки:

  • Зеленая буква дает нам информацию только по текущему множеству
    Это означает, что если я встречу зеленую букву на втором месте, то я могу изменить второе множество и оставить только эту букву.
  • Желтые буквы исключают возможность использовать эту букву на этом месте
    Таким образом, все буквы, кроме этой, технически могут оказаться на этом месте. Удаление буквы из набора в этой позиции гарантирует, что мы не сможем выбрать слова, в которых эта буква стоит на этом месте.
  • Серые буквы подразумевают исключение буквы из всего вектора.
    Следовательно, эта буква должна быть удалена из всех множеств в векторе слова.

Теперь нам нужна функция, которая выбирает слова, которые соответствуют текущему вектору слова. Есть несколько способов сделать это, но я предлагаю вот такой красивый и простой вариант:

def match_word_vector(word, word_vector):
    assert len(word) == len(word_vector)
    for letter, v_letter in zip(word, word_vector):
        if letter not in v_letter:
            return False
    return True

Этот подход использует метод zip для попарного сопоставления каждого символа в слове и каждого символа в векторе слова.

Проходимся по каждому множеству, и, если буквы на определенном месте нет в соответствующем множестве, то цикл прерывается и слово исключается. Если все в порядке и цикл отрабатывает до последней буквы, то мы получаем ответ True и выходим из цикла, отмечая совпадение слова с вектором.

Проверяем слова на соответствие

Принимая во внимание все правила игры, теперь можем написать функцию поиска, которая фильтрует список слов с учетом ответов, которые мы получили от Wordle.

def match(word_vector, possible_words):
    return [word for word in possible_words if match_word_vector(word, word_vector)]

Эта функция объединяет в себе все правила, которые мы обсудили выше, и ищет с помощью представления списков (list comprehension). Каждое слово проверяется на соответствие с word_vector с помощью match_word_vector.

Сортировка результатов

Наконец, нам нужно создать небольшой UI, который может многократно запрашивать нужный нам ответ.

def solve():
    possible_words = WORDS.copy()
    word_vector = [set(string.ascii_lowercase) for _ in range(WORD_LENGTH)]
    for attempt in range(1, ALLOWED_ATTEMPTS + 1):
        print(f"Attempt {attempt} with {len(possible_words)} possible words")
        display_word_table(sort_by_word_commonality(possible_words)[:15])
        word = input_word()
        response = input_response()
        for idx, letter in enumerate(response):
            if letter == "G":
                word_vector[idx] = {word[idx]}
            elif letter == "Y":
                try:
                    word_vector[idx].remove(word[idx])
                except KeyError:
                    pass
            elif letter == "?":
                for vector in word_vector:
                    try:
                        vector.remove(word[idx])
                    except KeyError:
                        pass
        possible_words = match(word_vector, possible_words)

Функция solve() включает в себя некоторые элементы, которые мы уже обсудили. Затем попадаем в цикл от 1 до ALLOWED_ATTEMPTS + 1, и после каждой попытки мы отображаем номер текущей попытки и количество оставшихся возможных слов. Затем мы вызываем функцию display_word_table, которая выводит красивую таблицу из 15 совпадений с наибольшим рейтингом популярности слова. Затем функции нужно узнать слово, которое пользователь в итоге ввел и ответ Wordle.

После этого мы проходимся по ответу Wordle, отмечая какой букве соответствует ответ (последовательность цветов). Код прост: мы сопоставляем каждый из трех возможных цветов с соответствующим контейнером (зеленый — с word_vector и т. д.) и применяем правила, которые мы обсудили выше.

Наконец, мы заново берем possible_words и проверяем совпадение с новым вектором слова с помощью match, сокращая множество потенциальных вариантов.

Давайте проверим, как это работает

Все начинается с запуска функции solve() (для краткости часть вывода опущена):

>>> Attempt 1 with 5905 possible words
arose      | 0.43
raise      | 0.42

   ... etc ...

Input the word you entered> arose
Type the color-coded reply from Wordle:
  G for Green
  Y for Yellow
  ? for Gray
Response from Wordle> ?Y??Y
Attempt 2 with 829 possible words
liter      | 0.34
liner      | 0.34

   ... etc ...

Input the word you entered> liter
Response from Wordle> ???YY
Attempt 3 with 108 possible words
nerdy      | 0.29
nehru      | 0.28

   ... etc ...

Input the word you entered> nerdy
Response from Wordle> ?YY?G
Attempt 4 with 25 possible words
query      | 0.24
chewy      | 0.21

   ... etc ...

Input the word you entered> query
Response from Wordle> GGGGG
Attempt 5 with 1 possible words
query      | 0.24

Резюме

  • Представления (Comprehensions) — мощный инструмент Python
    Они могут совмещать итерацию с фильтрацией, но если вы злоупотребите этой функцией, добавляя слишком много циклов for или слишком много условных операторов, код может стать нечитаемым. Избегайте сильной вложенности этих операторов, если это возможно.
  • Множества — полезный тип объекта в Python
    Множества их их верное использование делают код более стабильным, более математически правильным и более кратким. Главное — знать, когда и как их использовать. В нашем решении множества сыграли существенную роль — не пренебрегайте ими!
  • Регулярные выражения могут помочь описать все требования к поиску
    Хотя это не было использовано в коде, совпадение (или несовпадение) шаблона и слова — это то, что регулярные выражения делают круче всего. Подумайте, как можно переписать мэтчинг и векторизацию слов с помощью регулярных выражений.
  • Модули itertools и collections содержат очень полезные инструменты
    Вы можете многого добиться с базовым знанием Python, если знаете, как именно можно использовать встроенные модули. Модуль itertools особенно полезен, если вы вам нужно провернуть итеративные вычисления.
 Нет комментариев    222   2022   python   solver   wordle

How to: YouTube API

Время чтения текста – 10 минут

Современным аналитикам необходимо обладать навыком сбора информации из социальных сетей, ведь сейчас контент социальных сетей очень точно отражает реальную ситуацию в мире, помогает быстро распространить новости и позволяет анализировать аудиторию — подписчиков. В предыдущих постах мы уже описывали кейсы с использованием различных API: Vkontakte API, Facebook API, GitHub API. Сегодня мы расскажем вам о том, что представляет из себя YouTube API, как получить ключ API, а также наглядно покажем, какую информацию можно собрать с его помощью. В двух словах, с помощью YouTube API можно находить каналы по ключевым словам, выгружать данные канала, а также статистику по видео, опубликованным на этих каналах.

Подготовительный этап для работы с YouTube API

Для начала, нужно разобраться в том, как получить доступ к API. Этот процесс подробно изложен на сайте для разработчиков, на который вы можете перейти по ссылке. Если коротко, то необходимо иметь или завести аккаунт Google, войти в профиль для разработчиков, создать проект, получить ключ API и подключить к нему API YouTube Data API v3. Далее, с использованием этого ключа вам будет доступен весь необходимый функционал.
После того, как вы успешно получили ключ, можно открывать любой удобный ноутбук (Jupyter Notebook, Collab и т. д.), устанавливать и подключать нужные для работы библиотеки.

# установка библиотек
	pip install --upgrade google-api-python-client
	pip install --upgrade google-auth-oauthlib google-auth-httplib2
	# импорт необходимых библиотек
import googleapiclient.discovery
import time

Квоты

Один важный момент, который важно знать при использовании Youtube API — это наличие дневных квот на использование функций YouTube API в бесплатном режиме. На день дается квота 10000 юнитов, вызов функции поиска стоит 100 юнитов, вызов информации по объекту — 1 юнит, загрузка видео на YouTube стоит 1600 юнитов. Если вам недостаточно дневной квоты, то вы можете подать запрос в Google на её увеличение, в котором нужно подробно указать цели вашей деятельности c YouTube API.

Поиск YouTube-каналов по ключевым словам

Для начала заведем несколько переменных, которые понадобятся нам в процессе сбора информации.

channels_data = {}
channels_data_full = {}
video_data = {}

Дальше написан скрипт, который можно использовать для поиска перечня каналов по ключевым словам. Мы искали каналы, в названии или описании которых используются следующие слова: s_query = ’аналитика данных data’. Сначала выводятся каналы, в названии или описании которых присутствуют все три слова, затем хотя бы любые два, затем хотя бы одно. Чем больше ключевых слов по теме мы укажем, тем точнее будет результат.

api_service_name = "youtube"
api_version = "v3"
DEVELOPER_KEY = "" #тут нужно указать ключ, который вы получите при подключении YouTube API
 
youtube = googleapiclient.discovery.build(
   api_service_name, api_version, developerKey = DEVELOPER_KEY)
#строка поиска
s_query = 'аналитика данных data'
next_token = ''
 
while(True): 
 time.sleep(0.2)
 request = youtube.search().list(
     part="snippet",
     q=s_query,
     relevanceLanguage="ru",
     type="channel",
     maxResults=25,
     access_token=DEVELOPER_KEY,
     pageToken = next_token
 )
 response = request.execute()
 for item in response['items']:
   channels_data[item['snippet']['channelId']] = [item['snippet']['title'], item['snippet']['description']
   ]
 #берем только первые 25 результатов
 break

Добавим пару важных пояснений относительно скрипта. В начале цикла в этом скрипте (как и в двух последующих) мы вызываем функцию time.sleep(), чтобы инициировать двухсекундную задержку между вызовом функций. Это нужно для того, чтобы запросы к YouTube не были чересчур частыми (и вообще, это считается правилом хорошего тона в программировании, так что советуем взять на заметку).
Для простоты нашего примера мы сохранили только 25 первых каналов из всех подходящих под условия поиска. Если вам хочется найти все каналы, в которых упоминается хотя бы одно из ключевых слов, то нужно использовать следующее свойство:

try:
    next_token = response["nextPageToken"]
  except:
    break

Сбор полной информации по всем выбранным каналам

Теперь, когда названия и описания выбранных каналов собраны, можно переходить к следующему этапу, а именно — выгрузке всей информации об этих каналах, в том числе: количество подписчиков канала, количество видео, общее количество просмотров всех видео канала и страна в которой живет, автор канала.

scount = ''
for channel in channels_data:
   #получаем данные по каждому каналу
   time.sleep(0.2)
   r = youtube.channels().list(
         part="snippet, statistics",
         id=channel,
         access_token=DEVELOPER_KEY
   )
   resp = r.execute()
        
   try:
     if resp['items'][0]['statistics']['hiddenSubscriberCount']:
       scount = 'hidden'
     else:
       scount = resp['items'][0]['statistics']['subscriberCount']
  
     channels_data_full[channel] = [resp['items'][0]['snippet']['title'],
                                  resp['items'][0]['snippet']['description'],
                                  scount,
                                  resp['items'][0]['statistics']['videoCount'],
                                  resp['items'][0]['statistics']['viewCount'],
                                  resp['items'][0]['snippet']['country']
     ]
      
   except:
     pass

Теперь вся нужная информация о канале хранится в переменнной channels_data_full.

Получение информации о видео

Если у вас есть необходимость получить статистику по видео из выбранных каналов, то ниже приведен скрипт на этот случай. В итоге, вы получите словарь video_data с подробной информацией о каждом видео из плейлиста (список всех видео каждого канала): название канала, дата публикации, название и описание видео, количество просмотров, лайков/дизлайков и комментариев.

# получаем информацию по всем видео ранее найденных каналов
for channel in channels_data:
   #анализируем каналы
   time.sleep(0.2)
   r = youtube.channels().list(
           part="contentDetails",
           id=channel,
           access_token=DEVELOPER_KEY
     )
   resp = r.execute()           
   try:
     #получаем плейлист из видео для одного канала из списка
     id_playlist = resp['items'][0]['contentDetails']['relatedPlaylists']['uploads']     
     #получаем набор элементов плейлиста (видео)
     next_token = ''
     while(True):     
       time.sleep(0.2)
       r = youtube.playlistItems().list(
             part="contentDetails",
             playlistId=id_playlist,
             access_token=DEVELOPER_KEY,
             pageToken = next_token
       )
       resp = r.execute()
       for i in resp['items']:
         id_videos = i['contentDetails']['videoId']
         r = youtube.videos().list(
               part="snippet, statistics",
               id=id_videos,               
               access_token=DEVELOPER_KEY
         )
         resp1 = r.execute()       
         video_data[id_videos] = [channel,
                                   resp1['items'][0]['snippet']['publishedAt'],
                                   resp1['items'][0]['snippet']['title'],
                                   resp1['items'][0]['snippet']['description'],
                                   resp1['items'][0]['statistics']['viewCount'],
                                   resp1['items'][0]['statistics']['likeCount'],
                                   resp1['items'][0]['statistics']['dislikeCount'],
                                   resp1['items'][0]['statistics']['commentCount']
          ]
       break

В конце мы ставим break, то есть обрабатываем только одну часть видео из плейлиста. Если вы хотите обработать все видео, то нужно использовать функцию nextpagetoken, которую мы предложили в конце первого скрипта.
В итоге, если трансформировать словарь в привычный датафрейм, мы получим таблицу, которая содержит подробную информацию про все обработанные видео.

d = {'id': [x for x in video_data],
      'channel_id': [video_data[x][0] for x in video_data],
       'published_at': [video_data[x][1] for x in video_data],
      'title': [video_data[x][2] for x in video_data],
      'description': [video_data[x][3] for x in video_data],
      'viewCount': [video_data[x][4] for x in video_data],
      'likeCount': [video_data[x][5] for x in video_data],
      'dislikeCount': [video_data[x][6] for x in video_data],
      'commentCount': [video_data[x][7] for x in video_data]
   }
df = pd.DataFrame(d)
df.head()

Выводы

Конечно, это не все способы работы с YouTube API, однако, мы надеемся, что вы получили представление о том, как сильно расширяются возможности аналитика для получения и обработки информации с помощью этого инструмента.

 1 комментарий    621   2021   api   python   аналитика

Как и для чего экспортировать красивые отчеты из Jupyter Notebook в PDF

Время чтения текста – 7 минут

Если вы специалист по анализу данных и вам нужно представить отчет для заказчика, если вы ищете работу и не знаете, как оформить тестовое задание так, чтобы на вас обратили внимание, если у вас много учебных проектов, связанных с аналитикой и визуализацией данных, то сегодняшний пост будет вам очень и очень полезен. Дело в том, что смотреть на чужой код в Jupyter Notebook бывает проблематично, ведь результат часто теряется между множеством строк кода с подготовкой данных, импортом нужных библиотек и серией попыток реализовать ту или иную идею. Именно поэтому такой метод, как экспорт результатов в PDF-файл в формате LaTeX — это отличный вариант для итоговой визуализации, который сэкономит время и будет выглядеть презентабельно. В научных кругах статьи и отчеты очень часто оформляются именно с использованием LaTeX, поскольку он имеет ряд преимуществ:

  • Математические уравнения и формулы выглядят аккуратнее.
  • Библиография создается автоматически, на основе всех использованных в документе ссылок.
  • Автор может сосредоточиться на содержании, а не на внешнем виде документа, так как верстка текста и других данных происходит автоматически с помощью указания необходимых параметров в коде.

Сегодня мы подробно расскажем о том, как научиться экспортировать вот такие красивые отчеты из Jupyter Notebook в PDF с использованием LaTeX.

Установка LaTeX

Самый важный момент в формировании отчета из Jupyter Notebook на Python — это его экспорт в финальный файл. Для этого применяется одна библиотека — nbconvert — которая конвертирует ваш ноутбук в любой удобный формат документа: pdf (как в нашем случае), html, latex или другой. Эту библиотеку нужно не просто установить, а провести некоторую процедуру по предустановке нескольких других пакетов: Pandoc, TeX и Chromium. По ссылке на библиотеку весь процесс описан очень подробно для каждого программного обеспечения, поэтому подробно мы на нем останавливаться не будем.
Как только вы завершили все предварительные шаги, нужно установить и импортировать библиотеку в ваш Jupyter Notebook.

!pip install nbconvert
import nbconvert

Экспорт таблиц в Markdown формат

Обычно, таблицы не представляют в отчетах, поскольку их бывает трудно быстро прочесть, но иногда все-таки необходимо добавить небольшую таблицу в итоговый документ. Для того, чтобы таблица выглядела аккуратно, нужно представить ее в Markdown формате. Это можно сделать вручную, но если в таблице много данных, то лучше придумать более удобный метод. Мы предлагаем использовать следующую простую функцию pandas_df_to_markdown_table(), которая преобразует любой датафрейм в markdown-table. Единственный нюанс: после преобразования исчезают строчные индексы, потому, если они важны (как в нашем примере), то стоит записать их в переменную в первой колонке датафрейма.

data_g = px.data.gapminder()
summary = round(data_g.describe(),2)
summary.insert(0, 'metric', summary.index)

# Функция для преобразования dataframe в Markdown Table
def pandas_df_to_markdown_table(df):
    from IPython.display import Markdown, display
    fmt = ['---' for i in range(len(df.columns))]
    df_fmt = pd.DataFrame([fmt], columns=df.columns)
    df_formatted = pd.concat([df_fmt, df])
    display(Markdown(df_formatted.to_csv(sep="|", index=False)))

pandas_df_to_markdown_table(summary)

Экспорт изображения в отчет

В этом примере мы будем строить bubble-chart, про методику построения которых рассказывали в недавнем посте. В прошлый раз мы использовали пакет Seaborn, наглядно показывая, что отображение данных размером кругов на графике происходит корректно. Такие же графики можно построить и при помощи пакета Plotly.
Для того чтобы отобразить график, построенный в Plotly в отчете тоже нужно немного постараться. Дело в том, что plt.show() не поможет отобразить график при экспорте. Поэтому, нужно сохранить получившийся график в рабочей директории, а затем, используя библиотеку iPython.display, отобразить его с помощью функции Image().

from IPython.display import Image
import plotly.express as px
fig = px.scatter(data_g.query("year==2007"), x="gdpPercap", y="lifeExp",
                 size="pop", color="continent",
                 log_x=True, size_max=70)
fig.write_image('figure_1.jpg')
Image(data = 'figure_1.jpg', width = 1000)

Формирование и экспорт отчета

Когда все этапы анализа данных завершены, отчет можно экспортировать. Если вам нужны заголовки или текст в отчете, то пишите его в ячейках ноутбука, сменив формат Code на Markdown. Для экспорта можно использовать терминал, запуская там вторую строку без восклицательного знака, либо можно запустить код, написанный ниже, в ячейке ноутбука. Мы советуем не загружать отчет кодом, поэтому используем параметр TemplateExporter.exclude_input=True, чтобы ячейки с кодом не экспортировались. Также, при запуске этой ячейки код выдает стандартный поток (standard output) и, чтобы в отчете его не было видно, в начале ячейки нужно написать %%capture.

%%capture
!jupyter nbconvert --to pdf --TemplateExporter.exclude_input=True ~/Desktop/VALIOTTI/Reports/Sample\LaTeX\ Report.ipynb
!open ~/Desktop/VALIOTTI/Reports/Sample\ LaTeX\ Report.pdf

Если вы все сделали верно и методично, то в итоге получится вот такой отчет! Презентуйте данные красиво :)

 Нет комментариев    1314   2021   jupyter notebook   python

Граф телеграм-каналов по теме аналитики

Время чтения текста – 4 минуты

Авторы самых разных блогов в телеграме часто публикуют подборки любимых каналов, которыми они хотят поделиться со своей аудиторией. Идея, конечно, не новая, но я решил не просто составить рейтинг интересных аналитических телеграм-блогов, а решить эту задачу аналитически.

В рамках текущего курса моей учебы, я изучаю много современных подходов к анализу и визуализации данных. В самом начале курса было разминочное упражнение: объектно-ориентированное программирование на Python для сбора и итеративного построения графа с TMDB API. В задаче этот метод применяется для построения графа связи актеров, где связь — игра в одном и том же фильме. Но я решил, что можно применить его и к другой задаче: построению графа связей аналитического сообщества.

Поскольку последнее время мой временной ресурс особенно ограничен, а аналогичную задачу для курса я уже выполнил, то я решил передать эти знания кому-то еще, кто интересуется аналитикой. К счастью, в этот момент, ко мне в личку постучался кандидат на вакансию младшего аналитика данных — Андрей. Он сейчас находится в процессе постижения всех тонкостей аналитики, поэтому мы договорились на стажировку, в рамках которой Андрей спарсил данные с telegram-каналов.

Основной задачей Андрея был сбор всех текстов с телеграм-канала Интернет-аналитика, выделение каналов, на которые ссылался Алексей Никушин, сбор текстов из этих телеграм-каналов и ссылок на этих каналах. Под “ссылкой” подразумевается любое упоминание канала: через @, через ссылку или репостом. В результате парсинга, у Андрея получилось два файла: nodes и edges.
Теперь я представлю вам граф, который получился у меня на основе этих данных и прокомментирую результаты.

Пользуясь случаем, хочу выразить мое почтение команде karpov.courses, поскольку у Андрея отличное знание языка Python!

В результате топ-10 каналов по показателю degree (количество связей) выглядит так:

  1. Интернет-аналитика
  2. Reveal The Data
  3. Инжиниринг Данных
  4. Data Events
  5. Datalytics
  6. Чартомойка
  7. LEFT JOIN
  8. Epic Growth
  9. RTD: ссылки и репосты
  10. Дашбордец

По-моему, получилось супер-круто и визуально интересно, а Андрей — большой молодец! Кстати, он тоже начал свой канал ”Это разве аналитика?”, где публикуются новости аналитики.

Забегая вперед: у этой задачи имеется продолжение. С помощью Марковской цепи мы смоделировали в каком канале окажется пользователь, если будет переходить итеративно по всем упоминаниям в каналах. Получилось очень интересно, но об этом мы расскажем в следующий раз!

 1 комментарий    592   2021   Data Analytics   python

Принципы построения bubble-charts: площадь VS радиус

Время чтения текста – 6 минут

Такой навык как визуализация данных применяется в любой отрасли, где присутствуют данные, ведь таблицы хороши лишь для хранения информации. Когда есть необходимость презентовать данные, точнее определенные выводы, полученные на их основе — данные необходимо представить на графиках подходящего типа. И тут перед вами встает две задачи: первая — правильно подобрать тип графика, вторая — правдоподобно отразить результаты на диаграмме. Сегодня мы расскажем вам об одной ошибке, которую иногда допускают дизайнеры при визуализации данных на bubble-charts и о том, как эту ошибку можно избежать.

Суть построения bubble-чарта

Немного скучной теории перед тем, как мы приступим к анализу данных. Bubble-chart — удобный способ показать три параметра наблюдения без построения трехмерной модели. По привычным осям X и Y указываются значения двух параметров, а третий показан размером круга, который соответствует каждому наблюдению. Именно это позволяет избежать необходимости построения сложного 3D графика, то есть любой, кто видит bubble-chart, гораздо быстрее сможет сделать выводы о данных изображенных на одной плоскости.

Ошибка, которую может допустить дизайнер, но не аналитик данных

С метриками, которые отображены на осях графика не возникает никаких вопросов, это привычный способ их визуализации, а вот с размерами возникает некоторая трудность: как грамотно и точно отобразить изменения в значениях переменной, если управление идет не точкой на оси, а размером этой точки?
Дело в том, что при построении такого графика без использования аналитических средств, например, в графическом редакторе, автор может нарисовать круги, принимая радиус круга за его размер. На первый взгляд, все кажется абсолютно корректным — чем больше значение переменной, тем больше радиус круга. Однако, в таком случае, площадь круга будет увеличиваться не как линейная, а как степенная функция, ведь S = π × r2. Например, на рисунке ниже показано, что, если увеличить радиус круга в два раза, то площадь увеличится в 4 раза.


Построение круга в Matplotlib

fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(1, 1, 1)
s = 4*10e3


ax.scatter(100, 100, s=s, c='r')
ax.scatter(100, 100, s=s/4 ,c='b')
ax.scatter(100, 100, s=10, c='g')
plt.axvline(99, c='black')
plt.axvline(101, c='black')
plt.axvline(98, c='black')
plt.axvline(102, c='black')


ax.set_xticks(np.arange(95, 106, 1))
ax.grid(alpha=1)

plt.show()

Это значит, что график будет выглядеть неправдоподобно, ведь размеры не будут отражать реальное изменение переменной, а человек обращает внимание и сравнивает именно площадь кругов на графике.

Как построить такой график правильно?

К счастью, если строить bubble-charts с помощью библиотек Python (Matplotlib и Seaborn), то размер круга будет определяться именно площадью, что абсолютно корректно и грамотно с точки зрения визуализации.
Сейчас на примере реальных данных, найденных на Kaggle, покажем, как построить bubble-chart правильно. В данных присутствуют следующие переменные: страна, численность населения, процент грамотного населения. Для того чтобы диаграмма была читаемой, возьмем подвыборку из 10 первых стран после сортировки всех данных по возрастанию ВВП.

Для начала, загрузим все нужные библиотеки:

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

Затем, загрузим данные, очистим и от всех строк с пропущенными значениями и приведем данные по численности населения стран в миллионы:

data = pd.read_csv('countries of the world.csv', sep = ',')
data = data.dropna()
data = data.sort_values(by = 'Population', ascending = False)
data = data.head(10)
data['Population'] = data['Population'].apply(lambda x: x/1000000)

Теперь, когда все подготовка завершена, можно построить bubble-chart:

sns.set(style="darkgrid")    
fig, ax = plt.subplots(figsize=(10, 10))    
g = sns.scatterplot(data=data, x="Literacy (%)", y="GDP ($ per capita)", size = "Population", sizes=(10,1500), alpha=0.5)
plt.xlabel("Literacy (Percentage of literate citizens)")
plt.ylabel("GDP per Capita")
plt.title('Chart with bubbles as area', fontdict= {'fontsize': 'x-large'})

def label_point(x, y, val, ax):
    a = pd.concat({'x': x, 'y': y, 'val': val}, axis=1)
    for i, point in a.iterrows():
        ax.text(point['x'], point['y']+500, str(point['val']))

label_point(data['Literacy (%)'], data['GDP ($ per capita)'], data['Country'], plt.gca()) 

ax.legend(loc='upper left', fontsize = 'medium', title = 'Population (in mln)', title_fontsize = 'large', labelspacing = 1)

plt.show()

На этом графике получилось понятным образом отобразить три метрики: уровень ВВП на душу населения по оси Y, процент грамотного населения по оси X и численность населения — площадью круга.

Мы рекомендуем использовать площадь в качестве переменной, которая отвечает за размер фигуры, если есть необходимость показать несколько переменных на одном графике.

 Нет комментариев    131   2021   Data Analytics   python

Обзор дашборда в Dash

Время чтения текста – 2 минуты

Посмотрите и другие наши материалы про plotly

Сегодня публикуем не совсем классический выпуск обзора BI-инструментов — потому что речь пойдёт о Dash, фреймворке для Python от plotly. Dash — гибкий инструмент, который предоставляет набор компонентов для работы с HTML и Bootstrap для создания дашбордов с графиками plotly. Дашборд, созданный при помощи Dash — это веб-страница, написанная на Python. Любую диаграмму можно настроить, изменив передаваемые параметры прямо в коде. А работать с самими данными можно любым удобным в Python способом — например, при помощи датафреймов pandas.

В новом обзоре посмотрим на работу коллбэков и фильтров в Dash, а также на реализацию таблиц и диаграмм дашборда Superstore в plotly и Dash.

Внутри команды мы оценили дашборд и получили следующие средние оценки (1 — худшая оценка, 10 — лучшая):
Отвечает ли заданным вопросам — 8,83
Порог входа в инструмент — 4,83
Функциональность инструмента — 8,66
Удобство пользования — 7,83
Соответствие результата макету — 9,00
Визуальная составляющая — 8,16

Итог: дашборд получает 8,05 баллов из 10. Посмотрите на полученный результат.

Автор дашборда, член команды Valiotti Analytics — Елизавета Мазурова

Анализ альбомов Земфиры: дашборд в Tableau

Время чтения текста – 2 минуты

В марте мы опубликовали исследование «Python и тексты нового альбома Земфиры: анализируем суть песен», в котором при помощи Word2Vec-модели проанализировали близость песен альбома «бордерлайн» и получили самые близкие слова по духу альбома — ими оказались «пламень», «гореть», «тоска», «печаль», «сердце», «солнце» и другие.

Мы продолжили работу над альбомами Земфиры и проанализировали семь из них, а затем результаты собрали в один дашборд и опубликовали его в Tableau Public. Посмотрите, что получилось.

Заглавная страница — общий анализ семи альбомов Земфиры. Переключиться на конкретный альбом можно по нажатию на его иконку внизу страницы. Для каждого альбома представлена матрица семантической близости песен, облако слов и топ схожих слов для альбома.

Парсим вакансии для аналитиков из Indeed

Время чтения текста – 8 минут

В этом материале мы расскажем, как парсить вакансии с сайта Indeed. Indeed — это крупнейший в мире поисковик вакансий. Этим текстом мы начинаем большой проект по анализу и визуализации показателей оплаты труда в области Data Science в разных странах.
Подобный анализ рынка вакансий, но только в России, мы проводили в материале Анализ рынка вакансий аналитики и BI: дашборд в Tableau, когда парсили данные с сайта HeadHunter.

А еще у нас можно почитать материал Парсим данные каталога сайта, используя Beautiful Soup и Selenium

Импорт библиотек
Библиотека fake_useragent имитирует реальный User-Agent, чтобы преодолеть защиту сайта от парсинга. Таким образом мы сможем пройти проверку HTTP заголовка User-Agent.
Модуль urllib.parse разбирает URL-адрес на компоненты и записывает его как кортеж. Он пригодится для перехода на карточки вакансий. BeautifulSoup поможет разобраться в структуре html-страницы и добыть нужную нам информацию.

import requests
from datetime import timedelta, datetime
import urllib.parse
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import pandas as pd
import time
from lxml.html import fromstring
from clickhouse_driver import Client
from clickhouse_driver import errors
import numpy as np
from funcs import check_title, get_skills_row, parse_salary, get_sheetname, create_table

Создадим таблицу в Clickhouse
Данные, которые мы собираемся собрать, будем хранить в базе Clickhouse.

create_table = '''CREATE TABLE if not exists indeed.vacancies (
    row_idx UInt16,
    query_string String,
    country String,
    title String,
    company String,
    city String,
    job_added Date,
    easy_apply UInt8,
    company_rating Nullable(Float32),
    remote UInt8,
    job_id String,
    job_link String,
    sheet String,
    skills String,
    added_date Date,
    month_salary_from_USD Float64,
    month_salary_to_USD Float64,
    year_salary_from_USD Float64,
    year_salary_to_USD Float64,
)
ENGINE = ReplacingMergeTree
SETTINGS index_granularity = 8192'''

Обход блокировок
Нам нужно обойти защиту Indeed и избежать блокировки по IP. Для этого используем анонимные прокси адреса на сайте free-proxy-list.net. Как собрать свежие прокси, мы писали в нашем предыдущем тексте «Пишем парсер свежих прокси на Python для Selenium». Прокси адреса мы запишем в массив, который понадобится в момент обращения к Indeed, когда запрос будет проверять User-Agent.

Данный метод удаляет IP из списка с прокси в том случае, если ответ от Indeed через него так и не пришел.

def remove_proxy_from_list_and_update_if_required(proxy):
    global _proxies
    _proxies.remove(proxy)
    if len(_proxies) == 0:
        update_proxy_list()

Функция, используя прокси, возвращает нам страницу Indeed, из которой мы впоследствии спарсим данные.

def get_page(updated_url, session):
    proxy = get_proxy()
    proxy_dict = {"http": proxy, "https": proxy}
    logger.info(f'try with proxy: {proxy}')
    try:
        session.proxies = proxy_dict
        return session.get(updated_url, timeout=15)
    except (requests.exceptions.RequestException, requests.exceptions.ProxyError, requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout, requests.exceptions.SSLError,
            requests.exceptions.ConnectionError, url_ex.MaxRetryError, ConnectionResetError,
            socket.timeout, url_ex.ReadTimeoutError):
        remove_proxy_from_list_and_update_if_required(proxy)
        logger.info(f'try with proxy {proxy}')
        return get_page(updated_url, session)

Методы для парсера
Искомые данные нужно будет искать по тегам и атрибутам верстки с помощью BeautifulSoup. Мы заранее собрали ключевые слова, которые нас будут интересовать в вакансиях, и подготовили с ними отдельный датасет.

В карточках вакансий нет точной даты публикации, указано лишь сколько дней назад она была опубликована. Сохраним точную дату публикации в традиционном формате с помощью timedelta.

def raw_date_to_str(raw_date):
    raw_date = raw_date.lower()
    if '+' in raw_date or "более" in raw_date:
        delta = timedelta(days=32)
        return (datetime.now() - delta).strftime("%Y-%m-%d")
    else:
        parts = raw_date.split()
        for part in parts:
            if part.isdigit():
                delta = timedelta(days=part.isdigit())
                return (datetime.now() - delta).strftime("%Y-%m-%d")
    return ""

Сохраним id вакансии в системе Indeed. Подставляя id в URL страницы, мы сможем получить доступ к полному описанию вакансий.

def get_job_id_from_card(card):
    try:
        return card['id'].split('_')[1]
    except:
        return ""

Данный метод соберет названия вакансий.

def get_title_from_card(card):
    try:
        job_title = card.find('a', {'class': 'jobtitle'}).text
        return job_title.replace('\n', '')
    except:
        return ''

Аналогичным образом напишем методы, которые будут собирать данные о названии компании, времени публикации объявления, местоположении работодателя и рейтинге работодателя на портале.

URL сайта Indeed пишется для разных стран по-разному. Для США это будет просто indeed.com, а локализации для других стран получают префиксом xx.indeed.com. Список с префиксами мы собрали в массив заранее из https://opensource.indeedeng.io/api-documentation/docs/supported-countries/ списка Indeed.

def get_link_from_card(card, card_country):
    try:
        if card_country == 'us':
            return f"https://indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
        else:
            return f"https://{card_country}.indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
    except:
        return ""

Спарсим описание вакансии, которое можно найти по тегу ’summary’. Именно там содержатся требования, которые предъявляют к кандидату.

def get_summary_from_card_and_transform_to_skills(card):
    try:
        smr = card.find('div', {'class': 'summary'}).text
        return get_skills_row(smr)
    except:
        return ""
Необходимые hard-skills из описания вакансий будем сверять со списком 'skills'. 
skills = ["python", "tableau", "etl", "power bi", "d3.js", "qlik", "qlikview", "qliksense",
          "redash", "metabase", "numpy", "pandas", "congos", "superset", "matplotlib", "plotly",
          "airflow", "spark", "luigi", "machine learning", "amplitude", "sql", "nosql", "clickhouse",
          'sas', "hadoop", "pytorch", "tensorflow", "bash", "scala", "git", "aws", "docker",
          "linux", "kafka", "nifi", "ozzie", "ssas", "ssis", "redis", 'olap', ' r ', 'bigquery', 'api', 'excel']

Эта функция разобьет ’summary’ на слова пробелом и проверит их на соответствие нашему списку. В датасет будут возвращаться совпадения с нашим списком hard-skills.

def get_skills_row(summary):
    summary = summary.lower()
    row = []
    for sk in skills:
        if sk in summary:
            row.append(sk)
    return ','.join(row)

На выходе мы получим таблицу с примерно 30 тысячами строк.

Полный код проекта можно посмотреть в нашем репозитории на GitHub.

В Python 3.10 появился pattern matching

Время чтения текста – 11 минут

Этот материал — перевод статьи «How to use structural pattern matching in Python»

В новом релизе Python 3.10 появились операторы case/match, которые отвечают за реализацию в языке синтаксиса pattern-matching.

Python, несмотря на его простоту и популярность, в отличие от других языков, не имел отдельной формы управления потоком (form of flow control) — способа взять значение и элегантно сопоставить его с одним из множества возможных условий. В C и C++ эта функция реализована конструкцией switch/case, а в Rust она называется pattern matching.

Изящных способов реализовать это в Python, кроме как воспользоваться конструкцией if/elif/else и поиском по словарю, до этого момента не существовало. Оба способа работают, но из-за своей громоздкости они могли затруднить читабельность кода.

За последние годы были предприняты несколько попыток включить синтаксис типа switch/case в Python, но все они провалились. Это первая реализация структурного сопоставления шаблонов (structural pattern matching), которая сейчас доступна только в версии для разработчиков.

Введение в pattern matching на Python

Структурное сопоставление шаблонов (structural pattern matching) вводит оператор match/case, который работает по той же схеме, что и switch/case. Оператор проверяет объект на соответствие одному или нескольким шаблонам и, если совпадение найдено, выполняет действие.

match command:
    case "quit":
        quit()
    case "reset":
        reset()
    case unknown_command:
        print (f"Unknown command'{unknown_command}'")

За каждым выражением case следует шаблон для сопоставления. В данном примере сверху вниз идет сопоставление строк с оператором, и если такое сопоставление найдено, оператор выполняется. Также можно захватить все или часть совпадения и повторно использовать их. В нашем примере в случае с шаблоном сопоставления unknown_command мы использовали его повторно внутри f-строки.

Сопоставление переменных с помощью pattern matching

Если вы хотите сопоставить значение с константами, то константы следует отнести к полям класса:

class Command:
    QUIT = 0
    RESET = 1

command = 0

match command:
    case Command.QUIT:
        quit()
    case Command.RESET:
        reset()

Если вы попробуете сделать это не прибегая к классам, например, так:

QUIT = 0
RESET = 1

command = 0
match command:
    case QUIT:
        quit()
    case RESET:
        reset()

Получите в ответ ошибку, связанную с тем, что имя не относится к известному паттерну:

name capture 'QUIT' makes remaining patterns unreachable

Сопоставление нескольких элементов с помощью pattern matching

Pattern matching используется не только как замена поиска по словарю. Оно используется для описания самой структуры того, что вы хотите сопоставить. Таким образом, вы можете выполнять сопоставления на основе количества сопоставляемых элементов или их комбинаций.

Вот более сложный пример. Здесь пользователь вводит команду, за которой, возможно, следует имя файла:

command = input()
match command.split():
    case ["quit"]:
        quit()
    case ["load", filename]:
        load_from(filename)
    case ["save", filename]:
        save_to(filename)
    case _:
        print (f"Command '{command}' not understood")

Давайте рассмотрим варианты case по порядку:

  • case [’quit’]: проверяет, соответствует ли то, что мы сопоставляем, списку только с элементом ’quit’, полученных после разделения введенных данных с помощью split().
  • case [’load’, filename]: проверяет, является ли первый разделенный элемент строкой ’load’, и следует ли за ней вторая строка. Если вторая строка есть, то вторая строка сохраняется в переменной filename и используется для дальнейшей работы. Аналогично проверяется case [«save», filename]:.
  • case _: это совпадение с подстановочным знаком (wildcard match). Происходит совпадение, если до этого момента не происходило никакого другого совпадения. Обратите внимание, что символ нижнего подчеркивания ( _ ) ни к чему не привязан, в данном случае нижнее подчеркивание используется как сигнал команде match, что рассматриваемый случай является подстановочным знаком (wildcard). (Вот почему мы ссылаемся на команду переменной в теле блока case, ведь ничего не было захвачено.)

Шаблоны в structural pattern matching

Шаблоны могут быть простыми значениями или содержать более сложную логику сопоставления.
Вот несколько примеров:

  • case ’a’: сопоставить с единственным значением ’a’.
  • case [’a’,’b’]: сопоставить с коллекцией (collection) [’a’,’b’].
  • case [’a’, value1]: сопоставить с коллекцией, в которой два значения, и поместить второе значение в переменную value1.
  • case [’a’, *values]: сопоставить с коллекцией, в которой как минимум одно значение. Остальные значения, если они есть, хранить в values. Обратите внимание, что вы можете включить только один элемент со звездочкой в шаблон.
  • case (’a’|’b’|’c’): Оператор or, он же |, может использоваться для обработки нескольких обращений в одном блоке case. Здесь мы сопоставляем ’a’, ’b’, или ’c’.
  • case (’a’|’b’|’c’) as letter: То же, что и выше, за исключением того, что теперь мы помещаем соответствующий элемент в переменную letter.
  • case [’a’, value] if : Переменная связывается только если expression истинно. Переменные, которые мы хотим связать, можно использовать в . Например, если мы используем if value in valid_values, то case будет действительным только в том случае, если захваченное значение value был на самом деле в коллекции valid_values.
  • case [’z’, _]: будет соответствовать любая коллекция элементов, которая начинается с ’z’.

Сопоставление с объектами с помощью pattern matching

Самая продвинутая функция pattern matching в Python — это возможность сопоставлять объекты с определенными свойствами. Рассмотрим приложение, в котором мы работаем с объектом media_object. Этот объект мы хотим преобразовать в файл .jpg и вернуть из функции.

match media_object:
    case Image(type="jpg"):
        # Return as-is
        return media_object
    case Image(type="png") | Image(type="gif"):
        return render_as(media_object, "jpg")
    case Video():
        raise ValueError("Can't extract frames from video yet")
    case other_type:
        raise Exception(f"Media type {media_object} can't be handled yet")

В каждом из описанных выше case мы ищем объект определенного типа, иногда с определенными атрибутами. В первом case ищем соответствие объекта Image , у которого type атрибутирован как ’jpg’. Во втором case идет сопоставление, если type соответствует ’png’> или ’gif’. В третьем case идет проверка на соответствие объекта типу Video, при этом атрибут не имеет значения. И в последнем случае мы получаем все, что не было выбрано ранее.

Вы также можете выполнять захват с сопоставлением объектов:

match media_object:
    case Image(type=media_type):
        print (f"Image of type {media_type}")

Эффективное использование pattern matching

Ключевой момент при работе с match/case в Python заключается в написании шаблонов, в которых будет описана структура того, с чем вы хотите работать. Простые тесты на константы хороши, но если это все, что вы делаете, то лучше просто сделать поиск по словарю. Настоящая ценность структурного сопоставления с шаблоном (structural pattern matching) в Python заключается в возможности сопоставления с шаблоном объекта, а не только с каким-то одним объектом или даже с их набором.

Еще одна важная деталь, которую нужно иметь в виду, это порядок написания сопоставлений. То, какие сопоставления вы проверите в первую очередь, повлияет на эффективность и точность вашего сопоставления в целом. Размещайте наиболее конкретные сопоставления на первом месте, а наиболее общие — на последнем.

В конечном счете, если у вас есть проблема, которую можно решить с помощью if/elif/else или поиска по словарю, то используйте их вместо match/case. Pattern matching является мощным, но не универсальным решением. Используйте его, когда это наиболее целесообразно.

Подробнее с документацией по pattern matching в Python (PEP 622) можно ознакомиться тут.

 2 комментария    841   2021   english   python   перевод

Пишем скрипт для автоматизации коммитов GitHub

Время чтения текста – 9 минут

У GitHub есть API, позволяющий делать всё, что можно сделать руками: создавать репозитории, делать коммиты файлов и переключаться между ветками. Не все используют GUI при работе с системой контроля версий, и для коммита файлов приходится вводить не одну команду в терминал, а смена веток нередко приводит к запутанности действий. Сегодня мы поэкспериментируем с GitHub API и напишем скрипт, который сам собирает все файлы текущей директории раз в час и отправляет в отдельную ветку на GitHub при помощи get, post и put-запросов к методу /repos.

Получение access token

Для работы с GitHub API нужно получить токен, который выдаётся каждому пользователю в настройках. Для создания нового токена нажимаем на «Generate new token»:

В процессе работы с GitHub API токен может просрочиться и в ответ придёт ошибка «Bad credentials». Для решения проблемы можно создать новый токен или получить токен приложения

Следом нужно оставить название для токена и отметить области доступа ключу. Для наших целей достаточно дать доступ только к разделу repo.

После в зелёном окошке появится токен:

Отправляем запросы

Подключим нужные библиотеки и введём некоторые константные значения: логин, токен, название для репозитория и ветки, в которую в автоматическом режиме будут отправляться файлы:

import requests
import base64
import json
import os

username = 'leftjoin'
repo = 'leftjoin'
token = "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"
new_branch_name = 'automatic'

Создадим новый репозиторий post-запросом. В поле data передаем название репозитория, в auth — логин и токен:

r = requests.post('https://api.github.com/user/repos',
                  data=json.dumps({'name':repo}),
                  auth=(username, token),
                  headers={"Content-Type": "application/json"})

Теперь загрузим файл README.md, в котором будет строка “Hello, world!”. Для этого строку нужно перекодировать в base64 — GitHub принимает данные в такой кодировке:

content = "Hello, world!"

b_content = content.encode('utf-8')
base64_content = base64.b64encode(b_content)
base64_content_str = base64_content.decode('utf-8')

Теперь создадим коммит в формате словаря: он состоит из пути (пока его оставим пустым), сообщения коммита и содержания:

f = {'path':'',
     'message': 'Automatic update',
     'content': base64_content_str}

Отправим файл в репозиторий put-запросом, передав путь до README.md прямо в ссылке запроса:

f_resp = requests.put(f'https://api.github.com/repos/{username}/{repo}/contents/README.md',
                auth=(username, token),
                headers={ "Content-Type": "application/json" },
                data=json.dumps(f))

Теперь в репозитории в ветке main есть один файл README.md, который содержит строку «Hello, world!»:

Работа с ветками

Так как наш скрипт подразумевает работу в фоновом режиме, не всегда будет возможность контролировать, какие версии файлов он отправляет. Чтобы не засорять основную ветку будем отправлять все файлы в специальную, из которой при надобности можно будет достать обновления. Логика такая: проверяем, есть ли в репозитории такая ветка и, если нет, создаём её. Затем получаем все файлы текущей директории, построчно считываем, перекодируем в base64, коммитим и отправляем в ветку для автоматических обновлений.

Начнём с функции, которая определяет, существует ли ветка с таким названием. Она отправит запрос на получение всех веток репозитория, и вернёт False, если ветки не существует:

def branch_exist(branch_name):
    branches = requests.get(f'https://api.github.com/repos/{username}/{repo}/git/refs').json()
    try:
        for branch in branches:
            if branch['ref'] == 'refs/heads/' + branch_name:
                return True
        return False
    except Exception:
        return False

У каждой ветки, файлов и репозиториев на GitHub есть уникальный идентификатор, получаемый путём хэш-суммы алгоритмом SHA. Он необходим как для модификации файлов, так и для смены веток — опишем функцию, которая по названию ветки получает SHA для неё:

def get_branch_sha(branch_name):
    try:
        branches = requests.get(f'https://api.github.com/repos/{username}/{repo}/git/refs').json()
        for branch in branches:
            sha = None
            if branch['ref'] == 'refs/heads/' + branch_name:
                sha = branch['object']['sha']
            return sha
    except Exception:
        return None

Следом опишем функцию создания новой ветки с названием из переменной new_branch_name:

def create_branch():
    main_branch_sha = get_branch_sha('main')
    requests.post(f'https://api.github.com/repos/{username}/{repo}/git/refs',
             auth=(username, token),
             data=json.dumps({
                 'ref':f'refs/heads/{new_branch_name}',
                 'sha':main_branch_sha
             })).json()

Для модификации файлов тоже необходимо знать его идентификатор:

def get_sha(path):
    r = requests.get(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token),
                    data=json.dumps({
                        'branch': new_branch_name
                    }))
    sha = r.json()['sha']
    return sha

Наконец, опишем ряд главных функций — первая по аналогии с примером из начала материала принимает содержимое файла и формирует коммит в отдельную ветку, а вторая отправляет файл в указанный репозиторий:

def make_file_and_commit(content, sha=None):
    b_content = content.encode('utf-8')
    base64_content = base64.b64encode(b_content)
    base64_content_str = base64_content.decode('utf-8')
    f = {'path':'',
     'message': 'Automatic update',
     'content': base64_content_str,
     'sha':sha,
     'branch': new_branch_name}
    return f

def send_file(path, data):
    f_resp = requests.put(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token),
                    headers={ "Content-Type": "application/json" },
                    data=json.dumps(data))
    print(f_resp.json())
    return f_resp

И ещё две функции напоследок: первая нужна, потому что мы должны знать, модифицируем мы уже существующий файл или новый. В случае с новым нет необходимости получать SHA для файла — его просто не существует. Вторая функция считывает всё содержимое файла в строку.

def file_exist(path):
    r = requests.get(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token))
    return r.ok

def file_reader(path):
    lines = ""
    with open(path, 'r') as f:
        for line in f:
            lines += line
    return lines

Соберём всё вместе в функцию main(). Проверяем, существует ли ветка для автоматических коммитов, затем получаем все файлы директории, проверяем их существование в репозитории и отправляем:

def main():
    if not branch_exist(new_branch_name):
        create_branch()
    files = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(".")) for f in fn]
    for path in files:
        print(path)
        sha = None
        if file_exist(path):
            sha = get_sha(path)
        lines = file_reader(path)
        f = make_file_and_commit(lines, sha)
        r = send_file(path, f)

Перевести скрипт в автоматический режим может помочь библиотека schedule, которую мы уже использовали в материале «Анализ рынка вакансий аналитики и BI: дашборд в Tableau»

import schedule

schedule.every().hour.do(main)

while True:
    schedule.run_pending()

Вот и всё: мы написали скрипт, который самостоятельно каждый час отправляет все файлы текущей директории в ветку на GitHub.

 Нет комментариев    276   2021   api   github   python

Эффективное логирование в Python

Время чтения текста – 5 минут

В Python существует встроенный модуль logging, который позволяет журналировать этапы выполнения программы. Логирование полезно когда, например, нужно оставить большой скрипт сбора / обработки данных на длительное время, а в случае возникновения непредвиденных ошибок выяснить, с чем они могут быть связаны. Анализ логов позволяет быстро и эффективно выявлять проблемные места в коде, но для удобного использования модуля следует написать несколько функций по взаимодействию с ним и вынести их в отдельный файл — сегодня мы этим и займёмся.

Пишем логгер

Создадим файл loggers.py. Для начала импортируем модули и задаём пару значений по умолчанию — директорию для файла с логом и наименование конфигурационного файла, содержащего шаблоны логирования. Его мы опишем следом.

import os
import json
import logging
import logging.config

FOLDER_LOG = "log"
LOGGING_CONFIG_FILE = 'loggers.json'

Опишем функцию для создания папки с логом: она принимает наименование для папки, но по умолчанию будет называть её «log». Директорию создаём при помощи модуля os и только в том случае, если такой директории ещё не существует.

def create_log_folder(folder=FOLDER_LOG):
    if not os.path.exists(folder):
        os.mkdir(folder)

Теперь опишем функцию создания нового логгера по заданному шаблону. Функция должна создать директорию для логирования, открыть конфигурационный файл и достать нужный шаблон. Затем по шаблону при помощи модуля logging создаём новый логгер:

def get_logger(name, template='default'):
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        dict_config = json.load(f)
        dict_config["loggers"][name] = dict_config["loggers"][template]
    logging.config.dictConfig(dict_config)
    return logging.getLogger(name)

Для удобства опишем ещё одну функцию — получение стандартного лога. Она ничего не принимает и нужна только для инициализации лога с шаблоном default:

def get_default_logger():
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        logging.config.dictConfig(json.load(f))

    return logging.getLogger("default")

Описываем конфигурационный файл

Создадим по соседству файл loggers.json — он будет содержать настройки логгера. Внутри указываем такие настройки, как версию логгера, форматы логирования для разных уровней, наименование выходного файла и его максимальный размер:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "default": {
            "format": "%(asctime)s - %(processName)-10s - %(name)-10s - %(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "default"
        },
        "rotating_file": {
            "class": "logging.handlers.RotatingFileHandler",
            "level": "DEBUG",
            "formatter": "default",
            "filename": "log/main.log",
            "maxBytes": 10485760,
            "backupCount": 20
        }
    },
    "loggers": {
        "default": {
            "handlers": ["console", "rotating_file"],
            "level": "DEBUG"
        }
    }
}

Использование логгера

Теперь давайте представим, что вы выгружаете данные по API и складываете их в базу данных на примере нашего материала про транзакции в SQLAlchemy. Рассмотрим заключительную часть кода: добавим строку с инициализацией стандартного логгера и изменим код так, чтобы сначала в лог выводился offset, затем в случае успеха предложение «Successfully inserted data», а в случае ошибки выводилась сама ошибка и предложение: «Error: tried to insert data but got an error».

logger = get_logger('main')

offset = 0
subs_count = get_subs_count(group_id)

while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            logger.info(f"{offset} / {subs_count}")
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError("This is a test errror"))
            transaction.commit()
            logger.info(f"Successfully inserted data")
        except Exception as E:
            transaction.rollback()
            logger.error(f"Error: tried to insert {df} but got an error: {E}")
    time.sleep(1)
    offset += 10

Теперь во время работы программы будет отображаться такой вывод, который также будет записан в файл main.log папки log в директории проекта. После завершения работы программы можно исследовать логи, посмотреть, на каких offset возникли проблемы, какие данные не удалось вставить и прочитать текст ошибки:

Обнаружение статистических выбросов в Python

Время чтения текста – 7 минут

Параллельно с выходом материала «Обнаружение выбросов в R» предлагаем посмотреть, как те же методы обнаружения выбросов реализовать в Python.

Данные

Для наглядности эксперимента возьмём тот же пакет данных mpg — скачать его в виде csv-таблицы можно с GitHub. Импортируем библиотеки и читаем таблицу в DataFrame:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_csv('mpg.csv')

Минимальные и максимальные значения

Тут всё просто. Выводим описание всего датасета методом describe():

df.describe()

Гистограмма

Такой график тоже можно построить в одну строку, используя внутренние средства библиотеки pandas:

df.hwy.plot(kind='hist', density=1, bins=20, stacked=False, alpha=.5, color='grey')

Box plot

В случае ящика с усами далеко идти тоже не приходится — в pandas есть метод и для этого:

_, bp = df.hwy.plot.box(return_type='both')

Получим точки с графика и выведем их в таблице, используя объект bp:

outliers = [flier.get_ydata() for flier in bp["fliers"]][0]
df[df.hwy.isin(outliers)]

Процентили

При помощи метода quantile получаем соответствующую нижнюю и верхнюю границы, а затем выводим всё, что выходит за их рамки:

lower_bound = df.hwy.quantile(q=0.025)
upper_bound = df.hwy.quantile(q=0.975)
df[(df.hwy < lower_bound) | (df.hwy > upper_bound)]

Фильтр Хэмпеля

Мы используем реализацию фильтра Хэмпеля, найденную на StackOverflow

Опишем функцию, которая заменяет на nan все значения, у которых разница с медианой больше, чем три медианных абсолютных отклонения.

def hampel(vals_orig):
    vals = vals_orig.copy()    
    difference = np.abs(vals.median()-vals)
    median_abs_deviation = difference.median()
    threshold = 3 * median_abs_deviation
    outlier_idx = difference > threshold
    vals[outlier_idx] = np.nan
    return(vals)

И применим к нашему набору данных:

hampel(df.hwy)

0      29.0
1      29.0
2      31.0
3      30.0
4      26.0
       ... 
229    28.0
230    29.0
231    26.0
232    26.0
233    26.0
Name: hwy, Length: 234, dtype: float64

В выводе нет nan-значений, а значит и выбросов фильтр Хэмпеля не обнаружил.

Тест Граббса

Автор реализации теста Граббса и теста Рознера для Python

Опишем три функции: первая находит значение критерия Граббса и максимальное значение в наборе данных, вторая — критическое значение с учётом объёма выборки и уровня значимости, а третья проверяет, является ли значение с максимальным индексом выбросом:

import numpy as np
from scipy import stats

def grubbs_stat(y):
    std_dev = np.std(y)
    avg_y = np.mean(y)
    abs_val_minus_avg = abs(y - avg_y)
    max_of_deviations = max(abs_val_minus_avg)
    max_ind = np.argmax(abs_val_minus_avg)
    Gcal = max_of_deviations / std_dev
    print(f"Grubbs Statistics Value: {Gcal}")
    return Gcal, max_ind

def calculate_critical_value(size, alpha):
    t_dist = stats.t.ppf(1 - alpha / (2 * size), size - 2)
    numerator = (size - 1) * np.sqrt(np.square(t_dist))
    denominator = np.sqrt(size) * np.sqrt(size - 2 + np.square(t_dist))
    critical_value = numerator / denominator
    print(f"Grubbs Critical Value: {critical_value}")
    return critical_value

def check_G_values(Gs, Gc, inp, max_index):
    if Gs > Gc:
        print(f"{inp[max_index]} is an outlier")
    else:
        print(f"{inp[max_index]} is not an outlier")

Заменим значение в 34 строке на 212:

df.hwy[34] = 212

И выполним три функции:

Gcritical = calculate_critical_value(len(df.hwy), 0.05)
Gstat, max_index = grubbs_stat(df.hwy)
check_G_values(Gstat, Gcritical, df.hwy, max_index)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040397
212 is an outlier

Тест Рознера

Для теста Рознера достаточно дописать одну функцию, которая принимает набор данных, уровень значимости и число потенциальных выбросов:

def ESD_test(input_series, alpha, max_outliers):
    for iteration in range(max_outliers):
        Gcritical = calculate_critical_value(len(input_series), alpha)
        Gstat, max_index = grubbs_stat(input_series)
        check_G_values(Gstat, Gcritical, input_series, max_index)
        input_series = np.delete(input_series, max_index)

Используя функцию на нашем наборе данных получаем, что значение 212 является выбросом, а 44 — нет:

ESD_test(np.array(df.hwy), 0.05, 3)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040408
212 is an outlier
Grubbs Critical Value: 3.6508358337727187
Grubbs Statistics Value: 3.455960616168714
44 is not an outlier
Grubbs Critical Value: 3.649574509044683
Grubbs Statistics Value: 3.5561478280392245
44 is not an outlier

Python и тексты нового альбома Земфиры: анализируем суть песен

Время чтения текста – 18 минут

Неделю назад вышёл первый за 8 лет студийный альбом Земфиры «Бордерлайн». К работе помимо рок-певицы приложили руку разные люди, в том числе и её родственники — рифф для песни «таблетки» написал её племянник из Лондона. Альбом получился разнообразным: например, песня «остин» посвящена главному персонажу игры Homescapes российской студии Playrix (кстати, посмотрите свежие Бизнес-секреты с братьями Бухманами, там они тоже про это рассказывают) — Земфире нравится игра, и для трека она связалась со студией. А сингл «крым» был написан в качестве саундтрека к новой картине соратницы Земфиры — Ренаты Литвиновой.

Послушать альбом в Apple Music / Яндекс.Музыке / Spotify

Тем не менее, дух всего альбома довольно мрачен — в песнях часто повторяются слова «боль», «ад», «бесишь» и прочие по смыслу. Мы решили провести разведочный анализ нового альбома, а затем при помощи модели Word2Vec и косинусной меры посмотреть на семантическую близость песен между собой и вычислить общее настроение альбома.

Для тех, кому скучно читать про подготовку данных и шаги анализа можно перейти сразу к результатам.

Подготовка данных

Для начала работы напишем скрипт обработки данных. Цель скрипта — из множества текстовых файлов, в каждом из которых лежит по песне, собрать единую csv-таблицу. При этом текст треков очищаем от знаков пунктуации и ненужных слов.

import pandas as pd
import re
import string
import pymorphy2
from nltk.corpus import stopwords

Создаём морфологический анализатор и расширяем список всего, что нужно отбросить:

morph = pymorphy2.MorphAnalyzer()
stopwords_list = stopwords.words('russian')
stopwords_list.extend(['куплет', 'это', 'я', 'мы', 'ты', 'припев', 'аутро', 'предприпев', 'lyrics', '1', '2', '3', 'то'])
string.punctuation += '—'

Названия песен приведены на английском — создадим словарь для перевода на русский и словарь, из которого позднее сделаем таблицу:

result_dict = dict()

songs_dict = {
    'snow':'снег идёт',
    'crimea':'крым',
    'mother':'мама',
    'ostin':'остин',
    'abuse':'абьюз',
    'wait_for_me':'жди меня',
    'tom':'том',
    'come_on':'камон',
    'coat':'пальто',
    'this_summer':'этим летом',
    'ok':'ок',
    'pills':'таблетки'
}

Опишем несколько функций. Первая читает целиком песню из файла и удаляет переносы строки, вторая очищает текст от ненужных символов и слов, а третья при помощи морфологического анализатора pymorphy2 приводит слова к нормальной форме. Модуль pymorphy2 не всегда хорошо справляется с неоднозначностью — для слов «ад» и «рай» потребуется дополнительная обработка.

def read_song(filename):
    f = open(f'{filename}.txt', 'r').read()
    f = f.replace('\n', ' ')
    return f

def clean_string(text):
    text = re.split(' |:|\.|\(|\)|,|"|;|/|\n|\t|-|\?|\[|\]|!', text)
    text = ' '.join([word for word in text if word not in string.punctuation])
    text = text.lower()
    text = ' '.join([word for word in text.split() if word not in stopwords_list])
    return text

def string_to_normal_form(string):
    string_lst = string.split()
    for i in range(len(string_lst)):
        string_lst[i] = morph.parse(string_lst[i])[0].normal_form
        if (string_lst[i] == 'аду'):
            string_lst[i] = 'ад'
        if (string_lst[i] == 'рая'):
            string_lst[i] = 'рай'
    string = ' '.join(string_lst)
    return string

Проходим по каждой песне и читаем файл с соответствующим названием:

name_list = []
text_list = []
for song, name in songs_dict.items():
    text = string_to_normal_form(clean_string(read_song(song)))
    name_list.append(name)
    text_list.append(text)

Затем объединяем всё в DataFrame и сохраняем в виде csv-файла.

df = pd.DataFrame()
df['name'] = name_list
df['text'] = text_list
df['time'] = [290, 220, 187, 270, 330, 196, 207, 188, 269, 189, 245, 244]
df.to_csv('borderline.csv', index=False)

Результат:

Облако слов по всему альбому

Начнём анализ с построения облака слов — оно отобразит, какие слова чаще всего встречаются в песнях. Импортируем нужные библиотеки, читаем csv-файл и устанавливаем конфигурации:

import nltk
from wordcloud import WordCloud
import pandas as pd
import matplotlib.pyplot as plt
from nltk import word_tokenize, ngrams

%matplotlib inline
nltk.download('punkt')
df = pd.read_csv('borderline.csv')

Теперь создаём новую фигуру, устанавливаем параметры оформления и при помощи библиотеки wordcloud отображаем слова с размером прямо пропорциональным частоте упоминания слова. Над каждым графиком дополнительно указываем название песни.

fig = plt.figure()
fig.patch.set_facecolor('white')
plt.subplots_adjust(wspace=0.3, hspace=0.2)
i = 1
for name, text in zip(df.name, df.text):
    tokens = word_tokenize(text)
    text_raw = " ".join(tokens)
    wordcloud = WordCloud(colormap='PuBu', background_color='white', contour_width=10).generate(text_raw)
    plt.subplot(4, 3, i, label=name,frame_on=True)
    plt.tick_params(labelsize=10)
    plt.imshow(wordcloud)
    plt.axis("off")
    plt.title(name,fontdict={'fontsize':7,'color':'grey'},y=0.93)
    plt.tick_params(labelsize=10)
    i += 1

EDA текстов альбома

Теперь проанализируем тексты песен — импортируем библиотеки для работы с данными и визуализации:

import plotly.graph_objects as go
import plotly.figure_factory as ff
from scipy import spatial
import collections
import pymorphy2
import gensim

morph = pymorphy2.MorphAnalyzer()

Сначала посчитаем число слов в каждой песне, число уникальных слов и процентное соотношение:

songs = []
total = []
uniq = []
percent = []

for song, text in zip(df.name, df.text):
    songs.append(song)
    total.append(len(text.split()))
    uniq.append(len(set(text.split())))
    percent.append(round(len(set(text.split())) / len(text.split()), 2) * 100)

А теперь составим из этого DataFrame и дополнительно посчитаем число слов в минуту для каждой песни:

df_words = pd.DataFrame()
df_words['song'] = songs
df_words['total words'] = total
df_words['uniq words'] = uniq
df_words['percent'] = percent
df_words['time'] = df['time']
df_words['words per minute'] = round(total / (df['time'] // 60))
df_words = df_words[::-1]

Данные хорошо бы визуализировать — построим две столбиковые диаграммы: одну для числа слов в песне, а другую для числа слов в минуту.

colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(62,142,231,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='📝 Всего слов',
           text=df_words['total words'],
           textposition='auto',
           x=df_words.song,
           y=df_words['total words'],
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),),
    go.Bar(name='🌀 Уникальных слов',
           text=df_words['uniq words'].astype(str) + '<br>'+ df_words.percent.astype(int).astype(str) + '%' ,
           textposition='inside',
           x=df_words.song,
           y=df_words['uniq words'],
           textfont_color='white',
           marker_color=colors_2,
           marker=dict(line=dict(width=0)),),
])

fig.update_layout(barmode='group')

fig.update_layout(
    title = 
        {'text':'<b>Соотношение числа уникальных слов к общему количеству</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
)
fig.update_layout(legend=dict(
    yanchor="top",
    xanchor="right",
))

fig.show()
colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(238,85,59,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='⏱️ Длина трека, мин.',
           text=round(df_words['time'] / 60, 1),
           textposition='auto',
           x=df_words.song,
           y=-df_words['time'] // 60,
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),
          ),
    go.Bar(name='🔄 Слов в минуту',
           text=df_words['words per minute'],
           textposition='auto',
           x=df_words.song,
           y=df_words['words per minute'],
           marker_color=colors_2,
           textfont_color='white',
           marker=dict(line=dict(width=0)),
          ),
])

fig.update_layout(barmode='overlay')

fig.update_layout(
    title = 
        {'text':'<b>Длина трека и число слов в минуту</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)'
)


fig.show()

Работа с Word2Vec моделью

При помощи модуля gensim загружаем модель, указывая на бинарный файл:

model = gensim.models.KeyedVectors.load_word2vec_format('model.bin', binary=True)

Для материала мы использовали готовую обученную на Национальном Корпусе Русского Языка модель от сообщества RusVectōrēs

Модель Word2Vec основана на нейронных сетях и позволяет представлять слова в виде векторов, учитывая семантическую составляющую. Это означает, что если мы возьмём два слова — например, «мама» и «папа», представим их в виде двух векторов и посчитаем косинус, значения будет близко к 1. Аналогично, у двух слов, не имеющих ничего общего по смыслу косинусная мера близка к 0.

Опишем функцию get_vector: она будет принимать список слов, распознавать для каждого часть речи, а затем получать и суммировать вектора — так мы сможем находить вектора не для одного слова, а для целых предложений и текстов.

def get_vector(word_list):
    vector = 0
    for word in word_list:
        pos = morph.parse(word)[0].tag.POS
        if pos == 'INFN':
            pos = 'VERB'
        if pos in ['ADJF', 'PRCL', 'ADVB', 'NPRO']:
            pos = 'NOUN'
        if word and pos:
            try:
                word_pos = word + '_' + pos
                this_vector = model.word_vec(word_pos)
                vector += this_vector
            except KeyError:
                continue
    return vector

Для каждой песни находим вектор и собираем соответствующий столбец в DataFrame:

vec_list = []
for word in df['text']:
    vec_list.append(get_vector(word.split()))
df['vector'] = vec_list

Теперь сравним вектора между собой, посчитав их косинусную близость. Те песни, у которых косинусная метрика выше 0,5 запомним отдельно — так мы получим самые близкие пары песен. Данные о сравнении векторов запишем в двумерный список result.

similar = dict()
result = []
for song_1, vector_1 in zip(df.name, df.vector):
    sub_list = []
    for song_2, vector_2 in zip(df.name.iloc[::-1], df.vector.iloc[::-1]):
        res = 1 - spatial.distance.cosine(vector_1, vector_2)
        if res > 0.5 and song_1 != song_2 and (song_1 + ' / ' + song_2 not in similar.keys() and song_2 + ' / ' + song_1 not in similar.keys()):
            similar[song_1 + ' / ' + song_2] = round(res, 2)
        sub_list.append(round(res, 2))
    result.append(sub_list)

Самые похожие треки соберём в отдельный DataFrame:

df_top_sim = pd.DataFrame()
df_top_sim['name'] = list(similar.keys())
df_top_sim['value'] = list(similar.values())
df_top_sim.sort_values(by='value', ascending=False)

И построим такой же bar chart:

colors = ['rgba(101,181,205,255)'] * 5

fig = go.Figure([go.Bar(x=df_top_sim['name'],
                        y=df_top_sim['value'],
                        marker_color=colors,
                        width=[0.4,0.4,0.4,0.4,0.4],
                        text=df_top_sim['value'],
                        textfont_color='white',
                        textposition='auto')])

fig.update_layout(
    title = 
        {'text':'<b>Топ-5 схожих песен</b><br><span style="color:#666666"></span>'},
    showlegend = False,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
    xaxis={'categoryorder':'total descending'}
)

fig.show()

Имея вектор каждой песни, давайте посчитаем вектор всего альбома — сложим вектора песен. Затем для такого вектора при помощи модели получим самые близкие по духу и смыслу слова.

def get_word_from_tlist(lst):
    for word in lst:
        word = word[0].split('_')[0]
        print(word, end=' ')

vec_sum = 0
for vec in df.vector:
    vec_sum += vec
sim_word = model.similar_by_vector(vec_sum)
get_word_from_tlist(sim_word)

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Наверное, это ключевой результат и описание альбома Земфиры всего лишь в 10 словах.

Наконец, построим общую тепловую карту, каждая ячейка которой — результат сравнения косинусной мерой текстов двух треков.

colorscale=[[0.0, "rgba(255,255,255,255)"],
            [0.1, "rgba(229,232,237,255)"],
            [0.2, "rgba(216,222,232,255)"],
            [0.3, "rgba(205,214,228,255)"],
            [0.4, "rgba(182,195,218,255)"],
            [0.5, "rgba(159,178,209,255)"],
            [0.6, "rgba(137,161,200,255)"],
            [0.7, "rgba(107,137,188,255)"],
            [0.8, "rgba(96,129,184,255)"],
            [1.0, "rgba(76,114,176,255)"]]

font_colors = ['black']
x = list(df.name.iloc[::-1])
y = list(df.name)
fig = ff.create_annotated_heatmap(result, x=x, y=y, colorscale=colorscale, font_colors=font_colors)
fig.show()

Результаты анализа и интерпретация данных

Давайте ещё раз посмотрим на всё, что у нас получилось — начнём с облака слов. Нетрудно заметить, что у слов «боль», «невозможно», «сорваться», «растерзаны», «сложно», «терпеть», «любить» размер весьма приличный — всё потому, что такие слова встречаются часто на протяжении всего текста песен:

Одной из самых «разнообразных» песен оказался сингл «крым» — в нём 74% уникальных слов. А в песне «снег идёт» слов совсем мало, поэтому большинство — 82% уникальны. Самой большой песней в альбоме получился трек «таблетки» — суммарно там около 150 слов.

Как было выяснено на прошлом графике, самый «динамичный» трек — «таблетки», целых 37 слов в минуту — практически по слову на каждые две секунды. А самый длинный трек — «абъюз», в нём же и согласно предыдущему графику практически самый низкий процент уникальных слов — 46%.

Топ-5 самых семантически похожих пар текстов:

Ещё мы получили вектор всего альбома и подобрали самые близкие слова. Только посмотрите на них — «тьма», «тоска», «плакать», «горе», «печаль», «сердце» — это же ведь и есть тот перечень слов, который характеризует лирику Земфиры!

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Финал — тепловая карта. По визуализации заметно, что практически все песни достаточно схожи между собой — косинусная мера у многих пар превышает значение в 0.4.

Выводы

В материале мы провели EDA всего текста нового альбома и при помощи предобученной модели Word2Vec доказали гипотезу — большинство песен «бордерлайна» пронизывают довольно мрачные и тексты. И это нормально, ведь Земфиру мы любим именно за искренность и прямолинейность.

Экспорт исторических данных Apple Health в Google Sheets

Время чтения текста – 9 минут

Для устройств на базе iOS и watchOS существует приложение Health, которое ежедневно записывает все данные о здоровье носителя и синхронизирует их со сторонними приложениями. Все эти данные в любой момент можно получить прямо из приложения в виде XML-документа. Сегодня мы выгрузим исторические данные о здоровье из приложения Apple Health, обработаем их и отправим в Google Sheets для анализа и визуализации в будущем.

Экспорт архива из приложения

Зайдите в приложение Health на iPhone. Нажмите на аватарку своего профиля в верхнем правом углу — откроется меню приложения.

Внизу нажмите на кнопку «Экспортировать медданные». Через некоторое время откроется меню экспорта — отправьте архив себе на компьютер любым способом, можно по AirDrop или даже по почте в письме самому себе. Из архива нужен только один файл — «экспорт.xml». Достаньте его и положите в папку с ноутбуком jupyter.

Парсер XML в DataFrame

При помощи библиотеки XML составляем дерево на основе документа из Health. Собирать в словарь будем следующие атрибуты: тип, единица измерения, дата создания, дата начала, дата конца, значение. Проходим по всему дереву и отправляем полученные значения атрибутов в records_dict.

from xml.etree import ElementTree
import pandas as pd
import datetime

tree = ElementTree.parse('экспорт.xml')
root = tree.getroot()
records = root.findall('Record')

records_dict = {
    'type':[],
    'unit':[],
    'creationDate':[],
    'startDate':[],
    'endDate':[],
    'value':[]
}

for record in records:
    for attribute in records_dict.keys():
        attribute_value = record.get(attribute)
        records_dict[attribute].append(attribute_value)

События записаны в нечитабельном виде — для перевода составим специальный словарь с нужными типами, где ключ — старое название, а значение — новое. Мы возьмём только 11 событий: минуты осознанности, дистанция на велосипеде, дистанция заплыва, дистанция ходьбы и бега, пройдено пролётов, пульс, пульс в покое, шаги, активная энергия, энергия покоя и средний пульс при ходьбе.

types_dict = {
    'HKCategoryTypeIdentifierMindfulSession': 'Mindful Session',
    'HKQuantityTypeIdentifierDistanceCycling': 'Cycling Distance',
    'HKQuantityTypeIdentifierDistanceSwimming': 'Swimming Distance',
    'HKQuantityTypeIdentifierDistanceWalkingRunning': 'Walking + Running Distance',
    'HKQuantityTypeIdentifierFlightsClimbed': 'Flights Climbed',
    'HKQuantityTypeIdentifierHeartRate': 'Heart Rate',
    'HKQuantityTypeIdentifierRestingHeartRate': 'Resting Heart Rate',
    'HKQuantityTypeIdentifierStepCount': 'Steps',
    'HKQuantityTypeIdentifierActiveEnergyBurned': 'Active Calories',
    'HKQuantityTypeIdentifierBasalEnergyBurned': 'Resting Calories',
    'HKQuantityTypeIdentifierWalkingHeartRateAverage': 'Walking Heart Rate Average'
}

Для минут осознанности в поле значения записей нет — мы сами посчитаем позже это поле как разницу даты окончания и начала события. Разница будет представлена как timedelta, поэтому напишем функцию перевода timedelta в минуты:

def td_to_m(td):
    seconds = td.seconds + td.days * 24 * 60 * 60
    return seconds // 60

Из словаря создаём DataFrame и задаём названия колонок. Оставляем только те 11 событий, которые есть в словаре types_dict и приводим все колонки к нужным типам данных:

df = pd.DataFrame(records_dict)
df.columns = ['type', 'unit', 'date', 'start', 'end', 'value']
df = df[df['type'].isin(types_dict.keys())]
df['value'] = df['value'].astype(float)
df['date'] = df['date'].astype('datetime64')
df['date'] = df['date'].dt.date
df['start'] = df['start'].astype('datetime64')
df['end'] = df['end'].astype('datetime64')
df['unit'] = df['unit'].astype(str)

Данные Health при экспорте никак не группируются — мы сделаем это самостоятельно. DataFrame можно поделить на три: в первом будут события, у которых единица измерения «количество в минуту» — для таких событий нужно искать среднее значение. В другой группе будут минуты осознанности — считаем число минут в каждой записи и суммируем. В последней группе находятся все остальные записи, связанные с количественными событиями — шаги, дистанция ходьбы и бега и так далее. Их тоже суммируем.

df_1 = df[df['unit'] == 'count/min']
df_1 = df_1.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'mean'})

df_2 = df[df['type'] == 'HKCategoryTypeIdentifierMindfulSession']
df_2['value'] = df_2['end'] - df_2['start']
df_2['value'] = df_2['value'].map(td_to_m)
df_2 = df_2.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                     'end':'max',
                                                                     'value':'sum'})
df_3 = df[(df['unit'] != 'count/min') & (df['type'] != 'HKCategoryTypeIdentifierMindfulSession')]
df_3 = df_3.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'sum'})
df = pd.concat([df_1, df_2, df_3])

Дату создания записи переводим в строковый тип. Все наименования типов событий заменяем согласно словарю types_dict. В переменную dates записываем все уникальные даты.

df['date'] = df['date'].astype(str)
df['type'] = df['type'].apply(lambda x: types_dict[x])
dates = df['date'].unique()

В результате нужен словарь с колонкой даты и отдельной колонкой под каждое из 11 событий:

result = {
    'date': [],
    'Steps': [],
    'Walking + Running Distance': [],
    'Swimming Distance': [],
    'Cycling Distance': [],
    'Resting Calories': [],
    'Active Calories': [],
    'Flights Climbed': [],
    'Heart Rate': [],
    'Resting Heart Rate': [],
    'Walking Heart Rate Average': [],
    'Mindful Session': []
}

Проходим по каждой дате и получаем кусок DataFrame за эту дату. Добавляем её в словарь и проходим по каждому ключу, пробуя добавить значение:

for date in dates:
    part = df[df['date'] == date]
    result['date'].append(date)
    for key in result.keys():
        if key == 'date':
            continue
        else:
            field = 'value'
        try:
            result[key].append(part[part['type'] == key][field].values[0])
        except IndexError:
            result[key].append(None)

Из полученного словаря создаём DataFrame, округляем всё до двух знаков после запятой и сортируем по дате:

result_df = pd.DataFrame(result)
result_df = result_df.round(2)
result_df = result_df.sort_values(by='date')

В результате получается такая таблица с историческими данными по 11 событиям:

Экспорт DataFrame в Google Sheets

Для экспорта в Google Docs необходим сервисный аккаунт и json-файл с ключом. О том, как его получить, мы писали в материале «Собираем данные по рекламным кампаниям ВКонтакте»

Создайте новый документ в Google Sheets. Весь DataFrame можно вставить одним действием при помощи методов библиотеки gspread. Импортируйте её, а также укажите идентификатор документа и json-файл с ключом. В методе get_worksheet указывается порядковый номер листа в файле начиная с нуля.

import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
gc = gspread.service_account(filename='serviceAccount.json')
sh = gc.open_by_key('1osKA63LQkUC0FC0eIZ63jEJwn1TeIkUvqCV6ur')
worksheet = sh.get_worksheet(0)

В итоге в Google Spreadsheets появится такая таблица:

А в следующем материале посмотрим, как наладить ежедневный экспорт данных Здоровья в эту таблицу при помощи шорткатов и Google AppScript!

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 6 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

Для запуска в Jupyter Notebook описанный ниже код можно просто вставить в ячейку, но при запуске из Python-файла будет такая ошибка:

SyntaxError: 'await' outside function

Устранить проблему можно, записав весь код в асинхронную функцию. Целиком выглядеть код будет так:

from telethon import TelegramClient
import pandas as pd
import asyncio

async def main():
        api_id = 1234567
        api_hash = '1b42hj25kd8jw42b234kwj242c'
        phone = '+71234567890'
        channel_href = 'https://t.me/leftjoin'

	client = TelegramClient('session_name', api_id, api_hash)
	client = await client.start()
	dialogs = await client.get_dialogs()

	channels = {d.entity.username: d.entity
				for d in dialogs
				if d.is_channel}
	my_channel = channel_href.split('/')[-1]
	channel = channels[my_channel]

	members_telethon_list = await client.get_participants(channel, aggressive=True)

	username_list = [member.username for member in members_telethon_list]
	first_name_list = [member.first_name for member in members_telethon_list]
	last_name_list = [member.last_name for member in members_telethon_list]
	phone_list = [member.phone for member in members_telethon_list]

	df = pd.DataFrame()
	df['username'] = username_list
	df['first_name'] = first_name_list
	df['last_name'] = last_name_list
	df['phone'] = phone_list
	df.to_csv('subscribers.csv', index=False)

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	loop.run_until_complete(main())
 32 комментария    11066   2021   Analytics Engineering   python   telegram   telethon

Робот для автоматизированного просмотра Instagram на Python и Selenium

Время чтения текста – 13 минут

Недавно мы начали вести Instagram — подписывайтесь, чтобы не пропустить контент, которого нет в блоге и Telegram!

Многие из нас ежедневно заходят в Instagram, чтобы посмотреть истории друзей и полистать ленту постов и рекомендаций. Предлагаем действенный способ сохранить своё время — напишем на Python и Selenium робота, который возьмёт на себя рутинную задачу проверки свежих новостей друзей и подсчитает число новых историй и входящих сообщений.

Авторизация в аккаунт

При переходе в браузерную версию сайта, нас встречает такое окно:

Но просто вставить логин, пароль и нажать на кнопку «Войти» недостаточно: впереди будет ещё два окна. Во-первых, предложение сохранить данные — здесь мы тактично жмём «Не сейчас». Instagram тщательно следит за каждым нашим действием и малейшие аномалии в поведении приводят к блокировке, поэтому любые предложения по сохранению данных будем на всякий случай пропускать.

Следующим препятствием будет предложение включить уведомление, которое мы тоже пропустим:

Первым делом импортируем библиотеки:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs
import time
import random

И описываем функцию authorize — она будет принимать driver в качестве аргумента, отправлять в нужные поля логин и пароль, нажимать на кнопку «Войти», затем ждать десять секунд на загрузку страницы, нажимать на кнопку «Не сейчас», снова ждать загрузки страницы и пропускать уведомления:

def authorize(driver):
    username = 'login'
    password = 'password'
    driver.get('https://www.instagram.com')
    time.sleep(5)
    driver.find_element_by_name("username").send_keys(username)
    driver.find_element_by_name("password").send_keys(password)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('aOOlW   HoLwm ')[0].click()")

Новые сообщения

В Instagram могут прийти сообщения двух видов. В случае, если вы не подписаны на отправителя — придёт запрос на диалог. Если подписаны — придёт входящее сообщения. Оба случая обрабатываются по-разному. Число входящих сообщений можно получить с главной страницы — это число над иконкой бумажного самолётика:

А число запросов можно забрать текстом заголовка h5 из раздела «Сообщения». Сперва перейдём в этот раздел и попробуем найти строку с запросами на сообщение. Затем вернёмся на главную страницу и возьмём то самое число новых сообщений.

def messages_count(driver):
    driver.get('https://www.instagram.com/direct/inbox/')
    time.sleep(2)
    inbox = bs(driver.page_source)
    try:
        queries_text = inbox.find_all('h5')[0].text
    except Exception:
        queries_text = None
    driver.get('https://www.instagram.com')
    time.sleep(2)
    content = bs(driver.page_source)
    try:
        messages_count = int(content.find_all('div', attrs={'class':'KdEwV'})[0].text)
    except Exception:
        messages_count = 0
    return queries_text, messages_count

Подсчёт числа новых сторис

Все истории хранятся в одном блоке:

Это список с одинаковым классом, но в каждом элементе списка лежит ещё один div-блок. У новых историй это класс eebAO h_uhZ, у просмотренных — eebAO.

Ещё есть такая кнопка, которая показывает следующую пачку историй:

При этом Instagram динамически прогружает код страницы, и в нём не найти те элементы, которые вы не видите своими глазами. Поэтому мы возьмём первые 8 видимых новых историй, добавим в список, нажмём на кнопку «Показать следующие истории» и будем продолжать так, пока кнопка ещё отображается. А затем подсчитаем число уникальных элементов, чтобы избежать возможных дубликатов.

def get_stories_count(driver):
    stories_divs = []
    scroll = True
    while scroll:
        try:
            content = bs(driver.page_source)
            stories_divs.extend(content.find_all('div', attrs={'class':'eebAO h_uhZ'}))
            driver.execute_script("document.getElementsByClassName('  _6CZji oevZr  ')[0].click()")
            time.sleep(1)
        except Exception as E:
            scroll = False
    return len(set(stories_divs))

Просмотр сторис

Следующее, чем может заняться реальный пользователь после авторизации — просмотр свежих историй. Для того, чтобы зайти в блок историй, нужно просто нажать на кнопку класса OE3OK:

Есть еще две кнопки, о которых мы должны знать. Это кнопка для переключения на следующую историю — она в классе FhutL и кнопка закрытия блока историй — класс wpO6b. Пускай одна история будет отнимать у нас от 10 до 15 секунд, и с вероятностью 1/5 мы переключим на следующую. При этом зададим переменные counter и limit — пусть сейчас мы хотим посмотреть случайное число историй от 5 до 45, и если мы уже посмотрели столько, то выходим из функции и историй.

def watch_stories(driver):
    watching = True
    counter = 0
    limit = random.randint(5, 45)
    driver.execute_script("document.getElementsByClassName('OE3OK ')[0].click()")
    try:
        while watching:
            time.sleep(random.randint(10, 15))
            if random.randint(1, 5) == 5:
                driver.execute_script("document.getElementsByClassName('FhutL')[0].click()")
            counter += 1
            if counter > limit:
                driver.execute_script("document.getElementsByClassName('wpO6b ')[1].click()")
                watching = False
    except Exception as E:
        print(E)
        watching = False

Скроллинг ленты

После просмотра актуальных историй можно поскроллить ленту — это действие ничем не отличается от классического скроллинга страниц в Selenium. Запоминаем последнюю доступную длину страницы, скроллим до неё, ожидаем прогрузки, получаем новую. Прекратим просматривать ленту в двух случаях — если в random.randint() сгенерировалась единица или если лента кончилась.

def scroll_feed(driver):
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height

Просмотр рекомендуемых аккаунтов

Instagram в заглавной странице сам рекомендует нам для подписки некоторые аккаунты. Выглядит она так:

И на ней тоже придётся скроллить, чтобы дойти до конца. Заходим на страницу и ожидаем 5 секунд прогрузки, затем снова получаем длину страницы и скроллим вниз. Выходим тоже с вероятностью 1/10 или если страница кончилась, но ещё с вероятностью 1/2 подписываемся на некоторые из первых 100 аккаунтов рекомендаций:

def scroll_recomendations(driver):
   driver.get('https://www.instagram.com/explore/people/suggested/')
    time.sleep(5)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height
        if random.randint(0, 1):
            try:
                driver.execute_script(f"document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[{random.randint(1,100)}].click()")
            except Exception as E:
                print(E)

Просмотр рекомендуемых постов

Помимо ленты, которая сформирована из наших подписок, Instagram собирает ленту рекомендаций. Туда входят все посты, которые потенциально могут вам понравиться — мы просто пройдём вниз по этой ленте. Выйдем с вероятностью 1/5 или когда кончится, чтобы долго не засиживаться.

def scroll_explore(driver):
    driver.get('https://www.instagram.com/explore')
    time.sleep(3)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 5) == 1:
            scrolling = False
        last_height = new_height

Итог

Теперь можно собрать все функции вместе — создаём новый driver, проводим авторизацию, считаем число новых сторис и сообщений, просматриваем сторис, переходим в рекомендуемые подписки и листаем ленту. В конце печатаем полученные данные — число новых сообщений, запросов и историй друзей.

driver = webdriver.Chrome(ChromeDriverManager().install())
authorize(driver)
queries_text, messages_count = messages_count(driver)
stories_count = get_stories_count(driver)
watch_stories(driver)
scroll_recomendations(driver)
scroll_feed(driver)
scroll_explore(driver)

if queries_text is not None:
    print(queries_text)
else:
    print('Нет новых запросов на диалог')
print('Новых сообщений:', messages_count)

print('Новых историй:', stories_count)

Парсинг целевой аудитории ВКонтакте

Время чтения текста – 7 минут

При размещении рекламы некоторые площадки в настройках аудитории позволяют загрузить список конкретных людей, которые увидят рекламу. Для парсинга id по конкретным пабликам существуют специальные инструменты, но куда интереснее (и дешевле) сделать это собственноручно при помощи Python и VK API. Сегодня расскажем, как для рекламной кампании LEFTJOIN мы спарсили целевую аудиторию и загрузили её в рекламный кабинет.

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

Парсинг пользователей

Для отправки запросов потребуется токен пользователя и список пабликов, чьих участников мы хотим получить. Мы собрали около 30 сообществ, посвящённых аналитике, BI-инструментам и Data Science.

import requests
import time

group_list =  ['datacampus', '185023286', 'data_mining_in_action', '223456', '187222444', 'nta_ds_ai', 'business__intelligence', 'club1981711', 'datascience', 'ozonmasters', 'businessanalysts', 'datamining.team', 'club.shad', '174278716', 'sqlex', 'sql_helper', 'odssib', 'sapbi', 'sql_learn', 'hsespbcareer', 'smartdata', 'pomoshch_s_spss', 'dwhexpert', 'k0d_ds', 'sql_ex_ru', 'datascience_ai', 'data_club', 'mashinnoe_obuchenie_ai_big_data', 'womeninbigdata', 'introstats', 'smartdata', 'data_mining_in_action', 'dlschool_mipt']

token = 'ваш_токен'

Запрос на получение участников сообщества к API ВКонтакте вернёт максимум 1000 строк — для получения последующих тысяч потребуется смещать параметр offset на единицу. Но нужно знать, до какого момента это делать — поэтому опишем функцию, которая принимает id сообщества, получает информацию о числе участников сообщества и возвращает максимальное значение для offset — отношение числа участников к 1000, ведь мы можем получить ровно тысячу человек за раз.

def get_offset(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':0,
            'fields':'last_seen'
        }).json()['response']['count']
    return count // 1000

Следующим этапом опишем функцию, которая принимает id сообщества, собирает в один список id всех подписчиков и возвращает его. Для этого отправляем запросы на получение 1000 человек, пока не кончается offset, вносим данные в список и возвращаем его. Проходя по каждому человеку дополнительно проверяем дату его последнего посещения социальной сети — если он не заходил с середины ноября, добавлять его не будем. Время указывается в формате unixtime.

def get_users(group_id):
    good_id_list = []
    offset = 0
    max_offset = get_offset(group_id)
    while offset < max_offset:
        response = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':offset,
            'fields':'last_seen'
        }).json()['response']
        offset += 1
        for item in response['items']:
            try:
                if item['last_seen']['time'] >= 1605571200:
                    good_id_list.append(item['id'])
            except Exception as E:
                continue
    return good_id_list

Теперь пройдём по всем сообществам из списка и для каждого соберём участников, а затем внесём их в общий список all_users. В конце переводим сначала список в множество, а затем опять в список, чтобы избавиться от возможных дубликатов: одни и те же люди могли быть участниками разных пабликов. Лишним не будет после каждого паблика приостановить работу программы на секунду, чтобы не столкнуться с ограничениями на число запросов.

all_users = []

for group in group_list:
    print(group)
    try:
        users = get_users(group)
        all_users.extend(users)
        time.sleep(1)
    except KeyError as E:
        print(group, E)
        continue

all_users = list(set(all_users))

Последним шагом записываем каждого пользователя в файл с новой строки.

with open('users.txt', 'w') as f:
    for item in all_users:
        f.write("%s\n" % item)

Аудитория в рекламном кабинете из файла

Переходим в свой рекламный кабинет ВКонтакте и заходим во вкладку «Ретаргетинг». Там будем кнопка «Создать аудиторию»:

После нажатия на неё откроется новое окно, где можно будет выбрать в качестве источника файл и указать название для аудитории:

После загрузки пройдёт несколько секунд и аудитория будет доступна. Первые минут 10 будет указано, что аудитория слишком мала: это не так и панель вскоре обновится, если в вашей аудитории действительно более 100 человек.

Итоги

Сравним среднюю стоимость привлечённого в наше сообщество участника в объявлении с автоматической настройкой аудитории и в объявлении, аудиторию для которого мы спарсили. В первом случае получаем среднюю стоимость в 52,4 рубля, а во втором — в 33,2 рубля. Подбор качественной аудитории при помощи методов парсинга данных из ВКонтакте помог снизить среднюю стоимость на 37%.

Для рекламной кампании мы подготовили такой пост (нажмите на картинку, чтобы перейти к нему):

 3 комментария    320   2020   Analytics Engineering   api   python   vk   vk api

Бесплатные курсы математики для аналитиков и инженеров данных

Время чтения текста – 6 минут

Сейчас в интернете доступно огромное количество платных образовательных материалов, которые обещают сделать из вас аналитика. Иногда это так, и некоторые программы действительно дают хороший набор скилл-сетов, которые необходимы аналитику.

Порой можно встретить точку зрения, что аналитику и вовсе не надо знать SQL и Python. С моей точки зрения, наоборот, следует стараться прокачивать себя в разных направлениях, в том числе и в хард-скиллах. Другой большой спор предполагает, что аналитику не нужна математическая база, и он может эффективно решать задачи, набравшись исключительно хард-скиллов. Я считаю, что это громадное заблуждение. Спойлер — далее в тексте у меня есть таблетка от этого.

К примеру, на мой взгляд, тяжело рассуждать о вероятности оттока, не понимая, что такое теория вероятностей. Сложно обсуждать медиану и нормальность распределения, не понимая математической статистики. Не разобраться в SVD-разложении, не понимая линейную алгебру, и не посчитать градиент функции, не разбираясь в математическом анализе. Некоторые возразят: ну так аналитику это и не требуется, ведь в Python / R / Matlab всё уже реализовано. Действительно, для стартовой позиции, наверное, всё так, можно взять готовый алгоритм, написать пару команд, и, вау, ты построил модель линейной регрессии. Но что делать дальше? Как разобраться в сути математического аппарата при изменении конкретных деталей модели?

Сегодня интернет позволяет нам подарить себе бесплатно второе высшее образование, и начинающему аналитику следует начать именно с него, прежде чем покупать курсы по анализу данных. Буквально недавно я прослушал ещё раз все фундаментальные курсы, которыми хочу поделиться. Прелесть в том, что все эти материалы абсолютно бесплатны. И несмотря на то, что всё нижеизложенное я изучал в ВУЗе 15 лет назад, повторить изученные материалы крайне полезно (все же за 15 лет многое забывается). Кроме того, получение подобных знаний тренирует тот самый аналитический склад ума и математическую подготовку.

В посте предлагаю вам бесплатные фундаментальные курсы от американских именитых ВУЗов, с которыми вы можете стартовать обучение по направлению аналитика данных. Единственное требование — знание английского языка. Но если у вас до сих пор нет этого, обязательно сперва изучите английский и после возвращайтесь к посту :)

Предполагаю, что совершенно точно можно собрать аналогичный пост из русскоязычных лекций на Youtube, но мне очень нравится подача материала не одним видео на полтора часа, а отрезками с решением прикладных задач, дублированием информации текстом, именно поэтому я рекомендую эти материалы.

МАТЕМАТИЧЕСКИЙ АНАЛИЗ (M.I.T.)

В английском языке имеет название Calculus, совершенно потрясающий по подбору материалов и объяснению курс от MIT в трех частях:

  1. Дифференцирование
  2. Интегрирование
  3. Система координат и ряды

ЛИНЕЙНАЯ АЛГЕБРА (Georgia Tech)

Курс в четырех частях от одного из ведущих мировых ВУЗов в Computer Science: Georgia Tech.

  1. Линейные уравнения
  2. Матричная алгебра
  3. Детерминанты и собственные значения
  4. Ортогональность, симметричные матрицы и SVD

ТЕОРИЯ ВЕРОЯТНОСТЕЙ И МАТЕМАТИЧЕСКАЯ СТАТИСТИКА (Georgia Tech)

Курс в четырех частях от одного из ведущих мировых ВУЗов в Computer Science: Georgia Tech.

  1. Введение в теорию вероятностей
  2. Случайные величины
  3. Введение в статистику
  4. Доверительные интервалы и гипотезы

ВЫЧИСЛЕНИЯ В PYTHON (Georgia Tech)

Курс в четырех частях от одного из ведущих мировых ВУЗов в Computer Science: Georgia Tech.

  1. Основы
  2. Управляющие структуры
  3. Структуры данных
  4. Объектные алгоритмы

R ДЛЯ АНАЛИЗА ДАННЫХ (Harvard)

Курс в семи частях от профессора из Гарварда

  1. Основы R
  2. Визуализация
  1. Теория вероятностей
  2. Вывод и моделирование
  3. Полезные инструменты
  4. Форматирование данных
  5. Линейная регрессия
  6. Машинное обучение
  7. Итоги курса
 Нет комментариев    478   2020   analysis   maths   probability   python   r   statistics

Частотный словарь и биграммы по постам инвесторов

Время чтения текста – 16 минут

Тинькофф Инвестиции — сервис от Тинькофф Банка для инвестирования на Московской и Санкт-Петербургской биржах. Внутри сервиса есть социальная сеть «Пульс», где инвесторы любого уровня могут делиться своими опытом, мыслями и планами, комментировать и оценивать чужие посты. Сегодня решим такую задачу:
построим частотный словарь и биграммы по постам пользователей, разделив их по объёму портфеля, чтобы понять, чем отличаются посты людей с разным объёмом инвестиций.

Лента по ценной бумаге в Пульсе выглядит вот так:

У каждого инвестора есть личный профиль. Внутри отображается объём портфеля, статистика прироста за год и число сделок за последний месяц.

Схема в Clickhouse

Для биграмм и частотного словаря достаточно собрать только тексты постов, логины пользователей и объёмы портфеля, но ради спортивного интереса будем хранить ещё и рост портфеля, число сделок человека за месяц и количество оценок под его постом. Для хранения данных получится две таблицы posts и users:

CREATE TABLE tinkoff.posts
(
    `login` String,
    `post` String,
    `likes` Int16
)
ENGINE = MergeTree
ORDER BY login

 CREATE TABLE tinkoff.users
(
    `login` String,
    `volume_prefix` String,
    `volume` String,
    `year_stats_prefix` String,
    `year_stats` String
)
ENGINE = MergeTree()
ORDER BY login

В таблице с пользователями volume_prefix — это префикс «до» или «от», стоящий в объёме портфеля, а volume — сам объём портфеля. Соответственно years_stats_prefix обозначает, портфель за год упал или вырос, а year_stats — на сколько он упал или вырос. Такая схема из двух таблиц с ключом сортировки таблиц по полю login позволит их соединить позднее.

Пишем парсер постов

У Пульса нет своего API, поэтому для парсинга постов будем использовать Selenium.

Мы уже писали про то, как парсить сайты с прокруткой при помощи Selenium

Нам понадобятся следующие библиотеки:

from selenium import webdriver
import time
from webdriver_manager.chrome import ChromeDriverManager
from clickhouse_driver import Client
from bs4 import BeautifulSoup as bs
import pandas as pd
import requests
import os
from lxml import html
import re

Сразу составим список интересующих ценных бумаг: это акции Сбербанка, Газпрома, Яндекса, Лукойла, MailRu, Аэрофлота, Киви, ВТБ, Детского Мира и Ленты. Для каждой бумаги будем переходить на страницу с постами, в которых она упоминается и проматывать страницу, пока длина страницы не станет более 300000: это около одной недели.

driver = webdriver.Chrome(ChromeDriverManager().install())

securities = ['SBER', 'GAZP', 'YNDX', 'LKOH', 'MAIL', 'AFLT', 'QIWI', 'VTBR', 'DSKY', 'LNTA']
        
for security in securities:
    try:
        print(security)
        driver.get(f'https://www.tinkoff.ru/invest/stocks/{security}/pulse/')
        
        page_length = driver.execute_script("return document.body.scrollHeight")
        while page_length < 300000:
            driver.execute_script(f"window.scrollTo(0, {page_length - 1000});")
            page_length = driver.execute_script("return document.body.scrollHeight")

После забираем себе весь код полученной страницы и извлекаем нужную информацию: логины, текст постов и число лайков. Формируем из них DataFrame и записываем его в папку data.

source_data = driver.page_source
        soup = bs(source_data, 'lxml')
        
        posts = soup.find_all('div', {'class':'PulsePostCollapsed__text_1ypMP'})
        logins = soup.find_all('div', {'class':'PulsePostAuthor__nicknameLink_19Aca'})
        likes = soup.find_all('div', {'class':'PulsePostBody__likes_3qcu0'})
        
        logins = [login.text for login in logins]
        posts = [post.text for post in posts]
        likes = [like.text.split()[0] for like in likes]
        
        df_posts = pd.DataFrame()
        df_posts['login'] = logins
        df_posts['post'] = posts
        df_posts['likes'] = likes
        
        df_posts.to_csv(f'data/{security}.csv', index=False)
        
        print(f'SAVED {security}')
    except Exception as E:
        print(E)

После того, как нужные посты собраны, отправим их в таблицу posts в Clickhouse. При помощи модуля os переходим в директорию data и собираем в список all_files названия всех файлов в ней — это все csv-таблицы, которые мы спарсили. Затем по очереди читаем файл в DataFrame и вставляем в posts.

client = Client(host='', user='', password='', port='9000', database='tinkoff')

os.chdir('data')
all_files = os.listdir()

for file in all_files:
    df = pd.read_csv(file)
    client.execute("INSERT INTO posts VALUES", df.to_dict('records'))

Собираем информацию о профилях

Чтобы собрать все профили, получим уникальный список логинов из базы:

flatten = lambda t: [item for sublist in t for item in sublist]
logins = flatten(client.execute("SELECT DISTINCT login FROM posts"))

Получать посты будем request-запросом, без Selenium, ведь ничего листать уже не нужно. Но иконка падения или роста портфеля за год не является текстом и получить её нельзя, зато внутри CSS-стилей можно увидеть её цвет — его мы и будем сохранять себе.

Поэтому опишем такую функцию: она примет объект soup и извлечёт цвет иконки.

headers = {'accept': '*/*',
           'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def up_or_down_color(soup):
    string = str(soup.find('div', {'class':'Icon__container_3u7WK'}))
    start_index = string.find('style')
    color = string[start_index + 13:start_index + 20]
    return color

А теперь напишем парсер, который проходит по списку логинов, для каждого отправляет запрос и собирает всю статистику профиля. Причём если цвет иконки роста зеленый, то в поле year_stats_prefix добавим «+», иначе «-». В конце сделаем паузу на 0.2 секунды на всякий случай, чтобы не напороться на неявные ограничения.

session = requests.Session()

login_list = []
volume_list = []
volume_prefix_list = []
year_stats_list = []
year_stats_prefix_list = []

count = 0
for login in logins:
    print(count, '/', len(logins))
    
    try:
        count += 1
        if login == 'blocked_user':
            continue

        url = f'https://www.tinkoff.ru/invest/social/profile/{login}'
        request = session.get(url, headers=headers)
        soup = bs(request.content, 'lxml')

        try:
            login = soup.find('div', {'class':'ProfileHeader__nickname_1oynx'}).text
            volume = soup.find('span', {'class':'Money__money_3_Tn4'}).text
            _to = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('До')
            _from = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('От')
            year_stats = soup.find('div', {'class':'ProfileHeader__statisticsItem_1HPLt'}).text
            color = up_or_down_color(soup)
        except AttributeError as E:
            print(login, E)
            continue
        volume_list.append(volume)
        login_list.append(login)

        if _to == -1:
            volume_prefix_list.append('до')
        else:
            volume_prefix_list.append('от')

        year_stats_list.append(re.findall(r'\d+.+', year_stats)[0])

        if color == '#22a053':
            year_stats_prefix_list.append('-')
        elif color == '#dd5656':
            year_stats_prefix_list.append('+')
        else:
            year_stats_prefix_list.append('')
    except Exception as E:
        print(E)
        continue
    time.sleep(0.2)

Собираем все аккаунты и статистику по ним в DataFrame. Их тоже сохраним себе в базу.

df_users = pd.DataFrame()
df_users['login'] = login_list
df_users['volume_prefix'] = volume_prefix_list
df_users['volume'] = volume_list
df_users['year_stats_prefix'] = year_stats_prefix_list
df_users['year_stats'] = year_stats_list

client.execute("INSERT INTO users VALUES", df_users.to_dict('records'))

А теперь сделаем LEFT JOIN таблицы с постами к таблице с пользователями, чтобы у каждой строки с постом была ещё статистика по аккаунту автора. Запишем результат в DataFrame.

posts_with_users = client.execute('''
    SELECT login, post, likes, volume_prefix, volume, year_stats_prefix, year_stats FROM posts
    LEFT JOIN users
    ON posts.login = users.login
''')
posts_with_users_df = pd.DataFrame(posts_with_users, columns=['login', 'post', 'likes', 'volume_prefix', 'volume', 'year_stats_prefix', 'year_stats'])

Полученный результат будет выглядеть так:

Частотный словарь и биграммы

Для начала составим частотный словарь по постам без разделений на группы.

posts_with_users_df.post.str.split(expand=True).stack().value_counts()

Получим, что предлоги и союзы превалируют над остальными словами:

Значит, их нужно убрать из сета данных. Но даже в том случае, если данные будут очищены, получим что-то подобное. Такие данные ни о чём не говорят, и никаких выводов сделать не удастся.

В таком случае попробуем построить биграммы. Одна биграмма — последовательность из двух элементов, то есть два слова, стоящие рядом друг с другом. Существует много алгоритмов построения n-грамм разной степени оптимизации, мы воспользуемся встроенной функцией в nltk и разберём пример построения биграмм для одной группы. Первым делом импортируем дополнительные библиотеки, загружаем stopwords для русского языка и чистим данные. В список стоп-слов вносим дополнительные: среди них будут и тикеры акций, которые встречаются в каждом посте.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
import unicodedata
import collections

nltk.download("stopwords")
nltk.download('punkt')
russian_stopwords = stopwords.words("russian")
append_stopword = ['это', 'sber', 'акция', 'компания', 'aflt', 'gazp', 'yndx', 'lkoh', 'mail', 'год', 'рынок', 'https', 'млрд', 'руб', 'www', 'кв']
russian_stopwords.extend(append_stopword)

Опишем функцию для подготовки текста, которая переведёт все слова в нижний регистр, приведёт к нормальной форме, удалит стоп-слова и пунктуацию:

mystem = Mystem() 

def preprocess_text(text):
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and token.strip() not in punctuation]
    
    text = " ".join(tokens)
    
    return text

posts_with_users_df.post = posts_with_users_df.post.apply(preprocess_text)

Для примера возьмём посты группы инвесторов с объёмом портфеля до 10 тысяч рублей и построим биграммы, а затем выведем самые частые:

up_to_10k_df = posts_with_users_df[(posts_with_users_df['volume_prefix'] == 'от') & (posts_with_users_df['volume'] == '10 000 ₽')]

up_to_10k_counts = collections.Counter()
for sent in up_to_10k_df["post"]:
    words = nltk.word_tokenize(sent)
    up_to_10k_counts.update(nltk.bigrams(words))
up_to_10k_counts.most_common()

Получаем такой список:

Результаты исследования биграмм

В группе с объёмом портфеля до 10 и 100 тысяч руб. инвесторы чаще пишут о личном опыте и полученной прибыли: на это указывают биграммы «чистая прибыль» и «финансовый результат».

До 10 000 руб:

  1. добрый утро, 44
  2. цена нефть, 36
  3. неквалифицированный инвестор, 36
  4. чистый прибыль, 32
  5. шапка профиль, 30
  6. московский биржа, 30
  7. совет директор, 28

До 100 000 руб:

  1. чистый прибыль, 80
  2. финансовый результат, 67
  3. добрый утро, 66
  4. индекс мосбиржа, 63
  5. цена нефть, 58
  6. квартал 2020, 42
  7. мочь становиться 41

В группе до 500 тысяч руб. впервые появляются биграммы со словами «подписываться», «выкладывать», «новость» — инвесторы с такими характеристиками портфеля часто заводят собственные блоги об инвестировании и продвигают их через посты в Пульсе.

До 500 000 руб:

  1. чистый прибыль, 169
  2. квартал 2020, 154
  3. отчетность 3, 113
  4. выкладывать новость, 113
  5. 🤝подписываться, 80
  6. подписываться выкладывать, 80
  7. публиковать отчёт, 80
  8. цена нефть, 76
  9. колво бумага, 69
  10. вес портфель, 68

В биграммах группы с объёмом портфеля до и от 1 миллиона руб. появляется «фьючерс», что логично — это сложный инструмент, который обычно не рекомендуется новичкам. Кроме того, в постах группы проходит больше обсуждений отчётностей компаний — это биграммы «финансовый отчетность», «опубликовать финансовый», «отчетность мсфо».

До 1 000 000 руб:

  1. 3 квартал, 183
  2. квартал 2020, 157
  3. фьчерс утро, 110
  4. финансовый отчетность, 107
  5. опубликовать финансовый, 104
  6. чистый прибыль, 75
  7. наш биржа, 72
  8. отчетность мсфо, 69
  9. цена нефть, 67
  10. операционный результат, 61
  11. ноябрьский фьючерс, 54
  12. азиатский площадка, 51

От 1 000 000 руб:

  1. октябрь опубликовывать, 186
  2. 3 квартал, 168
  3. квартал 2020, 168
  4. финансовый отчетность, 159,
  5. опубликовывать финансовый, 95
  6. чистый прибыль, 94
  7. операционный результат, 86
  8. целевой цена, 74
  9. опубликовывать операционный, 63
  10. цена повышать, 60
 2 комментария    120   2020   clickhouse   Data Analytics   data science   nltk   python

Создаём дашборд на Bootstrap (Часть 2)

Время чтения текста – 16 минут

В последнем материале мы подготовили базовый макет дашборда при помощи библиотеки dash-bootstrap-components с двумя графиками: scatter plot и российской картой, которые подробно разбирали ранее. Сегодня продолжим наполнять дашборд информацией: встроим в него таблицы и фильтр данных по пивоварням.

Получение таблиц

Сами таблицы будем описывать в макете в файле application.py, но информацию, которую они отображают лаконичнее будет получить в отдельном модуле. Создадим файл get_tables.py: в нём будет функция, передающая готовую таблицу класса Table библиотеки dbc в application.py. В этом материале мы опишем только таблицу лучших пивоварен России, но на GithHub будут представлены все три.

В таблицах по заведениям и пивоварням мы реализуем фильтр по городам, но изначально города в собранных с Untappd данных записаны на латинице. Для запросов мы будем переводить русскоязычные наименования городов на английский при помощи библиотеки Google Translate. Кроме того, одни и те же города могут называться по-разному — например, «Москва» на латинице где-то записана как «Moskva», а где-то как «Moscow». Для этого дополнительно настроим маппинг наименований города и заранее создадим словарь с корректными наименованиями основных городов. Он пригодится в самом конце.

import pandas as pd
import dash_bootstrap_components as dbc
from clickhouse_driver import Client
import numpy as np
from googletrans import Translator

translator = Translator()

client = Client(host='12.34.56.78', user='default', password='', port='9000', database='')

city_names = {
   'Moskva': 'Москва',
   'Moscow': 'Москва',
   'СПБ': 'Санкт-Петербург',
   'Saint Petersburg': 'Санкт-Петербург',
   'St Petersburg': 'Санкт-Петербург',
   'Nizhnij Novgorod': 'Нижний Новгород',
   'Tula': 'Тула',
   'Nizhniy Novgorod': 'Нижний Новгород',
}

Таблица лучших пивоварен

Таблица, о которой идёт речь в материале, будет показывать топ-10 лучших российских пивоварен с изменением рейтинга. То есть мы сравниваем данные за два периода: [30 дней назад; сегодня] и [60 дней назад; 30 дней назад] и смотрим, как менялось место пивоварни в рейтинге. Соответственно, мы опишем следующие колонки: место в рейтинге, название пивоварни, ассортимент сортов пива, рейтинг пивоварни на untappd, изменение места и количество чекинов у этой пивоварни.
Опишем функцию get_top_russian_breweries, которая отправляет запрос к Clickhouse, получает общий топ пивоварен России, формирует данные и возвращает готовый для вывода DataFrame. Отправим два запроса — топ пивоварен за последние 30 дней и топ пивоварен за предыдущие 30 дней. Следующий запрос будет отбирать лучшие пивоварни, основываясь на количестве отзывов о пиве данной пивоварни.


Забираем данные из базы

def get_top_russian_breweries(checkins_n=250):
   top_n_brewery_today = client.execute(f'''
      SELECT  rt.brewery_id,
              rt.brewery_name,
              beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
              count_for_that_brewery as checkins FROM (
      SELECT           
              brewery_id,
              dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
              sum(rating_score) AS beer_pure_average_mult_count,
              count(rating_score) AS count_for_that_brewery
          FROM beer_reviews t1
          ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
          WHERE isNotNull(venue_id) AND (created_at >= (today() - 30)) AND (venue_country = 'Россия') 
          GROUP BY           
              brewery_id,
              brewery_name) rt
      WHERE (checkins>={checkins_n})
      ORDER BY avg_rating DESC
      LIMIT 10
      '''
   )

top_n_brewery_n_days = client.execute(f'''
  SELECT  rt.brewery_id,
          rt.brewery_name,
          beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
          count_for_that_brewery as checkins FROM (
  SELECT           
          brewery_id,
          dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
          sum(rating_score) AS beer_pure_average_mult_count,
          count(rating_score) AS count_for_that_brewery
      FROM beer_reviews t1
      ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
      WHERE isNotNull(venue_id) AND (created_at >= (today() - 60) AND created_at <= (today() - 30)) AND (venue_country = 'Россия')
      GROUP BY           
          brewery_id,
          brewery_name) rt
  WHERE (checkins>={checkins_n})
  ORDER BY avg_rating DESC
  LIMIT 10
  '''
)

Формируем из полученных строк два DataFrame:

top_n = len(top_n_brewery_today)
column_names = ['brewery_id', 'brewery_name', 'avg_rating', 'checkins']

top_n_brewery_today_df = pd.DataFrame(top_n_brewery_today, columns=column_names).replace(np.nan, 0)
top_n_brewery_today_df['brewery_pure_average'] = round(top_n_brewery_today_df.avg_rating, 2)
top_n_brewery_today_df['brewery_rank'] = list(range(1, top_n + 1))

top_n_brewery_n_days = pd.DataFrame(top_n_brewery_n_days, columns=column_names).replace(np.nan, 0)
top_n_brewery_n_days['brewery_pure_average'] = round(top_n_brewery_n_days.avg_rating, 2)
top_n_brewery_n_days['brewery_rank'] = list(range(1, len(top_n_brewery_n_days) + 1))

А затем в итераторе считаем, как изменилось место за последнее время у пивоварни. Обработаем исключение на случай, если 60 дней назад этой пивоварни в нашей базе ещё не было.

rank_was_list = []
for brewery_id in top_n_brewery_today_df.brewery_id:
   try:
       rank_was_list.append(
           top_n_brewery_n_days[top_n_brewery_n_days.brewery_id == brewery_id].brewery_rank.item())
   except ValueError:
       rank_was_list.append('–')
top_n_brewery_today_df['rank_was'] = rank_was_list

Теперь пройдёмся по полученным колонкам с текущими местами и изменениями. Если они не пустые, то при положительном изменении добавим к записи стрелочку вверх. При отрицательном — стрелочку вниз.

diff_rank_list = []
for rank_was, rank_now in zip(top_n_brewery_today_df['rank_was'], top_n_brewery_today_df['brewery_rank']):
   if rank_was != '–':
       difference = rank_was - rank_now
       if difference > 0:
           diff_rank_list.append(f'↑ +{difference}')
       elif difference < 0:
           diff_rank_list.append(f'↓ {difference}')
       else:
           diff_rank_list.append('–')
   else:
       diff_rank_list.append(rank_was)

Наконец, разметим итоговый DataFrame и вставим в него колонку с текущим местом. При этом у топ-3 будет отображаться эмодзи с золотым кубком.

df = top_n_brewery_today_df[['brewery_name', 'avg_rating', 'checkins']].round(2)
df.insert(2, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.insert(0, 'МЕСТО',
         list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))

return df

#Выбор пивоварен с фильтром по городам
Одна из функций нашего дашборда — просмотр топа пивоварен по конкретному городу. Для корректной работы напишем скрипт, который для каждого из списка российских городов получает топ пивоварен по числу чекинов и записывает данные по каждому городу в отдельные csv-файлы. В сущности, он мало чем отличается от предыдущего — рассмотрим главные отличия.

Прежде всего, функция принимает конкретный город. Мы уже отметили, что города в базе данных записаны на латинице — поэтому сначала переводим наименование города. В случае с Санкт-Петербургом, Нижним Новгородом и Пермью придётся перевести вручную: например, Санкт-Петербург переводится в Google Translate как St. Petersburg вместо ожидаемого Saint Petersburg.

ru_city = venue_city
if ru_city == 'Санкт-Петербург':
   en_city = 'Saint Petersburg'
elif ru_city == 'Нижний Новгород':
   en_city = 'Nizhnij Novgorod'
elif ru_city == 'Пермь':
   en_city = 'Perm'
else:
   en_city = translator.translate(ru_city, dest='en').text

Следующее отличие — запрос к базе. Нам нужно добавить в него условие совпадения по городу, чтобы получать чекины только в запрошенном городе:

WHERE (rt.venue_city='{ru_city}' OR rt.venue_city='{en_city}')

Наконец, сформированный DataFrame мы не возвращаем, а сохраняем в директорию data/cities.

df = top_n_brewery_today_df[['brewery_name', 'venue_city', 'avg_rating', 'checkins']].round(2)
df.insert(3, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'ГОРОД', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.to_csv(f'data/cities/{en_city}.csv', index=False)  # saving DF
print(f'{en_city}.csv updated!')

Обновление таблиц по расписанию

Наш дашборд будет использовать библиотеку apscheduler для вызова последней функции по расписанию и обновления таблиц по городам. Следующие строки добавим в файл application.py: scheduler будет обновлять данные для каждого города из списка all_cities ежедневно в 13:30 по МСК.

from apscheduler.schedulers.background import BackgroundScheduler
from get_tables import update_best_breweries

all_cities = sorted(['Москва', 'Сергиев Посад', 'Санкт-Петербург', 'Владимир',
             'Красная Пахра', 'Воронеж', 'Екатеринбург', 'Ярославль', 'Казань',
             'Ростов-на-Дону', 'Краснодар', 'Тула', 'Курск', 'Пермь', 'Нижний Новгород'])

scheduler = BackgroundScheduler()
@scheduler.scheduled_job('cron', hour=10, misfire_grace_time=30)
def update_data():
   for city in all_cities:
       update_best_breweries(city)
scheduler.start()

Формирование таблицы

Наконец, опишем заключительную функцию get_top_russian_breweries_table(venue_city, checkins_n=250) — она будет принимать город, количество чекинов и будет возвращать сформированную таблицу dbc. Второй параметр — checkins_n будет отсеивать пивоварни, у которых чекинов меньше значения этой переменной. Если город не указан, сразу вызываем ранее описанную get_top_russian_breweries(checkins_n) — она вернёт общую статистику за последнее время. В противном случае снова переводим города на латиницу.

if venue_city == None: 
   selected_df = get_top_russian_breweries(checkins_n)
else: 
   ru_city = venue_city
   if ru_city == 'Санкт-Петербург':
       en_city = 'Saint Petersburg'
   elif ru_city == 'Нижний Новгород':
       en_city = 'Nizhnij Novgorod'
   elif ru_city == 'Пермь':
       en_city = 'Perm'
   else:
       en_city = translator.translate(ru_city, dest='en').text

Читаем все строки из таблицы с нужным городом и проверяем количество чекинов каждой пивоварни. В самом начале материала мы завели словарь city_names. При помощи функции map() мы пишем лямбда-выражение, которое возвращает значение ключа словаря city_names только если входной аргумент из колонки df[‘ГОРОД’] совпадает с каким-либо из ключей в city_names. В случае, если совпадения не будет возвращает просто x во избежание np.Nan.

Например, для наименования «СПБ» в колонке df[‘ГОРОД’] вернётся значение «Санкт-Петербург», так как такой ключ есть в city_names. Для «Воронеж» название таким и останется, так как совпадающий ключ не найден. В конце удаляем возможные дубликаты из DataFrame, добавляем колонку с номером места пивоварни и забираем себе первые 10 строк — это и будет топ-10 пивоварен по нужному городу.

df = pd.read_csv(f'data/cities/{en_city}.csv')
df = df.loc[df['ЧЕКИНОВ'] >= checkins_n]
df['ГОРОД'] = df['ГОРОД'].map(lambda x: city_names[x] if (x in city_names) else x)
df.drop_duplicates(subset=['НАЗВАНИЕ', 'ГОРОД'], keep='first', inplace=True) 
df.insert(0, 'МЕСТО', list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))
selected_df = df.head(10)

Вне зависимости от того, получали мы DataFrame общей функцией get_top_russian_breweries() или по конкретному городу, собираем таблицу, задаём стили и возвращаем готовый dbc-объект.


Вёрстка в Dash Bootstrap Components

table = dbc.Table.from_dataframe(selected_df, striped=False,
                                bordered=False, hover=True,
                                size='sm',
                                style={'background-color': '#ffffff',
                                       'font-family': 'Proxima Nova Regular',
                                       'text-align':'center',
                                       'fontSize': '12px'},
                                className='table borderless'
                                )

return table

Структура вёрстки

Опишем в application.py слайдер, таблицу и Dropdown-фильтр с выбором города.

О вёрстке дашборда при помощи Dash Bootstrap Components мы говорили в предыдущем материале цикла

checkins_slider_tab_1 = dbc.CardBody(
                           dbc.FormGroup(
                               [
                                   html.H6('Количество чекинов', style={'text-align': 'center'})),
                                   dcc.Slider(
                                       id='checkin_n_tab_1',
                                       min=0,
                                       max=250,
                                       step=25,
                                       value=250,  
                                       loading_state={'is_loading': True},
                                       marks={i: i for i in list(range(0, 251, 25))}
                                   ),
                               ],
                           ),
                           style={'max-height': '80px', 
                                  'padding-top': '25px'
                                  }
                       )

top_breweries = dbc.Card(
       [
           dbc.CardBody(
               [
                   dbc.FormGroup(
                       [
                           html.H6('Фильтр городов', style={'text-align': 'center'}),
                           dcc.Dropdown(
                               id='city_menu',
                               options=[{'label': i, 'value': i} for i in all_cities],
                               multi=False,
                               placeholder='Выберите город',
                               style={'font-family': 'Proxima Nova Regular'}
                           ),
                       ],
                   ),
                   html.P(id="tab-1-content", className="card-text"),
               ],
           ),
   ],
)

И для обновления таблицы по фильтру и слайдеру с минимальным количеством чекинов опишем callback с вызовом get_top_russian_breweries_table(city, checkin_n):

@app.callback(
   Output("tab-1-content", "children"), [Input("city_menu", "value"),
                                         Input("checkin_n_tab_1", "value")]
)
def table_content(city, checkin_n):
   return get_top_russian_breweries_table(city, checkin_n)

Готово! Напомню, в материале описан пример создания только одной таблицы. На данный момент дашборд помимо лучших пивоварен выдаёт лучшие и худшие сорта пива, а также средний рейтинг пива по регионам и отношение количества чекинов каждой пивоварни к её средней оценке.

Полный код проекта доступен на GitHub

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

Обзор библиотеки pandas-profiling на примере датасета Superstore Sales

Время чтения текста – 10 минут

Перед тем как работать с данными, необходимо составить представление, с чем мы имеем дело. В материале будем рассматривать датасет SuperStore Sales, а именно его лист Orders. В нём собраны данные о покупках клиентов канадского интернет-супермаркета: идентификаторы заказа, товаров, клиента, тип доставки, цены, категории и названия продуктов и прочее. Подробнее с датасетом можно ознакомиться на GitHub. Например, если мы создадим из датасета DataFrame, можем воспользоваться стандартным методом describe() библиотеки pandas для описания данных:

import pandas as pd

df = pd.read_csv('superstore_sales_orders.csv', decimal=',')
df.describe(include='all')

И во многих случаях получим такую кашу:

Код библиотеки доступен на GitHub

Если постараться и потратить время, можно извлечь полезную информацию. Например, можем узнать, что люди чаще выбирают «Regular air» в качестве доставки или что большинство заказов поступило из провинции Онтарио. Тем не менее, есть и другое решение, которое подробнее и качественнее описывает датасет — библиотека pandas-profiling. Вы отдаёте ей DataFrame, а она генерирует html-страницу с подробным описанием сета данных:

import pandas_profiling
profile = pandas_profiling.ProfileReport(df)
profile.to_file("output.html")

Всего Pandas Profiling возвращает 6 разделов: обзор датасета, переменные, отношения и корреляцию между ними, количество пропущенных значений и примеры из датасета.

Web-версия отчёта доступна по ссылке

Обзор данных

Рассмотрим первый подраздел — «Overview». Библиотека собрала следующую статистику: количество переменных, наблюдений, пропущенных ячеек, дубликатов и общий вес файла. В колонке Variable types описаны типы переменных: здесь 12 качественных и 9 числовых.

В подразделе «Reproduction» собрана техническая информация библиотеки: сколько времени занял анализ сета данных, версия библиотеки и прочее.

А подраздел «Warnings» сообщает о возможных проблемах в структуре датасета: сейчас он, например, предупреждает, что у поля «Order Date» — слишком большое количество уникальных значений.

Переменные

Двигаемся ниже. В этом разделе содержится подробное описание каждой переменной: сколько возможных уникальных значений она принимает, сколько значений пропущено, сколько памяти занимает поле. Справа от статистики присутствует гистограмма с распределением значений поля.

При нажатии на Toggle details откроется расширенная информация: квартили, медиана и прочая полезная описательная статистика. В остальных вкладках находятся гистограмма из основного экрана, топ-10 значений по частоте и экстремальные значения.

Отношения переменных

В этом разделе визуализированы отношения переменных при помощи hexbin plot: выглядит это не очень очевидно и понятно. Особенно усугубляет положение отсутствие легенды к графику.

Корреляция переменных

В этом разделе представлена по-разному посчитананя корреляция переменных: например, первым указано r-value Пирсона. Заметно, что переменная Profit положительно коррелирует с переменной Sales. При нажатии на Toggle correlation descriptions открывается подробное пояснение к каждому коэффициенту.

Пропущенные значения

Тут всё просто — bar chart, матрица и дендрограмма с количеством заполненных полей в каждой переменной. Заметно, что в колонке Product Base Margin отсутствуют три значения.

Примеры

И, наконец, последний раздел представляет первые и последние 10 значений в качестве примера кусков сета данных — аналог метода head() из pandas.

Что в итоге?

Библиотека уделяет больше внимания статистике, чем pandas: можно получить подробную описательную статистику по каждой переменной, посмотреть, как коррелируют между собой столбцы датасета. В совокупности с генерацией простого и удобного интерфейса библиотека строит полноценный отчёт по датасету, уже на основании которого можно делать выводы и сформировать представление о данных.
И всё же, у библиотеки есть и минусы. На генерацию отчётов к громадным датасетам может уйти много времени вплоть до нескольких часов. Это безусловно хороший инструмент для автоматического проектирования, но он не может сделать полноценный анализ за вас и добавить больше деталей в графики. Кроме того, если вы только начали практиковаться с анализом данных лучше будет начать с pandas — это закрепит ваши навыки и придаст уверенности при работе с данными.

Пишем клиент для нового API nalog.ru

Время чтения текста – 6 минут

UPD 29-09-2021: Мы обновили клиент. Теперь проходить аутентификацию можно по номеру телефона и подтверждению по SMS. Репозиторий на GitHub

Ранее в блоге мы рассказывали, как благодаря открытому API можно собирать данные от ФНС по нашим чекам из магазинов, обращаясь к приложению налоговой. С прошлой недели способ стал нерабочим: ФНС обновили методы приложения. Сегодня напишем свой клиент для nalog.ru, который проходит авторизацию и отправляет чеки на проверку.

Авторизация

Прежде чем начать пользоваться приложением, наш профиль необходимо авторизовать. В отличии от предыдущей версии, текущая требует прохождение капчи для авторизации по мобильному телефону — такой способ нам не подходит. Проще всего будет входить в профиль по ИНН и паролю от личного кабинета налогоплательщика. Для хранения этих данных создадим файл credentials.env. Переменную CLIENT_SECRET зададим согласно коду: она отвечает за авторизацию приложения. А ИНН и пароль подставляем свои.

INN = 
PASSWORD = 
CLIENT_SECRET=IyvrAbKt9h/8p6a7QPh8gpkXYQ4=

Теперь создадим файл nalog_python.py, в котором будет описан клиент. Библиотека dotenv используется для загрузки нашего логина и пароля из .env файла.

import os
import json
import requests
from dotenv import load_dotenv

Опишем класс NalogRuPython, и начнём с глобальных переменных класса. Здесь перечисляем headers, необходимые для отправки запроса.

class NalogRuPython:
    HOST = 'irkkt-mobile.nalog.ru:8888'
    DEVICE_OS = 'iOS'
    CLIENT_VERSION = '2.9.0'
    DEVICE_ID = '7C82010F-16CC-446B-8F66-FC4080C66521'
    ACCEPT = '*/*'
    USER_AGENT = 'billchecker/2.9.0 (iPhone; iOS 13.6; Scale/2.00)'
    ACCEPT_LANGUAGE = 'ru-RU;q=1, en-US;q=0.9'

В конструкторе класса прочитаем данные из нашего окружения методом load_dotenv и вызовем метод set_session_id для получения __session_id, который сейчас опишем. Идентификатор сессии необходим для отправки прочих запросов к серверу налоговой, поэтому его получаем первым делом.

def __init__(self):
        load_dotenv()
        self.__session_id = None
        self.set_session_id()

Метод set_session_id проводит авторизацию пользователя по ИНН и паролю от личного кабинета налогоплательщика и ничего не возвращает, только задаёт значение переменной __session_id. Отправляем по указанному в глобальных переменных HOST запрос с нашими данными от аккаунта и получаем идентификатор сессии в ответе.

def set_session_id(self) -> None:
        if os.getenv('CLIENT_SECRET') is None:
            raise ValueError('OS environments not content "CLIENT_SECRET"')
        if os.getenv('INN') is None:
            raise ValueError('OS environments not content "INN"')
        if os.getenv('PASSWORD') is None:
            raise ValueError('OS environments not content "PASSWORD"')

        url = f'https://{self.HOST}/v2/mobile/users/lkfl/auth'
        payload = {
            'inn': os.getenv('INN'),
            'client_secret': os.getenv('CLIENT_SECRET'),
            'password': os.getenv('PASSWORD')
        }
        headers = {
            'Host': self.HOST,
            'Accept': self.ACCEPT,
            'Device-OS': self.DEVICE_OS,
            'Device-Id': self.DEVICE_ID,
            'clientVersion': self.CLIENT_VERSION,
            'Accept-Language': self.ACCEPT_LANGUAGE,
            'User-Agent': self.USER_AGENT,
        }

        resp = requests.post(url, json=payload, headers=headers)
        self.__session_id = resp.json()['sessionId']

Получение идентификатора чека

Следующий шаг — получение ticket_id. Прежде чем отправить сам чек, необходимо получить его идентификатор для проверки. Опишем метод _get_ticket_id, который принимает строку с расшифрованным QR-кодом чека, отправляет соответствующий запрос на сервер и возвращает идентификатор для этой строки. В headers помимо указания глобальных переменных появился также __session_id, который требует метод /v2/ticket.

def _get_ticket_id(self, qr: str) -> str:
        url = f'https://{self.HOST}/v2/ticket'
        payload = {'qr': qr}
        headers = {
            'Host': self.HOST,
            'Accept': self.ACCEPT,
            'Device-OS': self.DEVICE_OS,
            'Device-Id': self.DEVICE_ID,
            'clientVersion': self.CLIENT_VERSION,
            'Accept-Language': self.ACCEPT_LANGUAGE,
            'sessionId': self.__session_id,
            'User-Agent': self.USER_AGENT,
        }
        resp = requests.post(url, json=payload, headers=headers)
        return resp.json()["id"]

Расшифровка чека

Последний шаг — проверка чека. Формируем по ticket_id запрос к серверу и получаем подробную расшифровку чека в ответе. На этом клиент полностью описан и готов к работе.

def get_ticket(self, qr: str) -> dict:
        ticket_id = self._get_ticket_id(qr)
        url = f'https://{self.HOST}/v2/tickets/{ticket_id}'
        headers = {
            'Host': self.HOST,
            'sessionId': self.__session_id,
            'Device-OS': self.DEVICE_OS,
            'clientVersion': self.CLIENT_VERSION,
            'Device-Id': self.DEVICE_ID,
            'Accept': self.ACCEPT,
            'User-Agent': self.USER_AGENT,
            'Accept-Language': self.ACCEPT_LANGUAGE,
        }
        resp = requests.get(url, headers=headers)
        return resp.json()

Наконец, для удобства опишем пример работы клиента. Создадим экземпляр класса NalogRuPython, зададим для примера строку QR-кода и получим расшифрованный ticket, который затем напечатаем на экране.

if __name__ == '__main__':
    client = NalogRuPython()
    qr_code = "t=20200727T1117&s=4850.00&fn=9287440300634471&i=13571&fp=3730902192&n=1"
    ticket = client.get_ticket(qr_code)
    print(json.dumps(ticket, indent=4))

Клиент можно использовать и в своих скриптах: для этого нужно импортировать класс, создать экземпляр и, как и в примере, вызвать метод get_ticket.

from nalog_python import NalogRuPython

client = NalogRuPython()
qr_code = "t=20200727T1117&s=4850.00&fn=9287440300634471&i=13571&fp=3730902192&n=1"
ticket = client.get_ticket(qr_code)

Полный код проекта на GitHub

 34 комментария    6421   2020   Data Analytics   nalog.ru   python

Визуализация данных на российской карте библиотекой Plotly

Время чтения текста – 11 минут

Часто для визуализации данных подходит карта: например, когда нужно показать, как статистика ведёт себя в определённых городах, областях, регионах. В таких случаях каждый регион или другая административная единица кодируется: ее границы преобразуют в полигоны и мультиполигоны с координатами широты и долготы относительно карты мира. Для Америки и Европы не составит труда найти встроенное в библиотеку Plotly решение, но в случае с картой России такого реализованного решения сходу найти не удалось. Сегодня мы разметим готовый geojson файл с административными границами регионов России, напишем парсер последних данных по коронавирусу и визуализируем статистику на карте при помощи библиотеки Plotly.

from urllib.request import urlopen
import json
import requests
import pandas as pd
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import plotly.graph_objects as go

Правим geojson

Скачаем открытый geojson с границами российских регионов, найденный по одной из первых ссылок в Google по запросу «russia geojson». В нём уже есть кое-какие данные: например, наименования регионов. Но этот geojson-файл пока ещё не подходит под требуемый формат Plotly: в нём не размечены идентификаторы регионов.

with urlopen('https://raw.githubusercontent.com/codeforamerica/click_that_hood/master/public/data/russia.geojson') as response:
    counties = json.load(response)

Кроме разметки идентификаторов есть различия в наименовании регионов. Например, на сайте стопкоронавирус.рф, откуда мы будем брать свежие данные о заболевших, республика Башкортостан занесена как «Республика Башкортостан», а в нашем geojson-файле — просто «Башкортостан». Все эти различия необходимо устранить во избежание конфликтов. Кроме того, все первые буквы в названиях регионов должны начинаться с верхнего регистра.

regions_republic_1 = ['Бурятия', 'Тыва', 'Адыгея', 'Татарстан', 'Марий Эл',
                      'Чувашия', 'Северная Осетия – Алания', 'Алтай',
                      'Дагестан', 'Ингушетия', 'Башкортостан']
regions_republic_2 = ['Удмуртская республика', 'Кабардино-Балкарская республика',
                      'Карачаево-Черкесская республика', 'Чеченская республика']
for k in range(len(counties['features'])):
    counties['features'][k]['id'] = k
    if counties['features'][k]['properties']['name'] in regions_republic_1:
        counties['features'][k]['properties']['name'] = 'Республика ' + counties['features'][k]['properties']['name']
    elif counties['features'][k]['properties']['name'] == 'Ханты-Мансийский автономный округ - Югра':
        counties['features'][k]['properties']['name'] = 'Ханты-Мансийский АО'
    elif counties['features'][k]['properties']['name'] in regions_republic_2:
        counties['features'][k]['properties']['name'] = counties['features'][k]['properties']['name'].title()

Из получившегося geojson-файла сформируем DataFrame с регионами России: возьмём идентификаторы и наименования.

region_id_list = []
regions_list = []
for k in range(len(counties['features'])):
    region_id_list.append(counties['features'][k]['id'])
    regions_list.append(counties['features'][k]['properties']['name'])
df_regions = pd.DataFrame()
df_regions['region_id'] = region_id_list
df_regions['region_name'] = regions_list

Если сделаем всё правильно, получим такой DataFrame:

Собираем данные

Будем парсить эту таблицу:

Воспользуемся библиотекой Selenium. Перейдём на сайт и получим всю страницу, а затем преобразуем её в Soup для парсинга.

driver = webdriver.Chrome()
driver.get('https://стопкоронавирус.рф/information/')
source_data = driver.page_source
soup = bs(source_data, 'lxml')

На сайте наименования регионов находятся под тегом <th>, а свежие данные по регионам под тегом <td>. Для начала получим данные.

divs_data = soup.find_all('td')

Данные в divs_data выглядят следующим образом:

Вся информация идёт в одну строчку: и новые случаи, и активные, и все прочие. Тем не менее, заметно, что каждому региону соответствует пять значений: для Москвы это первые пять, для Московской области — вторые пять и так далее. Воспользуемся этим: в каждый из пяти списков будем класть значения согласно индексу. Если это первое значение, то оно войдёт в список выявленных случаев, если второе — в список новых случаев и так далее. После пяти индекс будет обнуляться.

count = 1
for td in divs_data:
    if count == 1:
        sick_list.append(int(td.text))
    elif count == 2:
        new_list.append(int(td.text))
    elif count == 3:
        cases_list.append(int(td.text))
    elif count == 4:
        healed_list.append(int(td.text))
    elif count == 5:
        died_list.append(int(td.text))
        count = 0
    count += 1

Следующим шагом соберём названия регионов из таблицы — они лежат под классом col-region. Из названий нужно убрать лишние двойные пробелы и символы переноса строки.

divs_region_names = soup.find_all('th', {'class':'col-region'})
region_names_list = []
for i in range(1, len(divs_region_names)):
    region_name = divs_region_names[i].text
    region_name = region_name.replace('\n', '').replace('  ', '')
    region_names_list.append(region_name)

Соберём DataFrame:

df = pd.DataFrame()
df['region_name'] = region_names_list
df['sick'] = sick_list
df['new'] = new_list
df['cases'] = cases_list
df['healed'] = healed_list
df['died'] = died_list

И посмотрим на Челябинскую область под десятым индексом — в конце наименования остался пробел! Этот пробел в конце строки может причинить много бед, ведь тогда название не будет соответствовать названию региона в geojson-файле. Уберём его — благо, все остальные наименования на сайте в порядке.

df.loc[10, 'region_name'] = df[df.region_name == 'Челябинская область '].region_name.item().strip(' ')

Наконец, сделаем merge двух DataFrame по колонке названия региона, чтобы в получившейся таблице с данными по коронавирусу появилась колонка с идентификатором региона, необходимая для генерации карты.

df = df.merge(df_regions, on='region_name')

Визуализация данных на карте Plotly

Создадим новую фигуру — она будет являться объектом Choroplethmapbox. В параметр geojson передаём переменную counties с geojson-файлом, в параметр locations вставляем идентификаторы регионов. Параметр z — значения, которые мы хотим визуализировать. Для примера возьмём количество новых случаев в каждом регионе — они лежат в колонке new таблицы. В text передаём названия регионов. Другой параметр — colorscale — нужен для цветового сопровождения данных. Он принимает списки со значениями от 0 до 1, которые являются позициями цветов в градиенте. Чем меньше заболевших, тем зеленее будет регион. С увеличением числа заболевших цвет переходит от желтого к красному. Параметр hovertemplate — шаблон панели, появляющейся при наведении на регион. С тултипом связан ещё один аргумент — customdata. Он принимает объединенные вдоль оси объекты, которые затем можно использовать в hovertemplate для отображения новых данных.

fig = go.Figure(go.Choroplethmapbox(geojson=counties,
                           locations=df['region_id'],
                           z=df['new'],
                           text=df['region_name'],
                           colorscale=[[0, 'rgb(34, 150, 79)'],
                                       [0.2, 'rgb(249, 247, 174)'],
                                       [0.8, 'rgb(253, 172, 99)'],
                                       [1, 'rgb(212, 50, 44)']],
                           colorbar_thickness=20,
                           customdata=np.stack([df['cases'], df['died'], df['sick'], df['healed']], axis=-1),
                           hovertemplate='<b>%{text}</b>'+ '<br>' +
                                         'Новых случаев: %{z}' + '<br>' +
                                         'Активных: %{customdata[0]}' + '<br>' +
                                         'Умерло: %{customdata[1]}' + '<br>' +
                                         'Всего случаев: %{customdata[2]}' + '<br>' +
                                         'Выздоровело: %{customdata[3]}' +
                                         '<extra></extra>',
                           hoverinfo='text, z'))

Теперь зададим стиль карты — возьмём готовую carto-positron, нейтральный и минималистичный шаблон, который не отвлекает от основных данных. Аргумент mapbox_zoom отвечает за приближение карты, а mapbox_center принимает координаты начального центра карты. Зададим marker_line_width равный нулю, чтобы убрать границы между регионами. После зададим всем отступам в margin значение 0, чтобы карта была визуально шире. Сразу после выведем фигуру методом show().

fig.update_layout(mapbox_style="carto-positron",
                  mapbox_zoom=1, mapbox_center = {"lat": 66, "lon": 94})
fig.update_traces(marker_line_width=0)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

Получилась такая карта. Из диаграммы следует, что больше всего заболевших за прошедшие сутки появилось в Москве — 608 случаев, что существенно относительно остальных регионов. Особенно в сравнении с Ненецким автономным округом, где число случаев новых заражений, на удивление, равняется нулю.

Полный код проекта на GitHub

 4 комментария    683   2020   dash   Data Analytics   plotly   python

Деплой дашборда на AWS Elastic Beanstalk

Время чтения текста – 7 минут

Если под рукой имеется машина на Amazon Web Services и стоит задача развернуть веб-приложение, можно воспользоваться сервисом Elastic Beanstalk от AWS: он позволяет развертывать приложения под другими сервисами от Amazon, включая EC2.

Готовим приложение

В материале «Делаем дашборд с параметром на Python» мы создали проект с двумя файлами: application.py — скрипт с генерацией локального дашборда и get_plots.py — скрипт, возвращающий scatter plot с пивоварнями Untappd из материала «Строим scatter plot по пивоварням Untappd». Немного подкорректируем файл application.py: чтобы приложение запускалось на Elastic Beanstalk, app.server в конце файла присвоим переменной application. Должно получиться вот так:

application = app.server

if __name__ == '__main__':
   application.run(debug=True, port=8080)

Перед тем, как развернуть приложение, нужно собрать его в архив. В архиве должны присутствовать все необходимые файлы, включая requirements.txt — перечень зависимостей приложения. В нём перечислены пакеты и версии, необходимые для запуска приложения. Чтобы его создать, достаточно в директории с проектом и окружением ввести команду pip freeze и отправить вывод в файл:

pip freeze > requirements.txt

Теперь соберём архив. В unix для архивации и сжатия предусмотрена встроенная утилита zip.

zip deploy_v0 application.py get_plots.py requirements.txt

Создаём приложение и окружение

Переходим на Elastic Beanstalk в раздел «Applications». Жмём на «Create a new application».

В открывшейся странице заполняем наименование приложения и описание. Ниже предлагается присвоить приложению теги для упрощенной категоризации ресурсов. Формат вводимого тега похож на словарь Python: это пара ключ — значение, ключ должен быть уникален. После заполнения данных жмём на оранжевую кнопку «Create».

Сразу после нам покажут список окружений для приложения: изначально он пустой, поэтому нажимаем на «Create a new environment».

Так как мы работаем с веб-приложением, выбираем окружение веб-сервера:

После предлагают ввести информацию о приложении, включая домен. Можно ввести свой домен, если таковой будет свободен:

Следом выбираем платформу веб-приложения. Наше написано на Python.

Теперь загружаем само приложение: так как код мы уже написали, выбираем «Upload your code» и прикрепляем файл с архивом. После жмём «Create environment».

Следом откроется окно с логами создания окружения. Пару минут придётся подождать.

Если все сделали правильно, увидим экран с галочкой и подписью «OK»: это означает, что наше приложение успешно загружено и доступно. Если захотим загрузить новую версию, достаточно пересобрать архив с файлами и загрузить его по кнопке «Upload and deploy».

По ссылке, представленной выше можем пройти на сайт, где лежит дашборд. При помощи тега <iframe> этот дашборд можно также встроить на другой сайт:

<iframe id="igraph" scrolling="no" style="border:none;"seamless="seamless" src="http://dashboardleftjoin-env.eba-qxzgfj64.us-east-2.elasticbeanstalk.com" height="1100" width="800"></iframe>

В итоге получим такой дашборд на сайте:

Полный код проекта на GitHub

Делаем дашборд с параметром на Python

Время чтения текста – 3 минуты

В прошлом материале мы подготовили scatter plot, используя библиотеку plotly: он отображал отношение количества отзывов пивоварни к её рейтингу в социальной сети Untappd. Ещё мы добавили каждому маркеру характеристики: дату регистрации пивоварни и количество сортов пива в её ассортименте. Сегодня воспользуемся другим инструментом plotly — Dash, и построим дашборд с двумя параметрами для этого графика. Создадим новый файл — application.py, который будет импортировать функцию get_scatter_plot(n_days, top_n) из последнего материала.

import dash
import dash_core_components as dcc
import dash_html_components as html
from get_plots import get_scatter_plot

После импорта библиотек загружаем css-стили и инициируем веб-приложение:

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

Опишем структуру дашборда:

app.layout = html.Div(children=[
       html.Div([
           dcc.Graph(id='fig1'),
       ]) ,
       html.Div([
           html.H6('Временной период, дней'),
           dcc.Slider(
               id='slider-day1',
               min=0,
               max=100,
               step=1,
               value=30,
               marks={i: str(i) for i in range(0, 100, 10)}
           ),
           html.H6('Количество пивоварен в топе'),
           dcc.Slider(
               id='slider-top1',
               min=0,
               max=500,
               step=50,
               value=500,
               marks={i: str(i) for i in range(0, 500, 50)})
       ])
])

Мы обозначили на панели график и два слайдера. У каждого слайдера есть свой идентификатор и параметры: минимальное значение, максимальное, шаг изменения, начальное значение. Так как данные из слайдеров будут передаваться в график, опишем callback для них: первый аргумент — Output — график, который будет изменяться, это наш вывод. Следующие два — Input — параметры, от которых график зависит.

@app.callback(
   dash.dependencies.Output('fig1', 'figure'),
   [dash.dependencies.Input('slider-day1', 'value'),
    dash.dependencies.Input('slider-top1', 'value')])
def output_fig(n_days, top_n):
    fig = get_scatter_plot(n_days, top_n)
    return fig

В конце файла добавим вызов локального сервера:

if __name__ == '__main__':
   app.run_server(debug=True)

Теперь при запуске файла в терминале появится адрес локального сервера. Пройдя по нему, в браузере откроем наш интерактивный дашборд, который самостоятельно обновляется при изменении значений слайдеров.

 2 комментария    186   2020   dash   Data Analytics   plotly   python   untappd

Строим scatter plot по пивоварням Untappd

Время чтения текста – 15 минут

Сегодня построим scatter plot, который отобразит отношение количества отзывов российских пивоварен к их средней оценке за последние 30 дней. В качестве данных будем использовать чекины социальной сети Untappd, которые пользователи оставляют для оценки пива. Маркеры на графике будут иметь две характеристики: цвет и размер. Цвет будет зависеть от даты регистрации пивоварни на сервисе (то есть показывать сколько лет пивоварне в Untappd), а размер — от количества сортов пива в её ассортименте. Этот материал — первая часть цикла материалов, посвященных построению дашборда с библиотекой dash от plotly.

Пишем запрос к Clickhouse

Данные, по которым мы хотим построить дашборд для начала нужно обработать. Мы использовали открытые данные, собранные с сайта Untappd в материалах «Обрабатываем нажатие кнопки в Selenium» и «Использование словарей в Clickhouse на примере данных Untappd».

from datetime import datetime, timedelta
from clickhouse_driver import Client
import plotly.graph_objects as go
import pandas as pd
import numpy as np
client = Client(host='ec1-2-34-567-89.us-east-2.compute.amazonaws.com', user='default', password='', port='9000', database='default')

График будет строиться в функции get_scatter_plot(n_days, top_n). Первый аргумент будет отвечать за временной период, который нужно обработать. Второй — какое количество пивоварен из топа отобразить на графике. Для начала напишем SQL-запрос, который возьмёт данные из таблицы Clickhouse и посчитает Brewery Pure Average. Для каждой пивоварни на сервисе он считается так: умножаем рейтинг сорта пива на количество оценок этого сорта и делим на общее число оценок пивоварни. Ещё запрос возьмёт наименование пивоварни и количество сортов пива у пивоварни из словаря, с которым мы работали ранее: при помощи функции dictGet обратимся к нему прямо в запросе и возьмём нужные колонки. Зададим ограничение: нас интересуют только те пивоварни, у которых Brewery Pure Average отличен от нуля, а количество отзывов более 100.

brewery_pure_average = client.execute(f"""
SELECT
       t1.brewery_id,
       sum(t1.beer_pure_average_mult_count / t2.count_for_that_brewery) AS brewery_pure_average,
       t2.count_for_that_brewery,
       dictGet('breweries', 'brewery_name', toUInt64(t1.brewery_id)),
       dictGet('breweries', 'beer_count', toUInt64(t1.brewery_id)),
       t3.stats_age_on_service / 365
   FROM
   (
       SELECT
           beer_id,
           brewery_id,
           sum(rating_score) AS beer_pure_average_mult_count
       FROM beer_reviews
       WHERE created_at >= today()-{n_days}
       GROUP BY
           beer_id,
           brewery_id
   ) AS t1
   ANY LEFT JOIN
   (
       SELECT
           brewery_id,
           count(rating_score) AS count_for_that_brewery
       FROM beer_reviews
       WHERE created_at >= today()-{n_days}
       GROUP BY brewery_id
   ) AS t2 ON t1.brewery_id = t2.brewery_id
   ANY LEFT JOIN
   (
       SELECT
           brewery_id,
           stats_age_on_service
       FROM brewery_info
   ) AS t3 ON t1.brewery_id = t3.brewery_id
   GROUP BY
       t1.brewery_id,
       t2.count_for_that_brewery,
       t3.stats_age_on_service
   HAVING t2.count_for_that_brewery >= 150
   ORDER BY brewery_pure_average
   LIMIT {top_n}
    """)

scatter_plot_df_with_age = pd.DataFrame(brewery_pure_average, columns=['brewery_id', 'brewery_pure_average', 'rating_count', 'brewery_name', 'beer_count'])

Обрабатываем значения из DataFrame

Добавим на график две пунктирные линии: они будут проходить в медианных значениях каждой оси. Так мы будем знать, какие пивоварни находятся выше медианного значения. Лучшими будут те, что находятся в верхнем правом квадрате.

dict_list = []
dict_list.append(dict(type="line",
                     line=dict(
                         color="#666666",
                         dash="dot"),
                     x0=0,
                     y0=np.median(scatter_plot_df_with_age.brewery_pure_average),
                     x1=7000,
                     y1=np.median(scatter_plot_df_with_age.brewery_pure_average),
                     line_width=1,
                     layer="below"))
dict_list.append(dict(type="line",
                     line=dict(
                         color="#666666",
                         dash="dot"),
                     x0=np.median(scatter_plot_df_with_age.rating_count),
                     y0=0,
                     x1=np.median(scatter_plot_df_with_age.rating_count),
                     y1=5,
                     line_width=1,
                     layer="below"))

Добавим аннотации: они будут сообщать пользователю медианные значения.

annotations_list = []
annotations_list.append(
    dict(
        x=8000,
        y=np.median(scatter_plot_df_with_age.brewery_pure_average) - 0.1,
        xref="x",
        yref="y",
        text=f"Медианное значение: {round(np.median(scatter_plot_df_with_age.brewery_pure_average), 2)}",
        showarrow=False,
        font={
            'family':'Roboto, light',
            'color':'#666666',
            'size':12
        }
    )
)
annotations_list.append(
    dict(
        x=np.median(scatter_plot_df_with_age.rating_count) + 180,
        y=0.8,
        xref="x",
        yref="y",
        text=f"Медианное значение: {round(np.median(scatter_plot_df_with_age.rating_count), 2)}",
        showarrow=False,
        font={
            'family':'Roboto, light',
            'color':'#666666',
            'size':12
        },
        textangle=-90
    )
)

Прибавим графику информативности: поделим пивоварни на 4 группы по сортам пива. В первой группе будут пивоварни, у которых менее 10 сортов пива, во второй 10 — 30 сортов, в третьей 30 — 50 и в четвертой те, у кого сортов более 50. Значения списка bucket_beer_count — размеры маркеров.

bucket_beer_count = []
for beer_count in scatter_plot_df_with_age.beer_count:
   if beer_count < 10:
       bucket_beer_count.append(7)
   elif 10 <= beer_count <= 30:
       bucket_beer_count.append(9)
   elif 31 <= beer_count <= 50:
       bucket_beer_count.append(11)
   else:
       bucket_beer_count.append(13)
scatter_plot_df_with_age['bucket_beer_count'] = bucket_beer_count

Следом поделим график ещё на четыре группы: теперь уже по возрасту.

bucket_age = []
for age in scatter_plot_df_with_age.age_on_service:
   if age < 4:
       bucket_age.append(0)
   elif 4 <= age <= 6:
       bucket_age.append(1)
   elif 6 < age < 8:
       bucket_age.append(2)
   else:
       bucket_age.append(3)
scatter_plot_df_with_age['bucket_age'] = bucket_age

Разделим один DataFrame на четыре, чтобы по каждому построить отдельный scatter plot со своим цветом и размером.

scatter_plot_df_0 = scatter_plot_df[scatter_plot_df.bucket == 0]
scatter_plot_df_1 = scatter_plot_df[scatter_plot_df.bucket == 1]
scatter_plot_df_2 = scatter_plot_df[scatter_plot_df.bucket == 2]
scatter_plot_df_3 = scatter_plot_df[scatter_plot_df.bucket == 3]

Описываем график

Построим график: поочередно добавим четыре группы пивоварен. Для каждой зададим своё имя и цвет маркера, название, прозрачность и текст.

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=scatter_plot_df_0.rating_count,
    y=scatter_plot_df_0.brewery_pure_average,
    name='< 4',
    mode='markers',
    opacity=0.85,
    text=scatter_plot_df_0.name_count,
    marker_color='rgb(114, 183, 178)',
    marker_size=scatter_plot_df_0.bucket_beer_count,
    textfont={"family":"Roboto, light",
              "color":"black"
             }
))

fig.add_trace(go.Scatter(
    x=scatter_plot_df_1.rating_count,
    y=scatter_plot_df_1.brewery_pure_average,
    name='4 – 6',
    mode='markers',
    opacity=0.85,
    marker_color='rgb(76, 120, 168)',
    text=scatter_plot_df_1.name_count,
    marker_size=scatter_plot_df_1.bucket_beer_count,
    textfont={"family":"Roboto, light",
              "color":"black"
             }
))

fig.add_trace(go.Scatter(
    x=scatter_plot_df_2.rating_count,
    y=scatter_plot_df_2.brewery_pure_average,
    name='6 – 8',
    mode='markers',
    opacity=0.85,
    marker_color='rgb(245, 133, 23)',
    text=scatter_plot_df_2.name_count,
    marker_size=scatter_plot_df_2.bucket_beer_count,
    textfont={"family":"Roboto, light",
              "color":"black"
             }
))

fig.add_trace(go.Scatter(
    x=scatter_plot_df_3.rating_count,
    y=scatter_plot_df_3.brewery_pure_average,
    name='8+',
    mode='markers',
    opacity=0.85,
    marker_color='rgb(228, 87, 86)',
    text=scatter_plot_df_3.name_count,
    marker_size=scatter_plot_df_3.bucket_beer_count,
    textfont={"family":"Roboto, light",
              "color":"black"
             }
))

fig.update_layout(
    title=f"Отношение количества отзывов к средней оценке пивоварни<br>за последние {n_days} дней, топ-{top_n} пивоварен",
    font={
            'family':'Roboto, light',
            'color':'black',
            'size':14
        },
    plot_bgcolor='rgba(0,0,0,0)',
    yaxis_title="Средняя оценка",
    xaxis_title="Количество отзывов",
    legend_title_text='Возраст пивоварни<br>на Untappd, лет:',
    height=750,
    shapes=dict_list,
    annotations=annotations_list
)

Получили такой график. Каждый маркер — отдельная пивоварня. Цвет характеризует количество сортов пива этой пивоварни, а при наведении увидим среднюю оценку по данным за последние 30 дней, количество отзывов, название и количество сортов пива у пивоварни. Две пунктирные линии проходят в соответствующих медианных значениях, рассчитанных модулем numpy: пивоварни, оказавшиеся в верхнем правом квадрате — самые успешные. В следующем материале построим дашборд, и сделаем количество последних дней и количество пивоварен в топе динамически изменяемым параметром.


Код функции get_scatter_plot

def get_scatter_plot(n_days, top_n):
    brewery_pure_average = client.execute(f'''
    SELECT 
        t1.brewery_id, 
        sum(t1.beer_pure_average_mult_count / t2.count_for_that_brewery) AS brewery_pure_average, 
        t2.count_for_that_brewery, 
        dictGet('breweries', 'brewery_name', toUInt64(t1.brewery_id)), 
        dictGet('breweries', 'beer_count', toUInt64(t1.brewery_id)),
        t3.stats_age_on_service / 365

    FROM 
    (
        SELECT 
            beer_id, 
            brewery_id, 
            sum(rating_score) AS beer_pure_average_mult_count
        FROM beer_reviews
        WHERE created_at >= today()-{n_days}
        GROUP BY 
            beer_id, 
            brewery_id
    ) AS t1
    ANY LEFT JOIN 
    (
        SELECT 
            brewery_id, 
            count(rating_score) AS count_for_that_brewery
        FROM beer_reviews
        WHERE created_at >= today()-{n_days}
        GROUP BY brewery_id
    ) AS t2 ON t1.brewery_id = t2.brewery_id
    ANY LEFT JOIN
    (
        SELECT
            brewery_id,
            stats_age_on_service
        FROM brewery_info_new
    ) AS t3 ON t1.brewery_id = t3.brewery_id
    GROUP BY 
        t1.brewery_id, 
        t2.count_for_that_brewery,
        t3.stats_age_on_service
    HAVING t2.count_for_that_brewery >= 150
    ORDER BY brewery_pure_average
    LIMIT {top_n}
    ''')

    scatter_plot_df_with_age = pd.DataFrame(brewery_pure_average, columns=['brewery_id', 'brewery_pure_average', 'rating_count', 'brewery_name', 'beer_count', 'age_on_service'])
    brewery_name_and_beer_count = []
    for name, beer_count in zip(scatter_plot_df_with_age.brewery_name, scatter_plot_df_with_age.beer_count):
        brewery_name_and_beer_count.append(f'{name},<br>количество сортов пива: {beer_count}')
    scatter_plot_df_with_age['name_count'] = brewery_name_and_beer_count
    dict_list = []
    dict_list.append(dict(type="line",
        line=dict(
             color="#666666",
             dash="dot"),
        x0=0,
        y0=np.median(scatter_plot_df_with_age.brewery_pure_average),
        x1=9000,
        y1=np.median(scatter_plot_df_with_age.brewery_pure_average),
        line_width=1,
        layer="below"))
    dict_list.append(dict(type="line",
        line=dict(
             color="#666666",
             dash="dot"),
        x0=np.median(scatter_plot_df_with_age.rating_count),
        y0=0,
        x1=np.median(scatter_plot_df_with_age.rating_count),
        y1=5,
        line_width=1,
        layer="below"))
    annotations_list = []
    annotations_list.append(
        dict(
            x=8000,
            y=np.median(scatter_plot_df_with_age.brewery_pure_average) - 0.1,
            xref="x",
            yref="y",
            text=f"Медианное значение: {round(np.median(scatter_plot_df_with_age.brewery_pure_average), 2)}",
            showarrow=False,
            font={
                'family':'Roboto, light',
                'color':'#666666',
                'size':12
            }
        )
    )
    annotations_list.append(
        dict(
            x=np.median(scatter_plot_df_with_age.rating_count) + 180,
            y=0.8,
            xref="x",
            yref="y",
            text=f"Медианное значение: {round(np.median(scatter_plot_df_with_age.rating_count), 2)}",
            showarrow=False,
            font={
                'family':'Roboto, light',
                'color':'#666666',
                'size':12
            },
            textangle=-90
        )
    )
    bucket = []
    for beer_count in scatter_plot_df_with_age.beer_count:
        if beer_count < 10:
            bucket.append(7)
        elif 10 <= beer_count <= 30:
            bucket.append(9)
        elif 31 <= beer_count <= 50:
            bucket.append(11)
        else:
            bucket.append(13)
    scatter_plot_df_with_age['bucket_beer_count'] = bucket
    bucket_age = []
    for age in scatter_plot_df_with_age.age_on_service:
        if age < 4:
            bucket_age.append(0)
        elif 4 <= age <= 6:
            bucket_age.append(1)
        elif 6 < age < 8:
            bucket_age.append(2)
        else:
            bucket_age.append(3)
    scatter_plot_df_with_age['bucket_age'] = bucket_age
    scatter_plot_df_0 = scatter_plot_df_with_age[(scatter_plot_df_with_age.bucket_age == 0)]
    scatter_plot_df_1 = scatter_plot_df_with_age[scatter_plot_df_with_age.bucket_age == 1]
    scatter_plot_df_2 = scatter_plot_df_with_age[scatter_plot_df_with_age.bucket_age == 2]
    scatter_plot_df_3 = scatter_plot_df_with_age[scatter_plot_df_with_age.bucket_age == 3]
    
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=scatter_plot_df_0.rating_count,
        y=scatter_plot_df_0.brewery_pure_average,
        name='< 4',
        mode='markers',
        opacity=0.85,
        text=scatter_plot_df_0.name_count,
        marker_color='rgb(114, 183, 178)',
        marker_size=scatter_plot_df_0.bucket_beer_count,
        textfont={"family":"Roboto, light",
                  "color":"black"
                 }
    ))

    fig.add_trace(go.Scatter(
        x=scatter_plot_df_1.rating_count,
        y=scatter_plot_df_1.brewery_pure_average,
        name='4 – 6',
        mode='markers',
        opacity=0.85,
        marker_color='rgb(76, 120, 168)',
        text=scatter_plot_df_1.name_count,
        marker_size=scatter_plot_df_1.bucket_beer_count,
        textfont={"family":"Roboto, light",
                  "color":"black"
                 }
    ))

    fig.add_trace(go.Scatter(
        x=scatter_plot_df_2.rating_count,
        y=scatter_plot_df_2.brewery_pure_average,
        name='6 – 8',
        mode='markers',
        opacity=0.85,
        marker_color='rgb(245, 133, 23)',
        text=scatter_plot_df_2.name_count,
        marker_size=scatter_plot_df_2.bucket_beer_count,
        textfont={"family":"Roboto, light",
                  "color":"black"
                 }
    ))

    fig.add_trace(go.Scatter(
        x=scatter_plot_df_3.rating_count,
        y=scatter_plot_df_3.brewery_pure_average,
        name='8+',
        mode='markers',
        opacity=0.85,
        marker_color='rgb(228, 87, 86)',
        text=scatter_plot_df_3.name_count,
        marker_size=scatter_plot_df_3.bucket_beer_count,
        textfont={"family":"Roboto, light",
                  "color":"black"
                 }
    ))

    fig.update_layout(
        title=f"Отношение количества отзывов к средней оценке пивоварни<br>за последние {n_days} дней, топ-{top_n} пивоварен",
        font={
                'family':'Roboto, light',
                'color':'black',
                'size':14
            },
        plot_bgcolor='rgba(0,0,0,0)',
        yaxis_title="Средняя оценка",
        xaxis_title="Количество отзывов",
        legend_title_text='Возраст пивоварни<br>на Untappd, лет:',
        height=750,
        shapes=dict_list,
        annotations=annotations_list
    )
    fig.show()
    return fig

 2 комментария    83   2020   dash   plotly   python   untappd
Ранее Ctrl + ↓