В
предыдущем сообщении
был приведен небольшой обзор основных алгоритмов машинного обучения,
реализованных в библиотеке MLlib, которая является частью платформы
распределенных вычислений Spark. В состав пакета
sparklyr для R, как мы выяснили, входят функции,
предоставляющие удобный интерфейс к соответствующим алгоритмам. В частности,
при работе с этими функциями пользователи могут задавать структуру подгоняемой
модели с помощью обычного для R синтаксиса формул (например,
y ~ x). Такой способ спецификации моделей удобен,
поскольку он абстрагирует от пользователя ряд важных этапов подготовки данных
и непосредственной подгонки модели. Однако при работе над сложными проектами
пользователю почти наверняка потребуется больший контроль над
вычислительными этапами и последовательностью их выполнения. В платформе Spark
такой контроль реализован в виде концепции "конвейера вычислительных задач" (англ. "pipeline"). Ниже мы рассмотрим, из чего подобные конвейеры состоят,
и как их создавать и использовать. В примерах по-прежнему используются данные
по авиарейсам, вылетевшим из аэропортов Нью-Йорка в 2013 г.:
require(sparklyr)
require(tidyverse)
require(nycflights13)
# подключение к кластеру:
sc <- spark_connect(master = "local", version = "2.3")
# загрузка данных в кластер и предварительная обработка:
flights <- copy_to(sc, flights, "flights") %>%
na.omit() %>%
filter(dep_delay >= 15, dep_delay <= 30)
# разбиение на обучающую и проверочную выборки:
splits <- flights %>%
sdf_random_split(train = 0.8, test = 0.2, seed = 1984)
Структурные блоки вычислительных конвейеров
Конвейер представляет собой последовательность вычислений, определяемых объектами двух типов (см. также рис. 1):
- transformer ("преобразователь", или "модификатор") - алгоритм, который выполняет определенные преобразования входной таблицы данных и возвращает другую, модифицированную таблицу. Часто к входной таблице просто добавляются столбцы с новыми переменными, рассчитанными в соответствии с логикой, определяемой объектом-преобразователем. Обученные модели также являются преобразователями, поскольку они добавляют в таблицу c предикторами столбцы с предсказанными значениями зависимой переменной.
- estimator ("оценщик", или "алгоритм оценки") - алгоритм, который создает из входной таблицы данных объект типа transformer. Так, все рассмотренные в предыдущем сообщении алгоритмы машинного обучения в Spark являются подобными "оценщиками", поскольку из таблицы с обучающими данными они создают объект предсказательной модели (который, как отмечено выше, является "преобразователем").
Рис. 1. Иллюстрация концепций "преобразователь" и "алгоритм оценки". Непосредственная модификация таблиц преобразователями выполняется путем вызова метода ml_transform(). Превращение же таблицы в преобразователь происходит с помощью метода ml_fit(). См. примеры в тексте |
Преобразователи
В пакете sparklyr есть целый ряд
функций-преобразователей, имена которых начинаются с приставки
ft_ (от "feature transformer"; см. полный список
здесь). Эти функции обычно принимают одну или несколько переменных и возвращают
результат в виде одной или нескольких новых переменных.
В
предыдущем сообщении
мы применили обычную функцию mutate() из пакета
dplyr для создания бинарной переменной
target (1 - задержанный рейс прибыл без
опоздания, 0 - с опозданием). Тот же результат можно было бы получить с
помощью функции-преобразователя ft_binarizer(), которая превращает числовые переменные в бинарные в соответствии с
заданным пользователем пороговым значением:
binarizer <- ft_binarizer(sc,
input_col = "arr_delay",
output_col = "target",
threshold = 0)
binarizer
## Binarizer (Transformer)
## <binarizer_2fc428d78c0>
## (Parameters -- Column Names)
## input_col: arr_delay
## output_col: target
Как видим, объект binarizer является
преобразователем (Transformer), который на входе
ожидает переменную arr_delay (input_col: arr_delay), и который в случае успешного завершения вычислений сохранит результат
в переменную target (output_col: target). Чтобы применить хранящиеся в этом объекте инструкции к конкретным данным необходимо воспользоваться функцией
ml_transform():
binarizer %>%
ml_transform(splits$train) %>%
select(arr_delay, target)
## # Source: spark<?> [?? x 2]
## arr_delay target
## <dbl> <dbl>
## 1 12 1
## 2 14 1
## 3 10 1
## 4 9 1
## 5 42 1
## 6 -6 0
## 7 13 1
## 8 4 1
## 9 43 1
## 10 36 1
# ... with more rows
Хотя в приведенном примере мы сначала создали объект
binarizer и только потом применили его к данным с
помощью ml_transform(), функцию
ft_binarizer() можно также непосредственно
применять к данным без этих дополнительных шагов (вызов
ml_transform() при этом произойдет автоматически
незаметно для пользователя):
flights %>% select(arr_delay) %>%
ft_binarizer(input_col = "arr_delay",
output_col = "target",
threshold = 0)
## # Source: spark<?> [?? x 2]
## arr_delay target
## <dbl> <dbl>
## 1 12 1
## 2 14 1
## 3 10 1
## 4 9 1
## 5 42 1
## 6 -6 0
## 7 13 1
## 8 4 1
## 9 43 1
## 10 36 1
# ... with more rows
Способы создания конвейеров
В пакете sparklyr реализовано два
способа создания конвейеров вычислительных задач и оба они предполагают
использование функции ml_pipeline(). Первый
способ заключается инициализации "пустого" конвейера с дальнейшим
добавлением в него необходимых вычислительных этапов, или
задач (англ. "stages"), с помощью функции ml_add_stages():
pl <- ml_pipeline(sc)
pl
## Pipeline (Estimator) with no stages
## <pipeline_2fc416c22fbb>
pl <- pl %>%
ml_add_stage(binarizer)
pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc417d4f5b>
## Stages
## |--1 Binarizer (Transformer)
## | <binarizer_2fc428d78c0>
## | (Parameters -- Column Names)
## | input_col: arr_delay
## | output_col: target
В приведенном примере объект pl сначала
представлял собой пустой конвейер, не содержащий ни одного вычислительного
этапа (Pipeline (Estimator) with no stages).
Затем в этот пустой конвейер был добавлен первый этап, логика которого хранилась в созданном нами выше объекте binarizer. В
результате объект pl стал конвейером с одной
вычислительной задачей.
Аналогичного результата можно было бы добиться также следующим образом:
pl <- ml_pipeline(sc) %>%
ft_binarizer(input_col = "arr_delay",
output_col = "target",
threshold = 0)
pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc42e9a420c>
## Stages
## |--1 Binarizer (Transformer)
## | <binarizer_2fc428d78c0>
## | (Parameters -- Column Names)
## | input_col: arr_delay
## | output_col: target
Второй способ создания конвейеров состоит в указании всех необходимых
вычислительных этапов в виде аргументов функции
ml_pipeline():
pl <- ml_pipeline(binarizer)
pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc42e9a420c>
## Stages
## |--1 Binarizer (Transformer)
## | <binarizer_2fc428d78c0>
## | (Parameters -- Column Names)
## | input_col: arr_delay
## | output_col: target
Обратите внимание на то, что полученный нами конвейер является "оценщиком"
(Pipeline (Estimator) with 1 stage). Эта справедливо для всех Spark-конвейеров, даже если они включают только
этапы модификации данных. Как следствие, для превращения в преобразователь к конвейеру нужно применить метод ml_fit().
Результатом "обучения" конвейера на некотором наборе данных является т.н.
"конвейерная модель" (pipeline model). Такую модель далее целиком можно
применять к новым наборам данных для выполнения всей цепочки соответствующих
вычислений. Рассмотрим это на более подробном примере.
Пример конвейера, выполняющего обучение предсказательной модели
В предыдущем сообщении мы уже построили простую модель, которая
предсказывает вероятность прибытия задержанного рейса без опоздания. Теперь
повторим создание этой модели, объединив последовательность необходимых этапов подготовки данных и подгонки логистической регрессии в один
конвейер. В частности, этот конвейер будет включать следующие этапы:
- Преобразование значений уровней категориальной переменной carrier (авиакомпания) в целочисленные метки;
- Преобразование полученных на первом этапе целочисленных меток в набор индикаторных переменных;
- Объединение значений всех предикторов в один векторный столбец (каждый элемент такого столбца содержит вектор со значениями предикторов);
- Стандартизация значений предикторов (ранее подобная стандартизация не выполнялась, но здесь мы применим ее в целях иллюстрации);
- Добавление бинарной переменной-отклика target (1 - задержанный рейс прибыл без опоздания, 0 - с опозданием);
- Подгонка логистической регрессии.
pm <- ml_pipeline(sc) %>%
ft_string_indexer(input_col = "carrier",
output_col ="carrier_idx") %>%
ft_one_hot_encoder(input_cols = "carrier_idx",
output_cols = "carrier_ohe") %>%
ft_vector_assembler(input_cols = c("dep_delay",
"distance",
"carrier_ohe"),
output_col = "features") %>%
ft_standard_scaler(input_col = "features",
output_col = "features_scaled",
with_mean = TRUE) %>%
ft_binarizer(input_col = "arr_delay",
output_col = "target",
threshold = 0) %>%
ml_logistic_regression(label_col = "target",
features_col = "features_scaled")
pm
## Pipeline (Estimator) with 6 stages
## <pipeline_2fc41c2e4890>
## Stages
## |--1 StringIndexer (Estimator)
## | <string_indexer_2fc44cc94662>
## | (Parameters -- Column Names)
## | input_col: carrier
## | output_col: carrier_idx
## | (Parameters)
## | handle_invalid: error
## | string_order_type: frequencyDesc
## |--2 OneHotEncoder (Transformer)
## | <one_hot_encoder_2fc462d2dfe>
## | (Parameters -- Column Names)
## | input_col: carrier_idx
## | output_col: carrier_ohe
## |--3 VectorAssembler (Transformer)
## | <vector_assembler_2fc4170319ee>
## | (Parameters -- Column Names)
## | input_cols: dep_delay, distance, carrier_ohe
## | output_col: features
## |--4 StandardScaler (Estimator)
## | <standard_scaler_2fc416462ef3>
## | (Parameters -- Column Names)
## | input_col: features
## | output_col: features_scaled
## | (Parameters)
## | with_mean: TRUE
## | with_std: TRUE
## |--5 Binarizer (Transformer)
## | <binarizer_2fc435b37cd7>
## | (Parameters -- Column Names)
## | input_col: arr_delay
## | output_col: target
## |--6 LogisticRegression (Estimator)
## | <logistic_regression_2fc45bc43ee1>
## | (Parameters -- Column Names)
## | features_col: features_scaled
## | label_col: target
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | aggregation_depth: 2
## | elastic_net_param: 0
## | family: auto
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
Полученный конвейер с 6-ю вычислительными этапами пока лишь хранит логику и
последовательность выполнения этих этапов. Никаких вычислений не
произошло и логичестическая регрессия пока построена не была - для этого необходимо
подать полученный конвейер на функцию ml_fit() в
сочетании с обучающими данными:
pm <- 6="" gt="" ml_fit="" pipeline_2fc41c2e4890="" pipelinemodel="" pm="" ransformer="" splits="" stages="" train="" with="">
## Stages
## |--1 StringIndexerModel (Transformer)
## |
## | (Parameters -- Column Names)
## | input_col: carrier
## | output_col: carrier_idx
## | (Transformer Info)
## | labels: chr [1:16] "UA" "EV" "B6" "DL" "MQ" "AA" "WN" ...
## |--2 OneHotEncoder (Transformer)
## |
## | (Parameters -- Column Names)
## | input_col: carrier_idx
## | output_col: carrier_ohe
## |--3 VectorAssembler (Transformer)
## |
## | (Parameters -- Column Names)
## | input_cols: dep_delay, distance, carrier_ohe
## | output_col: features
## |--4 StandardScalerModel (Transformer)
## |
## | (Parameters -- Column Names)
## | input_col: features
## | output_col: features_scaled
## | (Transformer Info)
## | mean: num [1:17] 21.579 1071.736 0.206 0.177 0.176 ...
## | std: num [1:17] 4.6 736.34 0.404 0.381 0.381 ...
## |--5 Binarizer (Transformer)
## |
## | (Parameters -- Column Names)
## | input_col: arr_delay
## | output_col: target
## |--6 LogisticRegressionModel (Transformer)
## |
## | (Parameters -- Column Names)
## | features_col: features_scaled
## | label_col: target
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficient_matrix: num [1, 1:17] 0.586 -0.394 -5.798 -5.215 -5.241 ...
## | coefficients: num [1:17] 0.586 -0.394 -5.798 -5.215 -5.241 ...
## | intercept: num 1.73
## | intercept_vector: num 1.73
## | num_classes: int 2
## | num_features: int 17
## | threshold: num 0.5
## | thresholds: num [1:2] 0.5 0.5 ->
Теперь мы получили "конвейерную модель" - объект-преобразователь (PipelineModel (Transformer) with 6 stages), который можно применять к новым наборам данных. Например, предсказания по данным из проверочной выборки легко получить следующим образом:
predictions <- pm %>%
ml_transform(splits$test)
predictions %>% select(target, prediction)
## # Source: spark<?> [?? x 2]
## target prediction
## <dbl> <dbl>
## 1 1 1
## 2 1 1
## 3 1 1
## 4 1 1
## 5 1 1
## 6 1 1
## 7 1 1
## 8 1 1
## 9 1 1
## 10 1 1
## # ... with more rows
Мы можем также оценить качество полученной модели, рассчитав площадь под ROC-кривой с помощью функции ml_binary_classification_evaluator():
ml_binary_classification_evaluator(predictions,
label_col = "target")
## [1] 0.7302388
Spark позволяет сохранять для дальнейшего использования не только модельные объекты, но и целые конвейеры. Для этого служат те же команды, что и рассмотренные нами ранее - ml_save() для сохранения на диск и ml_load() для загрузки с диска.
Отправить комментарий