6.824 Lab 1: MapReduce

 

一、Map/Reduce input and output


这两个函数主要做 User-Define function 与 MapReduce 库之间的衔接作用。在将 immediate K/V 写入中间文件时,应该是整个文件内容是一个 JSON 对象,而不是每个 K/V 一个 JSON 对象。由于对 Go 中 JSON 对象的操作不是很熟,这里还费了不少时间。

func doMap(
	jobName string, // the name of the MapReduce job
	mapTask int, // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run ("R" in the paper)
	mapF func(filename string, contents string) []KeyValue,
) {
	// 读取文件内容
	content, err := ioutil.ReadFile(inFile)
    if err != nil {
        panic(err)
    }
	// 传入 user-defined function,获得 immediate k/v
	kvs := mapF(inFile, string(content))
    tasks := make([][]KeyValue, nReduce)
    for _, kv := range kvs {
		// 根据 nReduce 个数 hash 分类
        num := ihash(kv.Key) % nReduce
        tasks[num] = append(tasks[num], kv)
    }
    for i, task := range tasks {
        filename := reduceName(jobName, mapTask, i)
        file, err := os.Create(filename)
        defer file.Close()
        if err != nil {
            panic(err)
		}
		// 一次性将一个 map task 下的一次中间 kv 存入中间文件
        kvsJson, err := json.Marshal(task)
        if err != nil {
            panic(err)
        }
        if _, err := file.Write(kvsJson); err != nil {
            panic(err)
        }
    }
}
func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTask int, // which reduce task this is
	outFile string, // write the output here
	nMap int, // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {
	kvs := make([]KeyValue, 0)
	for i := 0; i < nMap; i++ {
		imFileName := reduceName(jobName, i, reduceTask)
		content, err := ioutil.ReadFile(imFileName)
		if err != nil {
			panic(err)
		}
		kv := make([]KeyValue, 0)
		if err := json.Unmarshal(content, &kv); err != nil {
			panic(err)
		}	
		for _, v := range kv {
			kvs = append(kvs, v)
		}
	}
	kvMap := make(map[string][]string)
	for _, v := range kvs {
		kvMap[v.Key] = append(kvMap[v.Key], v.Value)
	}
	outFilePtr, err := os.Create(outFile)
	if err != nil {
		panic(err)
	}
	enc := json.NewEncoder(outFilePtr)
	for k, v := range kvMap {
		enc.Encode(KeyValue{k, reduceF(k, v)})
	}
	if err := outFilePtr.Close(); err != nil {
		panic(err)
	}
}
$ go test -run Sequential
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
PASS
ok  	6.824/src/mapreduce	1.964s

 

二、Single-worker word count


这部分比较简单,和论文中的例子一样。写一个词频统计的 mapF 和 reduceF。相关阅读:

第一篇文章是大神 Rob Pike 写的关于 Go 语言中 string 的文章。这篇文章写得深入浅出,值得一读。

func mapF(filename string, contents string) []mapreduce.KeyValue {
	// Your code here (Part II).
	// split the contents into words
	// use strings.FieldsFunc to split a string into components
	// 将输入文本拆分成 word,每个 word 由 unicode.IsLetter 决定
	// 返回值的 key 是 word
	// 分割点函数
	f := func (r rune) bool {
		return !unicode.IsLetter(r)
	}
	s := strings.FieldsFunc(contents, f)
	// use words as the keys
	wordMap := make(map[string]int)
	for _, str := range s {
		wordMap[str]++
	}
	wordKV := make([]mapreduce.KeyValue, 0, 32)
	for k, v := range wordMap {
		wordKV = append(wordKV, mapreduce.KeyValue{Key:k, Value:strconv.Itoa(v)})
	}
	return wordKV
}
func reduceF(key string, values []string) string {
	// Your code here (Part II).
	var cnt int = 0	
	// reduceF() will be called once for each key
	for _, v := range values {
		i, err := strconv.Atoi(v)
		if err != nil {
			panic(err)
		}
		cnt += i
	}
	// return a string containing the total number of occurences of the key.
	return strconv.Itoa(cnt)
}
$ bash ./test-wc.sh
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
Passed test

 

三、Distributing MapReduce tasks


这部分要求使用 RPC 在单机上模拟 MR 的分布式计算。相关阅读:

func schedule(jobName string, 
			  mapFiles []string, 
			  nReduce int, 
			  phase jobPhase, 
			  registerChan chan string) {
	...
	fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
	var wg sync.WaitGroup
	// 执行所有的 task
	for i := 0; i < ntasks; i++ {
		arg := DoTaskArgs{
			JobName:          jobName,
			Phase:            phase,
			TaskNumber:       i,
			File:	          mapFiles[i],
			NumOtherPhase: n_other}

		// 指定一个 worker 执行 task
		worker := <-registerChan
		wg.Add(1)
		go func (worker string, arg DoTaskArgs)  {
			call(worker, "Worker.DoTask", arg, nil)
			go func() {
                registerChan <- worker
            }()
			wg.Done()
		}(worker, arg)
	}
	wg.Wait()
	fmt.Printf("Schedule: %v done\n", phase)
}
go func() {
    registerChan <- worker
}()

新建协程来还 worker。这是因为 registerChan 是不带缓存的 channel,所以当执行某个 phase 的最后两个 task 的 registerChan <- worker 时候,由于不会再从 channel 中取值,所以会发生阻塞的情况。

 

四、Handling worker failures


这部分要求比价简单,虽说是 handler failure,但与 master 容错完全不同。容错联系会放到后面的 raft 实验。所以如果 worker 执行失败,只需重复的执行 tesk 即可。

for !call(worker, "Worker.DoTask", arg, nil) {
	// 如果执行错误,需要再次分配 worker
	worker = <-registerChan
}
$ bash ./test-mr.sh

==> Part I
ok  	mapreduce	1.966s

==> Part II
Passed test

==> Part III
ok  	mapreduce	3.444s

==> Part IV
ok  	mapreduce	3.881s