Pig Combined Input Format

2014-08-08

A lot of the data processing I've been doing recently had been ETL stuff using pig. Pig is great in that it automatically compiles your joins, groups, etc into very efficient mapreduce jobs, however by default it creates a mapper for each part file your read in as input (from hdfs, s3, etc). This doesn't usually pose an issue if you're processing data that was output from another mapreduce job because it probably wrote x GB to y part files and you have fairly evenly distributed work with each mapper processing x / y GB of data. As long as y isn't too large or small this is probably okay.

One of the issues I was running into was processing raw log data. These are logs of ad requests from numerous different publishers on different ad neworks and although they have been normalized and simplified into a standard format, they are still of radically different sizes. Most of the part files are tiny in fact, ~5kb. The entire hadoop / map reduce ecosystem is built around the concept of "big data", so that means a certain amount of over head in starting a job or a mapper is acceptable because it's going to be processing a lot of data. When each mapper is only processing ~5kb, then this overhead becomes longer than the actual run time of the map task, which creates a glaring inefficiency. Also I should note, we're not talking about a couple thousand files, we're talking on the order of 10's or 100's of thousands of files, so this overhead becomes signficant.

My solution to this was to use the combine input format in mapreduce, which is very easy to do in pig. In your pig script simple add the following lines.
set pig.maxCombinedSplitSize <size in bytes per split>;
set pig.splitCombination true;