Tech Paper Reading

Apache Spark

  1. Spark SQL – Relational Data Processing In Spark

3. Scaling Spark in the Real World : Performance and Usability

Apache Kafka

  1. Kafka : a Distributed Messaging System for Log Processing
  2. Streams and Tables : Two Sides of the Same Coin

Kubernetes

  1. Kubernetes – Scheduling the Future at Cloud Scale 

Machine Learning

  1. MMLSpark: Unifying Machine Learning Ecosystems at Massive Scales

Infrastructure As Code – a brief synopsis

Introduction

Infrastructure as code, otherwise known as programmable infrastructure, is one of the key practices utilized in implementing data projects in the cloud.

In this article I explain what is meant by infrastructure as code. The motivations for implementing infrastructure as code are discussed as well as the mechanics.

Motivation

The advent of virtualized computing aka virtualization heralded a new dawn in computing and the provisioning of hardware resources. Prior to virtualization, this was a process that could take weeks, if not months for projects that required significant
computing power. For example, In my own career, I remember waiting for 6 months while the needed  hardware for a key data warehouse was being provisioned !!

Virtualization has helped to change all that. Suddenly the balance of power shifted from the system administrators who provide these resources to the developers who request these resources for their projects. With virtualized server farms, developers could provision and configure high-end computing nodes for a project in minutes rather than days or weeks.

This trend has only accelerated with the move to cloud computing and the public cloud in the past 5 years.
In public cloud computing scalable information resources are provided as a service to multiple external customers via the Internet.
In fact the cloud computing era has only been made possible because of virtualization – so much so that it is often dubbed as Virtualization 2.0.

Evolution to the cloud

The top 3 public cloud platforms are : Google Cloud Platform, Amazon AWS & Microsoft Azure.

Each of them provide a range of services including but not limited to:

  • Virtual Servers & Scalable Computing on Demand
  • Durable Storage
  • Relational Databases
  • Analytics & Big Data processing

and so on. A more detailed view of the services offered by the various cloud providers is available in the Appendix.

Each cloud customer can request services through a web service api that is accessible via their accounts.
In the pre-virtualization and pre-cloud era the provision and management of computing resources was done in a rather manual fashion. Once the hardware was provisioned, a system administrator would install an operating system and needed software, setup networking and databases based on requirements from the development teams. This was appropriate and feasible given the length of time it took to get resources provisioned.
In the cloud era, however, the situation is very different. Developers can request and provision services at will and in the case of distributed computing at a high volume. Sticking to the old manual approach becomes infeasible and error prone.
For example, imagine a large project involving Hadoop MapReduce and a cluster of 5 nodes for the development and 50 nodes for scale testing and QA. To keep costs in check, the development team may wish to provision machines repeatedly and shut them down after use.

While such a request can be fulfilled via the Google Cloud console each time, it can be extremely error prone and inconsistent. This speaks to the need for some kind of automated approach and this is where Infrastructure as Code comes in.

Description

Infrastructure-as-code (IAC) is an approach to the automation of infrastructure based on software development practices.

 With the IAC approach, service provisioning and management along with configuration changes are expressed in an automated fashion through code. This ensures that changes are made in a consistent and enables easier validation of these changes.

For example, using a tool such as Ansible, developers can provision an entire set of virtual servers running Centos OS, install and configure Spark on them to form a cluster for distributed computing and run an entire ETL pipeline as a batch process and then terminate the cluster.

The idea is that modern tools can manage infrastructure like software and data.
Infrastructure can be managed via version control systems, automated testing
libraries and deployment orchestration tools. Software development
practices such as continuous integration and delivery (CICD) and test-driven-development (TDD) can be applied to the management of infrastructure.

Thus IAC is a way to represent your environment using software/config files so one can replicate it multiple times.

It consists of 2 main components:

  • a declarative template that enables us to provision resources from our cloud
    provider. Such resources could be load balancers, auto-scaling groups,
    VM instances, RDBMS etc.
  • configuration management component – code that enables us to configure
    and deploy software on the resource we have provisioned via our declarative
    template.

Benefits

  1. In configuration management we’re describing every aspect of our system that
    is configurable. The intent is to eliminate the need to make emergency changes
    amd prevent application configuration drift.
    This is because if manual changes are made to the configuration, its original
    state has been codified via configuration management scripts and thus can
    easily be restored by execution of those scripts.
  2. IAC eases friction between app developers and operations by requiring operations engineers to adhere more closely to traditional software development practices such as CICD, automated testing and source code version control.This has given rise to what is known as DevOps with operations engineers implementing a workflow akin to a traditional software development life cycle.
Devops Life Cycle

Tools for Implementation

There are multiple tools used to implement the IAC process.
Some of these tools are Ansible, SaltStack, Puppet and Chef.

IAC tools are often divided into 2 groups based on their functionality :

  1. Provisioning tools
    These tools focus solely on provisioning virtual servers on premise or within a cloud environment. Examples include Terraform, AWS Cloud Formation, Google Deployment Manager and Azure Resource Manager.
  2. Configuration Management tools
    These tools install and manage software on existing servers.
    Examples are: Ansible, Chef, Puppet, Salt.

Most of the more popular configuration management tools such as Ansible do increasing offer provisioning capabilities, blurring the distinction between the 2 groups.

This has led to a robust debate about their capabilities, with some commentators emphasizing the distinction. My opinion is that for more complex infrastructural requirements such a distinction may have merit and necessitate usage of a different tool for each capability. I feel such a distinction will not last for long as the vendors of these configuration management tools will increasing add features to make their tools just as capable when it comes to provisioning and orchestration.

Thus tools such as Terraform, AWS Cloud Formation, Google Deployment Manager and Azure Resource Manager which are solely resource provisioning tools need configuration management tools such as Chef, Puppet or Ansible in order for to have the full IAC stack.
A brief synopsis/comparison of each can be found in the Appendix.
For our relatively small size project, we will focus on using Ansible as a full stack IAC tool.

Code Examples

The intention is not to dive in depth into any one tool, but to give the reader an idea of what implementing infrastructure-as-code looks like.

Ansible
Here is a simple code snippet that illustrates how one can provision virtual servers on Google cloud platform :
– name: Launch instances
     gce:
         instance_names: dev
         machine_type: n1-standard-1
         image: debian-9
         service_account_email: [email protected]
         credentials_file: mycredentials.json
         project_id: Test Project
Assuming one has a Google Cloud account with the necessary credentials,  we can save the above script to a playbook file (provision_gce_instance.yml),
and run
ansible-playbook provision_gce_instance.yml
to create a new virtual server instance on the Google Cloud platform.

References

Appendix


Public Cloud Provider Services

Service TypeService Name
Virtual servers, scalable computing on demandAmazon EC2
Google Compute Engine
Azure Virtual Machines.
Durable StorageGoogle Cloud Storage Amazon S3
Azure Storage
Relational DatabaseGoogle CloudSQL
Amazon RDS
Azure SQL Database
Analytics & Big Data processingGoogle DataProc & DataFlow
Amazon EMR
Azure HDInsight
Data WarehouseGoogle BigQuery
Amazon Redshift
Azure SQL Data Warehouse
Networking - DNSGoogle Cloud DNS
Amazon Route 53
Microsoft Azure DNS
Networking - Virtual Private CloudGoogle Cloud VPC
Amazon VPC
Azure Virtual Network
NoSQL DatabaseGoogle Cloud Datastore & Bigtable
Amazon DynamoDB
Azure Cosmos DB
MessagingGoogle Cloud Pub/Sub
Amazon SNS
Azure Notification Hubs
Deployment/ProvisioningGoogle Cloud Deployment Manager
AWS CloudFormation
Azure Resource Manager


Cloud Provisioning Tools

ToolMain FeaturesDomain-Specific Language (DSL)
AnsibleWorkflow orchestration
Configuration
Management
Provisioning
App deployment
CICD
Python, YAML
SaltStackCloud orchestration and automation
CICD
Configuration management
DevOps toolchain workflow automation
Python, YAML
PuppetConfiguration management
Provisioning
Ruby
ChefConfiguration management
CICD
Provisioning
Ruby
TerraformProvisioningGo
AWS Cloud FormationProvisioningJSON/YAML
Google Cloud Deployment ManagerProvisioningJSON/YAML
Azure Resource ManagerProvisioningJSON/YAML

 

Upload a file to Google Drive using gdrive

Steps

Quick tip on using gdrive to upload to Google Drive:

gdrive upload <path-to-local-file>

e.g.

gdrive upload mydir/myfile.txt

This uploads the file to the home directory on Google Drive which is My Drive

To upload to a specific directory, do the following:

List the directories on Google Drive showing directory ids:

gdrive list

Obtain the directory id for the directory you wish to upload to.

Then do

gdrive upload --parent <id> mydir/myfile.txt

to upload the file to the directory in question

You can also search for specific folder in Google Drive by doing:

gdrive list -q "name contains 'BigData'"

References

Google Drive CLI Client

 

GCP Adventures : Instance creation using Ansible

This is the first of many posts in a series I am embarking chronicling my work on using Google Cloud Platform (GCP) for creating Big Data applications ? Why Google Cloud and not AWS you may ask ? Well, I already use AWS extensively at work, so at home I’m deciding to do something different so I can broaden my expertise. On a secondary note, it seems as if GCP may be a cheaper alternative than AWS for compute intensive workloads – and that matters when you pay for compute resources out of pocket.

So I already had a GCP account I had created a few years back. My first task was to figure out how to create VM instances on GC via the command line API gcloud and then via Ansible.

Via Command Line API

I followed the instructions for setting up the gcloud client on my Ubuntu laptop.

I was subsequently able to create an instance with the following command:
gcloud compute instances create test-instance --image-family ubuntu-1710 --image-project ubuntu-os-cloud
I was prompted to select a region and I chose us-east-1

Via Ansible

My goal is fully automating the provisioning of resources in GCP, so the next step for me would be to figure out how to provision a VM instance using cloud automation software such as Puppet, Ansible.
Ansible is what I am most familiar with from work so Ansible it is.

The simplest and quickest way to get started is by reading the example in the Ansible Google Cloud Platform Guide.

 

For a more expansive example, I some googling, and stumbled on this  Using Ansible with Google video and its associated Github repo.

The video is instructive, and subsequently I cloned the repo and attempted to follow the instructions to create my instance. I was able to create instances via the following :

ansible-playbook site.yml

and subsequently terminate them as follows:

ansible-playbook cleanup.yml

Git Tip: checkout tag of repository

How to checkout a specific tag of a repository

  1. Check out the repo:
 git clone https://github.com/miguelgrinberg/microblog.git
  1. Determine the list of available tags:
cd microblog
git tag -l
  1. Checkout the tag as a new branch:
microblog$ git checkout tags/v0.3 -b tag_v0.3
Switched to a new branch 'tag_v0.3'
  1. The reason for checking out the tag as a new branch is that we wish to keep the tagged version separate from the latest branch.

Reference: http://bit.ly/2ANbo90

Spark Code Cheatsheet

Word count in single line

 rdd.flatMap(line => line.split(" "))
 .map(word => (word,1)
 .reduceByKey((count,accum)=>(count+accum))

Count number of lines in file

rdd.count

Display Spark version

print sc.version

Read csv file into dataframe in spark with header

df=spark.read.option("header","true")
    .option("delimiter",",").csv(csv_path)

Read csv file into dataframe in spark without header
First you need to specify a schema:

schema = StructType([
    StructField("Field0", IntegerType()),
    StructField("Field1", StringType()),
    StructField("Field2", StringType())
    ])

Then read in the data:

df=spark.read.format('com.databricks.spark.csv')
   .load(csv_path, schema = schema)


Handle commas in embedded quoted strings when reading in data

This is done by specifying an escape option:

df_raw=spark.read.format('com.databricks.spark.csv')
     .load(csv_path, schema = schema, escape='"')

Obtain distinct values of column in Spark DF

student_ids=df_raw.select('student_id').distinct().take(200)

Convert list of DataFrame Rows to Python list

student_ids=[x.student_id for x in df_raw
            .select('student_id').distinct().take(200)]

Filter out rows based on values in list

filtered_df=df_raw.where(col('student_id').isin(student_ids))

Save dataframe to csv (only for small datasets)

# Many Files
filtered_df.write.csv(output_csv) 
# Single File
filtered_df.coalesce(1).write
.option("header","true").csv(output_csv) 

Count number of occurrences of composite key in data frame:

df.select('major','gender').distinct().count()

Start pyspark in python notebook mode

export PYSPARK_DRIVER_PYTHON=ipython;pyspark

Display spark dataframe with all columns using pandas

import pandas as pd
pd.options.display.max_columns = None
pd.set_option('max_colwidth',100)
df.limit(5).toPandas()

How I got Cloudera Quickstart VM running on Google Compute Engine

Introduction

This is a brief synopsis of how I got the Cloudera Quickstart VM running via Docker on Google Compute Engine, which is Google’s cloud equivalent to Amazon’s AWS Cloud Computing Service.

The Cloudera Quickstart VM is a basic “Hadoop-In-A-Box” virtual machine solution which provides a Hadoop ecosystem for developers who wish to quickly test out the basic features of Hadoop without having to deploy an entire cluster. Since it doesn’t entail setting up a cluster, certain features provided by a cluster are missing.

Detailed Steps

Step 0.

Install gcloud on your local workstation as per these instructions:

https://cloud.google.com/sdk/

Step 1. Create a container optimized VM on GCE:

https://cloud.google.com/compute/docs/containers/container_vms

$ gcloud compute --project "myproject-1439" \
instances create "quickstart-instance-1" \
--image container-vm --zone "us-central1-a" \
 --machine-type "n1-standard-2"
Created [https://www.googleapis.com/compute/v1/projects/gcebook-1039/zones/us-central1-a/instances/quickstart-instance-1].NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS
quickstart-instance-1 us-central1-a n1-standard-2 10.240.0.2 146.148.92.36 RUNNING

I created an n1-standard-2 VM on GCE which has 2vCPUs and 7.GB RAM. It will already have docker pre-installed.

Step 3. Let’s check the image size:

$ docker images
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
cloudera/quickstart latest 2cda82941cb7 41 hours ago 6.336 GB

Given that our VM has total disk size of 10GB this is cutting it a bit close for the long term, if we wish to install other software.

So let’s create a persistent disk and make that available for storing our docker image:

https://cloud.google.com/compute/docs/disks/persistent-disks

I was able to create a 200GB persistent disk: bigdisk1

Step 4.  Switch docker image installation directory to use the big persistent disk.

There are a couple of ways to do this as per this article:

https://forums.docker.com/t/how-do-i-change-the-docker-image-installation-directory/1169

The least trouble free way IMO was to mount the bigdisk1 persistent disk to it would be available for use by my VM, and move the default docker image installation directory to it.

First, create a mountpoint for the bigdisk:

$ sudo mkdir /mnt/bigdisk1

Next, mount it:
On GCE, the raw disk can be found at /dev/disk/by-id/google-<diskid>
i.e. /dev/disk/by-id/google-bigdisk1

$ sudo mount -o discard,defaults /dev/disk/by-id/google-bigdisk1 \
  /mnt/bigdisk1

Finally, symlink it back to the default image installation directory:

$ sudo ln -s /mnt/bigdisk1/docker/ /var/lib/docker

Presto, we’re now ready. If we run docker pull on any image, the image will be written to the large persistent disk:

$ ls -ltr /var/lib/docker
lrwxrwxrwx 1 root root 21 Apr 9 03:20 /var/lib/docker -> /mnt/bigdisk1/docker/

Step 5. Run the Cloudera Quickstart VM and get access to all the goodies:

http://www.cloudera.com/documentation/enterprise/5-5-x/topics/quickstart_docker_container.html

$ sudo docker run --hostname=quickstart.cloudera --privileged=true \
-t -i cloudera/quickstart  /usr/bin/docker-quickstart 
...
Starting zookeeper ... STARTED
starting datanode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-datanode-quickstart.cloudera.out

...
starting rest, logging to /var/log/hbase/hbase-hbase-rest-quickstart.cloudera.out
Started HBase rest daemon (hbase-rest): [ OK ]
starting thrift, logging to /var/log/hbase/hbase-hbase-thrift-quickstart.cloudera.out
Started HBase thrift daemon (hbase-thrift): [ OK ]
Starting Hive Metastore (hive-metastore): [ OK ]
Started Hive Server2 (hive-server2): [ OK ]
Starting Sqoop Server: [ OK ]
Sqoop home directory: /usr/lib/sqoop2
Setting SQOOP_HTTP_PORT: 12000
...
Started Impala Catalog Server (catalogd) : [ OK ]
Started Impala Server (impalad): [ OK ]
[root@quickstart /]#