Share via


Data quality error records (preview)

When performing data quality checks in Microsoft Purview Unified Catalog, you can configure and publish data quality error records for rule exceptions to help you review and fix these records directly in your managed storage, whether in Microsoft Fabric Lakehouse or Microsoft Azure Data Lake Storage Gen2.

This feature enables Microsoft Purview data quality users to identify, analyze, and address data quality issues, ensuring data accuracy, completeness, consistency, timeliness, and validity across systems and use cases.

Architecture

You need to set up your managed storage in Microsoft Purview to publish data quality error records for rule exceptions. The Data Governance Administrator for your organization is responsible for configuring the storage details on the Microsoft Purview administration page. The Data Quality Steward can enable the toggle either from the Data Quality Manage console or the data quality overview page at Unified Catalog > Health management > Data quality.

dq error record architecture

Configuration prerequisites

If your organization uses self-service analytics and insights (preview), then you don't need to configure storage details to store error records.

If you aren't using self-service analytics, find details and guidance on how to configure the storage to publish data quality error records for analytics and remediation.

To configure the storage for metadata self-serve analytics and data quality error records:

  • Sign in to the Microsoft Purview portal using credentials for a Data Governance Administrator.
  • Select the Settings icon in the left navigation or at the top of the page.
  • Under Solution settings, select Unified Catalog.
  • Select Solution integrations (preview), then select Edit.
  • Make a selection from the Storage type dropdown menu.
  • Enter the Location URL.
  • Test the connection.

Caution

  • If you don’t have storage and a connection set up to store data quality error records (as mentioned in the prerequisites), your data quality scan job will fail if the error record feature is enabled.
  • Ensure first that self-serve analytics storage and connection are configured. Then you can enable the data quality error record feature either from the Data Quality overview page or by navigating to Health management > Data quality > Manage > Settings.
  • The published erroneous records must be read for any analyses using the latest versions of Spark (>= 3.xx) on Synapse or Databricks.

Data quality issue detection

Select the data asset and configure rules for the critical columns that need a Data Quality measurement. You can use out-of-the-box rules, create custom rules, or use the rule suggestion feature to recommend rules for a data quality scan. Get details about data quality rules.

Enable error recording

To get error records, you need to enable the capture and storage of error records for every data quality job run by following these steps:

  1. Follow the instructions to start a data quality scan on a data asset.
  2. After selecting Run quality scan on the data asset's page, turn on the toggle for Enable publishing of failed rows.

activate error record feature from data asset page

You can also enable error recording for all data assets with data quality rules associated with data products in a governance domain by following these step:

  • In Unified Catalog, go to Health management > Data quality.
  • Select a governance domain from the list to open its details page.
  • Select Manage, then select Settings.
  • Turn on the toggle for Enable publishing of failed rows.

Set up schedule to run

Set up the schedule to run your data quality job and publish the error records into the configured storage. Get details on how to configure and run data quality scans.

Though data quality scans can be run on an ad-hoc basis by selecting Run quality scan on a data asset page, in production scenarios it's likely that the source data is being constantly updated. It's best to ensure data quality is regularly monitored in order to detect any issues. Automating the scanning process helps to manage regular updates of quality scans.

Check the error records in the configured storage

  1. Find the published error rows location from the data quality overview page.
  2. Select View Scan report, in the top-right corner just below the Latest quality score.
  3. Navigate to the path shown in your Data Lake Storage Gen2 or Microsoft Fabric Lakehouse folder, as seen in the example below:

error records in adlsG2

Data Lake Storage Gen2 folder hierarchy

Data Lake Storage Gen2 container (for example, folder name is DEH):

  1. DataQualityAuditExtract
  2. Governance domain (BusinessDomain)
  3. DataProduct
  4. DataAsset
  5. RunDate
  6. RunId
  7. Purview DQRunId
  8. Error record file

Read error records from Data Lake Storage Gen2

You can use Synapse or Databricks notebook to read data from Data Lake Storage Gen2 container.

<!--
- For a Particular Rule For a Particular Date
path = "abfss://deh@(storageAccount).dfs.core.windows.net/DataQualityAuditExtracts/BusinessDomain=(BusinessDomainId)/DataProduct=(DataProduct)/DataAsset=(DataAsset)/RunDate=(YYYY-MM-DD)/RunId=(JobRunId)/_purviewDQRuleId=(RuleId)/"
df = spark.read.format("parquet").load(path)
display(df)

- For All Rules For a Particular Date
path = "abfss://deh@(storageAccount).dfs.core.windows.net/DataQualityAuditExtracts/BusinessDomain=(BusinessDomainId)/DataProduct=(DataProduct)/DataAsset=(DataAsset)/RunDate=(YYYY-MM-DD)/RunId=(JobRunId)/*/"
df = spark.read.format("parquet").load(path)
display(df) 

- For All Runs For a Particular Date
path = "abfss://deh@(storageAccount).dfs.core.windows.net/DataQualityAuditExtracts/BusinessDomain=(BusinessDomainId)/DataProduct=(DataProduct)/DataAsset=(DataAsset)/RunDate=(YYYY-MM-DD)/*/*/"
df = spark.read.format("parquet").load(path)
display(df) 

- For All Runs For a Particular Month
path = "abfss://deh@(storageAccount).dfs.core.windows.net/DataQualityAuditExtracts/BusinessDomain=(BusinessDomainId)/DataProduct=(DataProduct)/DataAsset=(DataAsset)/RunDate=<YYYY-MM-**>/*/*/"
df = spark.read.format("parquet").load(path)
display(df) 
-->

Example dataset:

error records sample

Read error records from Fabric Lakehouse

  1. Navigate the path shown in your Lakehouse, and browse all fail records published to Fabric Lakehouse.
  2. Create a shortcut from Delta Parquet to Delta Table. Or you can use a Delta Parquet file to create a dashboard of your data quality error records, as seen in this example:

error records published in Lakehouse

You can use a notebook to create a shortcut table of failed data asset records. Refer to the notebook script.

Output data model

Failed records are available at the granularity of a governed data asset. For each new assessment run that is successful on the data asset, a fresh set of failed records corresponding to each rule evaluation is appended to the failed records table.

Format: Delta

Output schema: Each row of output contains all the columns of an Input data-asset row that failed a rule evaluation, along with a set of metadata columns that can be used to identify and analyze the failed row.

Primary key: Composite key of Input data-asset columns + _purviewDQRunId + _purviewDQRuleId

Metadata columns Data type Description
_purviewDQRunId string A GUID denoting the Run ID of the assessment scan available to the user at the time of run. Also, a Delta partition-column.
_purviewDQRunDate string The Run date in YYYY-MM-DD format. Also, a Delta partition-column
_purviewDQRuleId string The quality Rule ID corresponding to the failed rule. Also, a Delta partition-column.
_purviewDQRuleName string The Rule Name at the time of job run.
_purviewDQRuleType string The Rule type (for example: UniqueCheck, NotNull, Regex).
_purviewDQGovernanceDomainId string The governance domain ID of the data asset that was run.
_purviewDQDataProductId string The data product of the data asset that was run.
_purviewDQDataAssetId string The data asset ID.
_purviewDQRunSubmitTimestamp string The exact timestamps of the run submit time in the default UTC time-zone per the ISO format.

Create a Power BI dashboard for your data quality error records

You can use the Business Domain ID to link data products and their associated data assets for reporting data quality error records.

  • One business domain can be linked to multiple data products.
  • Each data product can be linked to multiple data assets.
  • Each data asset can have multiple rules.
  • Each rule can generate multiple error records.

This image illustrates a data model to create a basic report of your data quality error record:

error records reporting datamodel

This image illustrates a sample report created with the data model shown above:

error records sample report

Limitations

  • 100,000 failed records are published per rule for each run.
  • Datasets up to 100 million rows with up to 40 rules have been benchmarked.
  • Data Lake Storage Gen2 and Fabric storage in virtual network isn't supported.
  • As data is stored in organizations' managed storage, role-based access control for the data is owned by the organization. Microsoft publishes data quality error rows to an organizaiton's storage based on the organization's consent.

Script to create shortcut

You can automate the creation of table shortcuts from data quality audit extracts failed row data using this script in a notebook:

<!--
Update these three values for your Fabric Workspace ID, Lakehouse ID and Purview BYOC Self-serve store path

workspaceId = "" #Example: f28dc1c8-360c-4788-9f46-e69853b1c40d
lakehouseId = "" #Example: 77d6df6b-64ab-4628-985f-9365591a85a8
dataqualityauditPath = "" #Example: "Files/DEH2/DataQualityAuditExtracts"

import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException
import fnmatch
import re
import os
from collections import deque

SourceUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/{dataqualityauditPath}"

#Use for lakehouses with Lakehouse-Schema (Public preview feature)
DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/DataQualityAuditExtracts"

#Use for lakehouses without Lakehouse-Schema
DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/DataQualityAuditExtracts" 

def is_delta_table(uri: str):
    #print("Checking for uri:" + uri)
    delta_log_path = os.path.join(uri, "_delta_log")
    return mssparkutils.fs.exists(delta_log_path)

def extract_onelake_https_uri_components(uri):
    pattern = re.compile(r"abfss://([^@]+)@[^/]+/([^/]+)/(.*)")
    match = pattern.search(uri)
    if match:
        workspace_id, item_id, path = match.groups()
        return workspace_id, item_id, path
    else:
        return None, None, None

def is_valid_onelake_uri(uri: str) -> bool:
    workspace_id, item_id, path = extract_onelake_https_uri_components(uri)
    if "abfss://" not in uri or workspace_id is None or item_id is None or path is None:
        return False
    return True

def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):
    shortcut_uri = (
        f"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}"
    )
    result = client.get(shortcut_uri).json()
    return result

def get_last_path_segment(uri: str):
    path = uri.split("/")  # Split the entire URI by '/'
    return path[-1] if path else None

def create_onelake_shorcut(SourceUri: str, dest_uri: str, result: list):
    src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(
        SourceUri
    )

 dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
        dest_uri
    ) 

name = get_last_path_segment(SourceUri)
    dest_uri_joined = os.path.join(dest_uri, name)

    
- If the destination path already exists, return without creating shortcut
 if mssparkutils.fs.exists(dest_uri_joined):
        #print(f"Table already exists: {dest_uri_joined}")
        result.append(dest_uri_joined)
        return None

    
request_body = {
        "name": name,
        "path": dest_path,
        "target": {
            "oneLake": {
                "itemId": src_item_id,
                "path": src_path,
                "workspaceId": src_workspace_id,
            }
        },
    }
    #print(request_body)

   shortcut_uri = f"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts"
    #print(f"Creating shortcut: {shortcut_uri}/{name}..")
    try:
        client.post(shortcut_uri, json=request_body)
    except FabricHTTPException as e:
        #print(e)
        #print(e.error_reason)
        return None

    
return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)

client = fabric.FabricRestClient()

queue = deque([SourceUri])
result = []

while queue:
    current_uri = queue.popleft()

  #print(current_uri)
    
        
if  is_delta_table(os.path.join("", current_uri)):
        #print(current_uri)
        shortcut = create_onelake_shorcut(os.path.join("", current_uri), DestinationShortcutUri, result)
        if shortcut is not None:
            result.append(shortcut)
        continue;
    
- List subitems in the current folder
    subitems = mssparkutils.fs.ls(current_uri)   
    for item in subitems:
        if item.isDir:
            queue.append(item.path)   

print(f"{len(result)}" + " tables added to table shortcut DataQualityAuditExtracts: ")
for item in result:
    print(item)    

-->