# Реализация MapReduce в Hadoop. Лекция 4
## Кластер Hadoop

Ресурсы кластера Hadoop находятся под управлением менеджера ресурсов YARN, который предоставляет API для запроса ресурсов кластера и работы с ними. Но эти API обычно не используются непосредственно пользовательским кодом, вместо этого пользователи пишут с использованием API более высокого уровня, предоставляемых платформами более высоких вычислений, которые сами построены на YARN и скрывают от пользователя сведения об управлении ресурсами.
Фреймворк Hadoop MapReduce является примером такой платформы.
YARN предоставляет свои основные службы через 2 типа длительно работающих демонов:
1. **Resource Manager** - диспетчер ресурсов, который координирует выделение вычислительных ресурсов кластера.
2. **Node Manager** - диспетчер узлов, который запускает и наблюдает за работой вычислительных контейнеров на машинах кластера. Контейнер выполняет специфичный для приложения процесс с ограниченным набором ресурсов: память, процессорное время. В зависимости от конфигурации YARN контейнер может быть процессом UNIX или cgroup в линукс.
## Задание MapReduce

**Задание MapReduce** - работа, которую хочет выполнить пользователь. Она состоит из:
1. Входных данных
2. Программы MapReduce
3. Конфигурационной информации
Hadoop выполняет работу путем деления её на задачи 2 типов, которые распределяются при помощи YARN и выполняются на узлах кластера:
1. Map
2. Reduce
Hadoop делит данные задания MapReduce на фрагменты фиксированного размера, называемые **входными сплитами**, оптимальный размер которых совпадает с размерами блоков HDFS (128 Мб, такой размер данных может гарантированно храниться на 1 узле).
Hadoop создает для каждого сплита задачу Map, которая выполняет определяемую пользователем функцию map для каждой записи в сплите, т.е для каждой пары ключ-значение. Hadoop старается по возможности выполнять задачи Map на узле, хранящем входные данные в HDFS, не используя при этом ценную пропускную способность сети кластера - **это принцип оптимизации локальности данных**.
Задачи Map записывают свои выходные данные на локальный диск, а не в HDFS, это обусловлено тем, что вывод Map является промежуточным - он обрабатывается задачами Reduce для получения финального вывода и после завершения задания вывод Map может быть удален.
> Если на узле, на котором работает задача Map произойдет сбой, Hadoop автоматически выполнит задачу на другом узле заново.
Задачи Reduce не пользуются преимуществами локальности данных, поскольку входные данные для них образуются из выходных данных всех задач Map.
## Механизм выполнения приложения YARN

1. Клиент связывается с диспетчером ресурсов и просит его запустить главный процесс приложения.
2. Диспетчер ресурсов находит диспетчер узлов, который может запустить главный процесс приложения в контейнере. **(Шаги 2a, 2b)**
> То, что главный процесс приложения делает после его запуска зависит от приложения: он может просто выполнить вычисления в контейнере, в котором он выполняется, и вернуть результат клиенту.
3. Или он может запросить дополнительные контейнеры у диспетчера ресурсов... **(Шаг 3)**
4. ... и использовать их для выполнения распределенных вычислений. **(Шаги 4a, 4b)**
**Главный процесс MapReduce** - координирует задачи, ответственные за выполнение задания MapReduce.
**Главный процесс и задачи MapReduce** - выполняются в контейнерах, которые планируются диспетчером ресурсов и управляются диспетчером узлов.
## Отправка задания MapReduce

Процесс отправки задания MapReduce включает следующие шаги:
1. Клиент инициирует отправку задания методами **класса job**.
2. Происходит запрос у диспетчера ресурсов нового идентификатора для задания MapReduce.
3. В общую файловую систему копируются ресурсы, необходимые для выполнения заданий: jar-файл задания, файл конфигурации и, вычисленные на стороне клиента, входные сплиты.
4. Инициируется запуск приложения YARN с привлечение менеджера ресурсов.
5. Планировщик YARN выделяет контейнер, в котором диспетчер ресурсов запускает главный процесс приложения под управлением диспетчера узла. **(Шаги 5a, 5b)**
6. Главный процесс инициализирует работу путём создания ряда объектьов учёта, которые отслеживают ход выполнения задания.
7. Главный процесс получает входные сплиты из общей файловой системы, затем он создает объект задачи Map для каждого сплита, а также объект задачи Reduce, количество которых определяет пользователь. Задачам присваиваются идентификаторы.
8. Главный процесс запрашивает контейнеры для всех задач Map и Reduce у диспетчера ресурсов.
| Задачи Map | Задачи Reduce |
| -------- | -------- |
| Имеют ограничение на локальность данных, которые планировщик старается выполнить. | Могут выполняться где угодно в кластере. |
9. Когда выполнена задача выделения ресурсов для контейнера на конкретном узле, главный процесс приложения запускает контейнер, связываясь с диспетчером узла. **(Шаги 9a, 9b)**
10. Задача выполняется java-приложением, основным классом которого является YARN-child, прежде, чем он сможет выполнить задачу, он локалиует ресурсы, необходимые для выполнения задачи, включая конфигурацию задания и jar-файл.
11. YARN-child запускает задачу Map или Reduce. YARN-child работает в выделенной JVM, поэтому любые ошибки в определенных пользователем функциях map и reduce не влияют на работу узла.
## Hadoop Streaming

Утилита Streaming запускает специальные задачи Map и Reduce.
Streaming задача осуществляет связь с процессом, который может быть написан на любом ЯП, используя стандартные потоки ввода и вывода. Во время выполнения задачи Java-процесс передает входные пары ключ-значение внешнему процессу, который пропускает его через пользовательскую функцию map или reduce и передает выходные пары ключ-значение обратно в Java-процесс.
На вход функции map данные
подаются через стандартный ввод:
1. Обработка выполняется построчно;
2. map пишет пары ключ-значение, разделяемые через символ табуляции, в стандартный вывод.
На вход функции reduce данные
подаются через стандартный ввод,
отсортированный по ключам:
1. Обработка выполняется построчно;
2. reduce пишет пары ключ-значение, разделяемые через символ табуляции, в стандартный вывод.
## Сплиты и записи

Каждый сплит делится на записи, задача Map поочерёдно обрабатывает каждую запись, т.е пару ключ-значение.
Сплиты и записи определяются на логическом уровне, ничто не требует их привязки к файлам. В контексте базы данных сплит может соответствовать строк из таблицы, а запись - одной строке этого диапазона.
✓ Входные сплиты представлены java-классом InputSplit, с InputSplit связана длина в байтах и набор мест хранения данных, которые представляют собой обычные строки с именами хостов.
✓ Сплит не хранит входные данные, это всего лишь ссылка на них.
✓ Информация о местах хранения используется MapReduce для размещения задач Map как можно ближе к данным сплита, а размер используется для упорядочивания сплитов, чтобы самые больше обрабатывались в первую очередь.
InputSplit создаются с помощью InputFormat, который отвечает за создание входных сплитов и деления их на записи.
Задача Map передаёт сплит методу InputFormat.createRecordReader, чтобы получить RecordReader для этого сплита. **RecordReader** - итератор по записям и задача Map использует его для генерации пар ключ-значение, которые передаются в пользовательскую функцию map.
## Форматы **входных** данных Hadoop

Hadoop может обрабатывать множество входных форматов данных: от текстовых файлов до баз данных.
В иерархии классов InputFormat представлены часто применяемые форматы:
1. InputFormat <K,V> - используется по умолчанию. Каждая запись представляет собой текстовую строку: **ключ** определяет смещение начала строки файла в байтах, **значение** это содержимое строки.
2. SequenceFileInputFormat <K,V> - хранятся последовательности двоичных пар ключ-значение: **ключи и значения** определяются последовательным файлом.
3. SequenceFileAsTextInputFormat - **преобразует ключи и значения** последовательного файла в текстовый объект.
4. SequenceFileAsBinaryInputFormat - **позволяет получить ключи и значения** последовательного файла в виде неструктурированных двоичных объектов.
## Форматы **выходных** данных Hadoop

В иерархии классов InputFormat представлены часто применяемые форматы:
1. OutputFormat <K,V> - используется по умолчанию. Выводит записи в виде строк текста. Ключи и значения могут относиться к любому типу, т.к текст OutputFormat преобразует их в строки вызовом toString.
2. SequenceFileAsBinaryOutputFormat <K,V> - записывает ключи и значения в низкоуровневом двоичном формате.
## Shuffle and Sort в Hadoop

Когда функция map начинает производить выходные данные, они не просто записываются на диск.
1. Каждая задача Map имеет цикличный буффер, который она записывает в выходные данные, когда содержимое буффера достигает определенного размера, фоновый поток начинает выгружать содержимое на диск.
2. Перед записью на диск поток с помощью функции partitions делит данные на разделы, соответствующие каждому экземпляру Reduce, в которые они в итоге будут отправлены.
3. Внутри каждого раздела фоновый поток выполняет в памяти сортировку по ключу, если есть функция combine, она запускается над выводом функции сортировки, чтобы обеспечить более компактный вывод Map.
4. Каждый раз, как буффер достигает порога выгрузки, создается новый файл выгрузки, перед завершением задачи файлы выгрузки объединяются в один разделенный и отсртированный выходной файл.
5. Файл вывода Map находится на локальном диске машины, выполнившей задачу Map.
6. Задаче Reduce для каждого конкретного раздела неоходимы выходные данные из нескольких задач Map со всего кластера.
7. Поскольку задачи Map завершаются не одновременно, задача Reduce копирует их выходные данные сразу после их выполнения.
8. Когда все выходные данные задачи Map были скопированы, задача Reduce переходит в фазу слияния (объединение выходных данных Map с сохранением порядка после сортировки).
9. Функция Reduce вызывается для каждого ключа в отсортированном виде, вывод этой фазы записывается на файловую систему.