Collibra DQ + Databricks

Introduction

This page provides guidance to help you upload Collibra DQ jars to a Databricks cluster and run a Collibra DQ job by invoking Collibra DQ APIs. Some examples are provided in this page.

Architecture

Running CDQ jobs from Scala and Pyspark notebooks.

Collibra DQ Environment Setup

This section explains the steps involved in setting up your Collibra DQ environment in Databricks. This is the first step towards invoking Collibra DQ APIs in Databricks.

Step 1: Extract the Collibra DQ core jar from Collibra DQ package zipped file.

The first step is to get the CDQ jar file. Once you have the cdq jar package file, you can get the jars by running the following commands:

tar -xvf package.tar.gz

For example, tar -xvf owl-2022.04-RC1-default-package-base.tar.gz

Running this command instructs tar to extract the files from the zipped file. From the list of the files, you need to upload the owl-core-xxxx-jar-with-dependancies.jar to our Databricks file system which will be explained in the next section.

Extracting the owl jar files from owl package zipped file.

Step 2: Upload the Collibra DQ core jar file to Databricks file system using UI

The jars should be manually uploaded in Databricks file system. Below is the quick summary of the steps. For more information on uploading files in Databricks, refer to the official Databricks documentation.

  1. Login to your Databricks account.
  2. Click Data Icon Data in the sidebar.
  3. Click DBFS at the top of the page.
  4. Upload the owl-core-xxxx-jar-with-dependancies.jar to your desired path.

Upload owl-core-xxxx-jar-with-dependancies.jar to DBFS.

Step 3: Install Collibra DQ library in your Databricks cluster

Install owl-core-xxxx-jar-with-dependancies.jar in your cluster.

Once this step is completed, you can create a workspace and start using Collibra DQ APIs.

Step 4: Update the Spark Config in your Databricks cluster

Note This step is required if your cluster uses Spark 3.2.1 and onward.

When you bring Collibra DQ jars into Databricks, you are required as of the Collibra DQ 2023.01 release to set the property spark.sql.sources.disabledJdbcConnProviderList='basic,oracle,mssql' at either the Spark Cluster-level or the SparkSession-level before using Collibra DQ's set of functions for Spark profiles 3.2.1 and onwards.

Setting the property in the Spark Config of your Databricks cluster

If you have an active Databricks cluster, you can set the property in the Spark Config section of your Databricks cluster.

  1. On your Databricks cluster configuration page, click the Advanced Options toggle and select the Spark tab.
  2. In the Spark Config section, enter the following configuration property as one key-value pair per line:
    spark.sql.sources.disabledJdbcConnProviderList
    basic,oracle,mssql

updating the databricks cluster spark config

Setting the property in a SparkSession

If you have your own Spark data source, you can set the property in the SparkSession. For example:
SparkSession.getActiveSession.get.conf.set("spark.sql.sources.disabledJdbcConnProviderList", "basic,oracle,mssql")

Step 5 (Optional): Update datasource pool size

Note This step is necessary if you get PoolExhaustedException when you call Collibra DQ APIs.

Update the connection pool size in the Spark environment using the following environment variables:

SPRING_DATASOURCE_POOL_MAX_WAIT=500

SPRING_DATASOURCE_POOL_MAX_SIZE=30

SPRING_DATASOURCE_POOL_INITIAL_SIZE=5

For more information on setting up Databricks environment variables, refer to the official Databricks documentation.

Add CDQ environment variables to Databricks's cluster

Enabling multi-tenancy for notebook APIs

The following Scala code enables multi-tenancy for notebook APIs by setting up database connection parameters and assigning them to the opt object. In this example, "finance" is the name of the non-public tenant for Collibra DQ.

Copy
// Enable Multi Tenancy for Notebook API
val tenantName = "finance"
val pgHost = "xxxx.amazonaws.com"
val pgDatabase = "postgres"
val pgSchema = "public"
val pgUser = "???????" 
val pgPass = "????"
val pgPort = "0000"

// Assign these values to the opt object
val opt = new OwlOptions()
opt.port = s"jdbc:postgresql://$pgHost:$pgPort/$pgDatabase?currentSchema=$tenantName"

After you run the job, check the specific tenant's job page for the results.

Collibra DQ Working Example in DataBricks

Import Collibra DQ library

Copy
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import scala.collection.JavaConverters._ 
import java.util.Date
import java.time.LocalDate
import java.text.SimpleDateFormat
import spark.implicits._ 
import java.util.{ArrayList, List, UUID}
// CDQ Imports 
import com.owl.core.Owl 
import com.owl.common.options._
import com.owl.common.domain2._
import com.owl.core.util.OwlUtils
spark.catalog.clearCache

Bringing customer data from another database

Bringing customer data from a file

Copy
val df = (spark.read
.format("csv").option("header", true).option("delimiter", ","
.load(dbfs:/FileStore/nyse.csv")
)

Bringing customer data from a database

Copy
val connProps = Map(
"driver" -> "org.postgresql.Driver"
"user" -> "your-username"
"password" -> "your-password",
"url" -> "jdbc:postgresql://abc:1234/postgres",
"dbtable" -> "public.example_data"
//--- Load Spark DataFrame ---//
val df = spark.read.format("jdbc").options(connProps).load display(df)
display(df) // view your data

Variables to set up Collibra DQ Metastore database location

Copy
val pgHost = "xxxx.amazonaws.com" 
val pgDatabase = "postgres" 
val pgSchema = "public"
val pgUser = "???????" 
val pgPass = "????"
val pgPort = "0000"

Create a Collibra DQ Test (Rules) and detect breaks

Note If the rules are already created and assigned to a dataset from the UI, calling owlcheck() automatically executes all the rules associated with the given dataset and there is no need to recreate the rule from notebook.

Copy
val dataset = "cdq_notebook_db_rules"
var date = "2018-01-11"

// Options
val opt = new OwlOptions()
opt.dataset = dataset
opt.runId = date
opt.host = pgHost
opt.port = pgPort
opt.pgUser = pgUser
opt.pgPassword = pgPass

opt.setDatasetSafeOff(false) // to enable historical overwrite of dataset

// Create a simple rule and assign it to dataset
val simpleRule = OwlUtils.createRule(opt.dataset)
      simpleRule.setRuleNm("nyse-stocks-symbol")
      simpleRule.setRuleValue("symbol == 'BHK'")
      simpleRule.setRuleType("SQLG")
      simpleRule.setPerc(1.0)
      simpleRule.setPoints(1)
      simpleRule.setIsActive(1)
      simpleRule.setUserNm("admin")
      simpleRule.setPreviewLimit(8)

// Create a rule from generic rules that are created from UI:
val genericRule = OwlUtils.createRule(opt.dataset)
    genericRule.setRuleNm("exchangeRule") // this could be any name
    genericRule.setRuleType("CUSTOM")
    genericRule.setPoints(1)
    genericRule.setIsActive(1)
    genericRule.setUserNm("admin")
    genericRule.setRuleRepo("exchangeCheckRule"); // Validate the generic rule name //from UI
    genericRule.setRuleValue("EXCH") // COLUMN associate with the rule
    
        
// Pre Routine 
val cdq = com.owl.core.util.OwlUtils.OwlContext(df, opt)
cdq.removeAllRules(opt.dataset)
.register(opt)
.addRule(simpleRule)

// Scan
cdq.owlCheck()
val results = cdq.hoot() // returns object Hoot, not a DataFrame
//See Json Results(Option for downstream processing)
println("--------------Results:----------------\n")
println(results) //optional

//Post Routine, See DataFrame Results (Option for downstream processing)
val breaks = cdq.getRuleBreakRows("nyse-stocks-symbol")
println("--------------Breaks:----------------\n")
display(breaks)

// Different Options for handling bad records
val badRecords = breaks.drop("_dataset","_run_id", "_rule_name", "owl_id")
display(badRecords)

val goodRecords = df.except(badRecords)
display(goodRecords)

Write the breaks (bad records) DataFrame to a Parquet file

Copy
// Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", true
breaks.write.parquet("/tmp/databricks-df-example.parquet")

The following image shows the code snippet and the result in Databricks:

Create CDQ Test Rules In DataBricks

copying rules from one dataset to another

Steps to reassign the rules of one dataset to another via the API.

The breaks and the rules can also be viewed in Collibra DQ web.

Create a Collibra DQ Test (Profile)

Copy
val dataset = "cdq_notebook_nyse_profile"

val runList = List("2018-01-01", "2018-01-02", "2018-01-03", "2018-01-04", "2018-01-05"
for(runID <- runList {
// Options
val options = new OwlOptions()
options.dataset = dataset
options.host = pgHost
options.port = pgPort
options.pgUser = pgUser
options.pgPassword = pgPass

//Scan
val profileOpt = new ProfileOpt
profileOpt.on = true
profileOpt.setShape(true)
profileOpt.setShapeSensitivity(5.0)
profileOpt.setShapeMaxPerCol(10)
profileOpt.setShapeMaxColSize(10)
profileOpt.setShapeGranular(true)
profileOpt.behaviorEmptyCheck = true
profileOpt.behaviorMaxValueCheck = true
profileOpt.behaviorMinValueCheck = true
profileOpt.behaviorNullCheck = true
profileOpt.behaviorRowCheck = true
profileOpt.behaviorMeanValueCheck = true
profileOpt.behaviorUniqueCheck = true
profileOpt.behaviorMinSupport = 5 // default is 4
profileOpt.behaviorLookback = 5
options.profile = profileOpt

var date = runId
var df_1 = df.where($"TRADE_DATE"===s"$date")

//Scan
val cdq = OwlUtils.OwlContext(df_1, options)
cdq.register(opt)
cdq.owlCheck()
val profile = cdq.profileDF()
profile.show()
}

CDQ Profile Run In Databricks

The Profile result can be viewed in CDQ Web.

Create a Collibra DQ Test (Dupes)

Copy
val dataset = "cdq_notebook_db_dupe"
var date = "2018-01-11"

// Options
val options = new OwlOptions()
options.dataset = dataset
options.runId = date
options.host = pgHost
options.port = pgPort
options.pgUser = pgUser
options.pgPassword = pgPass

opt.dupe.ignoreCase = true
opt.dupe.on = true
opt.dupe.lowerBound = 99
opt.dupe.include = Array("SYMBOL", "TRADE_DATE")

//Scan
val cdq = OwlUtils.OwlContext(df, opt)
cdq.register(options)
cdq.owlCheck()

val dupesDf = cdq.getDupeRecords
dupesDf.show()

CDQ Dupes Run In Databricks

Dupes results can be viewed in CDQ Web.

Create a Collibra DQ Test (Outlier)

Copy
import scala.collection.JavaConverters._
import java.util
import java.util.{ArrayList, List, UUID}

val dataset = "cdq_notebook_db_outlier"
var date = "2018-01-11"

// Options
val options = new OwlOptions()
options.dataset = dataset
options.runId = date
options.host = pgHost
options.port = pgPort
options.pgUser = pgUser
options.pgPassword = pgPass

opt.dupe.on = false

val dlMulti: util.List[OutlierOpt] = new util.ArrayList[OutlierOpt]
val outlierOpt = new OutlierOpt()
outlierOpt.combine = true
outlierOpt.dateColumn = "trade_date"
outlierOpt.lookback = 4
outlierOpt.key = Array("symbol")
outlierOpt.include = Array("high")
outlierOpt.historyLimit = 10
dlMulti.add(outlierOpt)

opt.setOutliers(dlMulti)

val cdq = OwlUtils.OwlContext(df, opt)
        .register(opt)

cdq.owlCheck
val outliers = cdq.getOutliers()
outliers.show
outliers.select("value")

Create Collibra DQ Test (ValidateSource)

Copy
import com.owl.common.options.SourceOpt
import java.util
import java.util.{ArrayList, List}

var opt = new OwlOptions()
val dataset = "weather-validateSrc"
opt.setDataset(dataset)
opt.runId = "2018-02-23"
clearPreviousScans(opt)
val src = Seq(
  ("abc", true, 55.5, "2018-02-23 08:30:02"),
  ("def", true, 55.5, "2018-02-23 08:30:02"),
  ("xyz", true, 55.5, "2018-02-23 08:30:02")
).toDF("name", "sunny", "feel-like-temp", "d_date")

val target = Seq(
  ("abc", 72, false, 55.5, "2018-02-23 08:30:02"), // true to false
  ("xyz", 72, true, 65.5, "2018-02-23 09:30:02") // 08 to 09
).toDF("name", "temp", "sunny", "feel-like-temp", "d_date")

val optSource = new SourceOpt
optSource.on = true
optSource.dataset = dataset
optSource.key = Array("name")
opt.setSource(optSource)

//scan
val cdq = OwlUtils.OwlContext(src, target, opt)
  .register(opt)
cdq.owlCheck

val breakCountDf = cdq.getSourceBreaksCount()
breakCountDf.show()

Known API Limitations

Collibra DQ activities cannot currently be called independently. DQ Check() function should be called before calling any of the activities. For example, to get the profile DataFrame you should call the following code snippet:

Copy
cdq.owlCheck()
cdq.getProfileDF()