Upsert to Azure SQL DB with Azure Data Factory

While tinkering away on a passion project, I eventually reached a point where I needed to leverage the ELT capabilities of Azure Data Factory, to enable a pipeline that would copy data from Azure Table Storage to Azure SQL DB. The challenge... Azure Data Factory's built-in copy mechanism is set by default to append only (i.e. attempt to insert record; if exists: error).

For instance, after setting up a copy activity for the first time, while the initial execution of the pipeline is successful, subsequent executions would fail if a record from the source already exists in the destination environment. When this error is caught, you may encounter a message similar to that below.

Violation of PRIMARY KEY constraint '[primary key]'. Cannot insert duplicate key in object '[table name]'. The duplicate key value is ([some value]). The statement has been terminated.

In this scenario, the desired outcome is to alter the copy activity to perform the equivalent of an UPSERT (i.e. if record already exists: update, else: insert). This can be achieved in Azure Data Factory with some additional configuration to invoke a stored procedure during the copy.

Demo: Table Storage to Azure SQL Database

adf_pipeline_highlevel.png

Note: If you are just getting up to speed with Azure Data Factory, check out my previous post which walks through the various key concepts, relationships and a jump start on the visual authoring experience.

Prerequisites

High-Level Steps

  1. Using Azure Storage Explorer, create a table called employee to hold our source data.
  2. Load the table by importing some sample content.
  3. Log on to the Azure SQL Database and create the following objects (code samples below). 
    a) Table (employee)
    b) Data Type (EmployeeType)
    c) Stored Procedure (spUpsertEmployee)
  4. Log on to Azure Data Factory and create a data pipeline using the Copy Data Wizard.

Note: For detailed step-by-step instructions, check out the embedded video. The overview diagram below illustrates the configuration of the copy activity at a glance.

Video

Overview

ADF_upsert_demo_overview.png

SQL: Create a Table

CREATE TABLE employee (
    employee_id varchar(20) NOT NULL,
    department varchar(20) NOT NULL,
    first_name varchar(20) NOT NULL,
    last_name varchar(20) NOT NULL,
    gender varchar(6) NOT NULL,
    role varchar(50) NOT NULL
    PRIMARY KEY (employee_id)
);

SQL: Create a Custom Data Type

CREATE TYPE EmployeeType AS TABLE(
    RowKey varchar(20) NOT NULL,
    PartitionKey varchar(20) NOT NULL,
    FirstName varchar(20) NOT NULL,
    LastName varchar(20) NOT NULL,
    Gender varchar(6) NOT NULL,
    Role varchar(50) NOT NULL
)

SQL: Create a Stored Procedure

CREATE PROCEDURE spUpsertEmployee @employee EmployeeType READONLY
AS
BEGIN
  MERGE employee AS target_sqldb
  USING @employee AS source_tblstg
  ON (target_sqldb.employee_id = source_tblstg.RowKey)
  WHEN MATCHED THEN
      UPDATE SET 
      department = source_tblstg.PartitionKey,
      first_name = source_tblstg.FirstName,
      last_name = source_tblstg.LastName,
      gender = source_tblstg.Gender,
      role = source_tblstg.Role
  WHEN NOT MATCHED THEN
      INSERT (
          employee_id,
          department,
          first_name,
          last_name,
          gender,
          role
        )
      VALUES (
          source_tblstg.RowKey,
          source_tblstg.PartitionKey,
          source_tblstg.FirstName,
          source_tblstg.LastName,
          source_tblstg.Gender,
          source_tblstg.Role
        );
END

After performing an initial load (i.e. first run of the pipeline), change some data in the source and trigger a second execution. If all is working the update should flow through without any problems and update the destination environment with the new value.