Как обсуждалось ранее, при работе с большими данными, непосредственный анализ которых в системе R невозможен, мы вынуждены обращаться к специализированным платформам для выполнения распределенных вычислений, в частности Spark. Тем не менее, благодаря таким пакетам, как sparklyr, мы можем использовать R в качестве клиента, который отправляет вычислительные задачи (англ. "push compute") на Spark-кластер, а затем собирает результаты вычислений (англ. "collect results") для дальнейшего анализа уже в самой системе R. Есть два способа формального представления подобных вычислительных задач перед их отправкой на кластер: либо с использованием SQL-команд, либо с помощью функций из пакета dplyr. Хотя SQL (точнее, Spark SQL, который в свою очередь основан на диалекте HiveQL) позволяет формулировать сколь угодно сложные операции над данными, в большинстве стандартных случаев dplyr более удобен, поскольку он хорошо знаком большинству современных пользователей R и обладает намного более лаконичным синтаксисом. В этом сообщении мы рассмотрим особенности анализа данных с помощью платформы Spark и пакета dplyr.

Постановка задачи

В приведенных ниже примерах использованы данные из пакета nycflights13, который содержит несколько таблиц с описанием 336776 авиарейсов, вылетевших из аэропортов Нью-Йорка в 2013 г. Запустим локальный Spark-кластер и загрузим в него необходимые таблицы:

require(sparklyr)
require(nycflights13)

# подключение к кластеру:
sc <- spark_connect(master = "local", version = "2.3")

# загрузка данных в кластер:
flights_tbl <- copy_to(sc, flights, "flights") # данные по рейсам
airlines_tbl <- copy_to(sc, airlines, "airlines") # данные по авиакомпаниям

Предположим, что перед нами стоит задача построить модель, которая предсказывает вероятность прибытия задержанного рейса без опоздания (при условии задержки вылета на 15-30 минут). Будем рассматривать эту задачу как случай бинарной классификации: интересующий нас отклик принимает значение 1 если задержанный рейс прибыл без опоздания, и 0 если нет.

Процесс построения предсказательных моделей обычно включает несколько шагов, первым из которых является подготовка и разведочный анализ данных. Сам разведочный анализ также может состоять из нескольких шагов, таких как выявление и устранение проблем с качеством анализируемых данных (пропущенные наблюдение, выбросы и др.), расчет описательных статистик, обнаружение наиболее перспективных предикторов для последующего включения в модель и т.п. Посмотрим, как с этим могут помочь команды из пакета dplyr.

Что можно делать с помощью dplyr и как это работает

Автоматический перевод команд dplyr на SQL

С помощью пакета dplyr можно выполнять следующие стандартные виды вычислений на Spark-кластере:
  • выбор, фильтрация и агрегирование переменных;
  • использование оконных функций;
  • объединение нескольких таблиц с помощью join-операторов;
  • импорт результатов вычислений из Spark в среду R.
К наиболее важным командам dplyr (их еще часто называют "глаголами") относятся select(), filter(), mutate(), summarise() и arrange(). Кроме того, для выполнения групповых операций используется команда group_by().

Эти и другие команды dplyr можно обычным образом объединять в "цепочки" с помощью оператора %>% из пакета magrittr (подгружается одновременно с dplyr и поэтому отдельно его вызывать не нужно). Так, в следующем примере мы подсчитываем общее количество полетов, выполненных каждой авиакомпанией за 2013 г., а затем выбираем 5 авиакомпаний с наибольшим числом полетов:

require(dplyr)

flights_tbl %>% 
  group_by(carrier) %>% 
  summarise(N = n()) %>% 
  arrange(desc(N)) %>% 
  head(5)

## # Source:     spark<?> [?? x 2]
## # Ordered by: desc(N)
##   carrier     N
##   <chr>   <dbl>
## 1 UA      58665
## 2 B6      54635
## 3 EV      54173
## 4 DL      48110
## 5 AA      32729

Поскольку Spark "понимает" только SQL, то в действительности все вычисления, заданные в R с помощью команд dplyr, перед отправкой на Spark-кластер автоматически переводятся на SQL следующим образом:
  • select() -> SELECT
  • filter() -> WHERE
  • mutate() -> операторы +, -, /, *, log и др.
  • summarise() -> функции агрегирования SUM, MIN, MAX и др.
  • arrange() -> ORDER
  • group_by() -> GROUP BY
Кроме того, dplyr автоматически переводит на SQL следующие базовые команды R:

# Математические операторы:
+, -, *, /, %%, ^
  
# Математические функции:
abs, acos, asin, asinh, atan, atan2, ceiling, cos, cosh, exp,
floor, log, log10, round, sign, sin, sinh, sqrt, tan, tanh

# Логические сравнения:
<, <=, !=, >=, >, ==, %in%

# Булевы операторы:
&, &&, |, ||, !

# Функции для работы с символьными строками:
paste, tolower, toupper, nchar

# Функции для преобразования типа переменной:
as.double, as.integer, as.logical, as.character, as.date

# Функции агрегирования:
mean, sum, min, max, sd, var, cor, cov, n

Удобная функция show_query() из пакета dplyr позволяет просмотреть SQL-запрос, который формируется из соответствующего кода R. Для приведенного выше примера получаем:

flights_tbl %>% 
    group_by(carrier) %>% 
    summarise(N = n()) %>% 
    arrange(desc(N)) %>% 
    head(5) %>% 
    show_query()

## <SQL>
## SELECT `carrier`, count(*) AS `N`
## FROM `flights`
## GROUP BY `carrier`
## ORDER BY `N` DESC
## LIMIT 5

Ленивые вычисления

Важно понимать, что как и в случае с базами данных, при работе со Spark'ом dplyr следует принципу "ленивых вычислений" (англ. "lazy evaluation"). Это значит, что все вычисления на Spark-кластере откладываются до последнего момента, пока не понадобится их результат. Кроме того, итоговый результат вычислений будет импортирован в среду R только если пользователь запросит его в явном виде. Например, приведенную выше последовательность команд мы могли бы прописать следующим образом:

command_1 <- group_by(flights_tbl, carrier) 
command_2 <- summarise(command_1, N = n())
command_3 <- arrange(command_2, desc(N)) 
command_4 <- head(command_3, n = 5)

На самом деле, ни одна из этих последовательных команд не будет выполнена до тех пор, пока мы не запросим результат вычислений. Как отмечалось ранее, в объектах command_1, command_2 ... command_4 хранится только информация, необходимая для подключения к Spark-кластеру, а также инструкции для соответствующих вычислений (выполните, например, str(command_1), чтобы убедиться в этом).

Под "запросом результата вычислений в явном виде" понимаются две ситуации: либо вывод результата на экран, либо сохранение его в локальный объект R:

# вывод на экран:
command_4

## # Source:     spark<?> [?? x 2]
## # Ordered by: desc(N)
##   carrier     N
##   <chr>   <dbl>
## 1 UA      58665
## 2 B6      54635
## 3 EV      54173
## 4 DL      48110
## 5 AA      32729

# импорт результата и сохранение в объект R:
result <- collect(command_4)
result

## # A tibble: 5 x 2
##   carrier     N
##   <chr>   <dbl>
## 1 UA      58665
## 2 B6      54635
## 3 EV      54173
## 4 DL      48110
## 5 AA      32729

Обратите внимание: во втором случае мы применили специальную команду collect() из пакета dplyr. В некотором смысле collect() противоположна copy_to(), которая была использована выше для импорта данных из R в Spark.

Объединение таблиц

В пакете dplyr есть несколько функций для выполнения стандартных JOIN-операций над двумя таблицами (подробнее см. справочный файл, доступный по команде ?join, а также здесь): inner_join(), left_join(), right_join(), full_join(), semi_join(), nested_join() и anti_join(). Так, в следующем примере выполнен LEFT JOIN таблицы flights_tbl с таблицей airlines_tbl по полю carrier:

flights_tbl %>% 
    left_join(airlines_tbl, by = "carrier") %>% 
    glimpse()

## Observations: ??
## Variables: 20
## Database: spark_connection
## $ year           <int> 2013, 2013, 2013, 2013, 2013, 201...
## $ month          <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...
## $ day            <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...
## $ dep_time       <int> 517, 533, 542, 544, 554, 554, 555...
## $ sched_dep_time <int> 515, 529, 540, 545, 600, 558, 600...
## $ dep_delay      <dbl> 2, 4, 2, -1, -6, -4, -5, -3, -3, ...
## $ arr_time       <int> 830, 850, 923, 1004, 812, 740, 91...
## $ sched_arr_time <int> 819, 830, 850, 1022, 837, 728, 85...
## $ arr_delay      <dbl> 11, 20, 33, -18, -25, 12, 19, -14...
## $ carrier        <chr> "UA", "UA", "AA", "B6", "DL", "UA...
## $ flight         <int> 1545, 1714, 1141, 725, 461, 1696,...
## $ tailnum        <chr> "N14228", "N24211", "N619AA", "N8...
## $ origin         <chr> "EWR", "LGA", "JFK", "JFK", "LGA"...
## $ dest           <chr> "IAH", "IAH", "MIA", "BQN", "ATL"...
## $ air_time       <dbl> 227, 227, 160, 183, 116, 150, 158...
## $ distance       <dbl> 1400, 1416, 1089, 1576, 762, 719,...
## $ hour           <dbl> 5, 5, 5, 5, 6, 5, 6, 6, 6, 6, 6, ...
## $ minute         <dbl> 15, 29, 40, 45, 0, 58, 0, 0, 0, 0...
## $ time_hour      <dttm> 2013-01-01 10:00:00, 2013-01-01 ...
## $ name           <chr> "United Air Lines Inc.", "United ...

Функции Hive Query Language

Хотя стандартные команды dplyr и некоторые базовые функции R без труда автоматически переводятся на SQL (см. выше), их будет недостаточно для разработки более сложных вычислений над данными с помощью Spark. К счастью, Spark SQL основан на языке запросов Hive Query Language (HiveQL) и все функции этого языка можно использовать в сочетании с командами dplyr. Полный перечень функций HiveQL можно найти в официальной документации.

Например, для вычисления медианного значения переменной dep_delay (задержка рейса, мин) из таблицы flights_tbl мы не можем просто воспользоваться базовыми функциями R median() или quantile() - это приведет к ошибке. Но мы без проблем можем применить Hive-функцию percentile():

flights_tbl %>% 
  summarise(median = percentile(dep_delay, 0.5))

## # Source: spark<?> [?? x 1]
##   median
##    <dbl>
## 1     -2

Когда при переводе кода R на SQL dplyr встречает незнакомую функцию, то он просто включает ее в SQL-запрос "как есть":

flights_tbl %>% 
  summarise(median = percentile(dep_delay, 0.5)) %>% 
  show_query()

## <SQL>
## SELECT percentile(`dep_delay`, 0.5) AS `median`
## FROM `flights`

Кстати, Hive-функция percentile() позволяет одновременно вычислить несколько процентилей. Для этого на нее нужно подать массив (array()) с требуемыми значениями процентилей:

flights_tbl %>% 
  summarise(perc = percentile(dep_delay, array(0.25, 0.5, 0.75)))

# Source: spark<?> [?? x 1]
  perc    
  <list>    
1 <list [3]>

Как видим, результатом выполнения приведенной команды является список с тремя значениями. Чтобы автоматически извлечь эти значения из списка служит Hive-функция explode():

flights_tbl %>% 
  summarise(perc = percentile(dep_delay, array(0.25, 0.5, 0.75))) %>% 
  mutate(perc = explode(perc))

## # Source: spark<?> [?? x 1]
##    perc
##   <dbl>
## 1    -5
## 2    -2
## 3    11

Пример разведочного анализа данных

Итак, мы разобрались с тем, как использовать пакет dplyr для формирования вычислительных задач для Spark. Применим теперь полученные знания в ходе разведочного анализа данных из таблицы flights_tbl.

Для начала выясним, есть ли в наших данных пропущенные значения. Это можно сделать несколькими способами. Ниже подсчет пропущенных значений выполнен для всех столбцов таблицы flights_tbl с помощью команды summarise_each() из пакета dplyr в сочетании с анонимной функцией, которая задает логику вычислений:

flights_tbl %>% 
  summarise_each(list(~sum(as.integer(is.na(.))))) %>% 
  glimpse

## Observations: ??
## Variables: 19
## Database: spark_connection
## $ year           <dbl> 0
## $ month          <dbl> 0
## $ day            <dbl> 0
## $ dep_time       <dbl> 8255
## $ sched_dep_time <dbl> 0
## $ dep_delay      <dbl> 8255
## $ arr_time       <dbl> 8713
## $ sched_arr_time <dbl> 0
## $ arr_delay      <dbl> 9430
## $ carrier        <dbl> 0
## $ flight         <dbl> 0
## $ tailnum        <dbl> 2512
## $ origin         <dbl> 0
## $ dest           <dbl> 0
## $ air_time       <dbl> 9430
## $ distance       <dbl> 0
## $ hour           <dbl> 0
## $ minute         <dbl> 0
## $ time_hour      <dbl> 124

Как видим, пропущенные значения действительно имеют место в некоторых переменных. Максимальное количество пропущенных значений достигает 9430, что составляет менее 3% от общего числа наблюдений в таблице (336776). Кстати, размерность таблицы можно выяснить с помощью команды sdf_dim() из пакета sparklyr, которая является аналогом базовой R-функции dim(). В пакете sparklyr есть и целый ряд других команд, имя которых начинается на sdf_ (от "Spark data frame"), например sdf_nrow(), sdf_ncol(), sdf_bind_rows(), sdf_pivot() и др. - мы познакомимся с некоторыми из этих функций по мере необходимости.

Так как доля пропущенных значений невелика, мы можем удалить соответствующие строки из таблицы без особого риска повлиять на качество последующего анализа. Для этого можно воспользоваться базовой функцией na.omit():

flights_full <- flights_tbl %>% na.omit()
## * Dropped 9554 rows with 'na.omit' (336776 => 327222)

flights_full %>% sdf_dim()
## [1] 327222     19

Поскольку нас интересуют рейсы, задержка которых составила от 15 до 30 мин (включительно), далее нам нужно отфильтровать данные соответствующим образом. Параллельно добавим новый столбец target со значениями зависимой переменной (см. разд. "Постановка задачи" выше):

flights <- flights_full %>% 
  filter(dep_delay >= 15, dep_delay <= 30) %>% 
  mutate(target = as.integer(arr_delay <= 0))

# размерность таблицы:
flights %>% sdf_dim()
## [1] 24507    20

В итоге мы получили таблицу с 24507 строками и 20 столбцами. Это совсем небольшая таблица по меркам любого современного компьютера и уже сейчас ее можно было бы импортировать из Spark в R (с помощью команды collect()) для дальнейшего анализа. Однако в целях демонстрации возможностей работы со Spark'ом из R мы оставим эту таблицу в памяти кластера.

Попробуем теперь выяснить, какие из имеющихся переменных коррелируют с зависимой переменной target. Логично было бы ожидать, что вероятность прибытия задержанного рейса по расписанию в значительной мере определяется расстоянием между аэропортом вылета и аэропортом прибытия (столбец distance, выражается в милях). Рассчитаем медианные значения этого расстояния для обоих классов зависимой переменной:

flights %>% 
  group_by(target) %>% 
  summarise(median_dist = percentile(distance, 0.5))

## # Source: spark<?> [?? x 2]
##  target median_dist
##   <int>       <dbl>
## 1      0         820
## 2      1        1089

И действительно: чем больше расстояние между аэропортами, тем больше шанс у задержанного рейса наверстать упущенное время и прибыть без опоздания.

Возможность наверстать упущенное время может также определяться различными факторами, связанными с авиакомпанией, которая выполняет тот или иной рейс (опыт пилотов, характеристики самолета и т.п.). Поэтому категориальная переменная carrier, содержащая сокращенные названия авиакомпаний, также может оказаться полезным предиктором. Чтобы понять так ли это, в приведенном ниже коде мы сгруппируем данные по уровням переменной carrier и подсчитаем количество задержанных рейсов, прибывших с опозданием и без. Далее мы визуализируем данные из полученной таблицы сопряженности с помощью мозаичной диаграммы (для этого потребуется пакет ggmosaic, который легко установить обычным образом из хранилища CRAN):

require(ggmosaic)

flights %>% 
  group_by(target, carrier) %>%
  tally() %>% # здесь заканчиваются вычисления на стороне Spark-кластера
  collect %>% # импорт результатов вычислений в R
  ggplot() +  # визуализация данных нативными средствами R
  geom_mosaic(aes(product(target, carrier), weight = n)) +
  labs(x = "Авиакомпания", y = "Отклик")


Как видим, авиакомпании действительно довольно заметно различаются по доле рейсов, прибывших без опоздания, в связи с чем переменную carrier можно рассматривать как потенциально полезный предиктор.

Последний пример кода является еще одной хорошей иллюстрацией упомянутого в начале этой статьи принципа "push compute / collect results": сначала мы оправляем из R инструкции для выполнения определенных вычислений на Spark-кластере, а затем импортируем полученный  результат в среду R и уже продолжаем анализ обычными для R средствами (в данном случае - изображаем данные графически с помощью ggplot2 и ggmosaic).

В таблице flights есть много других потенциально полезных предикторов, но я оставлю читателям возможность самостоятельно продолжить начатый выше разведочный анализ и поупражняться в работе с платформой Spark. В следующем сообщении мы рассмотрим реализованные в Spark алгоритмы машинного обучения и попробуем построить несложную модель, предсказывающую вероятность прибытия задержанного рейса без опоздания.


Послать комментарий

Новые Старые