Tuesday, October 28, 2014

Scripting Extension Points in PDI

PDI Extension points are an awesome feature added to PDI 5.0 (and updated throughout 5.x) that allow you to hook into the operational aspects of your ETL processes to provide finer-grained control, optimizations, additional auditing/logging, or whatever your heart desires. Extension points abound in the system now, from places such as Transformation Start/Finish, Job Entry start/finish, Mouse Down/Double-click, Carte startup/shutdown, Database connect/disconnect, and the list goes on (see the above link for the current list).

Writing an extension point plugin is already pretty darn easy, a basic template might look like this:

  id = "CarteShutdownScript",
  extensionPointId = "CarteShutdown",
  description = "Executes script(s) when a Carte instance is shut down"
public class CarteShutdownExecutor implements ExtensionPointInterface {

  public CarteShutdownExecutor() {
    // Do init stuff here

  public void callExtensionPoint( LogChannelInterface log, Object o ) throws KettleException {
    WebServer server = (WebServer)o;

    // Do what you want here

However, unless you have a reusable project template for your IDE (which is not a bad idea by the way), then building and deploying an extension point plugin may be more difficult than writing one. Also, any code changes require a re-compile and re-deployment. I've written a few of these and wished the whole process, although pretty easy, would be even easier.

With that in mind, I set out to write extension point plugins that take care of the boilerplate code, while still allowing the full expressive power of general-purpose scripting languages. Also I didn't want to pick a particular scripting language (although I prefer Groovy :), so instead I decided to allow any language that provides a JSR-223 compliant script engine. Rhino (JavaScript) and Groovy come with PDI out-of-the-box (and so does AppleScript on a Mac), but I tested a Jython script as well.

To use this capability, go to the PDI Marketplace and install the PDI Extension Point Scripting plugin. This will put the plugin in data-integration/plugins/pdi-script-extension-points.  In that folder you will find two examples, TransformationStart.groovy and TransformationFinish.js.  If you run a transformation you should see two additional log messages every time you run a transformation. One script is executed with the Groovy script engine, and one with the Rhino engine, respectively.

The convention for these scripts is as follows:

1) The script must be located in the plugins/pdi-script-extension-points folder
2) The name of the script must start with the extension point ID you wish to implement. After that ID you can put whatever you like, my plugin just does a startsWith() to see if it recognizes the ID.
3) The extension of the script must be recognized by a JSR-223 script engine in your classpath.  So you can use .js for JavaScript files and .groovy for Groovy files, and if you've added something like Jython you can use .py as the extension
4) Besides any variable bindings provided by the engine, two more are added for your use, namely the two provided by the callExtensionPoint method:

  • _log_: a reference to the LogChannelInterface object, good for additional logging
  • _object_: a reference to the context object (Trans for TransformationStart, e.g.) The list is available on the wiki page. With dynamically typed languages you likely don't need to cast the _object_ value to the type listed on the wiki page.

The scripts are reloaded every time the extension point is invoked, so you can make updates to your script, re-run your transformation, and the updates will get pulled in. This allows for the use (and development of) scripting during design time that will be applied at run time.

The included examples are trivial, here's a slightly more involved script called StepAfterInitialize_each.groovy that adds a RowListener to each row for each step:

import org.pentaho.di.trans.step.*

_log_.logBasic "${_object_.combi.stepname} after init..."

rowAdapter = [
  rowReadEvent : { rowMeta, row -> 
     _log_.logBasic "Row size: ${row?.length}"
] as RowAdapter


Trying this with the "Delay row - Basic example" sample transformation, I get 10 lines of "Row size: 14". The transformation has two steps, so I might think I should get 20 lines (10 per step) of output, but a RowListener is called when a step reads a row, not outputs a row, so the Generate Rows step does not invoke the RowListeners.

I hope this is a helpful addition to the PDI ecosystem, and if so I'd love to hear about how you've used it, what kinds of crazy things you've tried, and especially how this can be improved.  The code is open-source (Apache 2.0 license) on GitHub.


No comments:

Post a Comment