Hadoop에서 여러 MapReduce 작업 연결
MapReduce를 적용하는 많은 실제 상황에서 최종 알고리즘은 여러 MapReduce 단계입니다.
즉, Map1, Reduce1, Map2, Reduce2 등이 있습니다.
따라서 다음 맵의 입력으로 필요한 마지막 감소 출력이 있습니다.
중간 데이터는 파이프 라인이 성공적으로 완료되면 일반적으로 유지하고 싶지 않은 것입니다. 또한이 중간 데이터는 일반적으로 일부 데이터 구조 (예 : '맵'또는 '세트')이므로 키-값 쌍을 쓰고 읽는 데 너무 많은 노력을 기울이고 싶지 않습니다.
하둡에서 권장되는 방법은 무엇입니까?
나중에 정리를 포함 하여이 중간 데이터를 올바른 방식으로 처리하는 방법을 보여주는 간단한 예제가 있습니까?
Yahoo의 개발자 네트워크에 대한이 튜토리얼이 도움이 될 것이라고 생각합니다. Chaining Jobs
을 사용합니다 JobClient.runJob()
. 첫 번째 작업의 데이터 출력 경로는 두 번째 작업의 입력 경로가됩니다. 이것들은 구문 분석하고 작업 매개 변수를 설정하기 위해 적절한 코드로 작업에 인수로 전달되어야합니다.
그러나 위의 방법은 이전의 매핑 된 API가했던 방식 일 수 있지만 여전히 작동해야한다고 생각합니다. 새로운 mapreduce API에는 비슷한 방법이 있지만 그것이 무엇인지 잘 모르겠습니다.
작업이 완료된 후 중간 데이터를 제거하는 한 코드에서이를 수행 할 수 있습니다. 내가 전에 한 방법은 다음과 같은 것을 사용하는 것입니다.
FileSystem.delete(Path f, boolean recursive);
경로는 데이터의 HDFS 위치입니다. 다른 작업에서 필요하지 않은 경우에만이 데이터를 삭제해야합니다.
당신이 그것을 할 수있는 많은 방법이 있습니다.
(1) 계단식 작업
첫 번째 작업에 대한 JobConf 오브젝트 "job1"을 작성하고 "input"을 입력 디렉토리로, "temp"를 출력 디렉토리로 사용하여 모든 매개 변수를 설정하십시오. 이 작업을 실행하십시오.
JobClient.run(job1).
바로 아래에 두 번째 작업에 대한 JobConf 오브젝트 "job2"를 작성하고 "temp"를 입력 디렉토리로, "output"을 출력 디렉토리로 사용하여 모든 매개 변수를 설정하십시오. 이 작업을 실행하십시오.
JobClient.run(job2).
(2) 두 개의 JobConf 객체를 생성 하고 JobClient.run을 사용하지 않는 것을 제외하고는 (1) 과 같이 모든 매개 변수를 설정하십시오 .
그런 다음 jobconf를 매개 변수로 사용하여 두 개의 Job 오브젝트를 작성하십시오.
Job job1=new Job(jobconf1);
Job job2=new Job(jobconf2);
jobControl 오브젝트를 사용하여 작업 종속성을 지정한 후 작업을 실행하십시오.
JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();
(3) Map + | 축소 | Map *에서는 Hadoop 버전 0.19 이상과 함께 제공되는 ChainMapper 및 ChainReducer 클래스를 사용할 수 있습니다.
실제로 여러 가지 방법으로이 작업을 수행 할 수 있습니다. 두 가지에 집중하겠습니다.
하나는 Riffle ( http://github.com/cwensel/riffle )을 통해 종속물을 식별하고 의존성 (토폴로지) 순서로 '실행'하는 주석 라이브러리입니다.
또는 Cascading ( http://www.cascading.org/ ) 에서 Cascade (및 MapReduceFlow)를 사용할 수 있습니다 . 향후 버전에서는 Riffle 어노테이션을 지원하지만 원시 MR JobConf 작업에서 잘 작동합니다.
이에 대한 변형은 MR 작업을 전혀 관리하지 않고 Cascading API를 사용하여 애플리케이션을 개발하는 것입니다. 그런 다음 JobConf 및 작업 체인은 Cascading planner 및 Flow 클래스를 통해 내부적으로 처리됩니다.
이런 식으로 하둡 작업 등을 관리하는 메커니즘이 아니라 문제에 집중할 수 있습니다. 클로저 나 jruby와 같은 다른 언어를 맨 위에 배치하여 개발 및 애플리케이션을 더욱 단순화 할 수도 있습니다. http://www.cascading.org/modules.html
JobConf 객체를 사용하여 작업 체인을 차례로 수행했습니다. 작업을 연결하기 위해 WordCount 예제를 사용했습니다. 한 직업은 주어진 출력에서 단어가 몇 번 반복되는지 알아냅니다. 두 번째 작업은 첫 번째 작업 출력을 입력으로 가져와 주어진 입력에서 총 단어를 계산합니다. 다음은 Driver 클래스에 배치해야하는 코드입니다.
//First Job - Counts, how many times a word encountered in a given file
JobConf job1 = new JobConf(WordCount.class);
job1.setJobName("WordCount");
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setMapperClass(WordCountMapper.class);
job1.setCombinerClass(WordCountReducer.class);
job1.setReducerClass(WordCountReducer.class);
job1.setInputFormat(TextInputFormat.class);
job1.setOutputFormat(TextOutputFormat.class);
//Ensure that a folder with the "input_data" exists on HDFS and contains the input files
FileInputFormat.setInputPaths(job1, new Path("input_data"));
//"first_job_output" contains data that how many times a word occurred in the given file
//This will be the input to the second job. For second job, input data name should be
//"first_job_output".
FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
JobClient.runJob(job1);
//Second Job - Counts total number of words in a given file
JobConf job2 = new JobConf(TotalWords.class);
job2.setJobName("TotalWords");
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
job2.setMapperClass(TotalWordsMapper.class);
job2.setCombinerClass(TotalWordsReducer.class);
job2.setReducerClass(TotalWordsReducer.class);
job2.setInputFormat(TextInputFormat.class);
job2.setOutputFormat(TextOutputFormat.class);
//Path name for this job should match first job's output path name
FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
//This will contain the final output. If you want to send this jobs output
//as input to third job, then third jobs input path name should be "second_job_output"
//In this way, jobs can be chained, sending output one to other as input and get the
//final output
FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
JobClient.runJob(job2);
Command to run these jobs is:
bin/hadoop jar TotalWords.
We need to give final jobs name for the command. In the above case, it is TotalWords.
You may run MR chain in the manner as given in the code.
PLEASE NOTE: Only the driver code has been provided
public class WordCountSorting {
// here the word keys shall be sorted
//let us write the wordcount logic first
public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
//THE DRIVER CODE FOR MR CHAIN
Configuration conf1=new Configuration();
Job j1=Job.getInstance(conf1);
j1.setJarByClass(WordCountSorting.class);
j1.setMapperClass(MyMapper.class);
j1.setReducerClass(MyReducer.class);
j1.setMapOutputKeyClass(Text.class);
j1.setMapOutputValueClass(IntWritable.class);
j1.setOutputKeyClass(LongWritable.class);
j1.setOutputValueClass(Text.class);
Path outputPath=new Path("FirstMapper");
FileInputFormat.addInputPath(j1,new Path(args[0]));
FileOutputFormat.setOutputPath(j1,outputPath);
outputPath.getFileSystem(conf1).delete(outputPath);
j1.waitForCompletion(true);
Configuration conf2=new Configuration();
Job j2=Job.getInstance(conf2);
j2.setJarByClass(WordCountSorting.class);
j2.setMapperClass(MyMapper2.class);
j2.setNumReduceTasks(0);
j2.setOutputKeyClass(Text.class);
j2.setOutputValueClass(IntWritable.class);
Path outputPath1=new Path(args[1]);
FileInputFormat.addInputPath(j2, outputPath);
FileOutputFormat.setOutputPath(j2, outputPath1);
outputPath1.getFileSystem(conf2).delete(outputPath1, true);
System.exit(j2.waitForCompletion(true)?0:1);
}
}
THE SEQUENCE IS
(JOB1)MAP->REDUCE-> (JOB2)MAP
This was done to get the keys sorted yet there are more ways such as using a treemap
Yet I want to focus your attention onto the way the Jobs have been chained!!
Thank you
You can use oozie for barch processing your MapReduce jobs. http://issues.apache.org/jira/browse/HADOOP-5303
There are examples in Apache Mahout project that chains together multiple MapReduce jobs. One of the examples can be found at:
RecommenderJob.java
We can make use of waitForCompletion(true)
method of the Job to define the dependency among the job.
In my scenario I had 3 jobs which were dependent on each other. In the driver class I used the below code and it works as expected.
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
CCJobExecution ccJobExecution = new CCJobExecution();
Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);
System.out.println("****************Started Executing distanceTimeFraudJob ================");
distanceTimeFraudJob.submit();
if(distanceTimeFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed DistanceTimeFraudJob================= ");
System.out.println("=================Started Executing spendingFraudJob ================");
spendingFraudJob.submit();
if(spendingFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed spendingFraudJob================= ");
System.out.println("=================Started locationFraudJob================= ");
locationFraudJob.submit();
if(locationFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed locationFraudJob=================");
}
}
}
}
The new Class org.apache.hadoop.mapreduce.lib.chain.ChainMapper help this scenario
Although there are complex server based Hadoop workflow engines e.g., oozie, I have a simple java library that enables execution of multiple Hadoop jobs as a workflow. The job configuration and workflow defining inter job dependency is configured in a JSON file. Everything is externally configurable and does not require any change in existing map reduce implementation to be part of a workflow.
Details can be found here. Source code and jar is available in github.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
Pranab
I think oozie helps the consequent jobs to receive the inputs directly from the previous job. This avoids the I/o operation performed with jobcontrol.
If you want to programmatically chain your jobs, you will wnat to use JobControl. The usage is quite simple:
JobControl jobControl = new JobControl(name);
After that you add ControlledJob instances. ControlledJob defines a job with it's dependencies, thus automatically pluging inputs and outputs to fit a "chain" of jobs.
jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));
jobControl.run();
starts the chain. You will want to put that in a speerate thread. This allows to check the status of your chain whil it runs:
while (!jobControl.allFinished()) {
System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
System.out.println("Jobs in success state: " + successfulJobList.size());
List<ControlledJob> failedJobList = jobControl.getFailedJobList();
System.out.println("Jobs in failed state: " + failedJobList.size());
}
As you have mentioned in your requirement that you want o/p of MRJob1 to be the i/p of MRJob2 and so on, you can consider using oozie workflow for this usecase. Also you might consider writing your intermediate data to HDFS since it will used by the next MRJob. And after the job completes you can clean-up your intermediate data.
<start to="mr-action1"/>
<action name="mr-action1">
<!-- action for MRJob1-->
<!-- set output path = /tmp/intermediate/mr1-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="mr-action2">
<!-- action for MRJob2-->
<!-- set input path = /tmp/intermediate/mr1-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="success">
<!-- action for success-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="fail">
<!-- action for fail-->
<ok to="end"/>
<error to="end"/>
</action>
<end name="end"/>
참고URL : https://stackoverflow.com/questions/2499585/chaining-multiple-mapreduce-jobs-in-hadoop
'Programming' 카테고리의 다른 글
특정 유형의 모든 이벤트 리스너 제거 (0) | 2020.07.15 |
---|---|
Google Colaboratory로 데이터 가져 오기 (0) | 2020.07.15 |
gem 파일에서 ~>은 무엇을 의미합니까? (0) | 2020.07.14 |
ASP.NET MVC 응용 프로그램을 지역화하는 방법은 무엇입니까? (0) | 2020.07.14 |
size_t와 std :: size_t의 차이점 (0) | 2020.07.14 |