В предыдущем сообщении о платформе распределенных вычислений Spark мы рассмотрели основы разведочного анализа данных, который является обязательным начальным этапом любого аналитического проекта. Подобные проекты редко заканчиваются лишь описанием свойств данных и обычно включают также формализацию обнаруженных закономерностей в виде предсказательных моделей. В состав платформы Spark входит библиотека MLlib, в которой реализован целый ряд распространенных алгоритмов машинного обучения для построения подобных моделей. Ниже приведено описание основ работы с этой библиотекой через интерфейс, предоставляемый пакетом sparklyr.

Типичный процесс построения моделей

Процесс построения предсказательных моделей с использованием Spark обычно включает следующие шаги:
  1. Выполнение SQL-запросов для извлечения необходимых данных (средствами пакета dplyr и/или функций Hive Query Language).
  2. Применение функций из пакета sparklyr, названия которых начинаются с приставок ft_ и sdf_, для преобразования исходных переменных в новые признаки и для разбиения данных на обучающую и проверочные выборки.
  3. Построение модели с помощью подходящего ситуации алгоритма машинного обучения. Для этого применяются функции из пакета sparklyr, названия которых начинаются с приставки ml_.
  4. Проверка качества модели (или нескольких альтернативных моделей) на проверочной выборке.
  5. Экспорт полученных результатов в среду R для дальнейшего анализа (например, для построения графиков, создания отчетов и т.п.)

Алгоритмы

В пакете sparklyr есть целое семейство функций, названия которых начинаются с приставки ml_. Эти функции предоставляют удобный интерфейс для работы с алгоритмами машинного обучения, реализованными в библиотеке MLlib. Ниже приведен список ml-функций и соответствующих им алгоритмов:
В случае с алгоритмами, выполняющими обучение с учителем, отклик и предикторы задаются либо с помощью обычной для R формулы, либо через аргументы response и features соответственно. Например, для подгонки логистической регрессии мы могли бы воспользоваться любым из приведенных ниже способов:

# Способ 1:
ml_logistic_regression(y ~ x1 + x2 + x3)
# Способ 2:
ml_logistic_regression(response = "y", features = c("x1", "x2", "x3"))

Пример построения модели

Продолжим работать с описанными ранее данными по авиарейсам, вылетевшим из аэропортов Нью-Йорка в 2013 г., и построим простую модель, которая предсказывает вероятность прибытия задержанного рейса без опоздания (при условии задержки вылета на 15-30 минут). Будем рассматривать эту задачу как случай бинарной классификации: отклик принимает значение 1, если задержанный рейс прибыл без опоздания, и 0 в противоположном случае. В приведенном ниже коде мы запускаем локальный Spark-кластер, загружаем в него необходимые данные, удаляем из них строки с пропущенными значениями и создаем новую переменную со значениями отклика:

require(sparklyr)
require(tidyverse)
require(nycflights13)

# подключение к кластеру:
sc <- spark_connect(master = "local", version = "2.3")

# загрузка данных в кластер:
flights_tbl <- copy_to(sc, flights, "flights") # данные по рейсам

# подготовка данных к моделированию:
flights <- flights_tbl %>% 
  na.omit() %>% 
  filter(dep_delay >= 15, dep_delay <= 30) %>% 
  mutate(target = as.integer(arr_delay <= 0))

В качестве алгоритма для построения модели воспользуемся логистической регрессией. В эту модель войдут три предиктора: distance - расстояние между аэропортом вылета и аэропортом прибытия, dep_delay - задержка вылета (в минутах) и carrier - обозначение авиакомпании, выполняющей рейс.

Проще всего подгонку модели можно выполнить, подав необходимые переменные на функцию ml_logistic_regression() в виде формулы. Однако перед тем, как мы это сделаем, разобьем исходные данные на обучающую и проверочную выборки (в соотношении 80% / 20%) с помощью функции sdf_random_split():

flights_splits <- flights %>% 
  select(target, carrier, distance, dep_delay) %>% 
  sdf_random_split(training = 0.8, testing = 0.2, seed = 1984)

flights_splits

## $training
## # Source: spark<?> [?? x 4]
##    target carrier distance dep_delay
##     <int> <chr>      <dbl>     <dbl>
##  1      0 9E            94        15
##  2      0 9E            94        15
##  3      0 9E            94        16
##  4      0 9E            94        16
##  5      0 9E            94        16
##  6      0 9E            94        17
##  7      0 9E            94        18
##  8      0 9E            94        19
##  9      0 9E            94        19
## 10      0 9E            94        19
## # ... with more rows
## 
## $testing
## # Source: spark<?> [?? x 4]
##    target carrier distance dep_delay
##     <int> <chr>      <dbl>     <dbl>
##  1      0 9E            94        15
##  2      0 9E            94        19
##  3      0 9E            94        19
##  4      0 9E            94        20
##  5      0 9E            94        23
##  6      0 9E            94        26
##  7      0 9E           184        16
##  8      0 9E           184        17
##  9      0 9E           184        18
## 10      0 9E           184        18
## # ... with more rows

Как видим, функция sdf_random_split() возвращает список с двумя элементами - training и testing, которые содержат указатели к соответствующим таблицам, хранящимся в памяти Spark-кластера.

Для подгонки модели теперь достаточно выполнить следующую простую команду:

m_a <- flights_splits$training %>%
  ml_logistic_regression(target ~ distance + carrier + dep_delay)

m_a

## Formula: target ~ distance + carrier + dep_delay
## 
## Coefficients:
##   (Intercept)      distance    carrier_UA    carrier_EV    carrier_B6 
## -0.5952734683  0.0005239528  1.3848849108  0.6757064463  0.7684918531 
##    carrier_DL    carrier_MQ    carrier_AA    carrier_9E    carrier_WN 
##  1.3375316352  0.3111300687  1.4387708009  1.9621412008  1.3693916205 
##    carrier_US    carrier_VX    carrier_FL    carrier_F9    carrier_YV 
##  0.0394677580  1.6989611819 -0.0947876393  1.0030570688  0.2357373400 
##    carrier_AS    carrier_HA     dep_delay 
##  1.4780191427  0.8431874600 -0.1263543212

Как отмечено выше, другой способ спецификации модели заключается в использовании аргументов response и features:

m_b <- flights_splits$training %>%
  ml_logistic_regression(response = "target", 
                         features = c("distance", "carrier", "dep_delay"))

m_b

## Formula: target ~ distance + carrier + dep_delay
## 
## Coefficients:
##   (Intercept)      distance    carrier_UA    carrier_EV    carrier_B6 
## -0.5952734683  0.0005239528  1.3848849108  0.6757064463  0.7684918531 
##    carrier_DL    carrier_MQ    carrier_AA    carrier_9E    carrier_WN 
##  1.3375316352  0.3111300687  1.4387708009  1.9621412008  1.3693916205 
##    carrier_US    carrier_VX    carrier_FL    carrier_F9    carrier_YV 
##  0.0394677580  1.6989611819 -0.0947876393  1.0030570688  0.2357373400 
##    carrier_AS    carrier_HA     dep_delay 
##  1.4780191427  0.8431874600 -0.1263543212

Как и следовало ожидать, оба описанных способа спецификации модели привели к идентичным оценкам ее коэффициентов. Эти два способа очень удобны, и если вы только начинаете работать с платформой Spark, то рекомендуется использовать именно их, поскольку они скрывают от пользователя многие этапы подготовки предикторов к подгонке модели в соответствии с требованиями Spark. Чтобы лучше понять, в чем заключаются некоторые из этих этапов, взглянем на таблицу с данными, которые в действительности были использованы при подгонке модели:

(r <- m_a$summary$predictions() %>% head() %>% collect())

## # A tibble: 6 x 9
##   target carrier distance dep_delay features label rawPrediction
##    <int> <chr>      <dbl>     <dbl> <list>   <dbl> <list>       
## 1      0 9E            94        15 <dbl [1~     0 <dbl [2]>    
## 2      0 9E            94        15 <dbl [1~     0 <dbl [2]>    
## 3      0 9E            94        16 <dbl [1~     0 <dbl [2]>    
## 4      0 9E            94        16 <dbl [1~     0 <dbl [2]>    
## 5      0 9E            94        16 <dbl [1~     0 <dbl [2]>    
## 6      0 9E            94        17 <dbl [1~     0 <dbl [2]>    
## # ... with 2 more variables: probability <list>, prediction <dbl>

Приведенная таблица включает исходные данные и несколько новых столбцов, добавленных в ходе вычислений. Особый интерес для нас представляет столбец features. Он является векторным столбцом (vector column), каждый элемент которого содержит вектор со значениями предикторов для соответствующего наблюдения из обучающей выборки:

r$features[1]
## [[1]]
##  [1] 94  0  0  0  0  0  0  1  0  0  0  0  0  0  0  0 15

r$features[3]
## [[1]]
##  [1] 94  0  0  0  0  0  0  1  0  0  0  0  0  0  0  0 16

r$features[6]
## [[1]]
##  [1] 94  0  0  0  0  0  0  1  0  0  0  0  0  0  0  0 17

В этих примерах первое значение каждого вектора соответствует расстоянию до аэропорта назначения (distance), следующие 15 значений - индикаторным переменным, созданным в соответствии с уровнями факторной переменной carrier, а последнее значение - длительности задержки рейса (dep_delay). Согласно требованиям Spark, значения всех предикторов должны быть представлены именно в таком виде, т.е. поданы на тот или иной алгоритм в виде единого векторного столбца. В следующем сообщении мы рассмотрим, как подготовку такого векторного столбца можно выполнить самостоятельно с помощью т.н. "преобразователей признаков" ("feature transformers").

Полученные нами модельные объекты представляют собой сложные списки, из которых можно извлечь много полезных вещей. В частности, в элементе summary этих списков хранятся различные метрики качества моделей, рассчитанные по обучающим данным:

m_a$summary
BinaryLogisticRegressionTrainingSummaryImpl 
 Access the following via `$` or `ml_summary()`. 
 - features_col() 
 - label_col() 
 - predictions() 
 - probability_col() 
 - area_under_roc() 
 - f_measure_by_threshold() 
 - pr() 
 - precision_by_threshold() 
 - recall_by_threshold() 
 - roc() 
 - prediction_col() 
 - accuracy() 
 - f_measure_by_label() 
 - false_positive_rate_by_label() 
 - labels() 
 - precision_by_label() 
 - recall_by_label() 
 - true_positive_rate_by_label() 
 - weighted_f_measure() 
 - weighted_false_positive_rate() 
 - weighted_precision() 
 - weighted_recall() 
 - weighted_true_positive_rate()

Так, например, мы можем извлечь значения площади под ROC-кривой и (взвешенную) F1-меру:

m_a$summary$area_under_roc()
## [1] 0.7344739

m_a$summary$weighted_f_measure()
## [1] 0.7493676

Знать подобные метрики для обучающих данных - хорошо, но, конечно, больше всего нас интересует качество предсказаний модели на независимых наборах данных. Для расчета этих показателей по проверочной выборке необходимо воспользоваться функцией ml_evaluate() из пакета sparklyr:

(m_a_eval <- ml_evaluate(m, flights_splits$testing))

## BinaryLogisticRegressionSummaryImpl 
##  Access the following via `$` or `ml_summary()`. 
##  - features_col() 
##  - label_col() 
##  - predictions() 
##  - probability_col() 
##  - area_under_roc() 
##  - f_measure_by_threshold() 
##  - pr() 
##  - precision_by_threshold() 
##  - recall_by_threshold() 
##  - roc() 
##  - prediction_col() 
##  - accuracy() 
##  - f_measure_by_label() 
##  - false_positive_rate_by_label() 
##  - labels() 
##  - precision_by_label() 
##  - recall_by_label() 
##  - true_positive_rate_by_label() 
##  - weighted_f_measure() 
##  - weighted_false_positive_rate() 
##  - weighted_precision() 
##  - weighted_recall() 
##  - weighted_true_positive_rate()

Площадь под ROC-кривой и F1-мера на проверочных данных составляют

m_a_eval$area_under_roc()
## [1] 0.7205595

m_a_eval$weighted_f_measure()
## [1] 0.7636891

...что вполне неплохо, учитывая, насколько простой является построенная нами модель.

В соответствии с рассмотренным ранее принципом "push compute / collect results", мы можем также импортировать результаты вычислений метрик качества модели в среду R для дальнейшего анализа. Так, например, мы можем построить ROC-кривую для проверочных данных:

roc_vals <- m_a_eval$roc() %>% collect()

ggplot(roc_vals, aes(FPR, TPR)) +
  geom_line() + geom_abline(linetype = 2) +
  theme_minimal()



Для расчета предсказаний с помощью построенной модели необходимо воспользоваться функцией ml_predict(), которая подобно стандартному методу predict(), принимает модельный объект и таблицу с новыми данными, по которым необходимо выполнить предсказания:

(pr <- ml_predict(m_a, flights_splits$testing) %>%
  head(100) %>% collect())

## # A tibble: 100 x 11
##    target carrier distance dep_delay features label rawPrediction
##     <int> <chr>      <dbl>     <dbl> <list>   <dbl> <list>       
##  1      0 9E            94        15 <dbl [1~     0 <dbl [2]>    
##  2      0 9E            94        19 <dbl [1~     0 <dbl [2]>    
##  3      0 9E            94        19 <dbl [1~     0 <dbl [2]>    
##  4      0 9E            94        20 <dbl [1~     0 <dbl [2]>    
##  5      0 9E            94        23 <dbl [1~     0 <dbl [2]>    
##  6      0 9E            94        26 <dbl [1~     0 <dbl [2]>    
##  7      0 9E           184        16 <dbl [1~     0 <dbl [2]>    
##  8      0 9E           184        17 <dbl [1~     0 <dbl [2]>    
##  9      0 9E           184        18 <dbl [1~     0 <dbl [2]>    
## 10      0 9E           184        18 <dbl [1~     0 <dbl [2]>    
## # ... with 90 more rows, and 4 more variables: probability <list>,
## #   prediction <dbl>, probability_0 <dbl>, probability_1 <dbl>

В данном случае мы имеем дело с предсказанием принадлежности к одному из двух классов. Соответствующие вероятности хранятся в столбцах probability_0 и probability_1.

С помощью команды ml_save() построенную модель можно сохранить для дальнейшего использования, а команда ml_load() позволяет загрузить сохраненную ранее модель в текущую Spark-сессию:

ml_save(m_a, path = "<директория_для_сохранения>", overwrite = TRUE)

m_loaded <- ml_load(sc = sc, path = "<директория_для_сохранения>")

# Отключение кластера:
# spark_disconnect(sc)


***
В этом сообщении мы рассмотрели основы построения предсказательных моделей с помощью реализованных в Spark алгоритмов. Для спецификации моделей мы применили удобные способы, которые абстрагируют от пользователя многие этапы подготовки данных к моделированию и выполнения подгонки моделей. Совокупность соответствующих этапов принято называть "конвейером" (англ. pipeline). Понятие "конвейера вычислительных команд" является очень важным для работы с платформой Spark и другими современными системами распределенных вычислений. В следующем сообщении мы познакомимся с этой темой подробнее.

2 Комментарии

edvardoss написал(а)…
Сергей, спасибо за статью. Довольно неплохо знаю H2O и совсем не знаю MLlib. Уже не помню в какой статье но было сравнение якобы MLlib проигрывал H2O по части простоты использования, масштабируемости и функциональности на своей родной площадке (H2O запускался в Sparkling Water). У Вас есть какой-то опыт в обоих платформах? Интересно узнать Ваше мнение...
Sergey Mastitsky написал(а)…
Согласен, MLlib проигрывает H2O по простоте и удобству использования. По производительности, в принципе, тоже, на такого рода заключения всегда неоднозначны и требуют хорошо продуманного "дизайна эксперимента" при выполнении сравнений.
Новые Старые