Wednesday, July 23, 2014

Spark play with HBase's Result object: handling HBase KeyValue and ByteArray in Scala with Spark -- Real World Examples

This is second part of "Lighting a Spark With HBase Full Edition"

you should read the previous part about HBase dependencies, and spark classpaths first: http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html

and you'd better read this for some background knowledge about combining HBase and Spark: http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase

this post aims to provide some additional complicated real world examples of above post.

at first, you can put your hbase-site.xml into spark's conf folder, otherwise you have to specify the full path (absolute path) of hbase-site.xml in your code.
ln -s /etc/hbase/conf/hbase-site.xml $SPARK_HOME/conf/

now, we use a very simple HBase table with string rowkey and string values to warm up.

table contents:
hbase(main):001:0> scan 'tmp'
ROW                   COLUMN+CELL
 abc                  column=cf:test, timestamp=1401466636075, value=789
 abc                  column=cf:val, timestamp=1401466435722, value=789
 bar                  column=cf:val, timestamp=1396648974135, value=bb
 sku_2                column=cf:val, timestamp=1401464467396, value=999
 test                 column=cf:val, timestamp=1396649021478, value=bb
 tmp                  column=cf:val, timestamp=1401466616160, value=test

in the post from vidyasource.com we can find how to get values from HBase Result's tuple, but no keys.

following code shows how to create a RDD of key-value pairs RDD[(key, value)] from HBase Results:
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.NewHadoopRDD

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tmp")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue
)
}).take(10)
you will get
Array[(String, Array[Byte])] = Array((abc,Array(55, 56, 57)), (bar,Array(98, 98)), (sku_2,Array(57, 57, 57)), (test,Array(98, 98)), (tmp,Array(116, 101, 115, 116)))

in scala, we can use map(_.toChar).mkString to convert Array[Byte] to a string (because we said, in this warm up example, the HBase table has only string values)
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toChar).mkString
)
}).take(10)
then we get
Array[(String, String)] = Array((abc,789), (bar,bb), (sku_2,999), (test,bb), (tmp,test))
=======================================================================

after warm up, let us take a complicated HBase table example:

this table stores the UUID/cookie or whatever of user's different devices, you can image this table is a part of some kind of platform which is used for cross device user tracking and/or analyzing user behavior on different devices.

userid as rowkey, is string (such as some kind of hashed value)
column family is d (device family)
column qualifiers are the name or id of device (such as some internal id of User Agent Strings, in this example we use some simple string like app1, app2 for mobile apps, pc1, ios2 for different browser on different devices)
value of row is an 8 bytes long (a ByteArray with length 8)

it looks like this:
hbase(main):001:0> scan 'test1'
ROW                   COLUMN+CELL
 user1                column=lf:app1, timestamp=1401645690042, value=\x00\x00\x00\x00\x00\x00\x00\x0F
 user1                column=lf:app2, timestamp=1401645690093, value=\x00\x00\x00\x00\x00\x00\x00\x10
 user2                column=lf:app1, timestamp=1401645690142, value=\x00\x00\x00\x00\x00\x00\x00\x11
 user2                column=lf:pc1,  timestamp=1401645690170, value=\x00\x00\x00\x00\x00\x00\x00\x12
 user3                column=lf:ios2, timestamp=1401645690180, value=\x00\x00\x00\x00\x00\x00\x00\x02

to create such a table, you should put like this in hbase shell
put 'test1', 'user1', 'lf:app1', "\x00\x00\x00\x00\x00\x00\x00\x0F"
put 'test1', 'user1', 'lf:app2', "\x00\x00\x00\x00\x00\x00\x00\x10"
put 'test1', 'user2', 'lf:app1', "\x00\x00\x00\x00\x00\x00\x00\x11"
put 'test1', 'user2', 'lf:pc1',  "\x00\x00\x00\x00\x00\x00\x00\x12"
put 'test1', 'user3', 'lf:ios2', "\x00\x00\x00\x00\x00\x00\x00\x02"

ok, then, how can we read/scan this table from spark?

let us see this code:
conf.set(TableInputFormat.INPUT_TABLE, "test1")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toInt).mkString
)
}).take(10)

why this time it is map(._toInt) ? because in this Array[Byte], those Bytes are numbers, not Chars.

but we get
Array((user1,000000015), (user2,000000017), ())
what? 000000015 ?... yes, because _.toInt convert each element in this Array[Byte] to Int, to avoid this, we can use java.nio.ByteBuffer

this code should be changed to
import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  ByteBuffer.wrap(row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue).getLong
)
}).take(10)
then we get
Array((user1,15), (user2,17), ())
finally looked better, but what is the last () ?!...

it is because rowkey user3 has no value with column lf:app1, so, again, we can do it better! in HBaseConfiguration object we can set TableInputFormat.SCAN_COLUMNS to a particular column qualifier, so we change the code to FINAL EDITION...
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.NewHadoopRDD

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "test1")
conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => {
  ( result.getRow.map(_.toChar).mkString,
    ByteBuffer.wrap(result.value).getLong
  )
}).take(10)

and now, finally we get:
Array[(String, Long)] = Array((user1,15), (user2,17))

=======================================================================

FINAL FULL EDITION

now, if you want to get all of key-value pairs of a HBase table (all versions of values from all of column qualifiers)

you can try this code (for string values table "tmp"):
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.NewHadoopRDD

import java.nio.ByteBuffer

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]

type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

type CFTimeseriesRowStr = Map[String, Map[String, Map[Long, String]]]

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
  navMap.map(cf =>
    (cf._1.map(_.toChar).mkString, cf._2.map(col =>
      (col._1.map(_.toChar).mkString, col._2.map(elem => (elem._1, elem._2.map(_.toChar).mkString))))))

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tmp")

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap))).map(kv => (kv._1.map(_.toChar).mkString, rowToStrMap(kv._2))).take(10)

for long values column family "lf" in table "test1", you can try to define CFTimeseriesRowStr and rowToStrMap as follows:
type CFTimeseriesRowStr = Map[String, Map[String, Map[Long, Long]]]

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
  navMap.map(cf =>
    (cf._1.map(_.toChar).mkString, cf._2.map(col =>
      (col._1.map(_.toChar).mkString, col._2.map(elem => (elem._1, ByteBuffer.wrap(elem._2).getLong))))))


=======================================================================

beyond all of these code, there are more particulars you should think about when you querying HBase table, such as scan cache, enable block cache or not, whether or not to use bloom filters

and most important is, spark is still using org.apache.hadoop.hbase.mapreduce.TableInputFormat  to read from HBase, it is the same as MapReduce program or hive hbase table mapping, so there is a big problem, your job will fail when one of HBase Region for target HBase table is splitting ! because the original region will be offline by splitting.

so if your HBase regions must be splittable, you should be careful to use spark or hive to read from HBase table. maybe you should write coprocessor instead of using hbase.mapreduce API.

if not, you should disable auto region split. following slide summarized all of HBase config properties related to control HBase region split.


Tuesday, July 22, 2014

Lighting a Spark With HBase Full Edition with real world examples ~ dependencies, classpaths, handling ByteArray in HBase KeyValue object

First of all, there are many resources in internet about integrating HBase and Spark

such as

Spark has their own example: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala

MapR has also some cool sample: http://www.mapr.com/developercentral/code/loading-hbase-tables-spark

and here, a more detailed code snippet: http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase

but all of them, has no information about:
  • which jar library are needed, let us say dependency problem
  • how should i set the classpath when i start my spark job/application with HBase connection
  • sc.newAPIHadoopRDD uses this holly class org.apache.hadoop.hbase.client.Result as a return value type, but objects in this Result are org.apache.hadoop.hbase.KeyValue, this is a core client-side Java API of HBase, sometimes it is really not enough to use it just with getColumn("columnFamily".getBytes(), "columnQualifier".getBytes()), and more important is, in scala, to use this KeyValue object is even more complicated.
therefore this post aims to create a "Full" Version...

assume you have already read the samples above. i will go ahead directly to solve this three problems.

if you only want to see some code, jump to the next part of this doc: http://www.abcn.net/2014/07/spark-hbase-result-keyvalue-bytearray.html

1. dependency problem

it is similar as a HBase client program

for maven:

<dependency>
        <groupid>org.apache.spark</groupid>
        <artifactid>spark-core_2.10</artifactid>
        <version>1.0.1</version>
</dependency>

<dependency>
        <groupid>org.apache.hbase</groupid>
        <artifactid>hbase</artifactid>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupid>org.apache.hbase</groupid>
        <artifactid>hbase-client</artifactid>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupid>org.apache.hbase</groupid>
        <artifactid>hbase-common</artifactid>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupid>org.apache.hbase</groupid>
        <artifactid>hbase-server</artifactid>
        <version>0.98.2-hadoop2</version>
</dependency>

sbt:

libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "1.0.1",
        "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
)

change the version of spark and hbase to yours.

2. classpath

in the time of Spark 0.9.x, you just need to set this environment: SPARK_CLASSPATH with HBase's Jars, for example, start spark-shell with local mode, in CDH5 Hadoop distribution:
export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-hadoop2-compat.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
and then
./bin/spark-shell --master local[2]
or just
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-hadoop2-compat.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar ./bin/spark-shell --master local[2]

in your cluster, you should change the path of those jars to your HBase's path, such as in other Hadoop distribution should be some path like /usr/lib/xxx (Hortonworks HDP) or /opt/mapr/hbase-xxx (MapR)

but, but... this lovely SPARK_CLASSPATH is deprecated in the new era of Spark 1.x  !!! -_-

so, in Spark 1.x

there is one conf property and one command line augment for this:
spark.executor.extraClassPath
and
--driver-class-path

WTF... but, yes, you must give the whole jar paths twice!... and spark.executor.extraClassPath must be set in a conf file, can not be set via command line...

so, you need to do this:

edit conf/spark-defaults.conf

add this:
spark.executor.extraClassPath  /opt/cloudera/parcels/CDH/lib/hive/lib/hive-hbase-handler.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-hadoop2-compat.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
and then, start spark shell or submit your spark job with command line args for driver --driver-class-path:
./bin/spark-shell --master local[2]  --driver-class-path  /opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-hadoop2-compat.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
unbelievable, but it is so in spark 1.x ...

3. how to use org.apache.hadoop.hbase.KeyValue in scala with Spark

it seems this post is already long enough, let us take a break, to see the code of real world examples, you can go to the next part of this doc: http://www.abcn.net/2014/07/spark-hbase-result-keyvalue-bytearray.html

Thursday, May 22, 2014

Install and config Graphite on Debian/Ubuntu

Install graphite server using python-pip


apt-get install gcc python-dev python-pip

sudo pip install https://github.com/graphite-project/ceres/tarball/master
sudo pip install whisper
sudo pip install carbon
sudo pip install graphite-web

cd /opt/graphite/conf
sudo cp carbon.conf.example carbon.conf
sudo cp storage-schemas.conf.example storage-schemas.conf
sudo cp graphite.wsgi.example graphite.wsgi


===========================================

Run carbon service and test

===========================================

sudo /opt/graphite/bin/carbon-cache.py start

if you get something like Python Error - ImportError: cannot import name daemonize
take a look at: Can't Start Carbon - 12.04 - Python Error - ImportError: cannot import name daemonize
try:
sudo pip install 'Twisted<12.0'

check if the port of carbon service is opened:
netstat -naep | grep 2003

test:
perl -e '$ts = time(); for (1..1000) { printf "foo.bar %d %d\n", int(rand(10000)), $ts - 90 * $_ }' \ | nc -c localhost 2003


===========================================

Run Graphite web UI (apache2 with mod_python, mod_wsgi)

===========================================

sudo apt-get install apache2 libapache2-mod-python libapache2-mod-wsgi

sudo chown -R www-data:www-data /opt/graphite/storage/

sudo mv /etc/apache2/sites-available/default /etc/apache2/sites-available/default.bak
sudo cp /opt/graphite/examples/example-graphite-vhost.conf /etc/apache2/sites-available/default
sudo mkdir /etc/httpd
sudo mkdir /etc/httpd/wsgi

cd /opt/graphite/webapp/graphite/
sudo cp local_settings.py.example local_settings.py

restart apache2
sudo service apache2 restart


===========================================

Initialize database after installed apache2

===========================================

sudo apt-get install python-django
sudo pip install django-tagging==0.3.1

cd /opt/graphite/webapp/graphite/
sudo python manage.py syncdb
Username: root
Password: xxx
E-mail: info@fxlive.de

Edit
/opt/graphite/webapp/graphite/local_settings.py
add:
SECRET_KEY
TIME_ZONE = 'Europe/Berlin'
LOG_RENDERING_PERFORMANCE = True
LOG_CACHE_PERFORMANCE = True
LOG_METRIC_ACCESS = True


===========================================

Troubleshooting

===========================================

1) Edit the following file:
/etc/apache2/sites-available/default

Make sure that the configuration for WSGISocketPrefix is set as follows:
WSGISocketPrefix /var/run/apache2/wsgi

Otherwise, you will get the following error:

    [Tue Jun 19 13:21:28 2012] [error] [client 192.168.xxx.xxx] (2)No such file or directory: mod_wsgi (pid=19506): Unable to connect to WSGI daemon process 'graphite' on '/etc/apache2/run/wsgi.19365.1.1.sock' after multiple attempts.


2) if you changed the VirtualHost post, do not forget to add this port in
/etc/apache2/ports.conf
add something like this:
NameVirtualHost *:8000
Listen 8000


3) if you can not see the images, get something like this:
ViewDoesNotExist: Could not import graphite.render.views. Error was: No module named cairo

you need install:
sudo apt-get install python-cairo-dev


4) watch the logs if something still not working right:
cd /opt/graphite/storage/log/webapp
find . -name '*.log' | xargs tail -F

5) location of graphite's whisper database (RDD storage)
/opt/graphite/storage/whisper


===========================================

example: storage-schemas.conf

===========================================

[stats_1day]
pattern = ^stats_1day\.
retentions = 1d:1y

[stats_1hour]
pattern = ^stats_1hour\.
retentions = 1h:90d


===========================================

example: /etc/apache2/sites-available/default

===========================================

<IfModule !wsgi_module.c>
    LoadModule wsgi_module modules/mod_wsgi.so
</IfModule>

WSGISocketPrefix /var/run/apache2/wsgi

<VirtualHost *:8000>
        ServerName graphite
        DocumentRoot "/opt/graphite/webapp"
        ErrorLog /opt/graphite/storage/log/webapp/error.log
        CustomLog /opt/graphite/storage/log/webapp/access.log common

        # enable XORS (Cross-origin resource sharing), see below
        Header set Access-Control-Allow-Origin "*"

        # I've found that an equal number of processes & threads tends
        # to show the best performance for Graphite (ymmv).
        WSGIDaemonProcess graphite processes=5 threads=5 display-name='%{GROUP}' inactivity-timeout=120
        WSGIProcessGroup graphite
        WSGIApplicationGroup %{GLOBAL}
        WSGIImportScript /opt/graphite/conf/graphite.wsgi process-group=graphite application-group=%{GLOBAL}

        # XXX You will need to create this file! There is a graphite.wsgi.example
        # file in this directory that you can safely use, just copy it to graphite.wgsi
        WSGIScriptAlias / /opt/graphite/conf/graphite.wsgi

        Alias /content/ /opt/graphite/webapp/content/
        <Location "/content/">
                SetHandler None
        </Location>

        # XXX In order for the django admin site media to work you
        # must change @DJANGO_ROOT@ to be the path to your django
        # installation, which is probably something like:
        # /usr/lib/python2.6/site-packages/django
        Alias /media/ "@DJANGO_ROOT@/contrib/admin/media/"
        <Location "/media/">
                SetHandler None
        </Location>

        # The graphite.wsgi file has to be accessible by apache. It won't
        # be visible to clients because of the DocumentRoot though.
        <Directory /opt/graphite/conf/>
                Order deny,allow
                Allow from all
        </Directory>

</VirtualHost>


you may want to enable XORS (Cross-origin resource sharing) or cross site scripting or whatever~

you should add this line
Header set Access-Control-Allow-Origin "*"
in to
/etc/apache2/sites-available/default

and then restart apache2


===========================================

bugfix

===========================================

there is a small bug in Graphite's Dashboard JS, take a look this post:
http://www.abcn.net/2014/01/graphites-dashboard-set-auto-hide-navbar.html


Tuesday, May 20, 2014

Install and config Ganglia on CDH5

Install


Server node


1. install packages

sudo apt-get install ganglia-monitor ganglia-webfrontend gmetad

2. edit /etc/ganglia/gmond.conf

sample gmond.conf setting:
cluster {
  name = "CDH5"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel {
/*  mcast_join = xxx */
  host = xxx.xxx.xxx.1 /* your ganglia server ip */
  port = 8649
  ttl = 1
}

udp_recv_channel {
/*  mcast_join = xxx */
  port = 8649
/*  bind = xxx */
}

3. scp /etc/ganglia/gmond.conf to all the other nodes


4. edit /etc/ganglia/gmetad.conf

sample gmond.conf setting:
data_source "CDH5" xxx.xxx.xxx.1 xxx.xxx.xxx.2 xxx.xxx.xxx.3 ...


Other nodes


1. install ganglia monitor

sudo apt-get install ganglia-monitor

2. copy gmond.conf from server node to /etc/ganglia

sudo mv gmond.conf /etc/ganglia/

3. restart ganglia monitor service

sudo service ganglia-monitor restart


Config: CDH5 components Hadoop Metrics2 setting

hadoop-metrics2.properties

HDFS

NameNode

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.period=10
namenode.sink.ganglia.servers=10.10.114.120:8649

DataNode

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.period=10
datanode.sink.ganglia.servers=10.10.114.120:8649

SecondaryNameNode

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.period=10
secondarynamenode.sink.ganglia.servers=10.10.114.120:8649

YARN

ResourceManager

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.period=10
resourcemanager.sink.ganglia.servers=10.10.114.120:8649
maptask.sink.ganglia.servers=10.10.114.120:8649 
reducetask.sink.ganglia.servers=10.10.114.120:8649

NodeManager

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.period=10
nodemanager.sink.ganglia.servers=10.10.114.120:8649
maptask.sink.ganglia.servers=10.10.114.120:8649 
reducetask.sink.ganglia.servers=10.10.114.120:8649

HBase

Master Server

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.sink.ganglia.period=10
hbase.sink.ganglia.period=10
hbase.sink.ganglia.servers=10.10.114.120:8649

Region Server

*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.sink.ganglia.period=10
hbase.sink.ganglia.period=10
hbase.sink.ganglia.servers=10.10.114.120:8649

*(there are too many metrics generated from RegionServer, each region of all of tables generates some metrics... be careful if you turn on the ganglia hadoop metrics2 sink on HBase RegionServer)


Thursday, April 10, 2014

Install Shark 0.9.1 on CDH 5.0.0 GA (hadoop 2.3.0) + Spark Configuration on CDH5


How to install Shark in CDH 5.0.0 GA

Requirements for Shark

1. CDH5
2. Spark
spark should be already installed in CDH 5, under /opt/cloudera/parcels/CDH/lib/spark

follow these steps you will install Shark 0.9.1 in /var/lib/spark on CDH 5.0.0 GA with Hadoop version 2.3.0

/var/lib/spark is the default user home of spark user in CDH 5.0.0 GA

you need run these scripts as root or spark user (you need to change the shell of spark user to /bin/bash, by default it is nologin)

1. Download Shark source code

export SPARK_USER_HOME=/var/lib/spark

cd $SPARK_USER_HOME

wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz

tar zxf scala-2.10.3.tgz

wget https://github.com/amplab/shark/archive/v0.9.1.tar.gz

tar zxf v0.9.1.tar.gz

OR YOU CAN DOWNLOAD MY shark-0.9.1 version, it is complied with CDH 5.0.0 packages:

http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz

2. Configure Shark

we can use the hive 0.12 in CDH5, so we do not need to download spark/shark version of hive 0.11 bin

set following configs in $SPARK_USER_HOME/shark-0.9.1/conf/shark-env.sh :
export SPARK_USER_HOME=/var/lib/spark
export SPARK_MEM=2g
export SHARK_MASTER_MEM=1g
export SCALA_HOME="$SPARK_USER_HOME/scala-2.10.3"
export HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive"
export HIVE_CONF_DIR="$HIVE_HOME/conf"
export HADOOP_HOME="/opt/cloudera/parcels/CDH/lib/hadoop"
export SPARK_HOME="/opt/cloudera/parcels/CDH/lib/spark"
export MASTER="spark://test01:7077"

SPARK_JAVA_OPTS=" -Dspark.local.dir=/tmp "
SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=10 "
SPARK_JAVA_OPTS+="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps "
export SPARK_JAVA_OPTS

(change the host name test01 in MASTER="spark://test01:7077" to your host)

3. Build Shark with Hadoop 2.3.0-cdh5.0.0

if you downloaded my shark-0.9.1 version above (http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz), you do not need to build it, you can jump to Step 5. Otherwise, you need to compile your shark-0.9.1 with hadoop 2.3.0-cdh5 :

cd $SPARK_USER_HOME/shark-0.9.1/
SHARK_HADOOP_VERSION=2.3.0-cdh5.0.0 ./sbt/sbt package

it takes a long time, depends on your network... normally it will be very slow... -_-

so may be now you want to download the pre-built shark-0.9.1 package for cdh 5.0.0 GA ...

again, it is here:
http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz

4. Parquet support

wget http://repo1.maven.org/maven2/com/twitter/parquet-hive/1.2.8/parquet-hive-1.2.8.jar -O $SPARK_USER_HOME/shark-0.9.1/lib/parquet-hive-1.2.8.jar

ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-hadoop.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-common.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-encoding.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-format.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-avro.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-column.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-thrift.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-generator.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-cascading.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-hadoop-bundle.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-scrooge.jar $SPARK_USER_HOME/shark-0.9.1/lib/

i am not sure if all of these jars are needed. but it works with these parquet jars...

and if you enable this parquet support, you need to set the SPARK_MEM in $SPARK_USER_HOME/shark-0.9.1/conf/shark-env.sh with at least 2GB .

5. Deploy shark to all the worker nodes

#MASTER

cd $SPARK_USER_HOME
tar zcf shark.tgz shark-0.9.1

scp this file to each worker, or

#WORKER

sudo ln -s /usr/bin/java /bin/java

export SPARK_USER_HOME=/var/lib/spark
cd $SPARK_USER_HOME
scp shark@test01:$SPARK_USER_HOME/shark.tgz $SPARK_USER_HOME/
tar zxf shark.tgz

6. Configure Spark

if your spark service can not be started in CM5 (Cloudera Manager 5), you may need to remove the "noexec" part of the /var or /var/run mount point. Using command:
mount -o remount,exec /var/run

and change the mount parameter in the line of /var or /var/run in /lib/init/fstab for a permanent solution.
you may need to go back to #MASTER, and add the workers in /etc/spark/conf/slaves .

such as you have 2 worker nodes:
echo "test02" >> /etc/spark/conf/slaves
echo "test03" >> /etc/spark/conf/slaves

and, in /etc/spark/conf/spark-env.sh you may need to change
export STANDALONE_SPARK_MASTER_HOST=`hostname`
to
export STANDALONE_SPARK_MASTER_HOST=`hostname -f`

7. Run it!

finally, i believe you can run the shark shell now!

go back to #MASTER
$SPARK_USER_HOME/shark-0.9.1/bin/shark-withinfo -skipRddReload

this -skipRddReload is only needed when you have some table with hive/hbase mapping, because of some issus in PassthroughOutputFormat by hive hbase handler.

the error message is something like:
"Property value must not be null"
or
"java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat"

8. Issuses

ref.: http://bigdataanalyze.blogspot.de/2014/03/installing-shark-in-cdh5-beta2.html

it is a good guide for Installing Shark in CDH5 beta2.

the author has also collect some common issues about Shark in CDH5 beta2: http://bigdataanalyze.blogspot.de/2014/03/issues-on-shark-with-cdh5-beta2-1.html

© Chutium / Teng Qiu @ ABC Netz Group