While tinkering away on a passion project, I eventually reached a point where I needed to leverage the ELT capabilities of Azure Data Factory, to enable a pipeline that would copy data from Azure Table Storage to Azure SQL DB. The challenge... Azure Data Factory's built-in copy mechanism is set by default to append only (i.e. attempt to insert record; if exists: error).
For instance, after setting up a copy activity for the first time, while the initial execution of the pipeline is successful, subsequent executions would fail if a record from the source already exists in the destination environment. When this error is caught, you may encounter a message similar to that below.
Violation of PRIMARY KEY constraint '[primary key]'. Cannot insert duplicate key in object '[table name]'. The duplicate key value is ([some value]). The statement has been terminated.
In this scenario, the desired outcome is to alter the copy activity to perform the equivalent of an UPSERT (i.e. if record already exists: update, else: insert). This can be achieved in Azure Data Factory with some additional configuration to invoke a stored procedure during the copy.
Demo: Table Storage to Azure SQL Database
Note: If you are just getting up to speed with Azure Data Factory, check out my previous post which walks through the various key concepts, relationships and a jump start on the visual authoring experience.
- Using Azure Storage Explorer, create a table called employee to hold our source data.
- Load the table by importing some sample content.
- Log on to the Azure SQL Database and create the following objects (code samples below).
a) Table (employee)
b) Data Type (EmployeeType)
c) Stored Procedure (spUpsertEmployee)
- Log on to Azure Data Factory and create a data pipeline using the Copy Data Wizard.
Note: For detailed step-by-step instructions, check out the embedded video. The overview diagram below illustrates the configuration of the copy activity at a glance.
SQL: Create a Table
CREATE TABLE employee ( employee_id varchar(20) NOT NULL, department varchar(20) NOT NULL, first_name varchar(20) NOT NULL, last_name varchar(20) NOT NULL, gender varchar(6) NOT NULL, role varchar(50) NOT NULL PRIMARY KEY (employee_id) );
SQL: Create a Custom Data Type
CREATE TYPE EmployeeType AS TABLE( RowKey varchar(20) NOT NULL, PartitionKey varchar(20) NOT NULL, FirstName varchar(20) NOT NULL, LastName varchar(20) NOT NULL, Gender varchar(6) NOT NULL, Role varchar(50) NOT NULL )
SQL: Create a Stored Procedure
CREATE PROCEDURE spUpsertEmployee @employee EmployeeType READONLY AS BEGIN MERGE employee AS target_sqldb USING @employee AS source_tblstg ON (target_sqldb.employee_id = source_tblstg.RowKey) WHEN MATCHED THEN UPDATE SET department = source_tblstg.PartitionKey, first_name = source_tblstg.FirstName, last_name = source_tblstg.LastName, gender = source_tblstg.Gender, role = source_tblstg.Role WHEN NOT MATCHED THEN INSERT ( employee_id, department, first_name, last_name, gender, role ) VALUES ( source_tblstg.RowKey, source_tblstg.PartitionKey, source_tblstg.FirstName, source_tblstg.LastName, source_tblstg.Gender, source_tblstg.Role ); END
After performing an initial load (i.e. first run of the pipeline), change some data in the source and trigger a second execution. If all is working the update should flow through without any problems and update the destination environment with the new value.