Kubernetes中从头开始构建MapReduce

译自 MapReduce from Scratch,作者 Michal Pitr。

在过去的几周里,我一直在从头开始构建 MapReduce。

这将是一篇很长的文章:我们将了解分布式计算的必要性,重新发现为什么 MapReduce 是对许多问题进行建模的自然方式,构建我们自己的版本,了解各个部分如何组合在一起,并用它解决一个实际问题!

动机问题

假设我们想计算海量数据集中的词频。我们尝试在单台机器上处理它,但发现需要一个月以上的时间。我们能做什么?

第一个想法应该是获得一台更快的机器,但它可能不存在或太昂贵。相反,让我们看看如何将问题分布到 N 台商品机器上。我们希望有一种简单的方法来使用简单的查询查找任何单词的频率,即 grep over a file。

让我们首先将数据集拆分为 N 个分区,并使用不同的机器计算每个子集的词频。这已经给了我们 Nx 的加速,减去了一些固定的开销。为了合并结果,我们可以添加一台最终机器,它获取这 N 个部分结果文件并对相应单词的频率求和。

这与 MapReduce 背后的核心思想相差不远。上述过程有两个主要步骤:首先,我们将单词映射到输入数据子集中的频率,然后减少中间结果以获得最终答案。

请注意,这是非常通用的,想象一下我们有一个大型照片数据集,我们希望对其进行分类:我们可以将图像分类任务作为映射操作,然后在归约阶段将具有相同类别的图像分组。

另一个观察结果是,映射部分通常是两个部分中更昂贵的阶段,因此,通常映射器比归约器多。

希望已经让你相信 MapReduce 是一个合理的想法,让我们看看 MapReduce 论文如何解决词频问题。

使用 MapReduce

下面,我从 论文中复制粘贴了 WordCounter MapReduce 程序。让我们看看它是如何工作的。稍后,当我们实现我们的版本时,我们的目标是保持使用语义相同。

#include "mapreduce/mapreduce.h"

// User’s map function
class WordCounter : public Mapper
{
public:
    virtual void Map(const MapInput &input)
    {
        const string &text = input.value();
        const int n = text.size();
        for (int i = 0; i < n;)
        {
            // Skip past leading whitespace
            while ((i < n) && isspace(text[i]))
                i++;
            // Find word end
            int start = i;
            while ((i < n) && !isspace(text[i]))
                i++;
            if (start < i)
                Emit(text.substr(start, i - start), "1");
        }
    }
};
REGISTER_MAPPER(WordCounter);

// User’s reduce function
class Adder : public Reducer
{
    virtual void Reduce(ReduceInput *input)
    {
        // Iterate over all entries with the
        // same key and add the values
        int64 value = 0;
        while (!input->done())
        {
            value += StringToInt(input->value());
            input->NextValue();
        }
        // Emit sum for input->key()
        Emit(IntToString(value));
    }
};
REGISTER_REDUCER(Adder);

int main(int argc, char **argv)
{
    ParseCommandLineFlags(argc, argv);
    MapReduceSpecification spec;
    
    // Store list of input files into "spec"
    for (int i = 1; i < argc; i++)
    {
        MapReduceInput *input = spec.add_input();
        input->set_format("text");
        input->set_filepattern(argv[i]);
        input->set_mapper_class("WordCounter");
    }
    
    MapReduceOutput *out = spec.output();
    out->set_filebase("/gfs/test/freq");
    out->set_num_tasks(100);
    out->set_format("text");
    out->set_reducer_class("Adder");
    
    // Tuning parameters: use at most 2000
    // machines and 100 MB of memory per task
    spec.set_machines(2000);
    spec.set_map_megabytes(100);
    spec.set_reduce_megabytes(100);
    
    // Now run it
    MapReduceResult result;
    if (!MapReduce(spec, &result))
        abort();
    // Done: ’result’ structure contains info
    // about counters, time taken, number of
    // machines used, etc.
    return 0;
}

用户程序有 3 部分:map 函数、reduce 函数和配置。大部分繁重的工作由导入的 mapreduce 库处理。

map 函数将输入文本拆分为单词并发出键值对。例如,“the quick brown fox jumps over the lazy dog” 映射到以下键值对:[“the”:1, “quick”:1, “brown”:1, “fox”:1, “jumps”: 1, “over”:1, “the”:1, “lazy”:1, “dog”:1]。请注意,对 (“the”: 1) 发射了两次。

reduce 函数按键操作。例如,给定输入 [“the”:1, “the”:1”, “the”:1],它将发出 (“the”:3)。

配置处理输入输出、格式以及可用于 MapReduce 作业的资源数量。

在不到 100 行代码中,我们可以通过利用 1000 台机器来解决单词计数问题!这非常简洁,展示了 MapReduce 作为一种抽象的强大功能。

收集需求

本文将成为我们的灵感,让我们开始勾画实施要求:

  • 用户导入我们的 MAPREDUCE 库,执行 MAP 和 REDUCE 函数,并提供配置。
  • 当用户执行二进制文件时,它将被复制到多台机器上,在这些机器上以 Mapper 或 Reducer 的模式执行。
  • 这些机器必须有权访问输入数据。
  • Mapper 的输出必须对 Reducer 可用。
  • 用户可以访问最终结果。

基础设施

当我开始研究这项工作时,这些需求提出了两个主要未知:如何将二进制文件分发给其他计算机以及如何向它们提供输入数据。

经过一番研究,我决定在我的输入数据上托管一个网络存储服务器——我选择 NFS。我们可以将网络目录挂载到每台计算机,并允许计算机对该目录进行读写。它可能不像 Google 文件系统或 HDFS 那样具有高性能或高可扩展性,但它可以满足我们的目的。

为了将二进制文件分发到多台计算机,执行并监控其状态,我们可能会通过 SCP 复制二进制文件、通过 SSH 登录到计算机、执行二进制文件,然后等待其完成。我们甚至可以实施运行状况 ping 和进度报告。

不过,在我闲置一段时间后,我意识到这听起来非常像集群编排,并决定利用 Kubernetes 让我的生活更轻松。我们可以将二进制文件构建为 docker 镜像,并将其作为 Kubernetes 任务执行。通过持久卷和持久卷声明,可以轻松地将 NFS 与 Kubernetes 集成。不用担心,您不必理解这些概念,但我提到它们是为了完整起见。

由于我是出于教育目的开发这个概念,所以我使用 minikube 在我的笔记本电脑上启动一个本地虚拟集群。我在集群外将我的 NFS 服务器作为一个 docker 容器运行,并通过 docker 网络将其连接到该集群。此设置的优点是它可以轻松迁移到多个物理或虚拟机。

在基础架构到位的情况下,让我们开始编写我们的MapReduce框架!

使用我的 MapReduce

首先,我们将探讨如何使用我的 MapReduce 实现解决字数统计问题。然后,我们将深入研究该实现,以了解各个组件的功能。

正如我承诺的,我试图保持与原始论文相似的用法语义。使用我的 Go 实现比较此 MapReduce 程序和我们之前看到的 C++ 示例!

// main.go

import (
  …

  "github.com/MichalPitr/map_reduce/pkg/config"
  "github.com/MichalPitr/map_reduce/pkg/interfaces"
  "github.com/MichalPitr/map_reduce/pkg/mapreduce"
)

type WordCounter struct {
  wordRegex *regexp.Regexp
}

func (wc *WordCounter) Map(input interfaces.MapInput, emit func(key, value string)) {
  text := input.Value()
  text = strings.ToLower(text)
  words := wc.wordRegex.FindAllString(text, -1)
  for _, word := range words {
    emit(word, "1")
  }
}

type Adder struct{}

func (a *Adder) Reduce(input interfaces.ReducerInput, emit func(value string)) {
  val := 0
  for !input.Done() {
    num, err := strconv.Atoi(input.Value())
    if err != nil {
      log.Printf("Failed converting input to integer, skipping: %q", input.Value())
      input.NextValue()
      continue
    }
  val += num
  input.NextValue()
  }
  emit(strconv.Itoa(val))
}

func main() {
  cfg := config.SetupJobConfig()
  log.Printf("cfg: %+v", cfg)
  cfg.NumReducers = 2
  cfg.NumMappers = 4


  cfg.Mapper = &WordCounter{wordRegex: regexp.MustCompile(`\b\w+\b`)}
  cfg.Reducer = &Adder{}

  mapreduce.Execute(cfg)
}

让我们花点时间理解一下我的解决方案是如何在幕后工作的。我们可以参考该论文中一张方便的图表来说明这一点。

用户的程序在三个模式下执行,分别是 Master、Mapper 和 Reducer,在不同的机器上执行。在高层次上,master 处理整个作业编排,mappers 对输入文件执行昂贵的 map 操作,reducers 联合来自 mappers 的中间结果。在我的解决方案中,模式由 CLI 标志决定,例如 --mode=master 代表 master 模式。

Master

主模式将输入文件分割成子集、准备 NFS 目录、启动带有已分配文件的映射器作业,并等待它们完成。然后,针对还原器重复此过程。主函数非常简单,我可以将其列出如下:

// /pkg/master/master.go

func Run(cfg *config.Config) {
  clientset := createKubernetesClient()
  numNodes := getNumberOfNodes(clientset)
  mustValidateConfig(cfg, numNodes)

  jobId := fmt.Sprintf("job-%s", time.Now().Format("2006-01-02-15-04-05"))
  log.Printf("Running master: %s", jobId)
	
  mustCreateJobDir(cfg.NfsPath, jobId)
  fileRanges := partitionInputFiles(cfg.InputDir, cfg.NumMappers)

  t0 := time.Now()
  launchMappers(cfg, clientset, jobId, fileRanges)
  waitForJobsToComplete(clientset, jobId, "mapper")
  log.Printf("Mappers took %v to finish", time.Since(t0))

  t1 := time.Now()
  launchReducers(cfg, clientset, jobId)
  waitForJobsToComplete(clientset, jobId, "reducer")
  log.Printf("Reducers took %v to finish", time.Since(t1))
  log.Printf("Total runtime: %v", time.Since(t0))
}

让我们来理解 launchMappers 函数。它为每个映射器创建 Kubernetes 作业。作业规范指定:

  • 包含我们的二进制文件的 Docker 映像。
  • mapper 必需的 CLI 参数:mapper 模式、输入/输出目录和要处理的文件。
  • 如何装载 NFS 存储。

等等……Docker镜像?我们从未创建过!当我们最终运行我们的解决方案时我会更详细地展示这一点,但 Kubernetes 围绕容器,所以我们对我们的二进制文件进行 docker 化,以便 Kubernetes pod 可以下载该二进制文件。

在 launchMappers 创建 Mappers 作业后,Kubernetes会将作业分配给群集内的节点,启动容器,并监视其状态。现在我们可以运行 Mappers 了,是时候了解它们的工作原理了!

Mappers

在 map 模式中,二进制文件逐行扫描输入文件,并将该行传递给用户的 map 函数。Map 处理文本,并使用注入的 emit 函数发出键值对。

在我的实现中,emit 函数非常简单 - 它将键值对存储在一个动态列表的 map 中。更完整的实现将进行一些缓冲,然后将缓冲区刷新到磁盘。

当 mapper 完成所有输入的处理后,它将已排序的键值对保存到 NFS 存储中的中间文件中,reducer 将从该中间文件中读取这些键值对进行最终处理。让我们在这里缩小视野,看看从中间文件到 reducer 的这种映射如何工作。

我们希望按照键为中间文件分区,这样所有相同的键都由一个 reduce 任务处理。例如,”brown“和“the”出现在 Mapper 1 和 Mapper 2 中间文件。我们希望确保每个最终由同一个 reduce 任务处理。我们来看看一个例子。

为了实现这一点,当保存 mappers 中的中间结果时,我们根据 reducers R 的数量使用公式对键进行分区

例如,使用 FNV 哈希和 R = 2,我们得到

(数学笔记:这可以解释为“1 与 FNV(brown) mod 2 同余”)。

基于该内容,我们将“brown”分配到第二个中间文件,该文件将由 reducer 2 处理。请注意在上面的示例中,所有“the”都落入蓝色文件中,而所有“brown”都落入红色文件中!

这总结了关于 mappers 的讨论——接下来,让我们看看 reducer 如何工作。

Reducers

如下先前所突出显示,还原程序的工作是从分配的中介文件中读取键值对,然后使用用户定义的还原函数来处理它们。

有两件事我们可以确信:中介文件中的键按键排序,如果某中介文件中存在键 A,则我们可以保证键 A 不会出现在分配给其他 reducers 程序的任何文件中。

我尝试以更聪明的方式实现我的 reducers 程序,并避免将所有中介文件加载到内存中。这会带来一个有趣的算法问题:

假设我们要处理 3 个中介文件,一次处理一个键值对,而无需将所有内容加载到内存中。

我们可以借助最小堆即时合并键值对!我们将来自每个文件的第一个键值对加载到堆中。每当我们从堆中弹出数据时,我们从对应文件中读取下一行并将其推入堆中。这为我们提供了一种高效内存读取键值对流的方式!你可以在此处找到实现。

处理完所有中间文件后,Reducer 将结果保存在 NFS 存储中。

例如,上述示例将产生: [“abby”: 1, “alice”: 3, “bob”: 2, “car”:2, “dog”:1, …]。

MapReduce 论文提出了我已经在我的实现中跳过的几个额外的优化。聪明的读者可能已经能够提出一些优化 - 例如,我们可以在 mapper 中选择性地进行一些缩减,不是吗?

恭喜你,你现在对 MapReduce 有一个非常透彻的了解!最困难的部分已经结束了!让我们将它们组合在一起,并使用我们的 MapReduce 来统计 Project Gutenberg 上最流行的 100 本图书中的单词频率!

MapReduce 实战

我下载了 Project Gutenberg 上最流行的 100 本书到 /mnt/nfs/input/,使用的是这个脚本。书本的文件被标记为 book-0 到 book-99。我还创建了一个可以访问我的 NFS 存储的 4 节点 minikube 集群。

michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get nodes
NAME            STATUS   ROLES           AGE    VERSION
multinode       Ready    control-plane   27d    v1.28.3
multinode-m02   Ready    <none>          7d2h   v1.28.3
multinode-m04   Ready    <none>          7d1h   v1.28.3
multinode-m05   Ready    <none>          7d1h   v1.28.3

我将重用前面展示的 Word-Frequencies Go 主文件。我们必须使用此 Dockerfile 准备 Docker 镜像,并将其推送到我的注册表。mapper 和 reducer 节点将提取此镜像来运行我们的工作负载。

docker build -t michalpitr/mapreduce:latest .
docker push michalpitr/mapreduce:latest

现在我们只需要在本地运行 master!

go build .
./map_reduce --mode=master --image=michalpitr/mapreduce:latest --input-dir /mnt/input/ --nfs-path /mnt/nfs/

让我们检查执行日志。

michal@michal-ThinkPad-T490s:~/code/map_reduce$ ./map_reduce --mode=ma
ster --image=michalpitr/mapreduce:latest --input-dir /mnt/nfs/input/ -
-nfs-path /mnt/nfs/
2024/04/28 17:13:53 Running master job-2024-04-28-17-13-53
2024/04/28 17:13:53 Creating mapper-0 for book-0-30
2024/04/28 17:13:53 Creating mapper-1 for book-31-53
2024/04/28 17:13:53 Creating mapper-2 for book-54-76
2024/04/28 17:13:53 Creating mapper-3 for book-77-99
2024/04/28 17:13:53 Waiting for jobs to finish.
2024/04/28 17:14:03 Waiting for jobs to finish.
2024/04/28 17:14:13 Waiting for jobs to finish.
2024/04/28 17:14:23 All jobs completed.
2024/04/28 17:14:23 Mappers took 30.1843218s to finish
2024/04/28 17:14:23 Creating reducer-0
2024/04/28 17:14:23 Creating reducer-1
2024/04/28 17:14:23 Waiting for jobs to finish.
2024/04/28 17:14:33 All jobs completed.
2024/04/28 17:14:33 Reducers took 10.106014703s to finish
2024/04/28 17:14:33 Total runtime: 40.290376921s

整个作业花费了 40 秒。我们在此处并未期望出现任何疯狂的情况,毕竟,我是在一台动力不足的笔记本电脑上运行此作业,而不是一个真正的集群。

让我们简单了解一下。master 创建了 4 个 mappers,每个 mappers 都分配了一个书籍范围。

使用kubectl,我们可以看到集群正在创建 4 个容器。这是集群拉取我们之前推送的镜像的时候!

michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME             READY   STATUS              RESTARTS   AGE
mapper-0-zgz24   0/1     ContainerCreating   0          11s
mapper-1-m824k   0/1     ContainerCreating   0          11s
mapper-2-ctqqt   0/1     ContainerCreating   0          11s
mapper-3-l2ddh   0/1     ContainerCreating   0          11s

这些容器启动并完成运行所需的时间并不长。

michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME             READY   STATUS      RESTARTS   AGE
mapper-0-zgz24   1/1     Running     0          18s
mapper-1-m824k   0/1     Completed   0          18s
mapper-2-ctqqt   0/1     Completed   0          18s
mapper-3-l2ddh   0/1     Completed   0          18s

master 定期轮询 Kubernetes API 服务器以获取这些作业的状态。一旦所有作业都完成,它就会启动归约器。

在进入归约器之前,让我们看看其中一个映射器生成的中间文件:

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53/mapper-3$ ls
partition-0  partition-1

检查分区-1 的末尾,显示了陀思妥耶夫斯基的《地下室手记》中一个角色的键值对。到目前为止,一切都好!

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53/mapper-3$ tail partition-1
zverkov,1
zverkov,1
zverkov,1
zverkov,1
zverkov,1
…

接下来,master 启动归约器,

michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME              READY   STATUS              RESTARTS   AGE
…
reducer-0-4xt58   0/1     ContainerCreating   0          2s
reducer-1-xvhfk   0/1     ContainerCreating   0          2s

它们很快完成。

michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME              READY   STATUS      RESTARTS   AGE
…
reducer-0-4xt58   0/1     Completed   0          7s
reducer-1-xvhfk   0/1     Completed   0          7s

就是这样 - 我们刚刚使用我们自己的 MapReduce 框架计算了单词频率!让我们看看我们可以从结果中学到什么!

我可以在作业文件夹的根目录中看到输出文件:

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ ls
…  reducer-0  reducer-1

让我们使用 grep 来查找一些单词的频率。

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w zverkov reducer-0 reducer-1
reducer-1:zverkov,112

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w the reducer-0 reducer-1
reducer-1:the,822175

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w mapreduce reducer-0 reducer-1
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ 

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w map reducer-0 reducer-1
reducer-1:map,188

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w reduce reducer-0 reducer-1
reducer-0:reduce,123

遗憾的是,在古腾堡计划最流行的 100 本书中没有提到 MapReduce……也许有一天?

最后一点,请注意这些输出文件如何按键对结果空间进行分区。当我们查找一个单词时,它只存在于一个文件中!这几乎就像我们做对了什么!

如果您已经读到这里,您不妨查看GitHub 仓库。您已经熟悉SQLite 存储格式内部。研究和撰写这些文章需要大量时间和精力。为确保您不会错过下一篇文章,请考虑订阅或关注我。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注