Monday, November 9, 2015

Data Lineage internals in PDI 6.0

In Pentaho Data Integration 6.0, we released a great new capability to collect data lineage for PDI transformations and jobs.  Data lineage is an oft-overloaded term, but for the purposes of this blog I will be talking about the flow of data from external sources into steps/entries and possibly out to other external targets. Basically we keep track of all fields as they are created, split, aggregated, transformed, etc. over the course of a transformation or job.  When jobs or transformations call other jobs or transformations, that relationship is also captured. So in that sense you can follow your data all the way through your PDI process.

The code for the current data lineage capability is entirely open source and is available on GitHub here: https://github.com/pentaho/pentaho-metaverse. You may see the term "metaverse" listed throughout the code and documentation (including the project name itself). The term was a pet name for what we envisioned the end product to be, a universe of metadata and relationships between all the artifacts and concepts in the Pentaho stack.  Whether that vision is realized the same way depends on the roadmap, it is very possible the needs of Pentaho's customers will drive the data lineage capabilities in a different direction.

Approach

Collecting lineage information for PDI is non-trivial. It may seem like the fields, steps, and operations are readily available such that the relationships could easily be discovered. However PDI is a very flexible and powerful tool. This includes APIs that are more general than uniform, as flexibility has seemed a more important goal than introspection.  For example, getting the list of fields that are output from a transformation step involves calling the getStepFields() API method. This lets the step report its own outgoing fields, and many times the step needs to know the incoming fields before it can properly report the output fields. So the step in turn calls the previous steps' getStepFields() methods.  In the case of a Table Input step, the query is actually executed so the metadata of the ResultSet is available to determine the output fields. This requires a valid database connection and introduces extra latency into the lineage collection.

Other considerations include variables and parameters. It is possible to parameterize things like table names, field names, etc. This makes it impossible to collect accurate data lineage based on a transformation "at-rest", i.e. during design time. Even using parameters' default values doesn't work as the default value may be meant to fail the transformation (to ensure valid parameter values are passed in).  For this reason, data lineage collection is performed at run-time, such that the fields and values are all resolved and known.  There is one caveat to this that I'll talk about at the end of the blog, but it is unsupported, undocumented, and more geared for the community for the time being.

At present, the architecture and flow for collecting lineage is as follows:

-  Just before a transformation is executed, a graph model is created and associated with the running Trans object. We use Apache Tinkerpop 2.6 for this (more on that in a moment)

-  If a step/entry is data-driven, we collect lineage information as each row is processed. Data-driven means the step behaves differently based on values coming in via fields in each row. For example, a Table Input step that gets parameters from a previous step, or a REST step that gets its URLs from an incoming field. This is contrast to a variable or parameter, which is defined and resolved once at the beginning of execution, and does not change over the execution.

- Once a transformation/job has finished, we iterate over each step/entry to collect the rest of the lineage information. This is done using a StepAnalyzer or JobEntryAnalyzer. These are individual objects responsible for collecting lineage information for a particular step (Table Output, e.g.), and there are generic versions of each in the event that no specific one exists.  There are also top-level analyzers for transformations and jobs, which actually perform the iteration and add top-level lineage information to the graph.

- The GenericStepMetaAnalyzer and GenericJobEntryMetaAnalyzer make a best-guess effort to collect accurate lineage. There is no encompassing API to collect all the various operations and business logic performed in each step. So all fields are assumed to be pass-through (meaning the structure of a named field -- type, e.g. -- hasn't changed as a result of the logic) and any new fields have no relationships to the incoming fields (as we don't know what those relationships are). To report full lineage, a step/entry needs a specific analyzer that understands the business logic of that step or entry.

- There are "decorator" analyzer interfaces (and base implementations) that can be associated with step/entry analyzers, most notably ExternalResourceStepAnalyzer and StepDatabaseConnectionAnalyzer (and the JobEntry versions thereof). These (when present for a step/entry analyzer) are called by the lineage collector to get relationships between steps (and/or their fields) to resources outside the transformation, such as text files, databases, etc.

The workhorse in this situation (the "lineage collector") is implemented in TransformationRuntimeExtensionPoint (there's a Job version too of course). This is the entry point to be called before a transformation starts, namely at the TransformationStartThreads extension point (see full list here). Instead of implementing multiple extension point plugins to be activated at the various points during execution, the TransListener interface provided the level of interaction we wanted, so the extension point plugin simply adds a TransListener interface (also implemented by the TransformationRuntimeExtensionPoint object) which will be called by the PDI internals.

The transStarted() method of TransformationRuntimeExtensionPoint creates a new "root node" associated with the client (Spoon, e.g.). This provides a well-known entry point for querying the graph if no other information is known.  When a graph model is created, nodes for all the concepts (transformation, job, database connection, step, field, etc.) are added to the graph as well. The method also creates a future runner for the lineage analysis, which will be called when the transformation is complete.

The transFinished() method spins off a new thread to perform the full lineage analysis. The Runnable calls into the parent entity's analyzer, so a TransformationAnalyzer or JobAnalyzer. The top-level analyzer adds its own lineage/metadata to the graph, then iterates over the steps/entries' so their analyzers can add their lineage information (see Graph Model and Usage below)

NOTE: In the code you will see lots of references to ExecutionProfile. This may be tied to the lineage graph someday (and indeed there is some data common to both) but for now it is there to collect something like the PDI Operations Mart and logging do, but in a uniform fashion with a standard format (JSON).

Graph Model and Usage

PDI's data lineage model is based on a property graph model. A graph is composed of nodes (concepts, entities, etc.) and edges connecting nodes (representing relationships between nodes). Nodes and edges can have properties (such as name = "my transformation" or relationship = "knows"). For our model, the nodes are things like the executed jobs/transformations, steps, stream fields, database connections, etc. Also the model includes "concept nodes" that allow for more targeted graph queries. For example, the graph includes a concept node labelled "Transformation", and all executed transformations in that graph have basically an "is-a" relationship with that concept node. In practice, it is a "parentconcept" edge from the concept node to the instance(s) of that concept. In our example, we could use it to start a query from the Transformation concept node and find all nodes connected to it via an outgoing "parentconcept" edge. This query returns nodes corresponding to all transformations executed for this lineage artifact.

For our property graph model implementation, we chose the open-source project Tinkerpop. The 3.x line of Tinkerpop has been accepted to the Apache Incubator, and I certainly congratulate them on that achievement!  Tinkerpop 3.x has absorbed all the 2.x products into a single product, and represents an impressive leap forward in terms of graph planning/processing engines. Having said that, Tinkerpop 3 requires Java 8, and since PDI 6.0 supports Java 7, we had to use the deprecated 2.6 version.  However 2.x had more than enough functionality for us, we just had to bring in the pieces we needed.  Those include the following:

Blueprints: This is the generic graph API, upon which all other Tinkerpop products are built, and useful in its own right to work at a low level with the graph model.

Pipes: This is the dataflow framework to allow for graph traversal and processing using a pipeline architecture (hence the name). The process pipeline(s) are themselves modelled as a graph (called a process graph)

Gremlin: This is the actual data traversal language, available as a Java API as well as a fluent Groovy DSL. Graph queries in Pentaho data lineage are materialized as Gremlin statements, which are executed as a process graph using the Pipes framework

Frames: This is basically an ORM from the graph/process models to Java objects. It allows the lineage code to offer a Java method whose body is essentially a Gremlin query that returns the specified object. There is some overhead involved with this ORM (due to the amount of reflection and such that is needed), so we only use Frames at present for integration testing. However it did increase our productivity and made our tests much less verbose :)

Viewing PDI Lineage Information

There's already a quite excellent blog on this subject by Pedro Alves, I highly recommend it as it explains where PDI stores lineage, as well as how to retrieve and display it using 3rd party tools such as yEd.


Design-time Lineage Information

As I mentioned, the lineage capability for PDI 6.0 is first-and-foremost a runtime lineage collection engine. However there are some APIs and such for accessing the lineage of an active transformation in Spoon. For example, the Data Services optimizations dialog uses something called the LineageClient to determine the "origin fields" for those fields exposed by a data service, in order to find possible push-down optimizations.

LineageClient contains methods offering a domain-level API for querying the lineage graphs. Inside each of these methods you'll see the Gremlin-Java API at work. Note: we decided not to include Groovy as an additional compile/runtime dependency to keep things simple and smaller in the build. This makes the usage more verbose (see the code) but there was no loss of functionality for us, there's a Tinkerpop wiki page on how to do Gremlin in Java.

To actually build the lineage graph for the active transformation, PDI 6.0 has TransOpenedExtensionPoint and TransChangedExtensionPoint plugins, each of which will create and populate the lineage graph for the active TransMeta object. It uses TransExtensionPointUtil.addLineageGraph() to achieve this.  This didn't need to be in its own thread as we can't collect data-driven lineage and we don't dive all the way down into executed transformations. The latter is because some transformations are dynamically created (using metadata injection for example).

So the extension points create and maintain the lineage graph for the active TransMeta, and the LineageClient can query (at the domain level) said graph.  However the graph(s) are stored in the LineageGraphMap and are thus accessible by anybody (using the active TransMeta as the key).  Similarly, the runtime graphs are available in the TransLineageHolderMap during their execution lifetime.

Get Involved

If you're looking to use the lineage capability from an external application, check out the HTTP API for lineage. If you'd like to get involved in the codebase, one great way is to add a StepAnalyzer or JobEntryAnalyzer for a step/entry that isn't covered yet.  The documentation for how to contribute these is on help.pentaho.com.  If you want to know which steps/entries have analyzers, start up PDI and (using the same HTTP API base as in the above link) point at cxf/lineage/info/steps (or .../entries for Job Entry analyzers)

Summary

Hopefully this sheds some light on the innards of the Pentaho Data Integration 6.0 data lineage capability.  This opens a world of opportunities both inside and outside Pentaho for maintaining provenance, determining change impact, auditing, and metadata management.  Cheers!













Friday, July 10, 2015

Drag-and-Drop support in Spoon

If you've ever played around with Drag-n-Drop in Spoon, you probably know that you can drag a KTR, KJB, or XML file onto the canvas, and it will open that file (if a legal PDI artifact) in Spoon for editing.  Under the hood, this is accomplished with "FileListeners", there are transformation and job versions of listeners registered automatically at Spoon startup.

However, did you know you can register your own FileListeners?  There is a pretty straightforward interface:

public interface FileListener {

  public boolean open( Node transNode, String fname, boolean importfile ) throws KettleMissingPluginsException;

  public boolean save( EngineMetaInterface meta, String fname, boolean isExport );

  public void syncMetaName( EngineMetaInterface meta, String name );

  public boolean accepts( String fileName );

  public boolean acceptsXml( String nodeName );

  public String[] getSupportedExtensions();

  public String[] getFileTypeDisplayNames( Locale locale );

  public String getRootNodeName();
}

You can implement this interface and it will be called at various points, including when a file is dragged onto the canvas. You can use this to add support to currently unsupported file types.  I will show how I implemented a quick CsvListener to add drag-and-drop support to PDI. With this, if you drag a CSV file onto a transformation on the canvas, a "CSV file input" step will be added to the transformation, with the filename already filled in:



I didn't bother with doing a "Get Fields" automatically because I won't know if there's a header row, etc.  Plus this is just a fun proof-of-concept, hopefully I/we will have a more robust Drag-n-Drop system in the future.

The trick is getting your FileListener registered with Spoon. There is no extension point directly for that purpose, but you can use a LifecycleListener plugin and implement the registration in your onStart() callback.

To get this going quickly, I wrote the CsvListener in Groovy, and put that in a file called onStart.groovy. I did that so I could leverage my PDI Extension Point Scripting plugin (available on the Marketplace), then drop my onStart.groovy file into plugins/pdi-script-extension-points/ and start Spoon.

The Groovy script is as follows, and is also available as a Gist:

import org.w3c.dom.* import org.pentaho.di.core.* import org.pentaho.di.core.exception.* import org.pentaho.di.core.gui.* import org.pentaho.di.core.plugins.* import org.pentaho.di.trans.step.* import org.pentaho.di.ui.spoon.* class CsvListener implements FileListener { public boolean open( Node transNode, String fname, boolean importfile ) throws KettleMissingPluginsException { def csvInputPlugin = PluginRegistry.instance.findPluginWithName(StepPluginType, 'CSV file input') def csvInputMetaClass = PluginRegistry.instance.loadClass(csvInputPlugin) csvInputMetaClass.setDefault() def pid = PluginRegistry.instance.getPluginId(csvInputPlugin.pluginType, csvInputMetaClass) def csv = new StepMeta(pid, csvInputPlugin.name, csvInputMetaClass) csv.stepMetaInterface.setFilename(fname) csv.setName(fname?.substring(fname?.lastIndexOf(File.separator)+1,fname?.indexOf('.')) ?: 'CSV file input') csv?.location= new Point(20,20) csv?.draw = true Spoon.instance.activeTransformation?.addStep(csv) Spoon.instance.activeTransGraph?.redraw() true } public boolean save( EngineMetaInterface meta, String fname, boolean isExport ) { false } public void syncMetaName( EngineMetaInterface meta, String name ) { } public boolean accepts( String fileName ) { def x = Arrays.asList(getSupportedExtensions()).contains(fileName?.substring(fileName?.indexOf('.')+1)) } public boolean acceptsXml( String nodeName ) { false } public String[] getSupportedExtensions() { ['csv'] as String[] } public String[] getFileTypeDisplayNames( Locale locale ) { ['CSV'] as String[] } public String getRootNodeName() { null } } Spoon.instance.addFileListener(new CsvListener())
There's lots of PDI voodoo going on in the open() method; I won't explain it all here, as I intend to write a proper plugin to do this for various file types. I just wanted to (again) show off how powerful and flexible PDI can be, and have some fun hacking the APIs :)

Cheers!

Tuesday, July 7, 2015

Bring Your Own Marketplace

The Pentaho Data Integration (PDI) Marketplace is a great place to share your PDI/Kettle contributions with the community at-large.  To add your plugin, you can pull down the marketplace.xml file (via our GitHub repo) and add your own entry, then submit a pull-request to have the entry added to the master Marketplace.

But did you know you could 'host' your own PDI Marketplace?  The Marketplace is designed to read in locations of marketplaces from anywhere you like, via a file at $KETTLE_HOME/.kettle/marketplaces.xml (where KETTLE_HOME can be your PDI/Kettle install directory and/or your user's home directory).  Here's an example file on Gist.

The file contains a list of marketplace entries, which are locations of various lists (aka marketplaces) of PDI plugins. The URLs provided are used to read Marketplace XML files, which contain the PDI Marketplace entries.

This is how I test incoming pull-requests for PDI Marketplace plugins. I use the marketplaces.xml file from the Gist link above, then checkout the pull-request from GitHub. Then I start PDI, go to the Marketplace, find the proposed plugin, try to install, open the dialog (if appropriate), then uninstall (NOTE: reboots are required).  Of course, support for the plugin itself is (perhaps) available via the submitter. These details are available in the PDI Marketplace UI before installation, and all licensing, usage, etc. is provided by the submitter.

The benefit of having a marketplaces.xml is that you can decide the list of PDI plugins available for download.  If your clients have a marketplaces.xml that only point at your own repositories / locations for plugins, then you can control which plugins can be downloaded by those clients.  For developers (as I show above), you can use it for testing before submitting your pull-request.  For consultants / OEMs, you can decide which plugins should show up in the list.  This mechanism is very flexible and should support most use cases.

In closing, I personally review many of the PDI Marketplace entries (aka pull-requests in GitHub), please let me know if you have any issues with announcing your plugin or otherwise contributing to our community.

Cheers!
- Matt

Friday, April 3, 2015

Command-line utility for PDI Marketplace (using Spoon?!)

The PDI Marketplace is a great way to extend the capabilities of your PDI installation, using excellent contributions from the community, and some less-excellent ones from yours truly ;)  At present, the Marketplace is a core PDI plugin (meaning it is not in the engine itself, but is included in all PDI distributions, both CE and EE).

As a proper plugin, its classes and "API" methods are not so easily accessible from the outside world, although it is possible using reflection or other methods. The good news is that because it is a core plugin, we know the JAR(s) will always be in a constant location (provided you haven't uninstalled or deleted it manually). Thus we could always add the plugins/market folder to our library/class paths if we needed direct access to the API.

So to get a command-line utility for listing, installing, and uninstalling PDI plugins via the Marketplace API, I could've created a Java project, done some classpath black magic, and gotten in working.  But to be honest that doesn't sound very fun, and this blog is all about fun with PDI :)

Now to the fun part! Did you know when you run spoon.sh or spoon.bat, you are really leveraging a capability called the Pentaho Application Launcher (on GitHub here), which was designed to offer a better experience for setting up the JVM environment than the "java" executable. In the first place, it allowed us to add all JARs in a folder to the classpath before the java interpreter supported wildcards, and also it allows us to build the classpath (as a parent URLClassLoader) dynamically, which avoids the max-length problem with setting the classpath in Windows, as we add a LOT of things to the PDI classpath.

Most users happily run spoon.sh and trust the system to set everything up. However the application launcher can be configured with a properties file (our defaults are in launcher/launcher.properties) but also from the command line, including the following switches:

-main: Allows you to set the main class (instead of Spoon.class) that is executed.
-lib: Allows you to add library folders to the environment.
-cp: Allows you to add classpath folders to the environment.

The -lib and -cp arguments are added to those in launcher.properties, so you don't have to worry about mucking with the existing setup.  In fact that's the point, I wanted the regular PDI environment but with my own entry point.  The kicker is that the folders have to be relative to the launcher/ folder. If you know where the libraries/JARs are (and don't mind figuring out the relative path from the absolute one), you can just add the relative paths.

My approach was the following: I wanted to use a small Groovy script to list, install, or uninstall plugins for PDI using the Marketplace API. I chose Groovy because I didn't want to set up a whole Java project with what would've been provided dependencies, and build and deploy a JAR with that one simple class.  Here's what the Groovy script looks like:

import org.pentaho.di.core.market.*
import org.pentaho.di.core.market.entry.*
import org.pentaho.di.core.*

KettleEnvironment.init(false)
entries = new MarketEntries()

command = args[0]
if(command == 'list') {
  entries.each {println it.name}
}
else {
  args[1..-1].each { arg ->
    entries.findAll {it.name == arg}.each {
      try {
        switch( command ) {
          case 'install': Market.install(it, null); break;
          case 'uninstall': Market.uninstall(it, null, false); break;
          default: println "Didn't recognize command: $command"
        }
      }
      catch(NullPointerException npe) {
        // eat it, probably trying to get a reference to a Spoon instance
      }
    }
  }
}

Then I needed a way to call this script with Groovy, but with the existing PDI environment available.  The whole thing is pretty easy to do with Gradle, but that approach downloads its own PDI JARs, doesn't support plugins, etc.  Plus it's not as fun as using spoon.sh to run a Groovy script ;)

Another thing I wanted to do was to dynamically find my Groovy interpreter, as I need to add its libraries to the library list for the application launcher. That's easily done in bash:

`dirname $(which groovy)`

That finds the executable, the library folder is one level up in lib/.  However I need this path as a relative path to launcher/, which proved to be more difficult. The most concise solution I found (for *nix) was to use Python and os.path.relpath:

python -c "import os.path; print os.path.relpath('`dirname $(which groovy)`'+'/../lib','launcher')"

Then it was a matter of adding that folder and ../plugins/market (the relative location of the PDI Marketplace JAR) to my library path, setting the main class as groovy.ui.GroovyMain, and passing as an argument the above Groovy script (which I called market.groovy located the same place as the bash script called market, both in my ~/bin folder).  Here's the resulting bash script:

#!/bin/bash

./spoon.sh -lib $(python -c "import os.path; print os.path.relpath('`dirname $(which groovy)`'+'/../lib','launcher')"):../plugins/market -main groovy.ui.GroovyMain `dirname $0`/market.groovy "$@"


Now I can go to my PDI installation and type "market list", and I get the following output (snippet):
PDI MySQL Plugin
PDI NuoDB Plugin
Apple Push Notification
Android Push Notification
Ivy PDI MongoDB Steps
Ivy PDI Git Steps
Vertica Bulk Loader
...

I can then put any of these friendly names into a command to install/uninstall:

market install "Ivy PDI Git Steps"
market uninstall "Android Push Notification"

The logger should output the status of the install/uninstall operation:
General - Installing plugin in folder: /Users/mburgess/pdi-ee-5.3.0.0-213/pdi-ee/data-integration/plugins/steps/IvyGitSteps

There you have it!  Like I said, there are probably MUCH easier ways to get this same thing done, and perhaps someday I'll write a proper utility (or we'll add a CLI to the Marketplace itself). However it was much more fun to call spoon.sh to get a headless PDI and Groovy to install a Marketplace plugin.

Cheers!

(Note: I don't think this works with pan.sh or kitchen.sh because they already use spoon.sh and set the main class to Pan or Kitchen using the same technique)

Monday, March 2, 2015

Using AppleScript with PDI SuperScript

In a previous blog post, I announced my SuperScript step for PDI, which adds and enhances some capabilities of the built-in Script step.  One notable addition is the ability to use AppleScript on a Mac, as the AppleScript script engine comes with the Mac JDK.

However the implementation of the script engine is a bit different than most other script engines, especially in terms of getting data into the script as variables, arguments, etc. If you just want to call a script for every row (without using incoming fields), you can just write straight AppleScript. However if you want to use incoming field(s), you have to do a little magic to get it all working.

First, the AppleScript script engine in Java will not pass bindings to the script as variables. Instead they use a combination of bindings to achieve this:

javax_script_function: This variable is set to the name of a function to be invoked in AppleScript
javax.script.argv: This variable is set to the value to be passed to the function. Since PDI doesn't have a List type, you can only pass one argument into your function in SuperScript. If you need multiple values, you'll have to concatenate them in PDI and pass them in as a single field value.

To make matters worse, SuperScript only passes in "used" fields to the script. To determine used fields, it (like the Script step) simply looks for the field name in the script. In this case, the actual field name used in the function invocation is likely neither the above properties. To get around this, simply put both of the above variable names in a comment:

(* uses javax_script_function and javax.script.argv *)

Then wrap your logic inside the function call:

on testFunc(testVal)
  return testVal & " World!"
end testFunc

In this example I used a Generate Rows step to set javax_script_function to "testFunc" and javax.script.argv to "Hello".  Then I ran the following sample transformation:




The final script for this example looks like:


The above picture doesn't show it but for the "result" output field I set "Script result?" to "Y". This way I can use "result" with "Hello World!" later on in the transformation.

Running the trans gives:


Hopefully this helps demystify the use of AppleScript with SuperScript in PDI, and allows you to leverage powerful Apple capabilities alongside the powerful capabilities of PDI.  Cheers!

Wednesday, February 4, 2015

Apache Pig UDF: Call a PDI transformation

For my latest fun side project, I looked at the integration of Pentaho Data Integration (PDI) and Apache Pig.  From the website: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." If you substitute "graphical" for "high-level" and "PDI" for "Apache Pig", you get a pretty accurate description of the Pentaho Data Integration product.  For this reason I thought it natural to look at the ways PDI and Pig could play together in the same pigpen, so to speak :)

Pentaho Data Integration has long offered an "Pig Script Executor" job entry, which allows the user to submit a Pig script to a Hadoop cluster (or a local Pig instance), which allows orchestration of data analysis programs written in Pig. However it doesn't integrate with other PDI capabilities (such as transformations) that are also data analysis programs.

My idea was to kind of turn the integration idea inside-out, so instead of PDI orchestrating Pig jobs, I wanted to leverage PDI transformations as data analysis programs inside of Pig.  I didn't want to have to include a PDI deployment inside a Pig deployment or vice versa; rather I envisioned a system where both Pig and PDI were installed, and the former could locate and use the latter.  This involved creating the following:
  1. A Pig UDF that can inject data into a PDI transformation and collect it on the other side, without needing PDI as a compile-time dependency
  2. A way to bridge the Pig UDF to a PDI deployment
  3. A way to transform Pig data types/values to/from PDI data types/values

For #2, I noticed that there are many places where this bridge could be leveraged (Hive, Spark, e.g.), so I created a project called pdi-bridge that could be used generally in other places. The project does two things:

First, it supplies classes that will run a transformation, inject rows, and collect result rows using an intermediate data model.

Second, there is a Java file (that is not compiled or included in the pdi-bridge JAR) called KettleBridge, this file needs to be copied into whatever integration project needs it, which in this case was my custom Pig UDF project.  The KettleBridge looks for a system property (then an environment variable) called KETTLE_HOME which needs to point at a valid PDI deployment. It then does some classloading and reflection stuff in order to wire up its static API methods to the PDI instance:

addRow - injects a row to the given transformation
finishTransformation - signals for the given transformation to stop running
getFieldHolder - returns the field holder (intermediate data model) for the given field name
getKettleClassloader - returns a classloader that includes the necessary PDI JARs
init - initializes the bridge
nextRow - retrieves a row from the PDI transformation "result set"
startTransformation - starts the given transformation

The project that uses the bridge is responsible for using getFieldHolder() and other methods to translate from the project's data types to PDI's data types. In my Pig UDF example I have methods like getFieldHolderList() and getKettleType(), which translate a Pig schema into a form which will ultimately be translated into PDI row metadata.  When calling addRow(), the objects passed in must be able to be used by PDI directly, so it is best to translate them to Java types such as String or Integer so the Injector step (see below) can convert the values.

You can see the code in my pdi-pig-udfs project, including the copy of KettleBridge and the RunKettleTrans class, the former providing for goal #2 above, and the latter providing for goals #1 and #3 above.

Next, I needed to get my JARs into a place where they could be used by Pig. Getting a Pig script to find and load the UDF jar was easy, using the REGISTER command (see Pig script below). In order to keep things simple, the KettleBridge class expects to find the pdi-bridge JAR in the same directory as the JAR that contains the KettleBridge class itself. So in this case the pdi-pig-udfs JAR and the pdi-bridge JAR need to be in the same folder.  You can get the pdi-bridge JAR by building from source (see my project link above) or by downloading it here. Same goes for the pdi-pig-udfs JAR, it can be downloaded here.  For my proof-of-concept, I built the pdi-pig-udfs JAR from source, then manually copied the pdi-bridge jar into the build/libs folder next to the other JAR.

Now that the plumbing was in place, I needed a transformation that would be executed as the UDF.  This transformation by convention needs the following things:
  • An Injector step called INPUT to be used to get rows into the transformation
  • A step called OUTPUT used to return rows to the UDF

For my example, I wanted to take firstname and lastname from a Pig script and inject them into a transformation that would uppercase the firstname, then concatenate the two and return a single field called fullname.  The transformation looks like this (don't mind the Text File Input step, that is for local testing outside Pig):



You can find the actual transformation on Gist.  Lastly, I needed a Pig script that registers the UDF JAR, loads in the data, then calls the UDF and dumps the output:

REGISTER '/Users/mburgess/git/pdi-pig-udfs/build/libs/pdi-pig-udfs-1.0.jar';

A = LOAD '../customers-100.txt' USING org.apache.pig.piggybank.storage.CSVExcelStorage( ';', 'NO_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER') AS (id: int, lastname: chararray, firstname: chararray);

B = FOREACH A GENERATE pdi.pig.RunKettleTrans('/Users/mburgess/pdi-pig.ktr', firstname, lastname) AS fullname;

DUMP B;


Notice I am using an absolute pathname to my UDF JAR, that is the same directory that contains the pdi-bridge JAR.  Then I'm loading customers-100.txt, getting the first three fields, and calling them "id", "lastname" and "firstname".  The FOREACH..GENERATE command will pass in the tuples to the UDF (called pdi.pig.RunKettleTrans). In this case it will pass in a tuple including the transformation filename, then firstname and lastname from the A dataset.  The transformation returns a single field called fullname, and the Pig output (from DUMP B) looks like this:

((FSJ-FIRSTNAME jwcdf-name))
((TUM-FIRSTNAME flhxu-name))
((GFE-FIRSTNAME xthfg-name))
((BNL-FIRSTNAME ulzrz-name))
((ONX-FIRSTNAME oxhyr-name))
...

Here is the command I ran, from the Pig 0.14 directory, to set KETTLE_HOME to PDI EE 5.2 and execute the above script with Pig in local mode:

KETTLE_HOME=~/pdi-ee-5.2.0.1-218/pdi-ee/data-integration bin/pig -x local ~/pdi-udf.pig


This all might appear terribly complicated, but if you'd like to call PDI transformations from Pig, you should only need to do the following:

1) Download the pdi-pig-udfs and pdi-bridge JARs into a single location. If you're running on Hadoop you might need to put them in HDFS or in some common location on the cluster where Pig can find them while running MapReduce (I only tested in local mode)

2) Create a transformation according to the rules above (Injector step called INPUT, e.g.)

3) Create a Pig script that calls pdi.pig.RunKettleTrans and passes in the location of the transformation and whatever other fields you have identified in your Injector step. You should also be able to use the results within the Pig script as well.

As this was just a proof-of-concept, there are probably a few bugs in there, and I wouldn't be surprised if more work is needed to get it going on a Hadoop cluster running Pig, but I wanted to show that Pig and other technologies are very approachable and amenable to PDI integration.