Lab1 MapReduce

相关资料

重点理解MapReduce的机制

MapReduce(2004)翻译

MapReduce 深入浅出

思路

首先看一下 Lab 提供的 MapReduce 执行流程图

lab1-思路

  • 可以看出 MapReduce 的大致流程:启动一个Master( Coordinator 协调者)分配多个任务给 worker 做 Map 任务。
  • 然后 Worker 完成 Map 任务后返回中间值一组 KV ,接着协调者再将这些 KV 分发给后继的Worker 根据 KV 进行 Reduce 任务,最后对 Reduce 进行一个总的处理进行返回。

实现

完成 worker 与 Coordinator 之间的交互,处理 map 任务

从实现来看我们可以先完成 worker 与Coordinator之间的交互,首先可以来看看给的 RPC 例子:首先运行 main/mrworker.go 会进入到 mr/Worker 的这个方法中。

lab1-实现-1

可以在这个方法中调用RPC的例子方法:CallExample()。

lab1-实现-2

然后 CallExample() 这个方法中会有一行:

1
ok := call("Coordinator.Example", &args, &reply)

调用 Coordinator 包的 Example 方法(方法名开头为大写的代表可以为外包所调用),然后得到传修改后的 reply ,得到rpc返回值。这就是 coordinator 与 worker 的简单交互。

接下来,自定义 coordinator 与 worker 的交互,使其能处理 map 任务。

完善 rpc.go

自定义用于 RPC 的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Task worker 向 coordinator 获取 task
type Task struct {
TaskType TaskType // 任务类型 map / reduce
TaskId int // 任务id
ReducerCnt int // 传入的 reducer 数量,用于hash
FileName string // 输入文件
}

// TaskArgs rpc应该传入的参数,实际上应该什么都不用传,因为worker只是获取一个任务
type TaskArgs struct {
}

// TaskType 任务类型
type TaskType int

const (
MapTask TaskType = iota // Map 任务
ReduceTask // Reduce 任务
WaitingTask // 任务分发完成,等待完成
ExitTask // 任务完成,退出
)

// Phase 分发状态
type Phase int

const (
MapPhase Phase = iota // 分发 MapTask
ReducePhase // 分发 ReduceTask
AllDone // 分发完成
)

// State 任务状态
type State int

const (
Working State = iota // 任务执行ing
Waiting // 任务等待执行ing
Done // 任务完成
)

完善 worker.go

在 worker 里面构造发送请求 RPC 的方法,获取 Map 任务,并请求 coordinator 处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
flag := true
for flag {
task := GetTask()
switch task.TaskType {
case MapTask:
{
DoMapTask(mapf, &task)
callDone()
}
case WaitingTask:
{
fmt.Println("All tasks are in progress,Please wait...")
time.Sleep(time.Second * 5)
}
case ExitTask:
{
time.Sleep(time.Second)
fmt.Println("All tasks are Done,will be exit...")
flag = false
}
}

}
// uncomment to send the Example RPC to the coordinator.
// CallExample()

}

获取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
// GetTask 获取任务
// 调用 RPC 拉取协调者的任务
func GetTask() Task {
args := TaskArgs{}
reply := Task{}
ok := call("Coordinator.PullTask", &args, &reply)
if ok {
fmt.Println("worker get [", reply.TaskType, "] , TaskId = [", reply.TaskId, "]")
} else {
fmt.Printf("call failed!\n")
}
return reply
}

参考给定的 wc.go 、mrsequential.go 的 map 方法,编写自定义 map 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func DoMapTask(mapf func(string, string) []KeyValue, response *Task) {
var intermediate []KeyValue

fileName := response.FileName

// 打开指定文件
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open file %v", fileName)
}

// 获取文件内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read file %v", fileName)
}

// 关闭文件,释放资源
err = file.Close()
if err != nil {
log.Fatalf("close err file %v", fileName)
}

// mapf 返回一组 KV 结构体
intermediate = mapf(fileName, string(content))

reducerCount := response.ReducerCnt

// 创建一个长度为 n 的 Reduce 二维切片
HashedKV := make([][]KeyValue, reducerCount)

// 将中间键值对(intermediate)按照它们的键进行哈希分区,将它们发送到不同的 Reducer 中进行处理
for _, kv := range intermediate {
// append(HashedKV[ihash(kv.Key)%reducerCount], kv) 将当前键值对 kv 添加到指定的 Reducer 数组中
// 具体来说,它使用哈希值计算出的 Reducer 索引 ihash(kv.Key)%reducerCount 来获取对应的 Reducer 数组
// 然后将当前键值对添加到该数组中

// 最终,代码将所有中间键值对哈希分区后的结果保存在 HashedKV 数组中
// 其中每个元素都是一个 Reducer 数组,用于存储该 Reducer 负责处理的键值对。
HashedKV[ihash(kv.Key)%reducerCount] = append(HashedKV[ihash(kv.Key)%reducerCount], kv)
}

// 将分区后的键值对写入到文件中
for i := 0; i < reducerCount; i++ {
// 对于每个 Reducer,使用 response.TaskId 和 i 拼接出一个文件名 oname
// 例如 mr-tmp-1-0 表示任务 ID 为 1 的 Reduce 任务的第一个输出文件。
fileName := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i)

// 创建文件
ofile, err := os.Create(fileName)
if err != nil {
log.Fatalf("cannot create file %v", fileName)
}

// 创建 JSON Encoder,将 kv 编码为 JSON 格式并写入到输出文件中
encoder := json.NewEncoder(ofile)
for _, kv := range HashedKV[i] {
err := encoder.Encode(kv)
if err != nil {
log.Fatalf("encode err kv %v", kv)
}
}

// 关闭文件,释放资源
err = ofile.Close()
if err != nil {
log.Fatalf("close err file %v", ofile)
}
}
}

做完任务也需要调用 RPC 在 coordinator 中将任务状态标记为已完成,以方便 coordinator 确认任务已完成,worker 与 coordinator 程序能正常退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// callDone 调用 RPC 将 Task 标记为已完成
func callDone() Task {
args := Task{}
reply := Task{}

ok := call("Coordinator.MarkCompleted", &args, &reply)
if ok {
fmt.Println(reply)
} else {
fmt.Printf("call failed!\n")
}

return reply
}

完善 coordinator.go

定义 coordinator 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Coordinator struct {
// Your definitions here.
ReducerCnt int // Reducer 数量
TaskId int // 任务Id
DistPhase Phase // 分发阶段 map / reduce
ReduceTaskChannel chan *Task // Reduce 任务 使用 chan 保证并发安全
MapTaskChannel chan *Task // map 任务 使用 chan 保证并发安全
TaskMetaHolder TaskMetaHolder // 任务元数据
files []string // 传入文件
}

// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
state State // 任务当前状态
task *Task // 任务信息,传入指针是为了取出来后能通过地址正常修改其状态,标记为已完成
}

// TaskMetaHolder 保存全部任务的元数据
type TaskMetaHolder struct {
taskMap map[int]*TaskMetaInfo // 通过下标[taskId]快速定位
}

初始化 coordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files,
ReducerCnt: nReduce,
DistPhase: MapPhase,
MapTaskChannel: make(chan *Task, len(files)),
ReduceTaskChannel: make(chan *Task, nReduce),
TaskMetaHolder: TaskMetaHolder{
taskMap: make(map[int]*TaskMetaInfo, len(files)+nReduce), // 任务总数 = map + reduce 即 len(files)+nReduce
},
}
// 将 Map 任务放到 Map 管道中,对应的 taskMetaInfo 放到 taskMetaHolder 中
c.makeMapTasks(files)

// Your code here.

c.server()
return &c
}

将 Map 任务放到 Map 管道中,对应的 taskMetaInfo 放到 taskMetaHolder 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 对 map 任务进行处理,初始化 map 任务
func (c *Coordinator) makeMapTasks(files []string) {
for _, file := range files {
id := c.generateTaskId()
task := Task{
TaskId: id,
TaskType: MapTask,
ReducerCnt: c.ReducerCnt,
FileName: file,
}

// 初始化任务
taskMetaInfo := TaskMetaInfo{
state: Waiting, // 等待被执行
task: &task, // 任务地址
}

c.TaskMetaHolder.addMetaInfo(&taskMetaInfo)

fmt.Println("make a map task : ", &task)
c.MapTaskChannel <- &task
}
}

// 通过结构体的TaskId自增来获取唯一的任务id
func (c *Coordinator) generateTaskId() int {
id := c.TaskId
c.TaskId++
return id
}

// 对应的 taskMetaInfo 放到 taskMetaHolder 中
// 以 [taskId,*TaskMetaInfo] 的键值对存储
func (t *TaskMetaHolder) addMetaInfo(info *TaskMetaInfo) bool {
taskId := info.task.TaskId
meta := t.taskMap[taskId]
if meta != nil {
fmt.Println("meta contains task which id = ", taskId)
return false
} else {
t.taskMap[taskId] = info
}
return true
}

实现 worker 中的 调用 coordinator 的 RPC 方法 PullTask -> 分配任务

将 map 任务管道中的任务取出,判断是否正在执行,并将等待状态变为执行状态。如果取完管道中的 map 任务,表示此时的任务要么已经完成,要么正在执行。判断任务map任务是否全部完成,如果完成那么应该进入下一个任务处理阶段(ReducePhase),因为此时我们先验证map则直接跳过reduce直接allDone全部完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// PullTask 分发任务
func (c *Coordinator) PullTask(args *TaskArgs, reply *Task) error {
// 分发任务上锁,防止多个 worker 竞争,并用 defer 在函数执行完毕后,自动释放互斥锁
// 即使函数因为某种原因而出现了 panic ,也会确保互斥锁能够被正确释放
mu.Lock()
defer mu.Unlock()

// 判断 coordinator 分发阶段,根据 map / reduce 类型分发
switch c.DistPhase {
case MapPhase:
{
if len(c.MapTaskChannel) > 0 {
// 取出 map 任务
*reply = *<-c.MapTaskChannel
// 判断是否正在执行,并将等待状态变为执行状态
if !c.TaskMetaHolder.isWorking(reply.TaskId) {
fmt.Printf("task [ %d ] is running\n", reply.TaskId)
}
} else {
// MapTaskChannel == 0 表示 任务分发完成
// 但此时仍处于 map 阶段,此时将任务设为 等待执行状态
reply.TaskType = WaitingTask

// 检查任务完成状态,是否可以进入下一阶段
if c.TaskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
default:
{
reply.TaskType = ExitTask
}
}
return nil
}

// 判断当前任务是否执行,若处于等待执行状态则转换为执行状态
func (t *TaskMetaHolder) isWorking(taskId int) bool {
metaInfo, ok := t.taskMap[taskId]
if !ok || metaInfo.state != Waiting {
return false
}
metaInfo.state = Working
return true
}

// 检查任务完成状态,是否可以进入下一阶段
func (t *TaskMetaHolder) checkTaskDone() bool {
var (
mapDoneNum = 0 // map 任务完成数量
mapUnDoneNum = 0 // map 任务未完成数量
reduceDoneNum = 0 // reduce 任务完成数量
reduceUnDoneNum = 0 // reduce 任务未完成数量
)

for _, taskMeta := range t.taskMap {
if taskMeta.task.TaskType == MapTask {
if taskMeta.state == Done {
mapDoneNum++
} else {
mapUnDoneNum++
}
} else if taskMeta.task.TaskType == ReduceTask {
if taskMeta.state == Done {
reduceDoneNum++
} else {
reduceUnDoneNum++
}
}
}

fmt.Printf("map tasks are finished %d/%d, reduce task are finished %d/%d \n",
mapDoneNum, mapDoneNum+mapUnDoneNum, reduceDoneNum, reduceDoneNum+reduceUnDoneNum)

// map 阶段
if (mapDoneNum > 0 && mapUnDoneNum == 0) && (reduceDoneNum == 0 && reduceUnDoneNum == 0) {
return true
} else { // reduce 阶段
if reduceDoneNum > 0 && reduceUnDoneNum == 0 {
return true
}
}

return false
}

func (c *Coordinator) toNextPhase() {
if c.DistPhase == MapPhase {
// TODO 进入 Reduce 阶段
c.DistPhase = AllDone
} else if c.DistPhase == ReducePhase {
c.DistPhase = AllDone
}
}

实现 worder 中调用 coordinator 的 RPC 方法 MarkCompleted -> 将任务标记为完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// MarkCompleted 将指定任务标记为已完成
func (c *Coordinator) MarkCompleted(args *Task, reply *Task) error {
mu.Lock()
defer mu.Unlock()
switch args.TaskType {
case MapTask:
info, ok := c.TaskMetaHolder.taskMap[args.TaskId]
// 防止另一个 worker 重复工作
if ok && info.state == Working {
info.state = Done
fmt.Printf("Map task [ %d ] is completed.\n", args.TaskId)
} else {
fmt.Printf("Map task [ %d ] is already completed.\n", args.TaskId)
}
break
default:
panic("The task type is not defined!")
}
return nil
}

完善 coordinator 的 RPC 方法 Done 方法

如果任务全部完成 即 coordinator 处于 AllDone 状态,返回 true 使其能正常退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
mu.Lock()
defer mu.Unlock()
if c.DistPhase == AllDone {
fmt.Println("All tasks are completed , the coordinator will be exit")
return true
} else {
return false
}
// Your code here.
}

运行结果

至此map阶段已经能暂且构成一个循环,先运行 mrcoordinator.go 、再运行 mrworker.go 查看效果。

mrcoordinator.go 运行效果

lab1-运行结果-1

mrworker.go 运行效果

lab1-运行结果-2

生成的文件

lab1-运行结果-3

问题

问题1:这里不太明白为什么将设为等待执行,不就阻塞了吗 ,可能还处于 map 阶段,后面有其他考虑

lab1-问题-1

问题2:这里无法处理多个文件,处理完第一个文件后,无法处理后面的文件,原因暂时没找到,也许是问题1造成的?

lab1-问题-2

补充处理 reduce 任务的逻辑

大部分逻辑其实和 map 阶段类似。

完善 coordinator.go

初始化 reduce 任务

将 Reduce 任务放到 Reduce 管道中,对应的 taskMetaInfo 放到 taskMetaHolder 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 对 reduce 任务进行处理,初始化 reduce 任务
func (c *Coordinator) makeReduceTasks() {
for i := 0; i < c.ReducerCnt; i++ {
id := c.generateTaskId()
task := Task{
TaskId: id,
TaskType: ReduceTask,
FileSlice: findReduceTemp(i),
}

taskMetaInfo := TaskMetaInfo{
state: Waiting,
task: &task,
}

c.TaskMetaHolder.addMetaInfo(&taskMetaInfo)

fmt.Println("make a reduce task : ", &task)
c.ReduceTaskChannel <- &task
}
}

// 获取指定 reduce task 对应的中间文件名列表
func findReduceTemp(reduceNum int) []string {
var s []string
// 获取当前工作目录
dir, err := os.Getwd()
if err != nil {
log.Fatalf("cannot get wd")
}
// 读取该目录下的所有文件列表。
files, err := ioutil.ReadDir(dir)
if err != nil {
log.Fatalf("cannot read dir %v", dir)
}

// 遍历所有文件,如果文件名以 "mr-tmp" 为前缀且以指定的 reduce 编号为后缀,则将该文件名添加到字符串切片 s 中
for _, file := range files {
if strings.HasPrefix(file.Name(), "mr-tmp") && strings.HasSuffix(file.Name(), strconv.Itoa(reduceNum)) {
s = append(s, file.Name())
}
}

// 返回 s 切片
return s
}

这里需要对 Task 结构做些改变,由 Filename 字符串 变为 一个 FileSlice 字符串切片数组

1
2
3
4
5
6
7
// Task worker 向 coordinator 获取 task
type Task struct {
TaskType TaskType // 任务类型 map / reduce
TaskId int // 任务id
ReducerCnt int // 传入的 reducer 数量,用于hash
FileSlice []string // 输入文件, map 对应一个输入文件,reduce 对应多个 temp 中间文件
}

因为对于重新理了以下 MapReduce 框架就可知,输入阶段时,初始化一个 map 任务其实是对应一个输入文件,但是经过 map 过程来看,我们其实一个任务切分成了很多 tmp 文件,那么 reduce 任务输入则应该是一组哈希相同的中间文件。

PullTask(分配任务)补充分发 reduce 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// PullTask 分发任务
func (c *Coordinator) PullTask(args *TaskArgs, reply *Task) error {
// 分发任务上锁,防止多个 worker 竞争,并用 defer 在函数执行完毕后,自动释放互斥锁
// 即使函数因为某种原因而出现了 panic ,也会确保互斥锁能够被正确释放
mu.Lock()
defer mu.Unlock()

// 判断 coordinator 分发阶段,根据 map / reduce 类型分发
switch c.DistPhase {
case MapPhase:
{
if len(c.MapTaskChannel) > 0 {
// 取出 map 任务
*reply = *<-c.MapTaskChannel
// 判断是否正在执行,并将等待状态变为执行状态
if !c.TaskMetaHolder.isWorking(reply.TaskId) {
fmt.Printf("Map task [ %d ] is running\n", reply.TaskId)
}
} else {
// MapTaskChannel == 0 表示 任务分发完成
// 但此时仍处于 map 阶段,此时将任务设为 等待执行状态
reply.TaskType = WaitingTask

// 检查任务完成状态,是否可以进入下一阶段
if c.TaskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
case ReducePhase:
{
fmt.Printf("start reduce phase")
if len(c.ReduceTaskChannel) > 0 {
// 取出 reduce 任务
*reply = *<-c.ReduceTaskChannel
// 判断是否正在执行,并将等待状态变为执行状态
if !c.TaskMetaHolder.isWorking(reply.TaskId) {
fmt.Printf("Reduce task [ %d ] is running\n", reply.TaskId)
}
} else {
// ReduceTaskChannel == 0 表示 任务分发完成
// 但此时仍处于 reduce 阶段,此时将任务设为 等待执行状态
reply.TaskType = WaitingTask

// 检查任务完成状态,是否可以进入下一阶段
if c.TaskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
case AllDone:
{
reply.TaskType = ExitTask
}
default:
{
panic("The phase is not defined!")
}
}
return nil
}

补充 toNextPhase 进入 Reduce 阶段的逻辑

1
2
3
4
5
6
7
8
func (c *Coordinator) toNextPhase() {
if c.DistPhase == MapPhase {
c.makeReduceTasks()
c.DistPhase = ReducePhase
} else if c.DistPhase == ReducePhase {
c.DistPhase = AllDone
}
}

补充 MarkCompleted 将 Reduce 任务标记为已完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// MarkCompleted 将指定任务标记为已完成
func (c *Coordinator) MarkCompleted(args *Task, reply *Task) error {
mu.Lock()
defer mu.Unlock()
switch args.TaskType {
case MapTask:
info, ok := c.TaskMetaHolder.taskMap[args.TaskId]
// 防止另一个 worker 重复工作
if ok && info.state == Working {
info.state = Done
fmt.Printf("Map task [ %d ] is completed.\n", args.TaskId)
} else {
fmt.Printf("Map task [ %d ] is already completed.\n", args.TaskId)
}
break
case ReduceTask:
info, ok := c.TaskMetaHolder.taskMap[args.TaskId]
// 防止另一个 worker 重复工作
if ok && info.state == Working {
info.state = Done
fmt.Printf("Reduce task [ %d ] is completed.\n", args.TaskId)
} else {
fmt.Printf("Reduce task [ %d ] is already completed.\n", args.TaskId)
}
break
default:
panic("The task type is not defined!")
}
return nil
}

完善 worker.go

在 worker 里面构造发送请求 RPC 的方法,获取 Reduce 任务,并请求 coordinator 处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
flag := true
for flag {
task := GetTask()
switch task.TaskType {
case MapTask:
{
DoMapTask(mapf, &task)
callDone()
}
case ReduceTask:
{
DoReduceTask(reducef, &task)
callDone()
}
case WaitingTask:
{
fmt.Println("All tasks are in progress,Please wait...")
time.Sleep(time.Second * 5)
}
case ExitTask:
{
time.Sleep(time.Second)
fmt.Println("All tasks are Done,will be exit...")
flag = false
}
}

}

参考给定的 wc.go 、mrsequential.go 的 reduce 方法,编写自定义 reduce 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func DoReduceTask(reducef func(string, []string) string, response *Task) {
reduceFileNum := response.TaskId
// 将对应 tmp 文件的所有 kv 存放在一个数组中,并按照 Key 进行排序,得到 kv 数组 intermediate。
intermediate := shuffle(response.FileSlice)

// 获取当前工作目录
dir, err := os.Getwd()
if err != nil {
log.Fatalf("cannot get wd")
}
// 创建一个临时文件 tempFile,作为 Reduce Task 的输出文件
tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
if err != nil {
log.Fatalf("temp file is not exist")
}
i := 0
// 遍历 intermediate 数组,将具有相同 Key 的 Value 进行合并
for i < len(intermediate) {
j := i + 1
// 当 数组 中有多个键值相同时,使用指针 j 跳过相同的键,直到找到新的键为止
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// 将与相同键相关联的值收集到一个字符串数组values中
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}

// 调用 reducef 计算出对应 key 出现的次数
output := reducef(intermediate[i].Key, values)

// "Key output\n" 格式的字符串写入 tempFile 中。
_, _ = fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
err = tempFile.Close()
if err != nil {
log.Fatalf("close err file %v", tempFile)
}
outFile := fmt.Sprintf("mr-out-%d", reduceFileNum)
// 通过 os.Rename 函数将临时文件 tempFile 重命名为 mr-out-{reduceFileNum},作为 Reduce Task 的输出文件
_ = os.Rename(tempFile.Name(), outFile)
}

func shuffle(files []string) []KeyValue {
var res []KeyValue

// 遍历文件列表
for _, filePath := range files {
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("cannot open file %v", filePath)
}

decoder := json.NewDecoder(file)
for {
var kv KeyValue
// JSON 解码器 Decoder 读取 kv 键值对
if err := decoder.Decode(&kv); err != nil {
break
}
// 将所有的 kv 键值对 放到 res 列表中。
res = append(res, kv)
}
err = file.Close()
if err != nil {
log.Fatalf("close err file %v", file)
}
}
// 对 res 列表按照 key 进行排序
sort.Sort(SortedKey(res))
return res
}

这里使用 自定义类型 SortedKey,实现三个方法:Len()、Swap(i, j int) 和 Less(i, j int) 对 SortedKey 进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type SortedKey []KeyValue

// 实现 sort 包下的 Len,Swap,Less 方法才能排序
// 切片长度
func (k SortedKey) Len() int {
return len(k)
}

// 交换切片元素位置
func (k SortedKey) Swap(i, j int) {
k[i], k[j] = k[j], k[i]
}

// 判断哪个元素更小
func (k SortedKey) Less(i, j int) bool {
return k[i].Key < k[j].Key
}

运行结果

至此reduce阶段也基本完成,最后得到的结果:

lab1-运行结果-4

问题

问题1:依然是多个文件无法处理

lab1-问题-3

这里已经处理了两个 map 任务,但是日志打印却显示 只完成了一个,也就无法进入 reduce 阶段。

初步认为是 checkTaskDone 出现了问题。

lab1-问题-4

再往上走可能是因为 MarkCompleted

lab1-问题-5

也不排除是因为 PullTask 的 WaitingTask 导致 无法标记已完成

lab1-问题-6

// TODO 整个流程图