В предыдущем сообщении о платформе распределенных вычислений Spark мы рассмотрели основы разведочного анализа данных, который является обязательным начальным этапом любого аналитического проекта. Подобные проекты редко заканчиваются лишь описанием свойств данных и обычно включают также формализацию обнаруженных закономерностей в виде предсказательных моделей. В состав платформы Spark входит библиотека MLlib, в которой реализован целый ряд распространенных алгоритмов машинного обучения для построения подобных моделей. Ниже приведено описание основ работы с этой библиотекой через интерфейс, предоставляемый пакетом sparklyr.
Типичный процесс построения моделей
Процесс построения предсказательных моделей с использованием Spark обычно включает следующие шаги:
- Выполнение SQL-запросов для извлечения необходимых данных (средствами пакета dplyr и/или функций Hive Query Language).
- Применение функций из пакета sparklyr, названия которых начинаются с приставок ft_ и sdf_, для преобразования исходных переменных в новые признаки и для разбиения данных на обучающую и проверочные выборки.
- Построение модели с помощью подходящего ситуации алгоритма машинного обучения. Для этого применяются функции из пакета sparklyr, названия которых начинаются с приставки ml_.
- Проверка качества модели (или нескольких альтернативных моделей) на проверочной выборке.
- Экспорт полученных результатов в среду R для дальнейшего анализа (например, для построения графиков, создания отчетов и т.п.)
Алгоритмы
В пакете sparklyr есть целое семейство функций, названия которых начинаются с приставки ml_. Эти функции предоставляют удобный интерфейс для работы с алгоритмами машинного обучения, реализованными в библиотеке MLlib. Ниже приведен список ml-функций и соответствующих им алгоритмов:
- ml_als() - коллаборативная фильтрация по методу "наименьших изменяющихся квадратов" (alternating least squares).
- ml_approx_nearest_neighbors() - метод приблизительных ближайших соседей с использованием LSH-понижения размерности данных (от "locality-sensitive hashing").
- ml_bisecting_kmeans() - кластеризация по методу k средних с "рассечением надвое".
- ml_decision_tree() - деревья решений.
- ml_generalized_linear_regression() - обобщенная линейная регрессия.
- ml_fpgrowth() - нахождение ассоциативных правил по методу FP-роста.
- ml_gradient_boosted_trees() - деревья решений на основе градиентного бустинга.
- ml_isotonic_regression() - изотоническая регрессия.
- ml_kmeans() - кластеризация по методу k средних.
- ml_lda() - анализ тем документов по методу латентного размещения Дирихле.
- ml_linear_regression() - линейная регрессия.
- ml_linear_svc() - классификатор на основе метода опорных векторов.
- ml_logistic_regression() - логистическая регрессия.
- ml_multilayer_perceptron() - многослойная нейронная сеть (многослойный перцептрон).
- ml_naive_bayes() - наивный байесовский классификатор.
- ml_one_vs_rest() - классификатор "один против всех" (для многоклассовых случаев).
- ml_pca() - анализ главных компонент.
- ml_random_forest() - метод "случайного леса".
- ml_survival_regression() - параметрическая модель выживания (по методу "ускоренного времени отказа").
В случае с алгоритмами, выполняющими обучение с учителем, отклик и предикторы задаются либо с помощью обычной для R формулы, либо через аргументы response и features соответственно. Например, для подгонки логистической регрессии мы могли бы воспользоваться любым из приведенных ниже способов:
# Способ 1:
ml_logistic_regression(y ~ x1 + x2 + x3)
# Способ 2:
ml_logistic_regression(response = "y", features = c("x1", "x2", "x3"))
Пример построения модели
Продолжим работать с описанными ранее данными по авиарейсам, вылетевшим из аэропортов Нью-Йорка в 2013 г., и построим простую модель, которая предсказывает вероятность прибытия задержанного рейса без опоздания (при условии задержки вылета на 15-30 минут). Будем рассматривать эту задачу как случай бинарной классификации: отклик принимает значение 1, если задержанный рейс прибыл без опоздания, и 0 в противоположном случае. В приведенном ниже коде мы запускаем локальный Spark-кластер, загружаем в него необходимые данные, удаляем из них строки с пропущенными значениями и создаем новую переменную со значениями отклика:
В качестве алгоритма для построения модели воспользуемся логистической регрессией. В эту модель войдут три предиктора: distance - расстояние между аэропортом вылета и аэропортом прибытия, dep_delay - задержка вылета (в минутах) и carrier - обозначение авиакомпании, выполняющей рейс.
Проще всего подгонку модели можно выполнить, подав необходимые переменные на функцию ml_logistic_regression() в виде формулы. Однако перед тем, как мы это сделаем, разобьем исходные данные на обучающую и проверочную выборки (в соотношении 80% / 20%) с помощью функции sdf_random_split():
Как видим, функция sdf_random_split() возвращает список с двумя элементами - training и testing, которые содержат указатели к соответствующим таблицам, хранящимся в памяти Spark-кластера.
Для подгонки модели теперь достаточно выполнить следующую простую команду:
Как отмечено выше, другой способ спецификации модели заключается в использовании аргументов response и features:
Как и следовало ожидать, оба описанных способа спецификации модели привели к идентичным оценкам ее коэффициентов. Эти два способа очень удобны, и если вы только начинаете работать с платформой Spark, то рекомендуется использовать именно их, поскольку они скрывают от пользователя многие этапы подготовки предикторов к подгонке модели в соответствии с требованиями Spark. Чтобы лучше понять, в чем заключаются некоторые из этих этапов, взглянем на таблицу с данными, которые в действительности были использованы при подгонке модели:
Приведенная таблица включает исходные данные и несколько новых столбцов, добавленных в ходе вычислений. Особый интерес для нас представляет столбец features. Он является векторным столбцом (vector column), каждый элемент которого содержит вектор со значениями предикторов для соответствующего наблюдения из обучающей выборки:
В этих примерах первое значение каждого вектора соответствует расстоянию до аэропорта назначения (distance), следующие 15 значений - индикаторным переменным, созданным в соответствии с уровнями факторной переменной carrier, а последнее значение - длительности задержки рейса (dep_delay). Согласно требованиям Spark, значения всех предикторов должны быть представлены именно в таком виде, т.е. поданы на тот или иной алгоритм в виде единого векторного столбца. В следующем сообщении мы рассмотрим, как подготовку такого векторного столбца можно выполнить самостоятельно с помощью т.н. "преобразователей признаков" ("feature transformers").
Полученные нами модельные объекты представляют собой сложные списки, из которых можно извлечь много полезных вещей. В частности, в элементе summary этих списков хранятся различные метрики качества моделей, рассчитанные по обучающим данным:
Так, например, мы можем извлечь значения площади под ROC-кривой и (взвешенную) F1-меру:
Знать подобные метрики для обучающих данных - хорошо, но, конечно, больше всего нас интересует качество предсказаний модели на независимых наборах данных. Для расчета этих показателей по проверочной выборке необходимо воспользоваться функцией ml_evaluate() из пакета sparklyr:
Площадь под ROC-кривой и F1-мера на проверочных данных составляют
...что вполне неплохо, учитывая, насколько простой является построенная нами модель.
В соответствии с рассмотренным ранее принципом "push compute / collect results", мы можем также импортировать результаты вычислений метрик качества модели в среду R для дальнейшего анализа. Так, например, мы можем построить ROC-кривую для проверочных данных:
Для расчета предсказаний с помощью построенной модели необходимо воспользоваться функцией ml_predict(), которая подобно стандартному методу predict(), принимает модельный объект и таблицу с новыми данными, по которым необходимо выполнить предсказания:
В данном случае мы имеем дело с предсказанием принадлежности к одному из двух классов. Соответствующие вероятности хранятся в столбцах probability_0 и probability_1.
С помощью команды ml_save() построенную модель можно сохранить для дальнейшего использования, а команда ml_load() позволяет загрузить сохраненную ранее модель в текущую Spark-сессию:
require(sparklyr)
require(tidyverse)
require(nycflights13)
# подключение к кластеру:
sc <- spark_connect(master = "local", version = "2.3")
# загрузка данных в кластер:
flights_tbl <- copy_to(sc, flights, "flights") # данные по рейсам
# подготовка данных к моделированию:
flights <- flights_tbl %>%
na.omit() %>%
filter(dep_delay >= 15, dep_delay <= 30) %>%
mutate(target = as.integer(arr_delay <= 0))
В качестве алгоритма для построения модели воспользуемся логистической регрессией. В эту модель войдут три предиктора: distance - расстояние между аэропортом вылета и аэропортом прибытия, dep_delay - задержка вылета (в минутах) и carrier - обозначение авиакомпании, выполняющей рейс.
Проще всего подгонку модели можно выполнить, подав необходимые переменные на функцию ml_logistic_regression() в виде формулы. Однако перед тем, как мы это сделаем, разобьем исходные данные на обучающую и проверочную выборки (в соотношении 80% / 20%) с помощью функции sdf_random_split():
flights_splits <- flights %>%
select(target, carrier, distance, dep_delay) %>%
sdf_random_split(training = 0.8, testing = 0.2, seed = 1984)
flights_splits
## $training
## # Source: spark<?> [?? x 4]
## target carrier distance dep_delay
## <int> <chr> <dbl> <dbl>
## 1 0 9E 94 15
## 2 0 9E 94 15
## 3 0 9E 94 16
## 4 0 9E 94 16
## 5 0 9E 94 16
## 6 0 9E 94 17
## 7 0 9E 94 18
## 8 0 9E 94 19
## 9 0 9E 94 19
## 10 0 9E 94 19
## # ... with more rows
##
## $testing
## # Source: spark<?> [?? x 4]
## target carrier distance dep_delay
## <int> <chr> <dbl> <dbl>
## 1 0 9E 94 15
## 2 0 9E 94 19
## 3 0 9E 94 19
## 4 0 9E 94 20
## 5 0 9E 94 23
## 6 0 9E 94 26
## 7 0 9E 184 16
## 8 0 9E 184 17
## 9 0 9E 184 18
## 10 0 9E 184 18
## # ... with more rows
Как видим, функция sdf_random_split() возвращает список с двумя элементами - training и testing, которые содержат указатели к соответствующим таблицам, хранящимся в памяти Spark-кластера.
Для подгонки модели теперь достаточно выполнить следующую простую команду:
m_a <- flights_splits$training %>%
ml_logistic_regression(target ~ distance + carrier + dep_delay)
m_a
## Formula: target ~ distance + carrier + dep_delay
##
## Coefficients:
## (Intercept) distance carrier_UA carrier_EV carrier_B6
## -0.5952734683 0.0005239528 1.3848849108 0.6757064463 0.7684918531
## carrier_DL carrier_MQ carrier_AA carrier_9E carrier_WN
## 1.3375316352 0.3111300687 1.4387708009 1.9621412008 1.3693916205
## carrier_US carrier_VX carrier_FL carrier_F9 carrier_YV
## 0.0394677580 1.6989611819 -0.0947876393 1.0030570688 0.2357373400
## carrier_AS carrier_HA dep_delay
## 1.4780191427 0.8431874600 -0.1263543212
Как отмечено выше, другой способ спецификации модели заключается в использовании аргументов response и features:
m_b <- flights_splits$training %>%
ml_logistic_regression(response = "target",
features = c("distance", "carrier", "dep_delay"))
m_b
## Formula: target ~ distance + carrier + dep_delay
##
## Coefficients:
## (Intercept) distance carrier_UA carrier_EV carrier_B6
## -0.5952734683 0.0005239528 1.3848849108 0.6757064463 0.7684918531
## carrier_DL carrier_MQ carrier_AA carrier_9E carrier_WN
## 1.3375316352 0.3111300687 1.4387708009 1.9621412008 1.3693916205
## carrier_US carrier_VX carrier_FL carrier_F9 carrier_YV
## 0.0394677580 1.6989611819 -0.0947876393 1.0030570688 0.2357373400
## carrier_AS carrier_HA dep_delay
## 1.4780191427 0.8431874600 -0.1263543212
Как и следовало ожидать, оба описанных способа спецификации модели привели к идентичным оценкам ее коэффициентов. Эти два способа очень удобны, и если вы только начинаете работать с платформой Spark, то рекомендуется использовать именно их, поскольку они скрывают от пользователя многие этапы подготовки предикторов к подгонке модели в соответствии с требованиями Spark. Чтобы лучше понять, в чем заключаются некоторые из этих этапов, взглянем на таблицу с данными, которые в действительности были использованы при подгонке модели:
(r <- m_a$summary$predictions() %>% head() %>% collect())
## # A tibble: 6 x 9
## target carrier distance dep_delay features label rawPrediction
## <int> <chr> <dbl> <dbl> <list> <dbl> <list>
## 1 0 9E 94 15 <dbl [1~ 0 <dbl [2]>
## 2 0 9E 94 15 <dbl [1~ 0 <dbl [2]>
## 3 0 9E 94 16 <dbl [1~ 0 <dbl [2]>
## 4 0 9E 94 16 <dbl [1~ 0 <dbl [2]>
## 5 0 9E 94 16 <dbl [1~ 0 <dbl [2]>
## 6 0 9E 94 17 <dbl [1~ 0 <dbl [2]>
## # ... with 2 more variables: probability <list>, prediction <dbl>
Приведенная таблица включает исходные данные и несколько новых столбцов, добавленных в ходе вычислений. Особый интерес для нас представляет столбец features. Он является векторным столбцом (vector column), каждый элемент которого содержит вектор со значениями предикторов для соответствующего наблюдения из обучающей выборки:
r$features[1]
## [[1]]
## [1] 94 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 15
r$features[3]
## [[1]]
## [1] 94 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 16
r$features[6]
## [[1]]
## [1] 94 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 17
В этих примерах первое значение каждого вектора соответствует расстоянию до аэропорта назначения (distance), следующие 15 значений - индикаторным переменным, созданным в соответствии с уровнями факторной переменной carrier, а последнее значение - длительности задержки рейса (dep_delay). Согласно требованиям Spark, значения всех предикторов должны быть представлены именно в таком виде, т.е. поданы на тот или иной алгоритм в виде единого векторного столбца. В следующем сообщении мы рассмотрим, как подготовку такого векторного столбца можно выполнить самостоятельно с помощью т.н. "преобразователей признаков" ("feature transformers").
Полученные нами модельные объекты представляют собой сложные списки, из которых можно извлечь много полезных вещей. В частности, в элементе summary этих списков хранятся различные метрики качества моделей, рассчитанные по обучающим данным:
m_a$summary
BinaryLogisticRegressionTrainingSummaryImpl
Access the following via `$` or `ml_summary()`.
- features_col()
- label_col()
- predictions()
- probability_col()
- area_under_roc()
- f_measure_by_threshold()
- pr()
- precision_by_threshold()
- recall_by_threshold()
- roc()
- prediction_col()
- accuracy()
- f_measure_by_label()
- false_positive_rate_by_label()
- labels()
- precision_by_label()
- recall_by_label()
- true_positive_rate_by_label()
- weighted_f_measure()
- weighted_false_positive_rate()
- weighted_precision()
- weighted_recall()
- weighted_true_positive_rate()
Так, например, мы можем извлечь значения площади под ROC-кривой и (взвешенную) F1-меру:
m_a$summary$area_under_roc()
## [1] 0.7344739
m_a$summary$weighted_f_measure()
## [1] 0.7493676
Знать подобные метрики для обучающих данных - хорошо, но, конечно, больше всего нас интересует качество предсказаний модели на независимых наборах данных. Для расчета этих показателей по проверочной выборке необходимо воспользоваться функцией ml_evaluate() из пакета sparklyr:
(m_a_eval <- ml_evaluate(m, flights_splits$testing))
## BinaryLogisticRegressionSummaryImpl
## Access the following via `$` or `ml_summary()`.
## - features_col()
## - label_col()
## - predictions()
## - probability_col()
## - area_under_roc()
## - f_measure_by_threshold()
## - pr()
## - precision_by_threshold()
## - recall_by_threshold()
## - roc()
## - prediction_col()
## - accuracy()
## - f_measure_by_label()
## - false_positive_rate_by_label()
## - labels()
## - precision_by_label()
## - recall_by_label()
## - true_positive_rate_by_label()
## - weighted_f_measure()
## - weighted_false_positive_rate()
## - weighted_precision()
## - weighted_recall()
## - weighted_true_positive_rate()
Площадь под ROC-кривой и F1-мера на проверочных данных составляют
m_a_eval$area_under_roc()
## [1] 0.7205595
m_a_eval$weighted_f_measure()
## [1] 0.7636891
...что вполне неплохо, учитывая, насколько простой является построенная нами модель.
В соответствии с рассмотренным ранее принципом "push compute / collect results", мы можем также импортировать результаты вычислений метрик качества модели в среду R для дальнейшего анализа. Так, например, мы можем построить ROC-кривую для проверочных данных:
roc_vals <- m_a_eval$roc() %>% collect()
ggplot(roc_vals, aes(FPR, TPR)) +
geom_line() + geom_abline(linetype = 2) +
theme_minimal()
Для расчета предсказаний с помощью построенной модели необходимо воспользоваться функцией ml_predict(), которая подобно стандартному методу predict(), принимает модельный объект и таблицу с новыми данными, по которым необходимо выполнить предсказания:
(pr <- ml_predict(m_a, flights_splits$testing) %>%
head(100) %>% collect())
## # A tibble: 100 x 11
## target carrier distance dep_delay features label rawPrediction
## <int> <chr> <dbl> <dbl> <list> <dbl> <list>
## 1 0 9E 94 15 <dbl [1~ 0 <dbl [2]>
## 2 0 9E 94 19 <dbl [1~ 0 <dbl [2]>
## 3 0 9E 94 19 <dbl [1~ 0 <dbl [2]>
## 4 0 9E 94 20 <dbl [1~ 0 <dbl [2]>
## 5 0 9E 94 23 <dbl [1~ 0 <dbl [2]>
## 6 0 9E 94 26 <dbl [1~ 0 <dbl [2]>
## 7 0 9E 184 16 <dbl [1~ 0 <dbl [2]>
## 8 0 9E 184 17 <dbl [1~ 0 <dbl [2]>
## 9 0 9E 184 18 <dbl [1~ 0 <dbl [2]>
## 10 0 9E 184 18 <dbl [1~ 0 <dbl [2]>
## # ... with 90 more rows, and 4 more variables: probability <list>,
## # prediction <dbl>, probability_0 <dbl>, probability_1 <dbl>
В данном случае мы имеем дело с предсказанием принадлежности к одному из двух классов. Соответствующие вероятности хранятся в столбцах probability_0 и probability_1.
С помощью команды ml_save() построенную модель можно сохранить для дальнейшего использования, а команда ml_load() позволяет загрузить сохраненную ранее модель в текущую Spark-сессию:
ml_save(m_a, path = "<директория_для_сохранения>", overwrite = TRUE)
m_loaded <- ml_load(sc = sc, path = "<директория_для_сохранения>")
# Отключение кластера:
# spark_disconnect(sc)
***
В этом сообщении мы рассмотрели основы построения предсказательных моделей с помощью реализованных в Spark алгоритмов. Для спецификации моделей мы применили удобные способы, которые абстрагируют от пользователя многие этапы подготовки данных к моделированию и выполнения подгонки моделей. Совокупность соответствующих этапов принято называть "конвейером" (англ. pipeline). Понятие "конвейера вычислительных команд" является очень важным для работы с платформой Spark и другими современными системами распределенных вычислений. В следующем сообщении мы познакомимся с этой темой подробнее.
Отправить комментарий