相关资料 重点理解MapReduce的机制
MapReduce(2004)翻译
MapReduce 深入浅出
思路 首先看一下 Lab 提供的 MapReduce 执行流程图
可以看出 MapReduce 的大致流程:启动一个Master( Coordinator 协调者)分配多个任务给 worker 做 Map 任务。
然后 Worker 完成 Map 任务后返回中间值一组 KV ,接着协调者再将这些 KV 分发给后继的Worker 根据 KV 进行 Reduce 任务,最后对 Reduce 进行一个总的处理进行返回。
实现 完成 worker 与 Coordinator 之间的交互,处理 map 任务 从实现来看我们可以先完成 worker 与Coordinator之间的交互,首先可以来看看给的 RPC 例子:首先运行 main/mrworker.go 会进入到 mr/Worker 的这个方法中。
可以在这个方法中调用RPC的例子方法:CallExample()。
然后 CallExample() 这个方法中会有一行:
1 ok := call("Coordinator.Example" , &args, &reply)
调用 Coordinator 包的 Example 方法(方法名开头为大写的代表可以为外包所调用),然后得到传修改后的 reply ,得到rpc返回值。这就是 coordinator 与 worker 的简单交互。
接下来,自定义 coordinator 与 worker 的交互,使其能处理 map 任务。
完善 rpc.go 自定义用于 RPC 的结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 type Task struct { TaskType TaskType TaskId int ReducerCnt int FileName string } type TaskArgs struct {} type TaskType int const ( MapTask TaskType = iota ReduceTask WaitingTask ExitTask ) type Phase int const ( MapPhase Phase = iota ReducePhase AllDone ) type State int const ( Working State = iota Waiting Done )
完善 worker.go 在 worker 里面构造发送请求 RPC 的方法,获取 Map 任务,并请求 coordinator 处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { flag := true for flag { task := GetTask() switch task.TaskType { case MapTask: { DoMapTask(mapf, &task) callDone() } case WaitingTask: { fmt.Println("All tasks are in progress,Please wait..." ) time.Sleep(time.Second * 5 ) } case ExitTask: { time.Sleep(time.Second) fmt.Println("All tasks are Done,will be exit..." ) flag = false } } } }
获取任务 1 2 3 4 5 6 7 8 9 10 11 12 13 func GetTask () Task { args := TaskArgs{} reply := Task{} ok := call("Coordinator.PullTask" , &args, &reply) if ok { fmt.Println("worker get [" , reply.TaskType, "] , TaskId = [" , reply.TaskId, "]" ) } else { fmt.Printf("call failed!\n" ) } return reply }
参考给定的 wc.go 、mrsequential.go 的 map 方法,编写自定义 map 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func DoMapTask (mapf func (string , string ) []KeyValue, response *Task) { var intermediate []KeyValue fileName := response.FileName file, err := os.Open(fileName) if err != nil { log.Fatalf("cannot open file %v" , fileName) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read file %v" , fileName) } err = file.Close() if err != nil { log.Fatalf("close err file %v" , fileName) } intermediate = mapf(fileName, string (content)) reducerCount := response.ReducerCnt HashedKV := make ([][]KeyValue, reducerCount) for _, kv := range intermediate { HashedKV[ihash(kv.Key)%reducerCount] = append (HashedKV[ihash(kv.Key)%reducerCount], kv) } for i := 0 ; i < reducerCount; i++ { fileName := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i) ofile, err := os.Create(fileName) if err != nil { log.Fatalf("cannot create file %v" , fileName) } encoder := json.NewEncoder(ofile) for _, kv := range HashedKV[i] { err := encoder.Encode(kv) if err != nil { log.Fatalf("encode err kv %v" , kv) } } err = ofile.Close() if err != nil { log.Fatalf("close err file %v" , ofile) } } }
做完任务也需要调用 RPC 在 coordinator 中将任务状态标记为已完成,以方便 coordinator 确认任务已完成,worker 与 coordinator 程序能正常退出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func callDone () Task { args := Task{} reply := Task{} ok := call("Coordinator.MarkCompleted" , &args, &reply) if ok { fmt.Println(reply) } else { fmt.Printf("call failed!\n" ) } return reply }
完善 coordinator.go 定义 coordinator 结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type Coordinator struct { ReducerCnt int TaskId int DistPhase Phase ReduceTaskChannel chan *Task MapTaskChannel chan *Task TaskMetaHolder TaskMetaHolder files []string } type TaskMetaInfo struct { state State task *Task } type TaskMetaHolder struct { taskMap map [int ]*TaskMetaInfo }
初始化 coordinator 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func MakeCoordinator (files []string , nReduce int ) *Coordinator { c := Coordinator{ files: files, ReducerCnt: nReduce, DistPhase: MapPhase, MapTaskChannel: make (chan *Task, len (files)), ReduceTaskChannel: make (chan *Task, nReduce), TaskMetaHolder: TaskMetaHolder{ taskMap: make (map [int ]*TaskMetaInfo, len (files)+nReduce), }, } c.makeMapTasks(files) c.server() return &c }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 func (c *Coordinator) makeMapTasks(files []string ) { for _, file := range files { id := c.generateTaskId() task := Task{ TaskId: id, TaskType: MapTask, ReducerCnt: c.ReducerCnt, FileName: file, } taskMetaInfo := TaskMetaInfo{ state: Waiting, task: &task, } c.TaskMetaHolder.addMetaInfo(&taskMetaInfo) fmt.Println("make a map task : " , &task) c.MapTaskChannel <- &task } } func (c *Coordinator) generateTaskId() int { id := c.TaskId c.TaskId++ return id } func (t *TaskMetaHolder) addMetaInfo(info *TaskMetaInfo) bool { taskId := info.task.TaskId meta := t.taskMap[taskId] if meta != nil { fmt.Println("meta contains task which id = " , taskId) return false } else { t.taskMap[taskId] = info } return true }
实现 worker 中的 调用 coordinator 的 RPC 方法 PullTask -> 分配任务 将 map 任务管道中的任务取出,判断是否正在执行,并将等待状态变为执行状态。如果取完管道中的 map 任务,表示此时的任务要么已经完成,要么正在执行。判断任务map任务是否全部完成,如果完成那么应该进入下一个任务处理阶段(ReducePhase),因为此时我们先验证map则直接跳过reduce直接allDone全部完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 func (c *Coordinator) PullTask(args *TaskArgs, reply *Task) error { mu.Lock() defer mu.Unlock() switch c.DistPhase { case MapPhase: { if len (c.MapTaskChannel) > 0 { *reply = *<-c.MapTaskChannel if !c.TaskMetaHolder.isWorking(reply.TaskId) { fmt.Printf("task [ %d ] is running\n" , reply.TaskId) } } else { reply.TaskType = WaitingTask if c.TaskMetaHolder.checkTaskDone() { c.toNextPhase() } return nil } } default : { reply.TaskType = ExitTask } } return nil } func (t *TaskMetaHolder) isWorking(taskId int ) bool { metaInfo, ok := t.taskMap[taskId] if !ok || metaInfo.state != Waiting { return false } metaInfo.state = Working return true } func (t *TaskMetaHolder) checkTaskDone() bool { var ( mapDoneNum = 0 mapUnDoneNum = 0 reduceDoneNum = 0 reduceUnDoneNum = 0 ) for _, taskMeta := range t.taskMap { if taskMeta.task.TaskType == MapTask { if taskMeta.state == Done { mapDoneNum++ } else { mapUnDoneNum++ } } else if taskMeta.task.TaskType == ReduceTask { if taskMeta.state == Done { reduceDoneNum++ } else { reduceUnDoneNum++ } } } fmt.Printf("map tasks are finished %d/%d, reduce task are finished %d/%d \n" , mapDoneNum, mapDoneNum+mapUnDoneNum, reduceDoneNum, reduceDoneNum+reduceUnDoneNum) if (mapDoneNum > 0 && mapUnDoneNum == 0 ) && (reduceDoneNum == 0 && reduceUnDoneNum == 0 ) { return true } else { if reduceDoneNum > 0 && reduceUnDoneNum == 0 { return true } } return false } func (c *Coordinator) toNextPhase() { if c.DistPhase == MapPhase { c.DistPhase = AllDone } else if c.DistPhase == ReducePhase { c.DistPhase = AllDone } }
实现 worder 中调用 coordinator 的 RPC 方法 MarkCompleted -> 将任务标记为完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (c *Coordinator) MarkCompleted(args *Task, reply *Task) error { mu.Lock() defer mu.Unlock() switch args.TaskType { case MapTask: info, ok := c.TaskMetaHolder.taskMap[args.TaskId] if ok && info.state == Working { info.state = Done fmt.Printf("Map task [ %d ] is completed.\n" , args.TaskId) } else { fmt.Printf("Map task [ %d ] is already completed.\n" , args.TaskId) } break default : panic ("The task type is not defined!" ) } return nil }
完善 coordinator 的 RPC 方法 Done 方法 如果任务全部完成 即 coordinator 处于 AllDone 状态,返回 true 使其能正常退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (c *Coordinator) Done() bool { mu.Lock() defer mu.Unlock() if c.DistPhase == AllDone { fmt.Println("All tasks are completed , the coordinator will be exit" ) return true } else { return false } }
运行结果 至此map阶段已经能暂且构成一个循环,先运行 mrcoordinator.go 、再运行 mrworker.go 查看效果。
mrcoordinator.go 运行效果
mrworker.go 运行效果
生成的文件
问题 问题1:这里不太明白为什么将设为等待执行,不就阻塞了吗 ,可能还处于 map 阶段,后面有其他考虑
问题2:这里无法处理多个文件,处理完第一个文件后,无法处理后面的文件,原因暂时没找到,也许是问题1造成的?
补充处理 reduce 任务的逻辑 大部分逻辑其实和 map 阶段类似。
完善 coordinator.go 初始化 reduce 任务 将 Reduce 任务放到 Reduce 管道中,对应的 taskMetaInfo 放到 taskMetaHolder 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (c *Coordinator) makeReduceTasks() { for i := 0 ; i < c.ReducerCnt; i++ { id := c.generateTaskId() task := Task{ TaskId: id, TaskType: ReduceTask, FileSlice: findReduceTemp(i), } taskMetaInfo := TaskMetaInfo{ state: Waiting, task: &task, } c.TaskMetaHolder.addMetaInfo(&taskMetaInfo) fmt.Println("make a reduce task : " , &task) c.ReduceTaskChannel <- &task } } func findReduceTemp (reduceNum int ) []string { var s []string dir, err := os.Getwd() if err != nil { log.Fatalf("cannot get wd" ) } files, err := ioutil.ReadDir(dir) if err != nil { log.Fatalf("cannot read dir %v" , dir) } for _, file := range files { if strings.HasPrefix(file.Name(), "mr-tmp" ) && strings.HasSuffix(file.Name(), strconv.Itoa(reduceNum)) { s = append (s, file.Name()) } } return s }
这里需要对 Task 结构做些改变,由 Filename 字符串 变为 一个 FileSlice 字符串切片数组
1 2 3 4 5 6 7 type Task struct { TaskType TaskType TaskId int ReducerCnt int FileSlice []string }
因为对于重新理了以下 MapReduce 框架就可知,输入阶段时,初始化一个 map 任务其实是对应一个输入文件,但是经过 map 过程来看,我们其实一个任务切分成了很多 tmp 文件,那么 reduce 任务输入则应该是一组哈希相同的中间文件。
PullTask(分配任务)补充分发 reduce 任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 func (c *Coordinator) PullTask(args *TaskArgs, reply *Task) error { mu.Lock() defer mu.Unlock() switch c.DistPhase { case MapPhase: { if len (c.MapTaskChannel) > 0 { *reply = *<-c.MapTaskChannel if !c.TaskMetaHolder.isWorking(reply.TaskId) { fmt.Printf("Map task [ %d ] is running\n" , reply.TaskId) } } else { reply.TaskType = WaitingTask if c.TaskMetaHolder.checkTaskDone() { c.toNextPhase() } return nil } } case ReducePhase: { fmt.Printf("start reduce phase" ) if len (c.ReduceTaskChannel) > 0 { *reply = *<-c.ReduceTaskChannel if !c.TaskMetaHolder.isWorking(reply.TaskId) { fmt.Printf("Reduce task [ %d ] is running\n" , reply.TaskId) } } else { reply.TaskType = WaitingTask if c.TaskMetaHolder.checkTaskDone() { c.toNextPhase() } return nil } } case AllDone: { reply.TaskType = ExitTask } default : { panic ("The phase is not defined!" ) } } return nil }
补充 toNextPhase 进入 Reduce 阶段的逻辑 1 2 3 4 5 6 7 8 func (c *Coordinator) toNextPhase() { if c.DistPhase == MapPhase { c.makeReduceTasks() c.DistPhase = ReducePhase } else if c.DistPhase == ReducePhase { c.DistPhase = AllDone } }
补充 MarkCompleted 将 Reduce 任务标记为已完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (c *Coordinator) MarkCompleted(args *Task, reply *Task) error { mu.Lock() defer mu.Unlock() switch args.TaskType { case MapTask: info, ok := c.TaskMetaHolder.taskMap[args.TaskId] if ok && info.state == Working { info.state = Done fmt.Printf("Map task [ %d ] is completed.\n" , args.TaskId) } else { fmt.Printf("Map task [ %d ] is already completed.\n" , args.TaskId) } break case ReduceTask: info, ok := c.TaskMetaHolder.taskMap[args.TaskId] if ok && info.state == Working { info.state = Done fmt.Printf("Reduce task [ %d ] is completed.\n" , args.TaskId) } else { fmt.Printf("Reduce task [ %d ] is already completed.\n" , args.TaskId) } break default : panic ("The task type is not defined!" ) } return nil }
完善 worker.go 在 worker 里面构造发送请求 RPC 的方法,获取 Reduce 任务,并请求 coordinator 处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { flag := true for flag { task := GetTask() switch task.TaskType { case MapTask: { DoMapTask(mapf, &task) callDone() } case ReduceTask: { DoReduceTask(reducef, &task) callDone() } case WaitingTask: { fmt.Println("All tasks are in progress,Please wait..." ) time.Sleep(time.Second * 5 ) } case ExitTask: { time.Sleep(time.Second) fmt.Println("All tasks are Done,will be exit..." ) flag = false } } }
参考给定的 wc.go 、mrsequential.go 的 reduce 方法,编写自定义 reduce 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 func DoReduceTask (reducef func (string , []string ) string , response *Task) { reduceFileNum := response.TaskId intermediate := shuffle(response.FileSlice) dir, err := os.Getwd() if err != nil { log.Fatalf("cannot get wd" ) } tempFile, err := ioutil.TempFile(dir, "mr-tmp-*" ) if err != nil { log.Fatalf("temp file is not exist" ) } i := 0 for i < len (intermediate) { j := i + 1 for j < len (intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append (values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values) _, _ = fmt.Fprintf(tempFile, "%v %v\n" , intermediate[i].Key, output) i = j } err = tempFile.Close() if err != nil { log.Fatalf("close err file %v" , tempFile) } outFile := fmt.Sprintf("mr-out-%d" , reduceFileNum) _ = os.Rename(tempFile.Name(), outFile) } func shuffle (files []string ) []KeyValue { var res []KeyValue for _, filePath := range files { file, err := os.Open(filePath) if err != nil { log.Fatalf("cannot open file %v" , filePath) } decoder := json.NewDecoder(file) for { var kv KeyValue if err := decoder.Decode(&kv); err != nil { break } res = append (res, kv) } err = file.Close() if err != nil { log.Fatalf("close err file %v" , file) } } sort.Sort(SortedKey(res)) return res }
这里使用 自定义类型 SortedKey,实现三个方法:Len()、Swap(i, j int) 和 Less(i, j int) 对 SortedKey 进行排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type SortedKey []KeyValuefunc (k SortedKey) Len() int { return len (k) } func (k SortedKey) Swap(i, j int ) { k[i], k[j] = k[j], k[i] } func (k SortedKey) Less(i, j int ) bool { return k[i].Key < k[j].Key }
运行结果 至此reduce阶段也基本完成,最后得到的结果:
问题 问题1:依然是多个文件无法处理
这里已经处理了两个 map 任务,但是日志打印却显示 只完成了一个,也就无法进入 reduce 阶段。
初步认为是 checkTaskDone 出现了问题。
再往上走可能是因为 MarkCompleted
也不排除是因为 PullTask 的 WaitingTask 导致 无法标记已完成
// TODO 整个流程图