Developing an ETL in SSIS with C# and Python to Load a Call Centre Data in a complex Excel File

I've decided to build the ETL using Python because of the complexity to read an Excel Spreadsheet with different levels, also, due to the simple way to develop the code.
 

 

This is the complex Excel file I need to load every month:

 

I sorted it out building that Python code: 

 
 
========================================================================================
 ## Class:  SQLConnection.py
========================================================================================
import pandas as pd
import pyodbc
 
def OpenConn():
    connStr = pyodbc.connect('DRIVER={ODBC Driver 13 for SQL Server};SERVER=MyServer;DATABASE=DW;Trusted_Connection=yes') 
    return connStr.cursor()
 
 
========================================================================================
 ## Class:  ExcelConnection.py
========================================================================================
from xlrd import open_workbook
 
def QueueMetricsReportConn():
    book = open_workbook('\\\\MyServerFileName\\QueueMetricsReport.xls')  
    sheet = book.sheet_by_name('Sheet0')
    return sheet
 
 
========================================================================================
 ## Class:  PureCloudDataLoading.py
==========================================================================
 
 import SQLConnection
import ExcelConnection
 
cursor = SQLConnection.OpenConn()    
sheet = ExcelConnection.QueueMetricsReportConn()
 
 
def fnCleanData(val,type):
    
    if type == "time":
        val = val.replace('N/A','00:00:00')
 
    if type == "float":
        val = val.replace('N/A','0.0')
        val = str(round(float(val),2))
 
    return val
 
 
for row in range(sheet.nrows -1):
    try:
        
        strBase = (sheet.cell_value(row,0))
        strBaseNext = (sheet.cell_value((row+1),0))
 
        if (row >= 1): 
            strBasePrev = (sheet.cell_value((row-1),0)) 
 
        if strBaseNext == "Date":
             BranchName = strBase[:strBase.find('-')]
             pass
 
        if (row >= 7 and BranchNamePrev != BranchName):
            BranchNamePrev = BranchName
            pass
           
        if (row > 8 and strBase != "Week Total" and strBase != "Date" and strBaseNext != "Date" and strBasePrev != "Week Total" and strBasePrev != ""):
 
            QueueBranchName = BranchName.replace(" Branch","")
            CallDate = str(sheet.cell_value(row,0))
            NoOfOfferedCalls = str(sheet.cell_value(row,1))
            NoOfAnsweredCalls  = str(sheet.cell_value(row,3))
            NoOfAbandonedCalls = str(sheet.cell_value(row,4))
            PercentAbandonDNUCalls = fnCleanData(str(sheet.cell_value(row,5)),"float")
            PercentServiceLevel = fnCleanData(str(sheet.cell_value(row,6)),"float")
            AvgSpeedAnswer = fnCleanData(str(sheet.cell_value(row,7)),"time")
            AvgTalk = fnCleanData(str(sheet.cell_value(row,9)),"time")
            AvgAfterCallWork = fnCleanData(str(sheet.cell_value(row,10)),"time")
            AvgHandleTime = fnCleanData(str(sheet.cell_value(row,11)),"time")
            AvgHold = fnCleanData(str(sheet.cell_value(row,12)),"time")
            NoOfTransferredCalls = str(sheet.cell_value(row,13))
            PercentTransferCalls = fnCleanData(str(sheet.cell_value(row,14)),"float")
 
 
            cursor.execute("Insert [Stage].[CallCentreQueue]([QueueBranchName], [CallDate],[NoOfOfferedCalls], [NoOfAnsweredCalls], [NoOfAbandonedCalls], [PercentAbandonDNUCalls], [PercentServiceLevel], [AvgSpeedAnswer], [AvgTalk], [AvgAfterCallWork], [AvgHandleTime], [AvgHold], [NoOfTransferredCalls], [PercentTransferCalls]) Values ('" + QueueBranchName + "','" + CallDate+ "'," + NoOfOfferedCalls+ "," + NoOfAnsweredCalls + "," + NoOfAbandonedCalls + "," + PercentAbandonDNUCalls + "," + PercentServiceLevel + ",'" + AvgSpeedAnswer + "','" + AvgTalk + "','" + AvgAfterCallWork+ "','" + AvgHandleTime+ "','" + AvgHold+ "'," + NoOfTransferredCalls+ "," + PercentTransferCalls +")") 
            cursor.commit()
 
    except Exception as err:
            print("Queue Metrics Report loading has failed due to the error: "+ str(err))
            break
        
cursor.close()
 
 
   
 
 
 
 
 
 
========================================================================================
 ## Class:  SQLConnection.py
========================================================================================
import pandas as pd
import pyodbc
 
def OpenConn():
    connStr = pyodbc.connect('DRIVER={ODBC Driver 13 for SQL Server};SERVER=MyServer;DATABASE=DW;Trusted_Connection=yes') 
    return connStr.cursor()
 
 
========================================================================================
 ## Class:  ExcelConnection.py
========================================================================================
from xlrd import open_workbook
 
def QueueMetricsReportConn():
    book = open_workbook('\\\\MyServerFileName\\QueueMetricsReport.xls')  
    sheet = book.sheet_by_name('Sheet0')
    return sheet
 
 
========================================================================================
 ## Class:  PureCloudDataLoading.py
==========================================================================
 
 import SQLConnection
import ExcelConnection
 
cursor = SQLConnection.OpenConn()    
sheet = ExcelConnection.QueueMetricsReportConn()
 
 
def fnCleanData(val,type):
    
    if type == "time":
        val = val.replace('N/A','00:00:00')
 
    if type == "float":
        val = val.replace('N/A','0.0')
        val = str(round(float(val),2))
 
    return val
 
 
for row in range(sheet.nrows -1):
    try:
        
        strBase = (sheet.cell_value(row,0))
        strBaseNext = (sheet.cell_value((row+1),0))
 
        if (row >= 1): 
            strBasePrev = (sheet.cell_value((row-1),0)) 
 
        if strBaseNext == "Date":
             BranchName = strBase[:strBase.find('-')]
             pass
 
        if (row >= 7 and BranchNamePrev != BranchName):
            BranchNamePrev = BranchName
            pass
           
        if (row > 8 and strBase != "Week Total" and strBase != "Date" and strBaseNext != "Date" and strBasePrev != "Week Total" and strBasePrev != ""):
 
            QueueBranchName = BranchName.replace(" Branch","")
            CallDate = str(sheet.cell_value(row,0))
            NoOfOfferedCalls = str(sheet.cell_value(row,1))
            NoOfAnsweredCalls  = str(sheet.cell_value(row,3))
            NoOfAbandonedCalls = str(sheet.cell_value(row,4))
            PercentAbandonDNUCalls = fnCleanData(str(sheet.cell_value(row,5)),"float")
            PercentServiceLevel = fnCleanData(str(sheet.cell_value(row,6)),"float")
            AvgSpeedAnswer = fnCleanData(str(sheet.cell_value(row,7)),"time")
            AvgTalk = fnCleanData(str(sheet.cell_value(row,9)),"time")
            AvgAfterCallWork = fnCleanData(str(sheet.cell_value(row,10)),"time")
            AvgHandleTime = fnCleanData(str(sheet.cell_value(row,11)),"time")
            AvgHold = fnCleanData(str(sheet.cell_value(row,12)),"time")
            NoOfTransferredCalls = str(sheet.cell_value(row,13))
            PercentTransferCalls = fnCleanData(str(sheet.cell_value(row,14)),"float")
 
 
            cursor.execute("Insert [Stage].[CallCentreQueue]([QueueBranchName], [CallDate],[NoOfOfferedCalls], [NoOfAnsweredCalls], [NoOfAbandonedCalls], [PercentAbandonDNUCalls], [PercentServiceLevel], [AvgSpeedAnswer], [AvgTalk], [AvgAfterCallWork], [AvgHandleTime], [AvgHold], [NoOfTransferredCalls], [PercentTransferCalls]) Values ('" + QueueBranchName + "','" + CallDate+ "'," + NoOfOfferedCalls+ "," + NoOfAnsweredCalls + "," + NoOfAbandonedCalls + "," + PercentAbandonDNUCalls + "," + PercentServiceLevel + ",'" + AvgSpeedAnswer + "','" + AvgTalk + "','" + AvgAfterCallWork+ "','" + AvgHandleTime+ "','" + AvgHold+ "'," + NoOfTransferredCalls+ "," + PercentTransferCalls +")") 
            cursor.commit()
 
    except Exception as err:
            print("Queue Metrics Report loading has failed due to the error: "+ str(err))
            break
        
cursor.close()
 
 
   
 
 
To avoid the SSIS loads in case of any file not exists, I'm validating it as a first step:
 
 
========================================================================================
 ## SSIS: File Validation in C# [File Exists]
========================================================================================
 
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
#endregion
using System.IO;
namespace ST_51e924b12cd541e2ae8a5e3f9fea6fd3
{
 
   
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
       
 
     
public void Main()
{
 
            String strLocation = Dts.Variables["User::ExcelFileSource"].Value.ToString();
 
      
 
            if (File.Exists(strLocation))
            {
            
               // Dts.TaskResult = (int)ScriptResults.Success;
                Dts.Variables["User::FileExists"].Value = 1;
            }
            else
            {
 
                //Dts.TaskResult = (int)ScriptResults.Success;
                Dts.Variables["User::FileExists"].Value = 0;
            }
 
      }
 
        #region ScriptResults declaration
        ///
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        ///
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
 
}
}
 
 
========================================================================================
 ## SSIS: Variable Configuration
========================================================================================
 
 
 
========================================================================================
 ## SSIS: Audit Job to trace all the loading (Starting)
========================================================================================
 
 
========================================================================================
 ## SSIS: Audit Job - Stored Procedure
========================================================================================
 
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER proc [Audit].[spGetJobID]
 
@JobName varchar(100),
@New char(1) = 'N',
@JobID int output,
@DebugMode char(1) = 'N',
@TestMode char(1) = 'N'
 
as
/*****************************************************************************
 
Name: Audit.spGetJobID
 
Return a job ID to use in JobSteps for Auditing and lineage information.
 
Generate a new JobID if:
- A new JobID is forced by @New = 'Y'
- The previous maximum job ID for the JobName has a finished time
- There is no entry for the JobName
 
******************************************************************************
Change History
******************************************************************************
Date: Author: Description:
----- ------- -------------------
14 Oct 2018 Leandro Buffone Created Initial Version
******************************************************************************
 
Usage:
 
declare @JobID int
 
exec Audit.spGetJobID
@JobName = 'Test1',
@JobID = @JobID output,
--@New = 'Y',
@DebugMode = 'Y',
@TestMode = 'Y'
 
******************************************************************************/
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, concat('** Start ** @JobName=',@JobName, ', @New=',@New))
 
begin try
 
declare @ParentTransaction bit = 0
 
if @@trancount > 0 -- only start a transaction if we are not already in one
set @ParentTransaction = 1
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Beginning a transaction')
begin tran
end
 
if @New = 'Y'
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'New job parameter specified. Insert entry into Audit.Job')
 
insert into Audit.Job(JobName) values (@JobName)
 
set @JobID = scope_identity()
end
else if (select top (1) EndTime from Audit.Job where JobName = @JobName order by JobID desc) is not null
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Previous job has completed. A new JobID is being generated')
 
insert into Audit.Job(JobName) values (@JobName)
 
set @JobID = scope_identity()
end
else if not exists (select * from Audit.Job where JobName = @JobName)
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'There is no job with this name. A new JobID is being generated')
 
insert into Audit.Job(JobName) values (@JobName)
 
set @JobID = scope_identity()
end
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Get the latest JobID')
 
select @JobID = max(JobID) from Audit.Job where JobName = @JobName
end
 
if @DebugMode = 'Y'
begin
print dbo.fnDebugInfo(@@procid, 'Show JobID')
select Comment = 'Audit.Job', * from Audit.Job where JobID = @JobID
end
 
if @ParentTransaction = 0
begin
if @TestMode = 'N'
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode = N. Committing changes')
commit tran
end
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode. Rolling back changes')
rollback tran
end
end
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, '***** Finish ***** ')
 
end try
 
begin catch
 
if @ParentTransaction = 0
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Error: Rolling back transaction')
if @@trancount > 0 rollback tran
end
 
exec spErrorHandler
 
end catch
 
========================================================================================
 ## SSIS: Audit Job - Audit.Job (Table)
========================================================================================
 
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [Audit].[Job](
[JobID] [int] IDENTITY(1,1) NOT NULL,
[JobName] [varchar](100) NOT NULL,
[StartTime] [datetime] NOT NULL,
[EndTime] [datetime] NULL,
[CreatedBy] [varchar](50) NOT NULL,
 CONSTRAINT [PK_AuditJob] PRIMARY KEY CLUSTERED 
(
[JobID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
 
ALTER TABLE [Audit].[Job] ADD  CONSTRAINT [df_AuditJob_StartTime]  DEFAULT (getdate()) FOR [StartTime]
GO
 
ALTER TABLE [Audit].[Job] ADD  CONSTRAINT [df_AuditJob_CreatedBy]  DEFAULT (suser_name()) FOR [CreatedBy]
GO
 
 
 
========================================================================================
 ## SSIS: Audit Job - Audit.JobStep (Table)
========================================================================================
 
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [Audit].[JobStep](
[JobStepID] [int] IDENTITY(1,1) NOT NULL,
[JobID] [int] NOT NULL,
[StepName] [varchar](100) NULL,
[StartTime] [datetime] NOT NULL,
[EndTime] [datetime] NULL,
[RowsAffected] [int] NULL,
[Comment] [varchar](512) NULL,
[ModifiedDate] [datetime] NULL,
[CreatedBy] [varchar](50) NOT NULL,
 CONSTRAINT [PK_AuditJobStep] PRIMARY KEY CLUSTERED 
(
[JobStepID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
 
ALTER TABLE [Audit].[JobStep] ADD  CONSTRAINT [df_AuditJobStep_StartTime]  DEFAULT (getdate()) FOR [StartTime]
GO
 
ALTER TABLE [Audit].[JobStep] ADD  CONSTRAINT [df_AuditJobStep_ModifiedDate]  DEFAULT (getdate()) FOR [ModifiedDate]
GO
 
ALTER TABLE [Audit].[JobStep] ADD  CONSTRAINT [df_AuditJobStep_CreatedBy]  DEFAULT (suser_name()) FOR [CreatedBy]
GO
 
ALTER TABLE [Audit].[JobStep]  WITH CHECK ADD  CONSTRAINT [FK_AuditJobStep_Job] FOREIGN KEY([JobID])
REFERENCES [Audit].[Job] ([JobID])
GO
 
ALTER TABLE [Audit].[JobStep] CHECK CONSTRAINT [FK_AuditJobStep_Job]
GO
 
 
 
 
 
 
========================================================================================
 ## SQL JOB: Calling Python (SQL Integration)
========================================================================================
 
c:\Python\python C:\Python\sql\CallCentreDataLoading\PureCloudDataLoading.py

 

 

 
========================================================================================
 ## SSIS: It's important to give time to SQL to load all the files [30 Seconds Delay]
========================================================================================
 
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
 
 
namespace ST_7007c7cdd86e40169528f8a98064c125
{
   
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
 
public void Main()
{
 
            System.Threading.Thread.Sleep(30000);///wait for 30 secs
Dts.TaskResult = (int)ScriptResults.Success;
}
 
     
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
     
}
}
 
 
========================================================================================
 ## SSIS: System Package Name Variable (relevant to fill the SP variable for Auditing)
========================================================================================
 
 
 
========================================================================================
 ## SSIS: Passing the System variable to a SP for Auditing)
========================================================================================
 
 
Exec Stage.spLoadFactCallCentreQueue @JobName = ?;
 
 
 
========================================================================================
 ## SQL: Stage Table
========================================================================================
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [Stage].[CallCentreQueue](
[CallDate] [varchar](20) NOT NULL,
[QueueBranchName] [varchar](200) NOT NULL,
[NoOfOfferedCalls] [int] NOT NULL,
[NoOfAnsweredCalls] [int] NOT NULL,
[NoOfAbandonedCalls] [int] NOT NULL,
[PercentAbandonDNUCalls] [float] NOT NULL,
[PercentServiceLevel] [float] NOT NULL,
[AvgSpeedAnswer] [time](7) NOT NULL,
[AvgTalk] [time](7) NOT NULL,
[AvgAfterCallWork] [time](7) NOT NULL,
[AvgHandleTime] [time](7) NOT NULL,
[AvgHold] [time](7) NOT NULL,
[NoOfTransferredCalls] [int] NOT NULL,
[PercentTransferCalls] [float] NOT NULL,
[DataHash]  AS (hashbytes('MD5',concat([CallDate],[QueueBranchName]))) PERSISTED,
[ModifiedOn] [datetime] NOT NULL,
[CreatedOn] [datetime] NOT NULL
) ON [PRIMARY]
GO
 
ALTER TABLE [Stage].[CallCentreQueue] ADD  DEFAULT (getdate()) FOR [ModifiedOn]
GO
 
ALTER TABLE [Stage].[CallCentreQueue] ADD  DEFAULT (getdate()) FOR [CreatedOn]
GO
 
 
 
 
========================================================================================
 ## SQL: Fact Table 
========================================================================================
 
USE [DW]
GO
 
 
SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [Fact].[CallCentreQueue](
[CallCentreQueueID] [int] IDENTITY(1,1) NOT NULL,
[CallDateID] [int] NOT NULL,
[QueueBranchNameID] [int] NOT NULL,
[QueueBranchName] [varchar](200) NOT NULL,
[NoOfOfferedCalls] [int] NOT NULL,
[NoOfAnsweredCalls] [int] NOT NULL,
[NoOfAbandonedCalls] [int] NOT NULL,
[NoOfServiceLevelCalls] [int] NOT NULL,
[AvgSpeedAnswer] [time](7) NOT NULL,
[AvgTalk] [time](7) NOT NULL,
[AvgAfterCallWork] [time](7) NOT NULL,
[AvgHandleTime] [time](7) NOT NULL,
[AvgHold] [time](7) NOT NULL,
[NoOfTransferredCalls] [int] NOT NULL,
[DataHash] [varbinary](16) NULL,
[ModifiedOn] [datetime] NOT NULL,
[CreatedOn] [datetime] NOT NULL,
 CONSTRAINT [PK_CallCentreQueueID] PRIMARY KEY NONCLUSTERED 
(
[CallCentreQueueID] DESC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
 
 
Create index idx_CallCentreQueueCallDateID on Fact.CallCentreQueue(CallDateID)
go
 
Create index idx_CallCentreQueueQueueBranchNameID on Fact.CallCentreQueue(QueueBranchNameID)
go
 
Create index idx_CallCentreQueueDataHash on Fact.CallCentreQueue(DataHash)
go
 
 
 
========================================================================================
 ## SQL: SP to load the data
========================================================================================
 
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
 
CREATE OR ALTER     PROCEDURE [Stage].[spLoadFactCallCentreQueue]
 
@JobName varchar(50),
@DebugMode char(1) = 'N',
@TestMode char(1) = 'N'
 
as
/*****************************************************************************
 
Name: stage.spLoadFactCallCentreQueue
 
Load the DW dimension table Fact.CallCentreQueue from a staging table previously populated
 
******************************************************************************
Change History
******************************************************************************
Date: Author: Description:
----- ------- -------------------
26 Oct 2018 Leandro Buffone Created Initial Version[finPowerConnect_EOM][finPowerConnect_EOM]
******************************************************************************
 
Usage:
 
exec Stage.spLoadFactCallCentreQueue
@JobName = 'Dev Test',
@DebugMode = 'N',
@TestMode = 'N'
 
******************************************************************************/
declare @ProcName varchar(100) = object_schema_name(@@procid) + '.' + object_name(@@procid)
declare @JobStepID int
declare @EndTime datetime
declare @RowsAffected int
declare @Comment varchar(255)
declare @Now datetime
declare @SettingString varchar(2000)
 
begin try
 
declare @ParentTransaction bit = 0
 
if @@trancount > 0 -- only start a transaction if we are not already in one
set @ParentTransaction = 1
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Beginning a transaction')
begin tran
end
 
exec Audit.spAddJobStep
@JobName = @JobName,
@StepName = @ProcName,
@JobStepID = @JobStepID output,
@DebugMode = @DebugMode,
@TestMode = @TestMode
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Merge Staging data into Fact.CallCentreQueue')
 
 
Merge Fact.CallCentreQueue F
Using (
select 
CallDateID = dbo.fnDateKey(CallDate),
QueueBranchName,
QueueBranchNameID = isnull(B.BranchID,-1),
NoOfOfferedCalls,
NoOfAnsweredCalls,
NoOfAbandonedCalls,
PercentAbandonDNUCalls,
PercentServiceLevel,
NoOfServiceLevelCalls = round(PercentServiceLevel * (NoOfAnsweredCalls + NoOfAbandonedCalls),0) ,
AvgSpeedAnswer,
AvgTalk,
AvgAfterCallWork,
AvgHandleTime,
AvgHold,
NoOfTransferredCalls,
PercentTransferCalls,
C.DataHash,
C.ModifiedOn,
C.CreatedOn
from Stage.CallCentreQueue  C
left Join Dim.Branch B on B.BranchName = C.QueueBranchName
 
) S on F.DataHash = S.DataHash 
when matched and S.ModifiedOn > F.ModifiedOn then
Update 
Set QueueBranchName = S.QueueBranchNameID,
QueueBranchNameID = S.QueueBranchNameID,
NoOfOfferedCalls = S.NoOfOfferedCalls,
NoOfAnsweredCalls = S.NoOfAnsweredCalls,
NoOfAbandonedCalls = S.NoOfAbandonedCalls,
NoOfServiceLevelCalls = S.NoOfServiceLevelCalls ,
AvgSpeedAnswer = S.AvgSpeedAnswer,
AvgTalk = S.AvgTalk,
AvgAfterCallWork = S.AvgAfterCallWork,
AvgHandleTime = S.AvgHandleTime,
AvgHold = S.AvgHold,
NoOfTransferredCalls = S.NoOfTransferredCalls,
DataHash = S.DataHash,
ModifiedOn = S.ModifiedOn,
CreatedOn = S.CreatedOn
when not matched then
Insert (  CallDateID,  QueueBranchName,   QueueBranchNameID,   NoOfOfferedCalls,   NoOfAnsweredCalls,   NoOfAbandonedCalls,    NoOfServiceLevelCalls,   AvgSpeedAnswer,   AvgTalk,   AvgAfterCallWork,   AvgHandleTime,   AvgHold,   NoOfTransferredCalls,    DataHash,   ModifiedOn,   CreatedOn) 
Values (S.CallDateID, S.QueueBranchName, S.QueueBranchNameID, S.NoOfOfferedCalls, S.NoOfAnsweredCalls, S.NoOfAbandonedCalls, S.NoOfServiceLevelCalls, S.AvgSpeedAnswer, S.AvgTalk, S.AvgAfterCallWork, S.AvgHandleTime, S.AvgHold, S.NoOfTransferredCalls,  S.DataHash, S.ModifiedOn, S.CreatedOn);
 
 
set @RowsAffected = @@rowcount
 
set @Now = getdate()
 
exec Audit.spUpdateJobStep
@JobStepID = @JobStepID,
@EndTime = @Now,
@RowsAffected = @RowsAffected,
@Comment = 'Fact.CallCentreQueue',
@DebugMode = @DebugMode,
@TestMode = @TestMode
 
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Merge Staging data into Fact.CallCentreQueue')
 
if @ParentTransaction = 0
begin
if @TestMode = 'N'
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode = N. Committing changes')
commit tran
end
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode. Rolling back changes')
rollback tran
end
end
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, '***** Finish ***** ')
 
end try
 
begin catch
 
if @ParentTransaction = 0
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Error: Rolling back transaction')
if @@trancount > 0 rollback tran
end
 
exec spErrorHandler
@JobStepID = @JobStepID,
@DebugMode = @DebugMode,
@TestMode = @TestMode
 
end catch
 
 
 
 
========================================================================================
 ## SSIS: Moving the File to Fail Folders (in case of any fails)
========================================================================================
 
 
 
========================================================================================
 ## C# Code: Moving the File to Fail Folders (in case of any fails)
========================================================================================
 
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.IO;
namespace ST_51e924b12cd541e2ae8a5e3f9fea6fd3
{
 
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
     
        public String fnFolderFile(String strFile)
        {
            String[] fileFolderArray = strFile.Split('\\');
            String fileFolder = "";
 
            for (int i = 1; i < (fileFolderArray.Length - 1); i++)
            {
                fileFolder += @"\" + fileFolderArray[i];
            }
 
            return fileFolder;
 
        }
 
 
        public String fnFileName(String strFile)
        {
            String[] FileNameArray = strFile.Split('\\');
            String fileName = "";
 
            fileName = FileNameArray[FileNameArray.Length-1];
 
            return fileName;
 
        }
 
public void Main()
{
 
            String strLocation = Dts.Variables["User::ExcelFileSource"].Value.ToString();
 
            String FolderLocation = fnFolderFile(strLocation) + @"\Fail\";
 
            String FileName = fnFileName(strLocation);
 
            String FolderDestinationFile = FolderLocation + DateTime.Now.ToString("yyyy-MM-dd") + "_" + FileName;
 
            if (File.Exists(strLocation))
            {
                if (File.Exists(FolderDestinationFile))
                    File.Delete(FolderDestinationFile);
 
                File.Move(strLocation, FolderDestinationFile);
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            else
            {
                Dts.TaskResult = (int)ScriptResults.Success;
            }
}
 
             enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
 
 
}
}
 
 
 
 
========================================================================================
 ## C# Code: Moving the File to Archive Folders (in case of it had success)
========================================================================================
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.IO;
namespace ST_51e924b12cd541e2ae8a5e3f9fea6fd3
{
 
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
     
 
        
 
        public String fnFolderFile(String strFile)
        {
            String[] fileFolderArray = strFile.Split('\\');
            String fileFolder = "";
 
            for (int i = 1; i < (fileFolderArray.Length - 1); i++)
            {
                fileFolder += @"\" + fileFolderArray[i];
            }
 
            return fileFolder;
 
        }
 
 
        public String fnFileName(String strFile)
        {
            String[] FileNameArray = strFile.Split('\\');
            String fileName = "";
 
            fileName = FileNameArray[FileNameArray.Length-1];
 
            return fileName;
 
        }
 
public void Main()
{
 
            String strLocation = Dts.Variables["User::ExcelFileSource"].Value.ToString();
 
            String FolderLocation = fnFolderFile(strLocation) + @"\Archive\";
 
            String FileName = fnFileName(strLocation);
 
            String FolderDestinationFile = FolderLocation + DateTime.Now.ToString("yyyy-MM-dd") + "_" + FileName;
 
            if (File.Exists(strLocation))
            {
                if (File.Exists(FolderDestinationFile))
                    File.Delete(FolderDestinationFile);
 
                File.Move(strLocation, FolderDestinationFile);
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            else
            {
                Dts.TaskResult = (int)ScriptResults.Success;
            }
}
 
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
 
 
}
}
 
 
=======================================================================================
 ## SQL: SP to Finalize the Package loading
========================================================================================
 
USE [DW]
GO
 
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE OR ALTER proc [Audit].[spEndJob]
 
@JobID int = NULL,
@JobName varchar(100) = NULL,
@DebugMode char(1) = 'N',
@TestMode char(1) = 'N'
 
as
/*****************************************************************************
 
Name: Audit.spEndJob
 
Sets the EndTime from a specified JobID. Only when a JobID is not supplied then
the last entry for a matching JobName will be used. If a JobID or JobName
are specified and cannot be matched then nothing will be updated.
 
******************************************************************************
Change History
******************************************************************************
Date: Author: Description:
----- ------- -------------------
15 Oct 2018 Leandro Buffone Created Initial Version
******************************************************************************
 
Usage:
 
exec Audit.spEndJob
@JobName = 'Test',
@JobID = NULL,
@DebugMode = 'Y',
@TestMode = 'Y'
 
******************************************************************************/
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, concat('** Start ** @JobID=',@JobID,', @JobName=',@JobName))
 
begin try
 
declare @ParentTransaction bit = 0
 
if @@trancount > 0 -- only start a transaction if we are not already in one
set @ParentTransaction = 1
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Beginning a transaction')
begin tran
end
 
if @JobID is null
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'JobID not specified. Lookup latest JobID from the supplied JobName')
 
select @JobID = max(JobID) from Audit.Job where JobName = @JobName
end
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, concat('Update Audit.Job for JobID=',@JobID))
update Audit.Job set EndTime = getdate() where JobID = @JobID
 
if @ParentTransaction = 0
begin
if @TestMode = 'N'
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode = N. Committing changes')
commit tran
end
else
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'TestMode. Rolling back changes')
rollback tran
end
end
 
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, '***** Finish ***** ')
 
end try
 
begin catch
 
if @ParentTransaction = 0
begin
if @DebugMode = 'Y' print dbo.fnDebugInfo(@@procid, 'Error: Rolling back transaction')
if @@trancount > 0 rollback tran
end
 
exec spErrorHandler
 
end catch