Возможность работать с платформой Spark локально - это большое ее преимущество, позволяющее выполнять разработку и отладку кода перед его исполнением на удаленном кластере, который может состоять из нескольких (возможно, дорогостоящих) компьютеров. В этом сообщении мы рассмотрим как установить Spark на локальной машине, как подключиться к локальному кластеру из среды R, а также попробуем выполнить некоторые простые вычисления. 

Инсталляция Spark и sparklyr

Программная платформа Spark написана на Scala и Java и требует Java Virtual Machine (JVM). Поэтому перед инсталляцией Spark необходимо убедиться, что на вашем компьютере установлен также язык Java версии не ниже 1.8. Поскольку Java требуется для выполнения многих приложений, скорее всего он на вашем компьютере уже есть. Из среды R это можно проверить c помощью команды system("java -version"). На моей Windows-машине результат выполнения этой команды выглядит так:

system("java -version")
## java version "1.8.0_241"
## Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
## Java HotSpot(TM) Client VM (build 25.241-b07, mixed mode, sharing)

Если Java у вас еще нет, то воспользуйтесь официальной инструкцией по инсталляции.

Если на вашем компьютере есть несколько версий Java, то, возможно, вам придется изменить значение переменной среды JAVA_HOME на нужную версию. Из R это можно сделать с помощью команды Sys.setenv(), например:

Sys.Setenv(JAVA_HOME = "<путь к Java нужной версии>")

Внимание: не используйте пробелы в названиях файлов и папок, работая на компьютерах под управлением Windows - это может привести к разного рода проблемам. Проблемы могут возникнуть также при употреблении кириллицы в названиях файлов и папок.

Далее установим пакет sparklyr обычным способом:

install.packages("sparklyr")

Программную платформу Spark можно установить как вручную, так и с помощью предназначенной для этого команды spark_install() из пакета sparklyr. Второй способ более удобный и именно им мы и воспользуемся. При этом во избежание "сюрпризов" в ходе рассмотрения последующих примеров, мы установим стабильную версию Spark 2.3 (несмотря на то, что на момент написания этой статьи уже была доступна версия 2.4 и даже версия для предварительного ознакомления 3.0):

require(sparklyr)
spark_install("2.3")

Выполнение последней команды приведет к инсталляции как запрошенной версии Spark, так и необходимой для ее корректной работы платформы Hadoop. С помощью команды spark_installed_versions() можно проверить, какие версии Spark и Hadoop есть на вашем компьютере. У меня это выглядит так:

spark_installed_versions()
##   spark hadoop                                                              dir
## 1 2.3.3    2.7 C:\\Users\\masti\\AppData\\Local/spark/spark-2.3.3-bin-hadoop2.7

По умолчанию платформа Spark будет установлена в "...\AppData\Local\spark" на Windows-машинах и в "~/spark" на Mac и Linux-машинах. Чтобы изменить эти места установки можно воспользоваться командой options(spark.install.dir = "<желаемый путь>")  перед выполнением spark_install(). Место инсталляции Spark называется "домашней директорией" и задается с помощью переменной среды SPARK_HOME. При инсталляции Spark с помощью команды spark_install() значение этой переменной будет присвоено автоматически и R будет "знать", где искать Spark для выполнения локальных вычислений. Однако позже, когда мы будем рассматривать работу с удаленным Spark-кластером (в облаке Amazon), эту переменную нам нужно будет настроить самостоятельно.

Подключение к локальному Spark-кластеру

Для подключения к Spark-кластеру служит функция spark_connect() из пакета sparklyr. У этой функции есть несколько аргументов, но на данный момент нам потребуются лишь два из них - master и version:

sc <- spark_connect(master = "local", version = "2.3")

Параметр master используется для указания IP адреса или URL "главного компьютера" в кластере, известного также как "управляющий узел" (driver node). Эта терминология связана с тем, что в "настоящих" кластерах есть один управляющий узел, который распределяет вычислительные задачи по многочисленными "рабочим узлам" (workers, или worker nodes). Поскольку сейчас нас интересует работа с локальным кластером, то мы просто присваиваем этому параметру значение "local".

Как следует из его названия, параметр version служит для указания необходимой версии Spark. Этот параметр используется только для работы с локальным кластером и позволяет автоматически обнаружить домашнюю директорию соответствующей версии Spark и запустить кластер.

Объект, возвращаемый функцией spark_connect(), содержит разного рода служебную информацию о кластере. Этому объекту принято давать имя sc - вы можете встретиться с этим обозначением в будущем при чтении кода, написанного другими людьми.

Первые простые примеры

Здесь и в последующих сообщениях мы будем использовать данные из пакета nycflights13, который содержит несколько таблиц с описанием 336776 авиарейсов, стартовавших из аэропортов Нью-Йорка в 2013 г. Это ни в коей мере не "большие данные". Тем не менее "нативная" обработка такого количества наблюдений в R может оказаться проблематичной на компьютерах с небольшим объемом памяти и Spark, как мы уже знаем, хорошо подходит для таких ситуаций. Установить пакет nycflights13 можно обычным способом:

install.packages("nycflights13")

require(nycflights13)
require(dplyr)

glimpse(flights)

## Observations: 336,776
## Variables: 19
## $ year           <int> 2013, 2013, 2013, 2013, 2013, ...
## $ month          <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...
## $ day            <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...
## $ dep_time       <int> 517, 533, 542, 544, 554, 554, ...
## $ sched_dep_time <int> 515, 529, 540, 545, 600, 558, ...
## $ dep_delay      <dbl> 2, 4, 2, -1, -6, -4, -5, -3, -...
## $ arr_time       <int> 830, 850, 923, 1004, 812, 740,...
## $ sched_arr_time <int> 819, 830, 850, 1022, 837, 728,...
## $ arr_delay      <dbl> 11, 20, 33, -18, -25, 12, 19, ...
## $ carrier        <chr> "UA", "UA", "AA", "B6", "DL", ...
## $ flight         <int> 1545, 1714, 1141, 725, 461, 16...
## $ tailnum        <chr> "N14228", "N24211", "N619AA", ...
## $ origin         <chr> "EWR", "LGA", "JFK", "JFK", "L...
## $ dest           <chr> "IAH", "IAH", "MIA", "BQN", "A...
## $ air_time       <dbl> 227, 227, 160, 183, 116, 150, ...
## $ distance       <dbl> 1400, 1416, 1089, 1576, 762, 7...
## $ hour           <dbl> 5, 5, 5, 5, 6, 5, 6, 6, 6, 6, ...
## $ minute         <dbl> 15, 29, 40, 45, 0, 58, 0, 0, 0...
## $ time_hour      <dttm> 2013-01-01 05:00:00, 2013-01-...

Для начала нужно загрузить таблицу flights в наш локальный Spark-кластер. Для этого служит функция copy_to(), на которую мы подаем созданный ранее объект sc и копируемую таблицу flights:

fl <- copy_to(sc, flights)

fl

## # Source: spark<flights> [?? x 19]
##     year month   day dep_time sched_dep_time dep_delay
##    <int> <int> <int>    <int>          <int>     <dbl>
##  1  2013     1     1      517            515         2
##  2  2013     1     1      533            529         4
##  3  2013     1     1      542            540         2
##  4  2013     1     1      544            545        -1
##  5  2013     1     1      554            600        -6
##  6  2013     1     1      554            558        -4
##  7  2013     1     1      555            600        -5
##  8  2013     1     1      557            600        -3
##  9  2013     1     1      557            600        -3
## 10  2013     1     1      558            600        -2
## # ... with more rows, and 13 more variables:
## #   arr_time <int>, sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>,
## #   tailnum <chr>, origin <chr>, dest <chr>,
## #   air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dttm>

Важно подчеркнуть отличие объекта fl от исходной таблицы flights. Объект flights - это обычная таблица класса tibble. В объекте же fl хранится только служебная информация по загруженной нами в Spark таблице с данными - самих данных в этом объекте нет (выполните str(fl), чтобы убедиться в этом). Тем не менее, введя в консоли R имя этого объекта и нажав клавишу Enter, мы получим первые несколько наблюдений из скопированной таблицы, как если бы это была обычная таблица с данными. Такое поведение объекта fl обусловлено тем, что при его вызове из консоли R Spark автоматически собирает (collect) первые несколько наблюдений из скопированной таблицы и выводит их на экран. Возможность такого рода интерактивного анализа, к которому пользователи R очень привычны, - это еще одно большое преимущество Spark.

Мониторинг процесса выполнения команд, посылаемых из консоли R, можно вести из веб-приложения, которое является частью дистрибутива Spark. Для его вызова необходимо выполнить следующую команду:

spark_web(sc)

На рис. ниже приведен пример того, как это приложение выглядит (подробности здесь обсуждаться не будут):



Для выполнения запросов к данным в Spark-кластере из среды R можно использовать либо SQL, либо функции из пакета dplyr. В качестве примера предположим, что мы хотим выяснить общее количество рейсов из каждого аэропорта Нью-Йорка (переменная origin).

Для выполнения SQL-запроса потребуется пакет DBI и входящая в его состав функция dbGetQuery(). Если вы использовали этот пакет раньше для работы с удаленными базами данных, то не встретите ничего необычного: на функцию dbGetQuery() нужно подать объект sc с информацией для подключения к Spark-кластеру, а также сам запрос:

require(DBI)

q <- "SELECT `origin`, count(*) AS `N`
      FROM `flights`
      GROUP BY `origin`"

dbGetQuery(sc, q)

## origin      N
## 1    LGA 104662
## 2    JFK 111279
## 3    EWR 120835

Использование SQL почти наверняка потребуется при выполнении сложных запросов, которые должны быть оптимизированы для обеспечения быстродействия. Однако в большинстве случаев для выполнения стандартных операций над данными использование функций из пакета dplyr будет более удобным. Более того, если вы раньше работали с dplyr  (а сегодня вряд ли найдутся пользователи R, которые не знают об этом пакете), то вы не увидите почти никаких отличий:

result <- fl %>% 
    group_by(origin) %>% 
    summarise(N = n())

result

## # Source: spark<?> [?? x 2]
##   origin      N
##   <chr>   <dbl>
## 1 LGA    104662
## 2 JFK    111279
## 3 EWR    120835

Следует лишь помнить о том, что приведенный в примере объект result (подобно созданному нами ранее объекту fl) не является таблицей данных: он лишь хранит информацию о том, как такие данные извлечь, или "собрать", из Spark-кластера. Для извлечения данных из Spark в среду R следует воспользоваться функцией collect():

true_result <- result %>% collect()

true_result

## # A tibble: 3 x 2
##  origin      N
##  <chr>   <dbl>
## 1 LGA    104662
## 2 JFK    111279
## 3 EWR    120835

# класс объектов result и true_result:
class(result)
## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"

class(true_result)
## [1] "tbl_df"     "tbl"        "data.frame"

По завершении работы необходимо отключиться от кластера. Для этого служит команда spark_disconnect():

spark_disconnect(sc)

Приведенная команда также удалит кластер и все хранящиеся в нем данные.

В следующем сообщении мы подробнее рассмотрим примеры работы с платформой Spark из среды R средствами пакета dplyr.

4 Комментарии

Unknown написал(а)…
С интересом жду продолжения.
Unknown написал(а)…
Очень актуальный и полезный материал.Спасибо.
edvardoss написал(а)…
Сергей, спасибо за статью!
А помимо отладки какое еще есть преимущество при установке локально?
Sergey Mastitsky написал(а)…
Еще до определенного предела (т.е. если объем данных и вычислений позволяют) помогает не тратить деньги на инфраструктуру в облаке :)
Новые Старые