Tuesday, October 28, 2014

Scripting Extension Points in PDI

PDI Extension points are an awesome feature added to PDI 5.0 (and updated throughout 5.x) that allow you to hook into the operational aspects of your ETL processes to provide finer-grained control, optimizations, additional auditing/logging, or whatever your heart desires. Extension points abound in the system now, from places such as Transformation Start/Finish, Job Entry start/finish, Mouse Down/Double-click, Carte startup/shutdown, Database connect/disconnect, and the list goes on (see the above link for the current list).

Writing an extension point plugin is already pretty darn easy, a basic template might look like this:

@ExtensionPoint(
  id = "CarteShutdownScript",
  extensionPointId = "CarteShutdown",
  description = "Executes script(s) when a Carte instance is shut down"
)
public class CarteShutdownExecutor implements ExtensionPointInterface {

  public CarteShutdownExecutor() {
    // Do init stuff here
  }

  @Override
  public void callExtensionPoint( LogChannelInterface log, Object o ) throws KettleException {
    WebServer server = (WebServer)o;

    // Do what you want here
  }
}

However, unless you have a reusable project template for your IDE (which is not a bad idea by the way), then building and deploying an extension point plugin may be more difficult than writing one. Also, any code changes require a re-compile and re-deployment. I've written a few of these and wished the whole process, although pretty easy, would be even easier.

With that in mind, I set out to write extension point plugins that take care of the boilerplate code, while still allowing the full expressive power of general-purpose scripting languages. Also I didn't want to pick a particular scripting language (although I prefer Groovy :), so instead I decided to allow any language that provides a JSR-223 compliant script engine. Rhino (JavaScript) and Groovy come with PDI out-of-the-box (and so does AppleScript on a Mac), but I tested a Jython script as well.

To use this capability, go to the PDI Marketplace and install the PDI Extension Point Scripting plugin. This will put the plugin in data-integration/plugins/pdi-script-extension-points.  In that folder you will find two examples, TransformationStart.groovy and TransformationFinish.js.  If you run a transformation you should see two additional log messages every time you run a transformation. One script is executed with the Groovy script engine, and one with the Rhino engine, respectively.

The convention for these scripts is as follows:

1) The script must be located in the plugins/pdi-script-extension-points folder
2) The name of the script must start with the extension point ID you wish to implement. After that ID you can put whatever you like, my plugin just does a startsWith() to see if it recognizes the ID.
3) The extension of the script must be recognized by a JSR-223 script engine in your classpath.  So you can use .js for JavaScript files and .groovy for Groovy files, and if you've added something like Jython you can use .py as the extension
4) Besides any variable bindings provided by the engine, two more are added for your use, namely the two provided by the callExtensionPoint method:

  • _log_: a reference to the LogChannelInterface object, good for additional logging
  • _object_: a reference to the context object (Trans for TransformationStart, e.g.) The list is available on the wiki page. With dynamically typed languages you likely don't need to cast the _object_ value to the type listed on the wiki page.


The scripts are reloaded every time the extension point is invoked, so you can make updates to your script, re-run your transformation, and the updates will get pulled in. This allows for the use (and development of) scripting during design time that will be applied at run time.

The included examples are trivial, here's a slightly more involved script called StepAfterInitialize_each.groovy that adds a RowListener to each row for each step:

import org.pentaho.di.trans.step.*

_log_.logBasic "${_object_.combi.stepname} after init..."

rowAdapter = [
  rowReadEvent : { rowMeta, row -> 
     _log_.logBasic "Row size: ${row?.length}"
  }
] as RowAdapter

_object_.combi.step.addRowListener(rowAdapter)


Trying this with the "Delay row - Basic example" sample transformation, I get 10 lines of "Row size: 14". The transformation has two steps, so I might think I should get 20 lines (10 per step) of output, but a RowListener is called when a step reads a row, not outputs a row, so the Generate Rows step does not invoke the RowListeners.

I hope this is a helpful addition to the PDI ecosystem, and if so I'd love to hear about how you've used it, what kinds of crazy things you've tried, and especially how this can be improved.  The code is open-source (Apache 2.0 license) on GitHub.

Cheers!

Friday, October 17, 2014

Flatten JSON to key-value pairs in PDI

I've heard a number of comments regarding JSON and PDI, most of them having to do with difficulties parsing nested documents, using JSONPath, etc.  Personally, I've had a JSON doc I'd like to fetch fields from but I didn't want to try to figure out the JSONPath or document structure, I just wanted to get right to the values.

To achieve this (and to prove my point from a previous post on using Groovy Grape in PDI), I wrote a Groovy script to flatten a JSON document into key/value pairs in PDI. I needed the following elements:

1) An Apache Ivy installation/JAR. In a previous post I added the Ivy folder to the launcher.properties; for this I just dropped the single JAR into data-integration/lib before starting Spoon.

2) A JSON document read into the PDI stream and passed to a Script step

To use Groovy with the (still experimental) Script step, ensure the step name ends in ".groovy". This indicates to the Script step to find a JSR-223 Scripting Engine with the name "groovy". Since PDI comes with the Groovy script engine, you can use this out-of-the-box. To use other script engines, simply add the JAR(s) to the classpath (I usually drop them in data-integration/lib).

NOTE: The Script step does not find the Groovy script engine with Java 7 on Mac. This is documented in PDI-13074.  You can use Java 6 but that is not officially supported by Pentaho for PDI 5.0+

For the Groovy script, I decided to use Jackson Databind to parse the JSON, using an example I found on Stack Overflow (here). Databind is not included with PDI, so I used the technique from my PRD post to @Grab it:

@Grab(group='com.fasterxml.jackson.core', module='jackson-databind', version='2.3.3')

Then I needed the ability to add more than one output row per input row. The script step was designed to operate on a row of data and add fields to that row by setting variables in the script (and specifying those variables/fields in the step dialog).  Since I needed one input row to generate multiple output rows (one per JSON scalar object), I created the output row I wanted by explicitly adding the two fields I intended to add:

outputRowMeta = _step_.getInputRowMeta().clone();
_step_.stepMeta.stepMetaInterface.getFields( outputRowMeta, _step_.getStepname(), null, null, _step_, _step_.repository, _step_.metaStore );
outputRowMeta.addValueMeta(new ValueMetaString("key"))
outputRowMeta.addValueMeta(new ValueMetaString("value"))
outputRow = RowDataUtil.resizeArray( row, outputRowMeta.size()+2 )

Note that the Script step does this for you if you have 1 output row for every input row.  As a result, I added all but the last JSON scalar, then let the Script step do the last one for me:

int outputIndex = rowMeta.size()
int count = 1
int numProps = map.size()
key = null
value = null
map.each {kv ->
  if(count < numProps) {
    keyIndex = outputIndex
    valueIndex = outputIndex+1
    if(keyIndex >= 0 && valueIndex >= 0) {
      outputRow[keyIndex] = kv.key
      outputRow[valueIndex] = kv.value
    }
    _step_.putRow(outputRowMeta, outputRow)
  }
  else {
    key = kv.key
    value = kv.value
  }
  count++
}

The entire script is a Gist located here, and here's a screenshot of the step dialog:


You can see where I added ".groovy" to the step name, as well as specifying the output fields in the table below (and using them in the else loop above).

I ran the step against the following JSON doc:

{
   "Port":
   {
       "@alias": "defaultHttp",
       "Enabled": "true",
       "Number": "10092",
       "Protocol": "http",
       "KeepAliveTimeout": "20000",
       "ThreadPool":
       {
           "@enabled": "false",
           "Max": "150",
           "ThreadPriority": "5"
       },
       "ExtendedProperties":
       {
           "Property":
           [                         
               {
                   "@name": "connectionTimeout",
                   "$": "20000"
               }
           ]
       }
   }
}

And got the following results:



Perhaps this will be helpful for you, either by using the script to flatten JSON, or as an example of using Groovy in the Script step, and/or using @Grab to get dependencies on-the-fly in PDI.

Cheers!

Wednesday, October 1, 2014

List Zookeeper Nodes and Data with Groovy

Here's a quick Groovy script to recursively list Zookeeper nodes (and optionally, data), also on Gist here.  What does this have to do with PDI, you may ask?  Stay tuned ;)


@Grab('org.apache.zookeeper:zookeeper:3.4.6')

import org.apache.zookeeper.*
import org.apache.zookeeper.data.*
import org.apache.zookeeper.AsyncCallback.StatCallback
import static org.apache.zookeeper.ZooKeeper.States.*

final int TIMEOUT_MSEC = 5000
final int RETRY_MSEC = 100

def num_retries = 0
def print_data = args?.length > 1 ? Boolean.valueOf(args[1]) : false
def path = args?.length > 0 ? [args[0]] : ['/']
noOpWatcher =  { event -> } as Watcher

listKids = { parentList, level ->
  if(parentList != null) {
    parentList.each { parent ->
      parentPath = parent?.startsWith('/') ? parent : ('/'+parent)
      level.times() {print '  '}
      println parentPath
      dataStat = new Stat()
      try {
        bytes = zk.getData(parentPath, true, dataStat)
        if(dataStat?.dataLength > 0 && bytes && print_data) {
          level.times() {print '  '}
          println new String(bytes)
        }
      }
      catch(e) {}
      try {
        kids = zk.getChildren(parentPath, true)
        if(kids && kids.size() > 0) {
          listKids(kids.collect{parentPath+(parentPath.endsWith('/') ? '' : '/') +it}, level+1)
        }
      }
      catch(e) {}
    }
  }
}
  
zk = new ZooKeeper('localhost:2181', TIMEOUT_MSEC , noOpWatcher)
while( zk.state != CONNECTED && num_retries < (TIMEOUT_MSEC / RETRY_MSEC) ) {
  Thread.sleep(RETRY_MSEC)
  num_retries++
}
if(zk.state != CONNECTED) {
  println "No can do bro, after $TIMEOUT_MSEC ms the status is ${zk.state}"
  //System.exit(1)
}
else {
  listKids(path, 0)
}

zk.close()