MIT 6.5840(6.824) | Lab1: MapReduce

本篇文章主要讲解我的无显式锁MapReduce实现思路。

开始之前

在开始之前建议先熟悉Go语言并阅读过MapReduce的论文或了解过MapReduce的原理。

使用IDE编写代码可以很方便的进行调试,这里使用Goland。关于项目的配置可以参考这篇知乎文章(并不建议阅读他的代码,很显然作者是从Java语言转过来的,一些数据结构显得比较多余)。

无显式锁的思路主要受2021年Russ Cox的guest lecture启发, 其中的scheduler模式十分适用于实现MapReduce.

建议使用最新版本的代码模板开始你的任务,相较于之前的散装代码,这个版本使用了gomod,并且减少了IDE中烦人的报错。同时,建议读者应该仔细阅读lab的rules与hint,对理解任务与代码编写都有很大的帮助。

实现

Task的定义

Task是需要通过RPC进行传递的,因此首先定义一下Task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type TaskType int

const (
MapState TaskType = iota
ReduceState
WaitState
CompleteState
)

type TaskStatus int

const (
Init TaskStatus = iota
Working
Complete
)

type Task struct {
FileName string
TaskType TaskType
TaskStatus TaskStatus
NReduce int
FileId int
StartTime time.Time
}

相较于区分map任务与reduce任务,我认为两者共用一个struct即可,无需为两者做出区分。在编写代码时,你应该随着你的编码进度来修改task的结构,而不是直接照抄,这样会大大限制你的思考。

Worker

Worker的实现比较简单,只需要对分配到的任务做出相应处理即可。注意,你应该有一个wait task,来应对一些需要worker等待的情况,例如hints里提到在开始reduce任务之前你需要等待map任务结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

task := AskTask(&Task{})
for {
switch task.TaskType {
case MapState:
task = DoMap(&task, mapf)
case ReduceState:
task = DoReduce(&task, reducef)
case WaitState:
fmt.Printf("wait for task\n")
time.Sleep(2 * time.Second)
task = Task{TaskType: WaitState}
case CompleteState:
return
}
task = AskTask(&task)
}
}

在中间文件的创建与读取中有一些细节需要注意,这里参考hints里提到的就好了。在worker完成当前任务并请求下一个任务的过程中,我将当前完成的任务作为参数发送给coordinator,方便coordinator对task进行状态维护(从workingChan转移至DoneChan)。

Coordinator

在包括空行的情况下我仅仅用了133行代码实现了coordinator, 这主要归功于channel的使用与不维护worker状态的设计。

我对于Coordinator的定义如下:

1
2
3
4
5
6
7
8
type Coordinator struct {
taskChan chan Task
workingChan chan Task
DoneChan chan Task
files []string
phase CoordinatorPhase
nReduce int
}

可以看到我定义了三个channel,这次大规模使用channel让我体会到对于多线程分布式的任务,它的存在会简化我们的许多工作。DoneChan实际上有点多余,由于不需要知道DoneChan中task任务的具体信息,只需要知道task的数量,使用一个bool类型的数组也可以完成它的工作。

Coordinator的工作流程主要如下:

  1. 将任务打入taskChan
  2. 接到worker任务请求,从workingChan取出worker发来的上一轮已完成任务并打入DoneChan
  3. 遍历workingChan查看是否有超时任务,如果超时则取出并放回taskChan供重新分配
  4. 若taskChan中还有任务,则从taskChan取出任务分配给worker后,将其打入workingChan
  5. 若taskChan中无任务,则分配waitTask

可以发现只需要维护task的状态即可,并不需要关注worker的状态,如果worker没能完成这个任务,交给其他worker完成就可以了。

至于map阶段,reduce阶段与所有任务完成阶段的转换,只需要在每次call Done函数时进行判断,如果此时DoneChan中的任务数量与需要完成的数量相同,就可以判断此阶段完成,可以开启下一阶段了。


MIT 6.5840(6.824) | Lab1: MapReduce
http://bustdot.github.io/2023/08/07/MIT-6-5840-6-824-Lab1-MapReduce/
作者
BustDot
发布于
2023年8月7日
许可协议