Wednesday, February 4, 2015

Apache Pig UDF: Call a PDI transformation

For my latest fun side project, I looked at the integration of Pentaho Data Integration (PDI) and Apache Pig.  From the website: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." If you substitute "graphical" for "high-level" and "PDI" for "Apache Pig", you get a pretty accurate description of the Pentaho Data Integration product.  For this reason I thought it natural to look at the ways PDI and Pig could play together in the same pigpen, so to speak :)

Pentaho Data Integration has long offered an "Pig Script Executor" job entry, which allows the user to submit a Pig script to a Hadoop cluster (or a local Pig instance), which allows orchestration of data analysis programs written in Pig. However it doesn't integrate with other PDI capabilities (such as transformations) that are also data analysis programs.

My idea was to kind of turn the integration idea inside-out, so instead of PDI orchestrating Pig jobs, I wanted to leverage PDI transformations as data analysis programs inside of Pig.  I didn't want to have to include a PDI deployment inside a Pig deployment or vice versa; rather I envisioned a system where both Pig and PDI were installed, and the former could locate and use the latter.  This involved creating the following:
  1. A Pig UDF that can inject data into a PDI transformation and collect it on the other side, without needing PDI as a compile-time dependency
  2. A way to bridge the Pig UDF to a PDI deployment
  3. A way to transform Pig data types/values to/from PDI data types/values

For #2, I noticed that there are many places where this bridge could be leveraged (Hive, Spark, e.g.), so I created a project called pdi-bridge that could be used generally in other places. The project does two things:

First, it supplies classes that will run a transformation, inject rows, and collect result rows using an intermediate data model.

Second, there is a Java file (that is not compiled or included in the pdi-bridge JAR) called KettleBridge, this file needs to be copied into whatever integration project needs it, which in this case was my custom Pig UDF project.  The KettleBridge looks for a system property (then an environment variable) called KETTLE_HOME which needs to point at a valid PDI deployment. It then does some classloading and reflection stuff in order to wire up its static API methods to the PDI instance:

addRow - injects a row to the given transformation
finishTransformation - signals for the given transformation to stop running
getFieldHolder - returns the field holder (intermediate data model) for the given field name
getKettleClassloader - returns a classloader that includes the necessary PDI JARs
init - initializes the bridge
nextRow - retrieves a row from the PDI transformation "result set"
startTransformation - starts the given transformation

The project that uses the bridge is responsible for using getFieldHolder() and other methods to translate from the project's data types to PDI's data types. In my Pig UDF example I have methods like getFieldHolderList() and getKettleType(), which translate a Pig schema into a form which will ultimately be translated into PDI row metadata.  When calling addRow(), the objects passed in must be able to be used by PDI directly, so it is best to translate them to Java types such as String or Integer so the Injector step (see below) can convert the values.

You can see the code in my pdi-pig-udfs project, including the copy of KettleBridge and the RunKettleTrans class, the former providing for goal #2 above, and the latter providing for goals #1 and #3 above.

Next, I needed to get my JARs into a place where they could be used by Pig. Getting a Pig script to find and load the UDF jar was easy, using the REGISTER command (see Pig script below). In order to keep things simple, the KettleBridge class expects to find the pdi-bridge JAR in the same directory as the JAR that contains the KettleBridge class itself. So in this case the pdi-pig-udfs JAR and the pdi-bridge JAR need to be in the same folder.  You can get the pdi-bridge JAR by building from source (see my project link above) or by downloading it here. Same goes for the pdi-pig-udfs JAR, it can be downloaded here.  For my proof-of-concept, I built the pdi-pig-udfs JAR from source, then manually copied the pdi-bridge jar into the build/libs folder next to the other JAR.

Now that the plumbing was in place, I needed a transformation that would be executed as the UDF.  This transformation by convention needs the following things:
  • An Injector step called INPUT to be used to get rows into the transformation
  • A step called OUTPUT used to return rows to the UDF

For my example, I wanted to take firstname and lastname from a Pig script and inject them into a transformation that would uppercase the firstname, then concatenate the two and return a single field called fullname.  The transformation looks like this (don't mind the Text File Input step, that is for local testing outside Pig):



You can find the actual transformation on Gist.  Lastly, I needed a Pig script that registers the UDF JAR, loads in the data, then calls the UDF and dumps the output:

REGISTER '/Users/mburgess/git/pdi-pig-udfs/build/libs/pdi-pig-udfs-1.0.jar';

A = LOAD '../customers-100.txt' USING org.apache.pig.piggybank.storage.CSVExcelStorage( ';', 'NO_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER') AS (id: int, lastname: chararray, firstname: chararray);

B = FOREACH A GENERATE pdi.pig.RunKettleTrans('/Users/mburgess/pdi-pig.ktr', firstname, lastname) AS fullname;

DUMP B;


Notice I am using an absolute pathname to my UDF JAR, that is the same directory that contains the pdi-bridge JAR.  Then I'm loading customers-100.txt, getting the first three fields, and calling them "id", "lastname" and "firstname".  The FOREACH..GENERATE command will pass in the tuples to the UDF (called pdi.pig.RunKettleTrans). In this case it will pass in a tuple including the transformation filename, then firstname and lastname from the A dataset.  The transformation returns a single field called fullname, and the Pig output (from DUMP B) looks like this:

((FSJ-FIRSTNAME jwcdf-name))
((TUM-FIRSTNAME flhxu-name))
((GFE-FIRSTNAME xthfg-name))
((BNL-FIRSTNAME ulzrz-name))
((ONX-FIRSTNAME oxhyr-name))
...

Here is the command I ran, from the Pig 0.14 directory, to set KETTLE_HOME to PDI EE 5.2 and execute the above script with Pig in local mode:

KETTLE_HOME=~/pdi-ee-5.2.0.1-218/pdi-ee/data-integration bin/pig -x local ~/pdi-udf.pig


This all might appear terribly complicated, but if you'd like to call PDI transformations from Pig, you should only need to do the following:

1) Download the pdi-pig-udfs and pdi-bridge JARs into a single location. If you're running on Hadoop you might need to put them in HDFS or in some common location on the cluster where Pig can find them while running MapReduce (I only tested in local mode)

2) Create a transformation according to the rules above (Injector step called INPUT, e.g.)

3) Create a Pig script that calls pdi.pig.RunKettleTrans and passes in the location of the transformation and whatever other fields you have identified in your Injector step. You should also be able to use the results within the Pig script as well.

As this was just a proof-of-concept, there are probably a few bugs in there, and I wouldn't be surprised if more work is needed to get it going on a Hadoop cluster running Pig, but I wanted to show that Pig and other technologies are very approachable and amenable to PDI integration.

Tuesday, December 30, 2014

SuperScript PDI plugin

As readers of my blog know, I'm a huge fan of scripting languages on the JVM (especially Groovy), and of course I'm a huge fan of Pentaho Data Integration :)  While using the (experimental) Script step to do various things, I saw a few places where a script step could be improved for easier use and more powerful features.  Specifically I wanted:


  • A drop-down UI for selecting the scripting engine
  • Allow non-compilable JSR-223 scripting (such as AppleScript)
  • Enable the use of the script's returned value as an output field
  • Enable the use of the script step as an input step (doesn't need a trigger)
To that end, I set out to refactor the existing Script step, and I'm happy to announce that the SuperScript step is now available in the PDI Marketplace:


As you can see from the screenshot above, I get a list of the output fields, which are the input fields plus the fields I specified in the table at the bottom of the dialog. Also notice that I did not define the variable/field "c" in the script, but I set "Script result?" to Y for "c", and thus "c" will contain the result of the script evaluation (in this case, b * rowNumber).

The following variables are available to each script:

  • step - A reference to the SuperScript step object
  • stepName - the name of the SuperScript step
  • transName - the name of the transformation
  • row - the current row's data
  • lastRow - the last row's data
  • rowMeta - the metadata about the rows (field types, e.g.)
  • rowNumber (starts with 1 like the rest of PDI)
  • SKIP_TRANSFORMATION
  • ABORT_TRANSFORMATION
  • ERROR_TRANSFORMATION
  • CONTINUE_TRANSFORMATION

These last 4 function the same way as they do in the Script step. Users of the Script (or Modified JavaScript) step will notice that I removed leading underscores from some of the variables, this is to support script engines that don't support leading underscores in variable names.

A noticeable addition is the "lastRow" variable, this will contain null (or be undefined) for the first row but will contain the previous row's data for all subsequent rows. This opens the door for more powerful processing, such as filling empty fields with the previous row's value, changing script behavior based on if a field value has changed since the last row, etc.  UPDATE: Here is a screenshot of an example script that will fill the field (if null) with the previous field's value (if not null):



Other helpful API calls include the following:

step.getTrans() (or just step.trans in Groovy) - gets a reference to the Transformation object
step.getTrans().findStepInterface("another step") - Gets a reference to a different step
step.putRow(rowMeta, rowData) - Adds a row to the output stream

As I mentioned before, the existing Script step must be "primed" so that it will run at least once. Usually this is done with a Generate Rows step that puts out 1 or more rows. Sometimes the script itself will generate rows, so I wanted SuperScript to run at least once, whether there was an incoming row or not:



Perhaps the most fun and powerful addition is the ability of SuperScript to execute any JSR-223 Script Engine. The existing Script step requires that the Script Engine produce CompiledScript(s), which of course is the fastest but not always available. To that end, SuperScript will attempt to compile the script first, and if it cannot, it will fall back to evaluating (i.e. interpreting) the script(s).  This opens the door for a lot of new scripting languages, such as Jython, AppleScript, and Renjin (an R ScriptEngine for the JVM).

To illustrate this, and to have "Fun with Pentaho Data Integration",  I created a transformation with two SuperScripts running AppleScript, one just after a Generate Rows step, and one just before the last step. The first AppleScript starts iTunes, and the last one quits it. For long-running transformations, this pattern can be used to provide some musical entertainment while you wait for the transformation to finish :)


NOTE: The AppleScript script engine is afaik only available on the Mac and comes with its JDK.

I hope you find this plugin helpful, and I'd love to know how/if you are using it, and also how to improve it. One improvement I hope to add is the ability to choose different plugin(s) to link to, in order to use their capabilities. This would likely be done by creating a chained self-first classloader, and could be useful for things like Big Data capabilities from inside the SuperScript step.

The code is Apache 2.0-licensed and available on GitHub at https://github.com/mattyb149/pdi-scriptengine-plugin

As always, I welcome all comments, questions, and suggestions. Until next time, have fun with Pentaho Data Integration :)

Cheers!

Monday, December 8, 2014

How sorted (or sordid) is your data?

I've spent quite a bit of time looking at Pentaho Data Integration (aka Kettle) and trying to make it do things with external technologies and idioms, anywhere from Groovy, Drill, memcached, Redis, Hazelcast, and even Markov Chains. Recently though, I've been started to focus on the data coming in and out of PDI, and what I could learn from it (#datadiscovery). I'll be spending a lot more time with R and Drill soon, but as a small example of data discovery, I thought I'd look at "how sorted" data is.

Basically I wanted to know for an input stream (consisting of CSV files or database tables or whatever), is the stream close to being in a sorted state or not?  I am currently looking into approximate and probabilistic methods (like Longest Increasing Sequence and an interesting "multiplayer" version here), but this post is about a brute-force method of finding the variance of distance between an element in a stream and where it would be if the stream were sorted.

Specifically, I looked at the rank (aka row number) of the incoming data as the row number of the raw input, then in parallel I sorted on the desired columns and ranked the sorted rows. I was looking for the distance between each row's value(s) and how far the rows were from their sorted position(s).  My research (read: Google search and Wikipedia) brought me to the Spearman rank correlation coefficient.

For this I would need to sort the rows, then find the delta between the position of each desired column in the original rowset and the sorted rowset, then find the statistical dependence of the ranked values. There are more sophisticated techniques to determine the relationships between ranked items, but this one suited my purpose :)

Once I found the algorithm I was looking for, I set out to create an example using only PDI steps, with the following caveats:

1) No scripting steps: Of course you can do whatever you like with the Scripting steps, but if you don't know those programming languages, you're left with the rest of what PDI offers. Luckily the choices are plentiful and powerful.

2) No SQL steps: Most databases probably offer the kind of expressive power you'd need to write a "Spearman rho" function inline, and to be honest that's probably the best option performance-wise; but I was looking to create a data-agnostic, language-agnostic way to calculate the "sortedness" of a data set in PDI, as this could be used in a blending or refinery situation.

I decided to use the "customer-100.txt" sample file in PDI, and sorted on full name, in order to determine "how sorted" the customer data was in terms of customers' names.  I designed the following transformation:




This transformation is on Gist and Box.  The results:




The absolute value of the Spearman rho for customer-100.txt (when sorted on name) is 0.001272127. I used absolute value because I didn't care whether the stream was close to being sorted or reverse-sorted; if you care about that in your usage, then leave out the ABS(rho) calculation in the "Spearman's rho" step above.

Being so close to zero, we can determine that the data is not very well sorted, as a result of the Spearman rho telling us that there is no tendency for the raw data and the sorted data to follow any sort of trend of monotonicity (ascending or descending). If the values were to get closer to 1 (or to -1 if not using absolute value), then the stream would be closer to its sorted state and thus "more sorted".  I set up a rudimentary Value Mapper step ("How sorted is the data?" above) to indicate whether the data was well-sorted or not.  If you disable the sort path and enable the direct path around it, then the two rowsets will match and you will get 1.0 as the Spearman rho.

This might not be very useful to the PDI user-at-large, but I learned alot while working through it so I thought I'd share it here. Stay tuned for more Fun with Pentaho Data Integration ;)


Monday, November 10, 2014

ZooKeeper Input and Output steps in PDI

While working with Apache Drill and PDI (see previous posts), I found myself needing to read and write values to and from Drill's ZooKeeper instance. Since ZooKeeper can be (and is) used for many other applications besides Drill, I thought I'd write some simple ZooKeeper steps for PDI, namely ZooKeeper Input and ZooKeeper Output. Also I thought it would be nice to be able to view and edit values in my ZooKeeper instance while designing transformations, so I integrated a cool UI called Zooviewer into Spoon.

The ZooKeeper Input step takes paths to ZooKeeper values, and fills those values into the field names you select in the dialog:




The above screenshot shows the ZooKeeper Input dialog, notice that all PDI types are supported, as long as they can represent their values as a byte array.

The ZooKeeper Output step also takes field names and paths, and will recursively create the paths if you check the "Create path(s)?" checkbox.

As of version 1.1 of the plugin (now in the PDI Marketplace), the ZooKeeper Output step also supports variable and field substitution for the Path values, in some pretty cool ways:

1) In the Path column of the Output Fields table, you can use a variable/parameter, such as ${pathParam}

2) You can also use the field-substitution notation, which will inject values from the given field as values for the path(s). This is a little-known feature of PDI and as far as I know has only been implemented in the Mongo plugin and (now) the ZooKeeper Output step. To use this, suppose you have a bunch of key/value pairs on the PDI stream going into the ZooKeeper Output step, where the key is the path where you want the value stored in ZooKeeper. Then you'd set the Path value in the ZooKeeper Output dialog to ?{key}. Notice the question mark instead of the dollar-sign, this indicates a field substitution versus a variable substitution.

3) The ZooKeeper Output step will perform another variable substitution on the field value, in case your field values contain variables.  Below is a screenshot showing this use case:



Notice the ZooKeeper Output dialog uses the ?{key} notation to get its path values from the key field in the stream. The key field values include a variable ${pathParam}, which is filled in at runtime (see "Execute a transformation" dialog under Parameters). Running this will create three paths and store 3 values.

To view and edit the values in your ZooKeeper instance from PDI, select Manage ZooKeeper from the Tools drop-down menu. This will bring up a view window and an edit window, where you can create and delete child nodes, change values, etc.  Here is a view of my ZooKeeper instance after running the test transformations I showed above:



I'm interested to see if folks find this plugin useful and if so, how they are using it. As always, I welcome all questions, comments, suggestions, and contributions.

Cheers!

Tuesday, October 28, 2014

Scripting Extension Points in PDI

PDI Extension points are an awesome feature added to PDI 5.0 (and updated throughout 5.x) that allow you to hook into the operational aspects of your ETL processes to provide finer-grained control, optimizations, additional auditing/logging, or whatever your heart desires. Extension points abound in the system now, from places such as Transformation Start/Finish, Job Entry start/finish, Mouse Down/Double-click, Carte startup/shutdown, Database connect/disconnect, and the list goes on (see the above link for the current list).

Writing an extension point plugin is already pretty darn easy, a basic template might look like this:

@ExtensionPoint(
  id = "CarteShutdownScript",
  extensionPointId = "CarteShutdown",
  description = "Executes script(s) when a Carte instance is shut down"
)
public class CarteShutdownExecutor implements ExtensionPointInterface {

  public CarteShutdownExecutor() {
    // Do init stuff here
  }

  @Override
  public void callExtensionPoint( LogChannelInterface log, Object o ) throws KettleException {
    WebServer server = (WebServer)o;

    // Do what you want here
  }
}

However, unless you have a reusable project template for your IDE (which is not a bad idea by the way), then building and deploying an extension point plugin may be more difficult than writing one. Also, any code changes require a re-compile and re-deployment. I've written a few of these and wished the whole process, although pretty easy, would be even easier.

With that in mind, I set out to write extension point plugins that take care of the boilerplate code, while still allowing the full expressive power of general-purpose scripting languages. Also I didn't want to pick a particular scripting language (although I prefer Groovy :), so instead I decided to allow any language that provides a JSR-223 compliant script engine. Rhino (JavaScript) and Groovy come with PDI out-of-the-box (and so does AppleScript on a Mac), but I tested a Jython script as well.

To use this capability, go to the PDI Marketplace and install the PDI Extension Point Scripting plugin. This will put the plugin in data-integration/plugins/pdi-script-extension-points.  In that folder you will find two examples, TransformationStart.groovy and TransformationFinish.js.  If you run a transformation you should see two additional log messages every time you run a transformation. One script is executed with the Groovy script engine, and one with the Rhino engine, respectively.

The convention for these scripts is as follows:

1) The script must be located in the plugins/pdi-script-extension-points folder
2) The name of the script must start with the extension point ID you wish to implement. After that ID you can put whatever you like, my plugin just does a startsWith() to see if it recognizes the ID.
3) The extension of the script must be recognized by a JSR-223 script engine in your classpath.  So you can use .js for JavaScript files and .groovy for Groovy files, and if you've added something like Jython you can use .py as the extension
4) Besides any variable bindings provided by the engine, two more are added for your use, namely the two provided by the callExtensionPoint method:

  • _log_: a reference to the LogChannelInterface object, good for additional logging
  • _object_: a reference to the context object (Trans for TransformationStart, e.g.) The list is available on the wiki page. With dynamically typed languages you likely don't need to cast the _object_ value to the type listed on the wiki page.


The scripts are reloaded every time the extension point is invoked, so you can make updates to your script, re-run your transformation, and the updates will get pulled in. This allows for the use (and development of) scripting during design time that will be applied at run time.

The included examples are trivial, here's a slightly more involved script called StepAfterInitialize_each.groovy that adds a RowListener to each row for each step:

import org.pentaho.di.trans.step.*

_log_.logBasic "${_object_.combi.stepname} after init..."

rowAdapter = [
  rowReadEvent : { rowMeta, row -> 
     _log_.logBasic "Row size: ${row?.length}"
  }
] as RowAdapter

_object_.combi.step.addRowListener(rowAdapter)


Trying this with the "Delay row - Basic example" sample transformation, I get 10 lines of "Row size: 14". The transformation has two steps, so I might think I should get 20 lines (10 per step) of output, but a RowListener is called when a step reads a row, not outputs a row, so the Generate Rows step does not invoke the RowListeners.

I hope this is a helpful addition to the PDI ecosystem, and if so I'd love to hear about how you've used it, what kinds of crazy things you've tried, and especially how this can be improved.  The code is open-source (Apache 2.0 license) on GitHub.

Cheers!

Friday, October 17, 2014

Flatten JSON to key-value pairs in PDI

I've heard a number of comments regarding JSON and PDI, most of them having to do with difficulties parsing nested documents, using JSONPath, etc.  Personally, I've had a JSON doc I'd like to fetch fields from but I didn't want to try to figure out the JSONPath or document structure, I just wanted to get right to the values.

To achieve this (and to prove my point from a previous post on using Groovy Grape in PDI), I wrote a Groovy script to flatten a JSON document into key/value pairs in PDI. I needed the following elements:

1) An Apache Ivy installation/JAR. In a previous post I added the Ivy folder to the launcher.properties; for this I just dropped the single JAR into data-integration/lib before starting Spoon.

2) A JSON document read into the PDI stream and passed to a Script step

To use Groovy with the (still experimental) Script step, ensure the step name ends in ".groovy". This indicates to the Script step to find a JSR-223 Scripting Engine with the name "groovy". Since PDI comes with the Groovy script engine, you can use this out-of-the-box. To use other script engines, simply add the JAR(s) to the classpath (I usually drop them in data-integration/lib).

NOTE: The Script step does not find the Groovy script engine with Java 7 on Mac. This is documented in PDI-13074.  You can use Java 6 but that is not officially supported by Pentaho for PDI 5.0+

For the Groovy script, I decided to use Jackson Databind to parse the JSON, using an example I found on Stack Overflow (here). Databind is not included with PDI, so I used the technique from my PRD post to @Grab it:

@Grab(group='com.fasterxml.jackson.core', module='jackson-databind', version='2.3.3')

Then I needed the ability to add more than one output row per input row. The script step was designed to operate on a row of data and add fields to that row by setting variables in the script (and specifying those variables/fields in the step dialog).  Since I needed one input row to generate multiple output rows (one per JSON scalar object), I created the output row I wanted by explicitly adding the two fields I intended to add:

outputRowMeta = _step_.getInputRowMeta().clone();
_step_.stepMeta.stepMetaInterface.getFields( outputRowMeta, _step_.getStepname(), null, null, _step_, _step_.repository, _step_.metaStore );
outputRowMeta.addValueMeta(new ValueMetaString("key"))
outputRowMeta.addValueMeta(new ValueMetaString("value"))
outputRow = RowDataUtil.resizeArray( row, outputRowMeta.size()+2 )

Note that the Script step does this for you if you have 1 output row for every input row.  As a result, I added all but the last JSON scalar, then let the Script step do the last one for me:

int outputIndex = rowMeta.size()
int count = 1
int numProps = map.size()
key = null
value = null
map.each {kv ->
  if(count < numProps) {
    keyIndex = outputIndex
    valueIndex = outputIndex+1
    if(keyIndex >= 0 && valueIndex >= 0) {
      outputRow[keyIndex] = kv.key
      outputRow[valueIndex] = kv.value
    }
    _step_.putRow(outputRowMeta, outputRow)
  }
  else {
    key = kv.key
    value = kv.value
  }
  count++
}

The entire script is a Gist located here, and here's a screenshot of the step dialog:


You can see where I added ".groovy" to the step name, as well as specifying the output fields in the table below (and using them in the else loop above).

I ran the step against the following JSON doc:

{
   "Port":
   {
       "@alias": "defaultHttp",
       "Enabled": "true",
       "Number": "10092",
       "Protocol": "http",
       "KeepAliveTimeout": "20000",
       "ThreadPool":
       {
           "@enabled": "false",
           "Max": "150",
           "ThreadPriority": "5"
       },
       "ExtendedProperties":
       {
           "Property":
           [                         
               {
                   "@name": "connectionTimeout",
                   "$": "20000"
               }
           ]
       }
   }
}

And got the following results:



Perhaps this will be helpful for you, either by using the script to flatten JSON, or as an example of using Groovy in the Script step, and/or using @Grab to get dependencies on-the-fly in PDI.

Cheers!

Wednesday, October 1, 2014

List Zookeeper Nodes and Data with Groovy

Here's a quick Groovy script to recursively list Zookeeper nodes (and optionally, data), also on Gist here.  What does this have to do with PDI, you may ask?  Stay tuned ;)


@Grab('org.apache.zookeeper:zookeeper:3.4.6')

import org.apache.zookeeper.*
import org.apache.zookeeper.data.*
import org.apache.zookeeper.AsyncCallback.StatCallback
import static org.apache.zookeeper.ZooKeeper.States.*

final int TIMEOUT_MSEC = 5000
final int RETRY_MSEC = 100

def num_retries = 0
def print_data = args?.length > 1 ? Boolean.valueOf(args[1]) : false
def path = args?.length > 0 ? [args[0]] : ['/']
noOpWatcher =  { event -> } as Watcher

listKids = { parentList, level ->
  if(parentList != null) {
    parentList.each { parent ->
      parentPath = parent?.startsWith('/') ? parent : ('/'+parent)
      level.times() {print '  '}
      println parentPath
      dataStat = new Stat()
      try {
        bytes = zk.getData(parentPath, true, dataStat)
        if(dataStat?.dataLength > 0 && bytes && print_data) {
          level.times() {print '  '}
          println new String(bytes)
        }
      }
      catch(e) {}
      try {
        kids = zk.getChildren(parentPath, true)
        if(kids && kids.size() > 0) {
          listKids(kids.collect{parentPath+(parentPath.endsWith('/') ? '' : '/') +it}, level+1)
        }
      }
      catch(e) {}
    }
  }
}
  
zk = new ZooKeeper('localhost:2181', TIMEOUT_MSEC , noOpWatcher)
while( zk.state != CONNECTED && num_retries < (TIMEOUT_MSEC / RETRY_MSEC) ) {
  Thread.sleep(RETRY_MSEC)
  num_retries++
}
if(zk.state != CONNECTED) {
  println "No can do bro, after $TIMEOUT_MSEC ms the status is ${zk.state}"
  //System.exit(1)
}
else {
  listKids(path, 0)
}

zk.close()