Tips for Using Hive on EMR

How to set up an Elastic Map Reduce (EMR) cluster on amazon is a topic for a different post. The focus here will be on describing how to interface with hive, how to load data from S3 and some tips about using partitioning.

A few interfaces to accessing Data

(first ssh into the master node )

Hive

> hive ( drop into hive just by calling 'hive' at the command line)
>> show tables;

Impala ( must be configured on the EMR cluster at startup )

> impala-shell
>> invalidate metadata

The 'invalidate metadata' command is important for syncing tables and metadata between hive and impala. If you create a table in hive and want to query the results in impala, you must run this command after each change to table structures in hive in order to update the metadata about the tables in impala.

Note that impala queries cannot run on data sitting in s3. One approach is to load a pre-aggregated set of data ( if the raw data wouldn't fit on your nodes ) from s3 into the cluster , and query that with impala.

Beeline

> beeline -u jdbc:hive2://localhost:10000 -n hadoop

( This is simply another interface to hive . It enables a user to run more than one query on the cluster at a time though, whereas the vanilla 'hive' client does not ) There are some additional startup options, but this will work for connecting to a local cluster with 'hadoop' as the user, and no password.

Hive partioning

Partitioning in hive is useful because it allows you to query subsets of your dataset without having to read in and scan all of the data. So rather than loading in all of the data for on year, and then filtering down to the month of April - for data that's partitioned by year and month, I can select to read in and scan only the data pertaining to April. Structuring your data with partitions actaully has an impact on the way that the data is stored which is why it's so powerful.

Partitions are also hierarchical. For example I could have my data organized by 'vertical' and 'month' -- but the order matters. If I want to run a query across all verticals, but only look at one month, it'll require loading in all of the data. But if I want to look at a single vertical over a subset of months or over all months, then only the relevant partition on 'vertical' will be accessed. If instead I do the opposite and order the partitions by 'month' then 'vertical', a query for all verticals in a subset of months will only pull the relevant months of data and then subset down to the selected verticals. A query for all months in one vertical will require a full table scan in this case however. When choosing which columns to partition on, and in what order, it's important to think about what types of queries you'll need to run most often.

Loading data from S3 in to EMR With Partitioning

Helpful external posts:

  • http://dev.bizo.com/2011/12/4-tips-from-trenches-of-amazon-elastic.html
  • http://stackoverflow.com/questions/22220837/how-to-add-partition-using-hive-by-a-specific-date

It's easiest to illustrate with an example how to load partitioned data from S3 into a hive table.

Assume that your S3 bucket has data stored in this format:

s3://product_clicks/data/year=YYYY/month=mm/day=dd/hour=HH/[file.csv]

The inside of each file contains only two columns : page and number of clicks for each hour.

Then, you can load it into a hive table like so:

CREATE EXTERNAL TABLE all_clicks (page STRING, clicks INT)
    PARTITIONED BY  (year int, month int, day int, hour int)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
    LOCATION 's3a://product_clicks/data';
    MSCK REPAIR TABLE all_clicks;

Important notes:

  • It must be an 'EXTERNAL TABLE' otherwise if you drop the table in hive it'll drop all of the data from S3.
  • Partition names do not need to be included in the column definition, only in the PARTITIONED BY section.
  • the MSCK REPAIR TABLE [tablename] command is what associates the external datasource to the cluster. After running this, you can run the command show partitions [tablename] to see all of the partitions that hive is aware of.
  • partition names (eg: year, month,day ) must be in the key name in the format eg: year=YYYY/month=mm/day=dd/[file.csv / file.gz]. If you want the partitions to be named something different, you have to re-name the s3 keys to reflect the new name. It also must have the '='.
  • Files can be compressed or uncompressed ( if compressed with gzip, it will automatically detect and unzip files at query-time )
  • When querying - remember, partitions are hierarchical, so if you don't include 'year = 2015' in your where-clause, it will scan all files, even if you include a condition like month = 1

Useful hive settings to be aware of:

If your mappers or reducers are throwing 'java heap space'/out of memory errors - this means that those containers don't have enough memory to complete the intermediate stage. You can change the allocated memory to each of these using these settings ( in the hive/beeline client )

SET mapreduce.reduce.memory.mb=4096;
SET mapreduce.map.memory.mb=2048;

This will reduce the number of tasks that can run at one time ( since each of them is being allocated more memory, and the total amount of memory is bounded) Other settings to look-in-to include block-size and split-size (http://stackoverflow.com/questions/30549261/split-size-vs-block-size-in-hadoop)

Creating partitioned tables with dynamic partitioning

Dynamic partitioning is very useful for creating intermediate aggregate summaries ( monthly, daily, or by some other factor like language or site). The table that results from these types of queries will be partitioned ( so you could look at only one partition at a time rather than scan the whole table )

-- the target table to load data into
Create Table clicks_intermediate (
  item                string,
  clicks          int,
)
partitioned by (year int, month int, day int );

-- this setting must be used to enable dynamic partitioning
set hive.exec.dynamic.partition.mode=nonstrict;

-- this query inserts records into the table partitioned by month and day 

Insert overwrite table clicks_intermediate PARTITION(year, month, day)
SELECT item, sum(clicks) as clicks , year, month, day
FROM all_clicks
GROUP BY item, year, month , day

Important notes:

  • If your insert query doesn't use a 'group by' statement ( for example , you just want to select right into another table) , then you should use a 'distribute by' clause over the columns that you're partitioning on. This will essentially 'group' and send tasks to the same mappers/reducers if they are the same in the 'distribute by' clause . If you don't do this, then there may be a lot of extra shuffling around that has to happen.
  • The partition columns in the select statement should go near the end of the selet & in the order of the partition hierarchy. When inserting into a table like this, the order of all columns in the select statement must match up with the order of the columns in the target table.
  • The insert overwrite only applies to the partitions that your insert/select statement touches. For example, if you already have data in the table for 2015 ,and then write a query 'Insert overwrite' statement into the table that only references 2016 data, none of the partitions from 2015 will be touched ( as long as year is the top-level partition. Recall that partitions are hierarchical . ). This is great for intermediate aggregate tables, since only have to re-compute aggregates for the current month - none of the prior month's aggregates need to be touched unless the data changes .

Written by sideprojects in Notes on Sat 09 January 2016. Tags: hive, aws,