------------------------------------------------------------------------ Title: Functional Model of Hadoop MapReduce and Their Application to Scan Author: Kiminori Matsuzaki ------------------------------------------------------------------------ > module MRModel where > > import Data.List > import Data.Ord ------------------------------------------------------------------------ 1. Introduction This Haskell program is to develop and test functional models of Hadoop MapReduce. This is a suppliment of the paper submited to HLPP2015. ------------------------------------------------------------------------ 2. Preliminaries The line after "--" is a comment in Haskell. In this literate- programming style, only the lines starting at ">" are the program code, and the others are dealt as comments. 2.1 Basic Notations Function composition is denoted by "." and (f . g) x = f (g x) holds. The identity function is id. The last expression could be written without brackets using "$" operator: f (g a) = f $ g a. Anonymous functions (lambda expressions) are denoted with "\" and "->" , and the following two are the smae. f = \x -> 2*x f x = 2 * x We may use _ for a parameter, showing that we don't care the value of it. In this paper, we uses two type classes Eq and Ord to clarify the condition on the datatype. For any datatype that belongs to Eq, we have the operator "==". For any datatype a that belongs to Ord, we have the function compare of type compare :: a -> a -> Ordering Here the type Ordering has three constructors, defined as follows. data Ordering = LT | EQ | GT Tuples consist of a finite number of values: a pair (a, b) or a triple (a, b, c) for example. Function fst takes the first element of the pair, snd the second element. The following function applyW applies the parameter function and make a pair with the input and output. > applyW :: (a->b) -> a -> (b, a) > applyW f a = (f a, a) 2.2 Lists We used the following functions, which are defined in Haskell. head, init, last, take, drop, (!!), map, foldl Instead of scanl in Haskell, we will use the following definition for scan (prescan). > scan op e as = init $ scanl op e as Function flatten takes a list of lists (a nested list) and concatenates all the inner lists. Function sortBy takes a comparison function and a list, and sort the elements in the list in terms of the comparison function. Function partition takes a predicate p and a list, and split the list into two lists where the former includes all the elements that satisfy the predicate, and the latter includes all the elements that do not satisfy the predicate. sortBy :: (a -> a -> Ordering) -> [a] -> [a] partition :: (a -> Bool) -> [a] -> ([a], [a]) > flatten :: [[a]] -> [a] > flatten = foldl (++) [] 2.3 Bags (Multi-sets) We define the data structure of bags (multi-set) and functions manipulating the bags. Here, we define the type Bag for bags in the same way as that for lists. > data Bag a = NilB | ConsB a (Bag a) > deriving (Show, Eq) The following functions list2bag and bag2list convert a list to a bag and vice versa. > bag2list :: Bag a -> [a] > bag2list NilB = [] > bag2list (ConsB a ab) = a : bag2list ab > > list2bag :: [a] -> Bag a > list2bag [] = NilB > list2bag (a : as) = ConsB a (list2bag as) Function mapB is a bag version of the map function for a list. Function mapB takes a unary function and a bag and applies the function to evely element in the bag. > mapB :: (a -> b) -> Bag a -> Bag b > mapB f NilB = NilB > mapB f (ConsB a x) = ConsB (f a) (mapB f x) Function flattenB takes a bag of lists of elements and flattens it into a bag of elements. > flattenB :: Bag [a] -> Bag a > flattenB = list2bag . flatten . bag2list Functions headB, partitionB and sortByB are the bag version of head, partition, and sortBy, respectively. > headB :: Bag a -> a > headB (ConsB a x) = a > > sortByB :: (a -> a -> Ordering) -> Bag a -> [a] > sortByB comp xb = sortBy comp (bag2list xb) > > partitionB :: (a -> Bool) -> Bag a -> (Bag a, Bag a) > partitionB f x = let (y1, y2) = partition f $ bag2list x > in (list2bag y1, list2bag y2) ------------------------------------------------------------------------ 3. A Low-Level Model of Hadoop MapReduce 3.1 Outline The most important two classes in MapReduce programs are Mapper and Reducer, which specify the main computation in MapReduce programs. We start by giving the type of them. In Hadoop MapReduce, a large input file is divided in two stages (Fig. 3). Firstly the whole data are divided into splits to the size that a single computer can deal with, and then each split is divided into smaller records (e.g. by lines). Splits often correspond to data chunks on DFS, we cannot take care of the order among them. In contrast, a split is processed on a single computer, we can mind the order among the records. A mapper (Mapper) takes a split for its input: a list of key-value pairs. The output of the mapper is a list of key-value pairs where the types of keys and/or values may differ from those of the input. mapper :: [ (k1, v1) ] -> [ (k2, v2) ] Before the Reduce phase, values of the same key are arranged into a list (we will discuss later how these keys are evaluated and merged). In a similar way to the mapper, a part of the intermediate results will be given for a reducer: the input type of the reducer becomes a list of pairs of a key and a list of values. The output of the reducer is also a list of key-value pairs with possibly different types. reducer :: [ (k2, [v2]) ] -> [ (k3, v3) ] Other important parameters in Hadoop MapReduce programs are those for controlling the Shuffle phase. We can specify three classes (functions) through setPartitionerClass, setSortComparatorClass, and setGroupingComparatorClass. Hereafter, we denote the three functions set by the above three as hashP, compS, and compG, respectively. From the signatures of these functions in Hadoop, we give their types as follows. hashP :: (k2, v2) -> Int compS :: k2 -> k2 -> Ordering compG :: k2 -> k2 -> Ordering With these parameter functions, we give a rough model of Hadoop MapReduce. > mapReduceL :: ([(k1, v1)] -> [(k2, v2)]) -- mapper > -> ([(k2, [v2])] -> [(k3, v3)]) -- reducer > -> ((k2, v2) -> Integer) -- hashP > -> (k2 -> k2 -> Ordering) -- compS > -> (k2 -> k2 -> Ordering) -- compG > -> Bag [(k1, v1)] -- input > -> Bag [(k3, v3)] -- output > mapReduceL mapper reducer hashP compG compS input > = let aftMap = mapB mapper input > bfrRed = shuffleMR hashP compG compS aftMap > in mapB reducer bfrRed 3.2 Shuffle phase In the shuffle phase, basically the key-value pairs that are outputted from mapper are grouped together based on their keys. There is also a sorting mechanism in Hadoop MapReduce. The shuffle phase in Hadoop MapReduce consists of following three processing in this order: 1. Partitioning: For each key-value pair, the ID of the reducer is computed using the parameter function hashP. 2. Sorting: All the key-value pairs of the same reducer ID are sorted in terms of the comparator function compS. 3. Grouping: After the sorting, the key-value pairs that have the same key in terms of the comparator function compG are grouped together to be passed to the reducer. Here is a definition of shuffleMR. > shuffleMR :: ((k2, v2) -> Integer) -- hashP > -> (k2 -> k2 -> Ordering) -- compS > -> (k2 -> k2 -> Ordering) -- compG > -> Bag [(k2, v2)] -- input > -> Bag [(k2, [v2])] -- output > shuffleMR hashP compS compG input > = let aftP = grpByID $ mapB (applyW hashP) $ flattenB input > aftS = mapB (sortByKey compS) aftP > in mapB (grpByKey compG) aftS > grpByID :: Eq a => Bag (a, b) -> Bag (Bag b) > grpByID NilB = NilB > grpByID (ConsB a x) = let (xa, xo) = partitionB (\b->fst a==fst b) x > in ConsB (mapB snd (ConsB a xa)) (grpByID xo) > sortByKey :: (k2 -> k2 -> Ordering) -> Bag (k2, v2) -> [(k2, v2)] > sortByKey compS = sortByB (\(k, v) (k',v')->compS k k') > grpByKey :: (k2 -> k2 -> Ordering) -> [(k2, v2)] -> [(k2, [v2])] > grpByKey compG [] = [] > grpByKey compG ((k, v) : x) = grpByKey' compG [] k [v] x > grpByKey' compG rs k vs [] = rs ++ [(k, vs)] > grpByKey' compG rs k vs ((k',v') : x) > | compG k k' == EQ = grpByKey' compG rs k' (vs ++ [v']) x > | otherwise = grpByKey' compG (rs ++ [(k,vs)]) k' [v'] x 3.3 Map and Reduce Phases In the Hadoop MapReduce, mapper or reducer are parameter functions that take a list of key-value pairs, and users should develop those classes by overriding the following three methods. - setup is called once for each split, before processing the key-value pairs. - map (or reduce) is called for each key-value pair. - cleanup is called once for each split, after processing the key-value pairs. Usually (and in the simplest case), users only provide the map (or reduce) function f_map. Then, the computation of the mapper class is spcified by the following function mkMapper1. > mkMapper1 :: ((k1, v1) -> [(k2, v2)]) -- f_map > -> [(k1, v1)] -> [(k2, v2)] -- input/output > mkMapper1 f_map = flatten . map f_map We can have attributes in the Mapper class, and use them for the computation. For example, we can start from an initial value z that is set in setup function, then update the value at every call of map function, and finally output the accumulated value in the cleanup function. The computation of such a mapper class is specified by the following function mkMapper2 that takes three parameter functions corresponding to setup, map, and cleanup. > mkMapper2 :: att -- f_setup > -> (att -> (k1,v1) -> att) -- f_map > -> (att -> [(k2,v2)]) -- f_cleanup > -> [(k1,v1)] -> [(k2,v2)] -- input/output > mkMapper2 f_setup f_map f_cleanup > = f_cleanup . foldl f_map f_setup We can combine these two: we use attributes for accumulating some information through the list of key-value pairs and at the same time output key-value pairs from map function. > mkMapper3 :: att -- f_setup > -> (att -> (k1,v1) -> (att, [(k2, v2)])) -- f_map > -> (att -> [(k2,v2)]) -- f_cleanup > -> [(k1,v1)] -> [(k2,v2)] -- input/output > mkMapper3 f_setup f_map f_cleanup xs > = let a = f_setup > (a', ys) = aux f_map a [] xs > in ys ++ f_cleanup a' > where aux f_map a ys [] = (a, ys) > aux f_map a ys (kv : xs) = let (a', ys') = f_map a kv > in aux f_map a' (ys ++ ys') xs We can develop the function for reducer in the same manner. For example, the most simple case mkReducer1 can be defined as follows. Note that the difference is only in the type of f_reduce. > mkReducer1 :: ((k2, [v2]) -> [(k3, v3)]) -- f_reduce > -> [(k2, [v2])] -> [(k3, v3)] -- input/output > mkReducer1 f_reduce = flatten . map f_reduce 3.4 Including Combiner In MapReduce programming, a combiner is inserted between the Map phase and the Shuffle phase. It merges some key-value pairs from the preceding Mapper and improves the performance by reducing the intermediate data. We can embed the combiner into our model just by function composition of mapper and combiner; mapper' = combiner . mapper. The type of the combiner should be as follows to support this. combiner :: [(k2, v2)] [(k2, v2)] Here is a simplified definition of mkCombiner1 that shows the computation with combiner . To make the definition simple, we assume that we can check the equality on keys. In the real implementation, this grouping is executed with Comparators as the shuffleMR does. > mkCombiner1 :: Eq k2 > => ((k2, [v2]) -> [(k2, v2)]) -- f_combine > -> [(k2, v2)] -> [(k2, v2)] > mkCombiner1 f_combine input > = flatten $ map f_combine $ grpByIDC input > > grpByIDC :: Eq k => [(k, v)] -> [(k, [v])] > grpByIDC [] = [] > grpByIDC ((k,v):x) = let (xa, xo) = partition (\(k',v')->k == k') x > in (k, v : map snd xa) : grpByIDC xo 3.5 Word Count Example on Low-Level Model > wc :: Bag [(Int, [Char])] -> Bag [([Char], Int)] > wc = mapReduceL mapperWC reducerWC partitionWC compare compare > > mapperWC = mkMapper1 (\(_, w)->[(w, 1)]) > reducerWC = mkReducer1 (\(w, as)->[(w, foldl (+) 0 as)]) > partitionWC (_,_) = 1 ------------------------------------------------------------------------ 4. A High-Level Model of Hadoop MapReduce The model in Section 4 was developed based on the implementation of Hadoop MapReduce. Its Shuffle phase, however, is not easy to handle for two reasons. First reason is that we require two comparison functions compS and compG for a single key and they should be consistent: compS a b = EQ ==> compG a b = EQ. Second reason is that the order of subphases: it is more intuitive and easier to understand if the three subphases are executed as: 1. we partition the whole data to reducer tasks, 2. we then group the key-value pairs inside a single reducer task, and 3. finally we sort the key-value pairs inside each group. These steps of execution are often called secondary sorting. Note that in the implementation of Hadoop MapReduce, there is no sorting subphase after the grouping, and thus there is a gap between the above steps of executions and the real implementation. In this section, we propose another model for the Shuffle phase (and for MapReduce) to bridge the gap. The key idea is to introduce three keys instead of using a single key k2 for intermediate data: kP for partition, kG for grouping, and kS for sorting. Therefore, now we assume that the intermediate data passed from mapper to reducer are in the form ((kP , kG, kS), v2). Here, we assume that kP belongs to the Eq class and kG and kS belong to the Ord class. The following are three functions used to extract the keys. > getP ((kP, kG, kS), v2) = kP > getG ((kP, kG, kS), v2) = kG > getS ((kP, kG, kS), v2) = kS With the extended key-value pairs, we can develop a new definition for the Shuffle phase. Note that we have the key-value pairs sorted after the grouping in this definition. > shuffleMR2 :: Eq kP => Ord kG => Ord kS -- requirements > => Bag [((kP, kG, kS), v2)] -- input > -> Bag [((kP, kG, kS), [v2])] -- output > shuffleMR2 input > = let aftP = grpByID $ mapB (applyW getP) $ flattenB input > aftG = mapB (sortByB compGS . grpByID . mapB (applyW getG)) aftP > where compGS a b = compare (getG $ headB a) (getG $ headB b) > in mapB (map sortMerge) aftG > where sortMerge xs = let ss = sortByB compSS xs > in (fst $ last ss, map snd ss) > compSS a b = compare (getS a) (getS b) This shuffleMR2 can work as a high-level model of the Shuffle phase, because we can derive the functions required in the low-level model as follows. In this program, we assume that we use only Integers for kP. > hashP2 ((kP, kG, kS), v2) = id kP > compS2 (kP, kG, kS) (kP', kG', kS') | kG == kG' = compare kS kS' > | otherwise = compare kG kG' > compG2 (kP, kG, kS) (kP', kG', kS') = compare kG kG' A high-level functional model of Hadoop MapReduce is given with this shuffleMR2. In this model, we have requirements of type classes but less number of parameters are required than those in the low-level model. > mapReduceH :: Eq kP => Ord kG => Ord kS > => ([(k1,v1)] -> [((kP,kG,kS),v2)]) -- mapper > -> ([((kP,kG,kS), [v2])] -> [(k3, v3)]) -- reducer > -> Bag [(k1,v1)] -- input > -> Bag [(k3,v3)] -- output > mapReduceH mapper reducer input > = let aftMap = mapB mapper input > bfrRed = shuffleMR2 aftMap > in mapB reducer bfrRed Here we briefly show the program for well-known word-count application. The mapper and reducer are almost the same as those for the low-level model. Since we need explicitly three keys, the fmap function sets (1, w, 1) for the key of intermediate results. > wc2 :: Bag [(Int, [Char])] -> Bag [([Char], Int)] > wc2 = mapReduceH mapperWC2 reducerWC2 > > mapperWC2 = mkMapper1 (\(_, w)->[((1, w, 1), 1)]) > reducerWC2 = mkReducer1 (\((_, w, _), as)->[(w, sum as)]) ------------------------------------------------------------------------ 5. Implementing List Scan on MapReduce Model 5.1 A Two-Pass MapReduce Implementation for scanDist (3-phase distributed scan) Here is the original definition for 3-phase distributed scan. It consists of the followeng three phase: 1. local reduce 2. global scan 3. local scan > scanDist :: (a -> a -> a) -> a -> a -> [[a]] -> [[a]] > scanDist op i_op e xss > = let ys = map (foldl op i_op) xss -- local reduce > zs = scan op e ys -- global scan > in map scanl' (zip xss zs) -- local scan > where scanl' (xs, z) = scan op z xs Now we discuss the MapReduce implementation. Input is a bag of lists of key-value pairs, where a pair consists of a global index and a value. > scanDist_MR op i_op e input > = let ConsB [(_, gs)] _ = mapReduceH mapper1 reducer1 input > in mapReduceH (mapper2 gs) reducer2 input > where > mapper1 = mkMapper2 (0,i_op) -- f_setep > (\(l,s) (k,v)->(k, s `op` v)) -- f_map > (\(l,s)->[((1,1,l), s)]) -- f_cleanup > reducer1 = mkReducer1 (\(_, ss)->[(1, scan op e ss)]) > mapper2 gs = mkMapper3 i_op -- f_setup > (\s (k,v)->let p = div k 3 -- f_map > v' = (gs !! p) `op` s > in (s `op` v, [((p, k, 1), v')])) > (\s -> []) -- f_cleanup > reducer2 = mkReducer1 (\((_, k, _), vs)->[(k, head vs)]) 5.2 A One-Pass MapReduce Implementation for scanBSP (BSP scan) > scanBSP :: (a->a->a) -> a -> a -> Int -> [[a]] -> [[a]] > scanBSP op i_op e p xss > = let ys = map (foldl op i_op) xss > zss = map (\p->take p ys) [0..(p-1)] -- 1st superstep > in map scan' (zip xss zss) -- 2nd superstep > where scan' (xs, zs) = scan op (foldl op e zs) xs Here is the one-pass MapReduce implementation of the scan_BSP. The input is the same as that for scanDist_MR, a bag of lists of key-value pair, where a pair consists of a global index and a value. The parameter pp is the number of BSP processes. We hard-coded the number of key-value pairs 3 in a element of bag, for the simplicity of the program. The Map phase corresponds to the local computation in the first superstep, the Shuffle phase corresponds to the communication in the first superstep, and the Reduce phase corresponds to the second superstep. > scanBSP_MR op i_op e pp xb > = mapReduceH mapper reducer xb > where > mapper = mkMapper3 fs fm fc > fs = (0, i_op) -- f_setup > fm (_,s) (k,v) = let p = k `div` 3 > in ((p, s `op` v), [((p, 1, k), v)]) -- f_map > fc (p,s) = map (\p'->((p', 1, -1000+p), s)) [p+1..pp-1] -- f_cleanup > reducer = mkReducer1 fr > fr ((p,_,_), vs) = let zs = take p vs > xs = drop p vs > in zip [(3*p)..] (scan op (foldl op e zs) xs) ------------------------------------------------------------------------ Appendix A. Code for testing A.1 Test for bugs. > bagTest = ConsB 'a' (ConsB 'b' NilB) > bagTest2 = list2bag ['a', 'b', 'c'] A.2 Test for Ordering > comp :: Int -> Int -> Ordering > comp a b | a < b = LT > comp a b | a == b = EQ A.3 Test for Shuffling > testForShuffleMR :: Bag [(Int, [Char])] > testForShuffleMR = list2bag ([[(1, "a"), (4, "a"), (3, "a"), (2, "a")], > [(3, "b"), (4, "b"), (3, "b")]]) > fp_1 (k,v) = k `mod` 2 > fs_1 :: Int -> Int -> Ordering > fs_1 k1 k2 | k1 < k2 = LT > | k1 == k2 = EQ > | k1 > k2 = GT > fg_1 = fs_1 We test that the Shuffle phase by shuffleMR (in the Low-Level Model) and shuffleMR2 (in the High-Level Model) are the same. > testSMR2 = list2bag ([[((1, 1, 1), "a"), ((1, 3, 2), "b"), ((2, 2, 1), "c"), ((1, 3, 1), "d")], > [((2, 1, 3), "e"), ((1, 1, 0), "f"), ((2, 2, 3), "g"), ((2, 1, 1), "h")]]) > > testForTwoShuffle = (shuffleMR hashP2 compS2 compG2 testSMR2 == shuffleMR2 testSMR2) A.4 test for scanDist Inputs > nested_list = [[3, 1, 4], [1, 5, 9], [2, 6, 5]] > > bag_nested_list :: Bag [(Int, Int)] > bag_nested_list = list2bag [[(3, 1), (4, 5), (5, 9)], [(6, 2), (7, 6), (8, 5)], [(0, 3), (1, 1), (2, 4)]] Test > result_scanDist = scanDist (+) 0 100 nested_list > result_scanDist_MR = scanDist_MR (+) 0 100 bag_nested_list A.5 test for scanBSP Test > result_scanBSP = scanBSP (+) 0 100 3 nested_list > result_scanBSP_MR = scanBSP_MR (+) 0 100 3 bag_nested_list