В предыдущем сообщении был приведен небольшой обзор основных алгоритмов машинного обучения, реализованных в библиотеке MLlib, которая является частью платформы распределенных вычислений Spark. В состав пакета sparklyr для R, как мы выяснили, входят функции, предоставляющие удобный интерфейс к соответствующим алгоритмам. В частности, при работе с этими функциями пользователи могут задавать структуру подгоняемой модели с помощью обычного для R синтаксиса формул (например, y ~ x). Такой способ спецификации моделей удобен, поскольку он абстрагирует от пользователя ряд важных этапов подготовки данных и непосредственной подгонки модели. Однако при работе над сложными проектами пользователю почти наверняка потребуется больший контроль над вычислительными этапами и последовательностью их выполнения. В платформе Spark такой контроль реализован в виде концепции "конвейера вычислительных задач" (англ. "pipeline"). Ниже мы рассмотрим, из чего подобные конвейеры состоят, и как их создавать и использовать. В примерах по-прежнему используются данные по авиарейсам, вылетевшим из аэропортов Нью-Йорка в 2013 г.:
require(sparklyr)
require(tidyverse)
require(nycflights13)

# подключение к кластеру:
sc <- spark_connect(master = "local", version = "2.3")

# загрузка данных в кластер и предварительная обработка:
flights <- copy_to(sc, flights, "flights") %>% 
  na.omit() %>% 
  filter(dep_delay >= 15, dep_delay <= 30)

# разбиение на обучающую и проверочную выборки:
splits <- flights %>% 
  sdf_random_split(train = 0.8, test = 0.2, seed = 1984)

Структурные блоки вычислительных конвейеров

Конвейер представляет собой последовательность вычислений, определяемых объектами двух типов (см. также рис. 1):
  • transformer ("преобразователь", или "модификатор") - алгоритм, который выполняет определенные преобразования входной таблицы данных и возвращает другую, модифицированную таблицу. Часто к входной таблице просто добавляются столбцы с новыми переменными, рассчитанными в соответствии с логикой, определяемой объектом-преобразователем. Обученные модели также являются преобразователями, поскольку они  добавляют в таблицу c предикторами столбцы с предсказанными значениями зависимой переменной.
  • estimator ("оценщик", или "алгоритм оценки") - алгоритм, который создает из входной таблицы данных объект типа transformer. Так, все рассмотренные в предыдущем сообщении алгоритмы машинного обучения в Spark являются подобными "оценщиками", поскольку из таблицы с обучающими данными они создают объект предсказательной модели (который, как отмечено выше, является "преобразователем").


Рис. 1. Иллюстрация концепций "преобразователь" и "алгоритм оценки". Непосредственная модификация таблиц преобразователями выполняется путем вызова метода             ml_transform(). Превращение же таблицы в преобразователь происходит с помощью метода ml_fit(). См. примеры в тексте

Преобразователи

В пакете sparklyr есть целый ряд функций-преобразователей, имена которых начинаются с приставки ft_ (от "feature transformer"; см. полный список здесь). Эти функции обычно принимают одну или несколько переменных и возвращают результат в виде одной или нескольких новых переменных.

В предыдущем сообщении мы применили обычную функцию mutate() из пакета dplyr для создания бинарной переменной target (1 - задержанный рейс прибыл без опоздания, 0 - с опозданием). Тот же результат можно было бы получить с помощью функции-преобразователя ft_binarizer(), которая превращает числовые переменные в бинарные в соответствии с заданным пользователем пороговым значением:
binarizer <- ft_binarizer(sc, 
                          input_col = "arr_delay", 
                          output_col = "target",
                          threshold = 0)

binarizer

## Binarizer (Transformer)
## <binarizer_2fc428d78c0> 
##  (Parameters -- Column Names)
##   input_col: arr_delay
##   output_col: target
Как видим, объект binarizer является преобразователем (Transformer), который на входе ожидает переменную arr_delay (input_col: arr_delay), и который в случае успешного завершения вычислений сохранит результат в переменную target (output_col: target). Чтобы применить хранящиеся в этом объекте инструкции к конкретным данным необходимо воспользоваться функцией ml_transform():
binarizer %>% 
  ml_transform(splits$train) %>% 
  select(arr_delay, target)

## # Source: spark<?> [?? x 2]
##    arr_delay target
##        <dbl>  <dbl>
##  1        12      1
##  2        14      1
##  3        10      1
##  4         9      1
##  5        42      1
##  6        -6      0
##  7        13      1
##  8         4      1
##  9        43      1
## 10        36      1
# ... with more rows
Хотя в приведенном примере мы сначала создали объект binarizer и только потом применили его к данным с помощью ml_transform(), функцию ft_binarizer() можно также непосредственно применять к данным без этих дополнительных шагов (вызов ml_transform() при этом произойдет автоматически незаметно для пользователя):
flights %>% select(arr_delay) %>% 
  ft_binarizer(input_col = "arr_delay", 
               output_col = "target", 
               threshold = 0)

## # Source: spark<?> [?? x 2]
##    arr_delay target
##        <dbl>  <dbl>
##  1        12      1
##  2        14      1
##  3        10      1
##  4         9      1
##  5        42      1
##  6        -6      0
##  7        13      1
##  8         4      1
##  9        43      1
## 10        36      1
# ... with more rows

Способы создания конвейеров

В пакете sparklyr реализовано два способа создания конвейеров вычислительных задач и оба они предполагают использование функции ml_pipeline(). Первый способ заключается инициализации "пустого" конвейера с дальнейшим добавлением в него необходимых вычислительных этапов, или задач (англ. "stages"), с помощью функции ml_add_stages():
pl <- ml_pipeline(sc)

pl
## Pipeline (Estimator) with no stages
## <pipeline_2fc416c22fbb>

pl <- pl %>% 
  ml_add_stage(binarizer)

pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc417d4f5b> 
##   Stages 
##   |--1 Binarizer (Transformer)
##   |    <binarizer_2fc428d78c0> 
##   |     (Parameters -- Column Names)
##   |      input_col: arr_delay
##   |      output_col: target
В приведенном примере объект pl сначала представлял собой пустой конвейер, не содержащий ни одного вычислительного этапа (Pipeline (Estimator) with no stages). Затем в этот пустой конвейер был добавлен первый этап, логика которого хранилась в созданном нами выше объекте binarizer. В результате объект pl стал конвейером с одной вычислительной задачей.

Аналогичного результата можно было бы добиться также следующим образом:
pl <-  ml_pipeline(sc) %>% 
  ft_binarizer(input_col = "arr_delay", 
               output_col = "target", 
               threshold = 0)

pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc42e9a420c> 
##   Stages 
##   |--1 Binarizer (Transformer)
##   |    <binarizer_2fc428d78c0> 
##   |     (Parameters -- Column Names)
##   |      input_col: arr_delay
##   |      output_col: target
Второй способ создания конвейеров состоит в указании всех необходимых  вычислительных этапов в виде аргументов функции ml_pipeline():
pl <- ml_pipeline(binarizer)

pl
## Pipeline (Estimator) with 1 stage
## <pipeline_2fc42e9a420c> 
##   Stages 
##   |--1 Binarizer (Transformer)
##   |    <binarizer_2fc428d78c0> 
##   |     (Parameters -- Column Names)
##   |      input_col: arr_delay
##   |      output_col: target

Обратите внимание на то, что полученный нами конвейер является "оценщиком" (Pipeline (Estimator) with 1 stage). Эта справедливо для всех Spark-конвейеров, даже если они включают только этапы модификации данных. Как следствие, для превращения в преобразователь к конвейеру нужно применить метод ml_fit(). Результатом "обучения" конвейера на некотором наборе данных является т.н. "конвейерная модель" (pipeline model). Такую модель далее целиком можно применять к новым наборам данных для выполнения всей цепочки соответствующих вычислений. Рассмотрим это на более подробном примере.

Пример конвейера, выполняющего обучение предсказательной модели

В предыдущем сообщении мы уже построили простую модель, которая предсказывает вероятность прибытия задержанного рейса без опоздания. Теперь повторим создание этой модели, объединив последовательность необходимых этапов подготовки данных и подгонки логистической регрессии в один конвейер. В частности, этот конвейер будет включать следующие этапы:
  1. Преобразование значений уровней категориальной переменной carrier (авиакомпания) в целочисленные метки;
  2. Преобразование полученных на первом этапе целочисленных меток в набор индикаторных переменных;
  3. Объединение значений всех предикторов в один векторный столбец (каждый элемент такого столбца содержит вектор со значениями предикторов);
  4. Стандартизация значений предикторов (ранее подобная стандартизация не выполнялась, но здесь мы применим ее в целях иллюстрации);
  5. Добавление бинарной переменной-отклика target (1 - задержанный рейс прибыл без опоздания, 0 - с опозданием);
  6. Подгонка логистической регрессии.
Конвейер с перечисленными этапами можно создать следующим образом:
pm <- ml_pipeline(sc) %>% 
  ft_string_indexer(input_col = "carrier", 
                    output_col ="carrier_idx") %>% 
  ft_one_hot_encoder(input_cols = "carrier_idx", 
                     output_cols = "carrier_ohe") %>% 
  ft_vector_assembler(input_cols = c("dep_delay",
                                     "distance",
                                     "carrier_ohe"),
                      output_col = "features") %>% 
  ft_standard_scaler(input_col = "features", 
                     output_col = "features_scaled",
                     with_mean = TRUE) %>%
  ft_binarizer(input_col = "arr_delay", 
               output_col = "target", 
               threshold = 0) %>% 
  ml_logistic_regression(label_col = "target",
                         features_col = "features_scaled")

pm
## Pipeline (Estimator) with 6 stages
## <pipeline_2fc41c2e4890> 
##   Stages 
##   |--1 StringIndexer (Estimator)
##   |    <string_indexer_2fc44cc94662> 
##   |     (Parameters -- Column Names)
##   |      input_col: carrier
##   |      output_col: carrier_idx
##   |     (Parameters)
##   |      handle_invalid: error
##   |      string_order_type: frequencyDesc
##   |--2 OneHotEncoder (Transformer)
##   |    <one_hot_encoder_2fc462d2dfe> 
##   |     (Parameters -- Column Names)
##   |      input_col: carrier_idx
##   |      output_col: carrier_ohe
##   |--3 VectorAssembler (Transformer)
##   |    <vector_assembler_2fc4170319ee> 
##   |     (Parameters -- Column Names)
##   |      input_cols: dep_delay, distance, carrier_ohe
##   |      output_col: features
##   |--4 StandardScaler (Estimator)
##   |    <standard_scaler_2fc416462ef3> 
##   |     (Parameters -- Column Names)
##   |      input_col: features
##   |      output_col: features_scaled
##   |     (Parameters)
##   |      with_mean: TRUE
##   |      with_std: TRUE
##   |--5 Binarizer (Transformer)
##   |    <binarizer_2fc435b37cd7> 
##   |     (Parameters -- Column Names)
##   |      input_col: arr_delay
##   |      output_col: target
##   |--6 LogisticRegression (Estimator)
##   |    <logistic_regression_2fc45bc43ee1> 
##   |     (Parameters -- Column Names)
##   |      features_col: features_scaled
##   |      label_col: target
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      aggregation_depth: 2
##   |      elastic_net_param: 0
##   |      family: auto
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06
Полученный конвейер с 6-ю вычислительными этапами пока лишь хранит логику и последовательность выполнения этих этапов. Никаких вычислений не произошло и логичестическая регрессия пока построена не была - для этого необходимо подать полученный конвейер на функцию ml_fit() в сочетании с обучающими данными:
pm <- 6="" gt="" ml_fit="" pipeline_2fc41c2e4890="" pipelinemodel="" pm="" ransformer="" splits="" stages="" train="" with=""> 
##   Stages 
##   |--1 StringIndexerModel (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      input_col: carrier
##   |      output_col: carrier_idx
##   |     (Transformer Info)
##   |      labels:  chr [1:16] "UA" "EV" "B6" "DL" "MQ" "AA" "WN" ... 
##   |--2 OneHotEncoder (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      input_col: carrier_idx
##   |      output_col: carrier_ohe
##   |--3 VectorAssembler (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      input_cols: dep_delay, distance, carrier_ohe
##   |      output_col: features
##   |--4 StandardScalerModel (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      input_col: features
##   |      output_col: features_scaled
##   |     (Transformer Info)
##   |      mean:  num [1:17] 21.579 1071.736 0.206 0.177 0.176 ... 
##   |      std:  num [1:17] 4.6 736.34 0.404 0.381 0.381 ... 
##   |--5 Binarizer (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      input_col: arr_delay
##   |      output_col: target
##   |--6 LogisticRegressionModel (Transformer)
##   |     
##   |     (Parameters -- Column Names)
##   |      features_col: features_scaled
##   |      label_col: target
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficient_matrix:  num [1, 1:17] 0.586 -0.394 -5.798 -5.215 -5.241 ... 
##   |      coefficients:  num [1:17] 0.586 -0.394 -5.798 -5.215 -5.241 ... 
##   |      intercept:  num 1.73 
##   |      intercept_vector:  num 1.73 
##   |      num_classes:  int 2 
##   |      num_features:  int 17 
##   |      threshold:  num 0.5 
##   |      thresholds:  num [1:2] 0.5 0.5
Теперь мы получили "конвейерную модель" - объект-преобразователь (PipelineModel (Transformer) with 6 stages), который можно применять к новым наборам данных. Например, предсказания по данным из проверочной выборки легко получить следующим образом:
predictions <- pm %>% 
  ml_transform(splits$test)

predictions %>% select(target, prediction)
## # Source: spark<?> [?? x 2]
##    target prediction
##     <dbl>      <dbl>
##  1      1          1
##  2      1          1
##  3      1          1
##  4      1          1
##  5      1          1
##  6      1          1
##  7      1          1
##  8      1          1
##  9      1          1
## 10      1          1
## # ... with more rows
Мы можем также оценить качество полученной модели, рассчитав площадь под ROC-кривой с помощью функции ml_binary_classification_evaluator():
ml_binary_classification_evaluator(predictions, 
                                   label_col = "target")
## [1] 0.7302388
Spark позволяет сохранять для дальнейшего использования не только модельные объекты, но и целые конвейеры. Для этого служат те же команды, что и рассмотренные нами ранее - ml_save() для сохранения на диск и ml_load() для загрузки с диска.

1 Комментарии

edvardoss написал(а)…
Спасибо. А будут ли в будущем статьи по развертыванию кластера?
Новые Старые