Saturday, July 15, 2017

Installing CouchDB on Red Hat Enterprise Linux

     I got this up and running on RHEL 7.3. It took awhile to get through all the dependencies. Unlike what it says on the CouchDB Installation page, there is no RPM for this. Your'e going to have to download, compile, punch things to get this installed. Hopefully this guide will help to expedite this and get your document database up and running.

1. Get the RPM dependencies that are actually available

sudo yum install autoconf autoconf-archive automake \
curl-devel libicu-devel libtool gcc-c++


2. Install the wxWidgets libraries 

wget http://downloads.naulinux.ru/pub/NauLinux/6x/x86_64/sites/School/RPMS//wxBase-2.8.12-1.el6.x86_64.rpm

yum install wxBase-2.8.12-1.el6.x86_64.rpm

wget https://yum.postgresql.org/9.1/redhat/rhel-6-x86_64/wxGTK-2.8.12-1.el6.x86_64.rpm

yum install wxGTK-2.8.12-1.el6.x86_64.rpm

wget http://dl.fedoraproject.org/pub/epel/6/x86_64//wxGTK-gl-2.8.12-1.el6.x86_64.rpm

yum install wxGTK-gl-2.8.12-1.el6.x86_64.rpm

3. Install Erlang

wget http://packages.erlang-solutions.com/site/esl/esl-erlang/FLAVOUR_1_general/esl-erlang_19.0~centos~6_amd64.rpm

yum install esl-erlang_19.0~centos~6_amd64.rpm

4. Install Mozilla SpiderMonkey 1.8.5

wget http://ftp.mozilla.org/pub/mozilla.org/js/js185-1.0.0.tar.gz

tar -xvf js185-1.0.0.tar.gz

cd js-1.8.5/js/src

./configure

make && make install

This is optional, its just where I personally chose to put these files

cd back to root dir where the js-1.8.5 folder is

move js-1.8.5 to /var/lib/js-1.8.5

cp -r js-1.8.5 /var/lib

5. Time to install CouchDB

wget http://apache.mirrors.ionfish.org/couchdb/source/2.0.0/apache-couchdb-2.0.0.tar.gz

tar -xzf apache-couchdb-2.0.0.tar.gz

cd apache-couchdb-2.0.0

./configure

Change the below line in ./apache-couchdb-2.0.0/src/couch/rebar.config.script to point to where you put the SpiderMonkey files. For this example I changed:

{"linux",  CouchJSPath, CouchJSSrc, [{env, [{"CFLAGS", JS_CFLAGS ++ " -DXP_UNIX -I/usr/local/include/js"}, 

 to 

{"linux",  CouchJSPath, CouchJSSrc, [{env, [{"CFLAGS", JS_CFLAGS ++ " -DXP_UNIX -I/var/lib/js-1.8.5/js/src"},

Then make

make release

This is another optional step, it's just where I chose to put these files:

cp -r apache-couchdb-2.0.0/rel /var/lib/apache-couchdb-2.0.0/

Need to change the default.ini so we can access Fauxton from another server

vi /var/lib/apache-couchdb-2.0.0/rel/couchdb/etc/default.ini

Need to change bind_address under [chttpd] to 0.0.0.0

Now we can start up CouchDB

 nohup ./bin/couchdb/couchdb

If you want to make this run at start up add

/var/lib/apache-couchdb-2.0.0/rel/couchdb/bin/couchdb

to

/etc/rc.d/rc.local

6. Install the default databases

curl -X PUT http://127.0.0.1:5984/_users

curl -X PUT http://127.0.0.1:5984/_replicator

curl -X PUT http://127.0.0.1:5984/_global_changes


That should do it. You can then go to http:<servername>:5984/_utils to complete the configuration:

Figure 1. CouchDB Web UI

(Pro tip... If you are going to be using remote replication. I found the replication page in the web ui to not be working properly, best to use the REST API for this. Here is a good example of how to do this)

Saturday, July 8, 2017

Using SQL Server Change Tracking with Streamset's Data Collector Kudu Destination

     Just started using Apache Kudu for a few projects that require near-real time data, as well as for our ETL logging for analytics. It's a really great storage engine if you like working with relational data, and if you can get around the limitations. My favorite thing about it is that not only can you interact with Kudu tables using Impala and SQL, but it also has an API that allows you insert/update/delete/upsert to tables directly. This means I can send data to Kudu with instructions on what to do with that data. So for traditional data warehousing, you would most likely send your deltas to a staging table before running batched upserts into the destination table to get it in synch with the source system. With Kudu you could still do this, but due to the Java API and Streamsets Data Collector, there really is no need to. 

     In Streamsets Data Collector they have a Kudu Destination that takes advantage of the Java API to load data. This is tightly coupled with the CDC feature they have on their JDBC Origin. The way this works is that the CDC operations like:

1 for INSERT
2 for DELETE
3 for UPDATE
4 for UPSERT

are assigned to the Data Collector header variable sdc.operation.type which in turn tells the Kudu destination what to do with the data. Unfortunately for some of us change tracking users the operation codes come in as I, D or U. So in order for us to take advantage of Kudu, we're going to have to translate these change tracking operations and get that assigned to the header variable.

     Before we dive into how we do this I wanted to list a couple of gotchas I ran into while using Data Collector with Kudu. Unlike a Hive destination that has the Hive Metadata Processor, the Kudu destination has no equivalent and will not create the table for you. You will have to create the table in advance of kicking off the pipeline or it will fail. Kudu will not accept your datetime data types from SQL Server, you will have to get the Unix epoch representation and store them in a bigint. I will demonstrate how to do this. Make sure the Max Batch Size (Records) on the JDBC origin does not exceed the Mutation Buffer Space (records) on the Kudu Destination or the pipeline will fail.

     Ok, for this example we will use a pipeline that looks like this:


Figure 1. JDBC to Kudu Pipeline

For the JDBC Origin make sure that:

    -The Incremental Mode checkbox is checked
    -Initial Offset is set to 0
    -The Offset Column is set to sys_change_version

The source query is going to pull from this table:


Figure 2. Source System Table

using this query:

DECLARE @offset INT =${offset} 
DECLARE @unixepoch2 datetime2 = '1970-01-01 00:00:00.0000' 
IF (@offset =0 OR @offset IS NULL) 
   SELECT 
        sct.abn_note_id AS abn_note_id, 
        sct.line AS line, 
        sct.cm_log_owner_id AS cm_log_owner_id, 
        sct.cm_phy_owner_id AS cm_phy_owner_id, 
        sct.order_id AS order_id, 
 ((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date,
        c.sys_change_version AS sys_change_version, 
        'I' AS sys_change_operation 
FROM 
        dbo.abn_orders sct 
CROSS APPLY (
   SELECT 
        coalesce(max(sys_change_version), 0) AS sys_change_version 
    FROM 
        changetable(changes dbo.abn_orders, 0) c) AS c 
ELSE 
   SELECT 
        ct.abn_note_id AS abn_note_id, 
        ct.line AS line, 
        sct.cm_log_owner_id AS cm_log_owner_id,                                            sct.cm_phy_owner_id AS cm_phy_owner_id, 
        sct.order_id AS order_id,
((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date, 
        sys_change_version AS sys_change_version, 
        sys_change_operation AS sys_change_operation 
  FROM 
        dbo.abn_orders AS sct 
   RIGHT OUTER JOIN changetable(changes dbo.abn_orders, ${offset}) AS ct 
   ON ct.abn_note_id = sct.abn_note_id AND ct.line = sct.line 
  WHERE 
        sys_change_version > ${offset} 
  ORDER BY 
        sys_change_version 

     In this query you can see we are using an if - else statement. The first part of the if is used when the offset is 0, thus triggering an initial load from the table and grabbing the latest offset to store in Data Collector. The reason we have to do this is to protect ourselves if we are loading this data from outside the change tracking retention window, or change tracking was not enabled from the table's inception. After the else is used for delta loading. This will pull data from the table since the last recorded offset. Pro tip...if you are using incremental pipelines make sure to periodically backup /var/lib/sdc/data on your Data Collector server in order to save the offset states of your pipelines. Can definitely come in handy when disaster strikes.

     You will also notice the functions we apply to the abn_check_from_date field. This is to convert the SQL Server datetime to a unix epoch with millisecond precision that we can store in a bigint column over in the Kudu destination table. We can later throw a view on top of this value to display it as a timestamp in Hive/Impala.

     Now that we took care of the JDBC origin, we need to take care of setting the header variable in order to tell Kudu what to do with our record(s) coming through the pipeline. We can accomplish this using a Javascript Evaluator processor. In the script textarea put:

//Loop through columns
for (var i = 0; i < records.length; i++) {
 try {
  //We either delete
  if (records[i].value.sys_change_operation == "D") {
   records[i].attributes['sdc.operation.type'] = '2'
  }
  //Or we upsert
  else {
   records[i].attributes['sdc.operation.type'] = '4'
  }

  // Write record to processor output
  output.write(records[i]);
 } catch (e) {
  // Send record to error
  error.write(records[i], e);
 }

}

     You can see that if the operation is "D" we set the header variable to 2, else we set it to 4 for an upsert. I did that for simplicity sake, you can change this to add cases specifically for inserts and updates if need be.

     That's it! You now have an incremental pipeline that takes advantage of SQL Server Change Tracking and Kudu. The devs over at Streamsets have promised that they will support Change Tracking natively in a future release, but for now this solution will get you by.


Tuesday, July 4, 2017

Recovering From an Avro to Parquet Conversion Failure in Streamsets Data Collector

     In version 2.6 of Streamsets Data Collector, they added data drift support for the parquet file format. This means that if your source system changes, i.e. a new column has been added to the table/flat file/etc., Data Collector will update the target Hive table with this new column before the pipeline starts sending data to it. They have a walkthrough here that will help explain how it all works. In previous versions this was only available for the avro file format.

     To make the data drift solution work for parquet, Data Collector will first dump the data in an Avro format, then will use a MapReduce executor to kick off a mapreduce job ,via YARN, that will convert the avro file to parquet. It does this asynchronously, thus the pipeline reports it has finished and the mapreduce job goes off to run on the cluster. This is fine and all if it works. But........sometimes it doesn't. There are a myriad of reasons why your mapreduce job could fail, none of which matter at the moment when you realize that you just finished an initial load of some monster from the source system that may have taken hours to days to pull down into hadoop and now you have to run it all over again. Well my friend...put down the gun...we have one more shot to save the patient here.

     For this fix, we are going to have to convert the file from avro to parquet "manually".To do this we are going to use the code from this github project. The unfortunate thing is that this project is not being maintained and you will get some dependency issues when attempting to build it in maven. So I had to do some surgery on the project and updated some of the dependencies for avro and hadoop to get it working for CDH 5.11. If you want to build the project yourself, here is the pom:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cloudera.science</groupId> <artifactId>avro2parquet</artifactId> <version>0.1.0</version> <packaging>jar</packaging> <name>avro2parquet</name> <properties> <hadoop.version>2.0.0-cdh4.1.2</hadoop.version> <guava.version>13.0.1</guava.version> <junit.version>4.8.2</junit.version> </properties> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.7</version> <classifier>hadoop2</classifier> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.12</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.11.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.11.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.11.0</version> <exclusions> <exclusion> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>cloudera-repos</id> <name>Cloudera Repos</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- bind to the packaging phase --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

   If you are way too busy to be building java projects you can download the jar from here. So now that we have the jar built, here is a walkthrough of how to use it.


   In this example we will use this pipeline that takes data from a single table in a relational database and moves it to an external table in Hive, a.k.a. will create a parquet file in HDFS.

Figure 1. Streamsets Pipeline

     When the pipeline finishes, and the MapReduce executor kicks off, in mid-flight conversion you get a failure and your HDFS dir has something like this in there:

Figure 2. Failed Parquet Conversion
     
     If the conversion was successful the Avro file would be gone(if you made that selection in the MapReduce executor) and you would see a file with a .parquet file extension. Here you see the avro_to_parquet_tmp….. a relic of a failed mapreduce job. From here, we need to look at the Data Collector log for this pipeline to retrieve the YARN application id for the mapreduce job:

Figure 3. Streamsets Data Collector Log

Log into Hue and click on the Job Browser in the top right of the screen:

Figure 4. Hue Job Browser


In that screen filter on Username sdc and the text should be the application id from the log:


Figure 5. Hue Job Browser Application Filter


Click on the log icon to access the logs for this failed job. On the syslog tab you should see something like this:


Figure 6. Syslog for Application


This is the JSON representation of the avro schema for the file. We will need to copy this text:
{"type":"record","name":"order_results_target","namespace":"clarity_dbo","fields":[{"name":"line","type":["null","int"],"default":null},{"name":"ord_date_real","type":["null","double"],"default":null},{"name":"order_proc_id","type":["null","double"],"default":null},{"name":"cm_ct_owner_id","type":["null","string"],"default":null},{"name":"comp_anl_inst_tm","type":["null","double"],"default":null},{"name":"comp_obs_inst_tm","type":["null","double"],"default":null},{"name":"comp_res_technicia","type":["null","string"],"default":null},{"name":"comp_snomed_src_c","type":["null","int"],"default":null},{"name":"compon_lnc_id","type":["null","double"],"default":null},{"name":"compon_lnc_src_c","type":["null","int"],"default":null},{"name":"component_comment","type":["null","string"],"default":null},{"name":"component_id","type":["null","double"],"default":null},{"name":"component_type_c","type":["null","int"],"default":null},{"name":"data_type_c","type":["null","int"],"default":null},{"name":"interface_yn","type":["null","string"],"default":null},{"name":"lab_status_c","type":["null","int"],"default":null},{"name":"lrr_based_organ_id","type":["null","double"],"default":null},{"name":"numeric_precision","type":["null","double"],"default":null},{"name":"ord_end_date_real","type":["null","double"],"default":null},{"name":"ord_num_value","type":["null","double"],"default":null},{"name":"ord_raw_value","type":["null","string"],"default":null},{"name":"ord_value","type":["null","string"],"default":null},{"name":"organism_quantity","type":["null","string"],"default":null},{"name":"organism_quantity_unit","type":["null","string"],"default":null},{"name":"pat_enc_csn_id","type":["null","double"],"default":null},{"name":"pat_enc_date_real","type":["null","double"],"default":null},{"name":"pat_id","type":["null","string"],"default":null},{"name":"raw_high","type":["null","string"],"default":null},{"name":"raw_low","type":["null","string"],"default":null},{"name":"raw_ref_vals","type":["null","string"],"default":null},{"name":"ref_normal_vals","type":["null","string"],"default":null},{"name":"ref_range_type","type":["null","string"],"default":null},{"name":"ref_unit_uom_id","type":["null","double"],"default":null},{"name":"reference_high","type":["null","string"],"default":null},{"name":"reference_low","type":["null","string"],"default":null},{"name":"reference_unit","type":["null","string"],"default":null},{"name":"result_cmt_end_ln","type":["null","int"],"default":null},{"name":"result_cmt_start_ln","type":["null","int"],"default":null},{"name":"result_date","type":["null","double"],"default":null},{"name":"result_flag_c","type":["null","int"],"default":null},{"name":"result_in_range_yn","type":["null","string"],"default":null},{"name":"result_status_c","type":["null","int"],"default":null},{"name":"result_sub_idn","type":["null","string"],"default":null},{"name":"result_time","type":["null","double"],"default":null},{"name":"result_val_end_ln","type":["null","int"],"default":null},{"name":"result_val_start_ln","type":["null","int"],"default":null},{"name":"resulting_lab_id","type":["null","double"],"default":null},{"name":"rslt_reportable_yn","type":["null","string"],"default":null},{"name":"serv_area_id","type":["null","double"],"default":null},{"name":"value_normalized","type":["null","string"],"default":null},{"name":"verify_user_id","type":["null","string"],"default":null},{"name":"sys_change_version","type":["null","long"],"default":null},{"name":"sys_change_operation","type":["null","string"],"default":null},{"name":"loaddate","type":["null","long"],"default":null}]}

Now that we have our avro schema we need to create an Avro schema file. For this example the table is clarity_dbo.order_results. So we will create a file called order_results.avsc, copy the avro schema into the file, change the file to Unix EOL (can be done in notepad ++ Edit->EOL Conversion->Unix/OSX Format), and upload it to a dir in HDFS like /schemas/clarity_dbo/order_results:


Figure 7. Avro Schema File in HDFS

Figure 8. Avro Schema File Text
If you cannot find this in the logs, you can generate the avro schema using avro tools. The jar file is available here. Now in the directory you downloaded avro tools to on your local linux file system run this command:
hadoop jar avro-tools-1.8.2.jar getschema /tmp/parquetfix/sdc-4dbc5be5-5809-4faa-9e58-8e9a8b77de84_4c546953-99fd-43f1-a1f4-2eea27b7bbe5 | hadoop fs -put -f - /schemas/clarity_dbo/order_results/order_results.avsc

Parameter breakdown:
1. getschema tells avro tools to grab the schema from...
2. The full path with, file name, to the avro file we are going to generate the avro schema from
3. The path in hadoop where we want to put the avro schema file
Now we need to copy the Avro file from /sources/clarity_dbo/order_results to a directory we can use for this repair, in the example we use /tmp/parquetfix (If there are any files already in that directory you will need to delete them before attempting to repair!!!!). Check the box next to the file and Click Actions->Copy:


Figure 9. HDFS Copy Screen

Figure 10. Avro File in Tmp Dir
We're good to begin the conversion now. Log on to a server on the cluster where you put the conversion jar. Make sure to kinit as a user who has rights to execute this job if you have kerberos on your cluster. In this example I put the jar in sub dir in /var/lib so
cd /var/lib/avro2parquet
Here is where I put the avro2parquet.jar file. For the files/directories cited in this example, this would be the command you would want to execute:
hadoop jar avro2parquet.jar  \
-D mapreduce.map.memory.mb=27120 \
hdfs:///schemas/clarity_dbo/order_results/order_results.avsc\
hdfs:///tmp/parquetfix \
hdfs:///sources/clarity_dbo/order_results/fix
The first part of the command calls the jar.
  • The first parameter determines how much memory should be allocated to the job. In this example we give it 27GB of memory( which is overkill for this, but I had a lot of memory free on the server at the time). Make sure the job has enough memory to run or it will fail. You can see what the current memory load for the server is in Cloudera Manager-->Hosts
  • The second parameter is the link to the avro schema we created in the previous step
  • The third parameter is the location of the avro file we want converted
  • The fourth parameter is the location where we want to dump the converted parquet file(s). Make sure this directory doesn’t already exist or the script will fail

Kick off the script. When it is done, hopefully, you should see a success message:

Figure 11. Mapreduce Job Results
Now we go to /sources/clarity_dbo/order_results/fix to see our parquet files:
Figure 12. Parquet Files
Check the checkbox next to every file with the .parquet extension and click Action->Move and enter the parent folder:

Figure 13. Move Parquet Files

Now that the parquet files are there you can delete the Avro file, the tmp parquet file and the fix directory. This should leave just the .parquet files. To verify the fix took go to Hive and select some records from the table. If you can see the data now congratulations!! You saved the data!