lab1 要求按照论文实现一个mapReduce 框架
lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
论文:https://zhuanlan.zhihu.com/p/122571315
在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。
整体框架流程:
Coordinator 是协调器,负责
① 给woker分发任务
② 合并由map任务执行产生的中间文件
③ 任务超时重新分配任务
woker 是工作器,负责
①循环申请map 或reduce任务
先看woker:
worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务
//申请任务 for { args := Args{} args.Signal = REQUEST_WORKER reply := RpcCall(args) switch reply.STATUS { case COORDINATOR_MAP: //获得map任务 MapHandle(&reply,mapf)
case COORDINATOR_REDUCE: //获得reduce任务 ReduceHandle(&reply,reducef)
case COORDINATOR__MAP_END: //没申请到任务重新获取
continue
case END: //结束
return
}
Recude任务
处理方式和mrsequential.go中几乎是一样的不多说了。
map任务
会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)
kva是通过mapf 对文件处理得到的数据。
我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。
kva := MapMachingFile(reply.FileName, mapf) midFileName := \"mr-out-\" + reply.FileName chanArray := make([]chan KeyValue, 10) for i := 0; i < 10; i++ { chanArray[i] = make(chan KeyValue, 10) } //开启reduceNumber个文件写入线程 var w sync.WaitGroup var mapW sync.WaitGroup w.Add(reply.Neduce) mapW.Add(2) for i := 0; i < 10; i++ { go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w) } // 开启分发线程,分发数据到文件写入线程 lenght := len(kva) go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW) go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW) //所有分发线程结束 mapW.Wait() for cIndex := 0; cIndex < 10; cIndex++ { close(chanArray[cIndex]) } //所有文件写入线程结束 w.Wait()
worker结束剩下看Coordinator 。
1 type Coordinator struct { 2 filebit //数据分发记录 3 Nreduce int 4 midFileMergeC chan int 5 Mergefiled //已处理数据记录 6 monitorC []chan int //监听每个worker是否按时完成 7 STATUS 8 RedeceS 9 *sync.Mutex 10 End bool 11 }
Coordinator 结构记录的信息主要为三部分
2、3、4、5行记录map相关
6 为监听chan,监听任务是否超时
7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束
判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。
//信号处理 func (c *Coordinator) SignalTask (args *Args, reply *Reply) error { switch args.Signal { case REQUEST_WORKER: c.distributeTask(args,reply) //中间文件处理 case COMPLETE: c.midFileMerge(args,reply) } return nil }
在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给Coordinator,Coordinator通过chan将数据发给每个文件合并线程StartMergeFile。
举个例子
workerMap A产生了 1,2,3 个中间文件
1号文件 合并到 mr-out-m-1
2号文件 合并到 mr-out-m-2
3号文件 合并到 mr-out-m-3
workerMap B 又产生1、2、3个中间文件
1号文件 合并到 mr-out-m-1
2号文件 合并到 mr-out-m-2
3号文件 合并到 mr-out-m-3
//开启Nreduce个中间文件写入线程 //返回文件写入chan 切片 func (c *Coordinator)runFileWorker () []chan int { cLi := make([]chan int,c.Nreduce) for i := 0 ; i < c.Nreduce ; i ++ { cLi[i] = make(chan int,10) } for fid := 0 ; fid < c.Nreduce ; fid ++ { go c.StartMergeFile(fid,cLi[fid]) } return cLi }
记录信息是否已处理的结构:
filebit 、ReduceS 核心是通过一个简单的bitmap实现的
type filebit struct{ rw *sync.Mutex bitMap file []string } type RedeceS struct { filebit }
type bitMap struct { bit int16 size int } //获取一个未使用位置 func (b *bitMap) GetOne() int { for i := int(0) ; i < b.size ; i ++ { if b.isZero(i) { b.seTUsed(i) return i } } //这里超过size限制会直接报错 return -1 } //第i位是否为0 //为0未使用 func (b *bitMap) isZero (index int) bool { return ((1 << index) & b.bit) == 0 } //设置index位已使用 func (b *bitMap) seTUsed (index int) { b.bit = (1 << index) | b.bit } func (b *bitMap) setEnUsed (index int) { b.bit = (0 << index) | b.bit }
任务超时处理:
func (c *Coordinator) monitorWorker (id int) { timer := time.NewTimer(time.Duration(time.Second*10)) select { case <-c.monitorC[id]: return case <-timer.C: //超时设置为未分配,重新分配 c.SetEnUsed(id) } }
每次分发一个任务出去,就会开启一个线程监听刚发送出去的任务。
当Coordinator 接收到任务完成信号,就会给任务id对应的信号监听函数发送信息,结束监听函数。
当未在规定时间内发送信号给监听函数,则将当前监听的任务id在filebit结构中标记在为未分发,重新轮循分发给下一个到来的worker。
如果这个未按时完成任务的worker后来完成任务并且发送信号过来,当这个任务已经还是为未分发状态则舍弃这个worker请求。
如果这个任务同时分发给了其他worker,则接收这个worker,舍弃最后来的。(这里设计的不太好)
来源:https://www.cnblogs.com/thotf/p/16458901.html
本站部分图文来源于网络,如有侵权请联系删除。