Sunday, September 1, 2019

Querying Athena tables without the limits of Athena

When AWS introduced Athena, we started moving more and more of our business use to it. Athena meant that we no longer had to raise an EMR to query our parquet files and can easily play with with writing queries anytime and even save them. We even introduced CloudWatch scheduled jobs to run queries and email out reports.

However, when we started building a pipeline for processing data we quickly hit a ceiling of maximum concurrent queries allowed by Athena which is defined on an account basis! You can see the limits here. Notice how low they are! We soon realized that Athena may be problematic for automated pipelined with concurrent processes.

The good news is that once you have defined tables in Athena, these tables are automatically in the Glue Catalog of your AWS environment. This meant that we have Hive Tables that are globally accessible from any EMR we raise. This is much more efficient than having spark read a path from s3 where our files are stored, since in the case where you have a large number of files, it actually needs to scan the header information of each file before you can get to work. Instead, we can create the EMR with one extra parameter and then create our spark session with hive enabled. Once we do this we can access all our athena tables directly from spark code.

The first necessary change is to add to the emr creation script the following:

--configurations '[{"Classification":"hive-site","Properties": 
\
{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}, \
{"Classification":"spark-hive-site", \
"Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]'

The next step is the change your code when you create your spark session

spark = SparkSession.builder.appName("SimpleApp").enableHiveSupport().getOrCreate()
After this you can select which athena database you want to use:

   spark.sql("use dev")
   spark.sql("show tables").show()
Then you can easily query you tables:
   spark.sql("SELECT * FROM myTable LIMIT 10