IT 프로그래밍/빅데이터

[Bigdata] MapReduce구현 완벽정리

잉휴 2021. 3. 10. 17:34

1. 구성요소

HDFS의 구성요소는 클라이언트,네임 노드,데이터 노드 3가지가 있다.


클라이언트는 아시다시피 데이터 송수신을 요청하는 곳이고,
네임노드는 config, mongos같은 역할을 하고
데이터노드는 shard처럼 실제 데이터 저장역할을 한다.

(저번에 올린 글에서 config,mongos,shard의 역할을 다루었습니다)
*노드=컴퓨터

 

2. Finalize & 맵리듀스(mapReduce)

하나의 key,value방식으로 다 나누는 것을 맵 리듀스라고 한다
key값이 같은 것들을 모아서=> map( )함수단계

key별로 계산한다=>reduce( )함수단계

 


다음 mongodb쿼리를 cmd창에 하나씩 입력해준다(//는 설명)

use test
db.words.save({text:'read a book'})
db.words.save({text:'write a book'})
db.words.find()

var map=function(){
var res=this.text.split(' ')
for(var i in res){
	key={word:res[i]};
	value={count:1};
	emit(key,value);
	}
}
//this는 map을 사용할 컬렉션
//res[i]sms read,a,book3개임을 나타냄
//여기까지 map()과정-shuffling함
//read[1]
//a[1,1]
//book[1,1]
//write[1]

var reduce=function(key,values){
	var totalcount=0;
	for(var i in values){
		totalcount=values[i].count+totalcount;
	}
	return{count: totalcount}
}
//totalcount=values[i].count+totalcount; 이거는
//totalcount+=values[i].count; 요거랑 같은의미
//count는 총 네 번이 이루어짐(read,a,book,write)이렇게 4개

db.words.mapReduce(map,reduce,'wordcount'); 
//wordcount라는 컬렉션을 만들어서 map,reduce를 넣겟다라는 뜻

출력결과

여기서는 단순히 map과 reduce와 'wordcount'라는 이름으로 result값을 얻어냈다면

여러 데이터를 Finalize함수로 추가적인 데이터 처리를 할 수도 있고,

증분 맵리듀스(reduce,replace,merge)로 document가 추가될 때 기존결과와 병합하는 방식을 쓸 수 도 있다.

 

1. Finalize

var order_map=function(){
for(var i=0;i<this.items.length;i++){
	var key=this.items[i].item_name;
	var value={count:1,qty:this.items[i].gty} 
	emit(key,value);
	}
}

var order_reduce=function(key,values){
	reduceValue={count:0,qty:0}
	for(var i=0;i<values.length;i++){
		reduceValue.count+=values[i].count;
		reduceValue.qty+=values[i].qty;
	}
	return reduceValue;
}

var order_finalize=function(key,value){
	value.avg=value.qty/value.count;
	return value;
}

db.order.mapReduce(order_map,order_reduce,{out:'order_out',finalize:order_finalize} 는 이처럼

map(order_map)과 reduce(order_reduce)함수를 쓰고 난후 추가적으로 avg(평균)값을 필요로 할 때 쓰인다.

 

2. 증분 맵리듀스

 

다음은 merge를 이용한 방법이다.

merge>

emp_out2 avg잘못써서 NaN..

db.order.mapReduce(order_map,order_reduce,{out:'order_out',finalize:order_finalize};

 

db.employees.mapReduce(mapper,reducer,{out:{merge:'emp_out2'},finalize:Finalizer});

이 부분이 추가 되었음을 알 수 있다.

 

db.login_history.insert({name : "matt", login_date : ISODate('2017-01-03 14:17:00')});
db.login_history.insert({name : "lara", login_date : ISODate('2017-01-09 10:07:00')});
db.login_history.insert({name : "todd", login_date : ISODate('2017-01-12 23:21:00')});
db.login_history.insert({name : "matt", login_date : ISODate('2017-01-22 02:54:00')});
db.login_history.insert({name : "todd", login_date : ISODate('2017-01-28 22:21:00')});

db.login_history.insert({name : "matt", login_date : ISODate('2017-02-03 14:17:00')});
db.login_history.insert({name : "todd", login_date : ISODate('2017-02-12 20:21:00')});
db.login_history.insert({name : "matt", login_date : ISODate('2017-02-22 12:54:00')});
db.login_history.insert({name : "todd", login_date : ISODate('2017-02-28 12:00:00')});

위 예시를

1월에 matt:2, lara:1, todd:2

2월에 matt:2, lara:0, todd:2

로 lara만 삭제한 상태를 더했을 때 reduce,replace,merge의 각각 출력했을 때 나오는 값이다.

결과>reduce

{ "_id" : "matt", "value" : { "login_count" : 2 } }
{ "_id" : "todd", "value" : { "login_count" : 2 } }

결과>replace
{ "_id" : "matt", "value" : { "login_count" : 1 } }
{ "_id" : "todd", "value" : { "login_count" : 1 } }

결과>merge
{ "_id" : "lara", "value" : { "login_count" : 1 } }
{ "_id" : "matt", "value" : { "login_count" : 2 } }
{ "_id" : "todd", "value" : { "login_count" : 2 } }