使用更改跟踪实现数据同步

    技术2022-05-19  25

    SQL Server 2008 引入了更改跟踪,这是一种轻量型解决方案,它为应用程序提供了一种有效的更改跟踪机制。通常,若要使应用程序能够查询对数据库中的数据所做的更改和访问与这些更改相关的信息,应用程序开发人员必须实现自定义更改跟踪机制。创建这些机制通常涉及多项工作,并且常常涉及使用触发器、timestamp 列和新表组合来存储跟踪信息,同时还会涉及使用自定义清除过程。

    通过更改跟踪,可以很容易地编写同步数据的应用,下面是一个使用更改跟踪实现单向数据同步的示例。

    1. 建立示例环境

    -- ====================================================

    -- 测试的数据库

    USE master;

    GO

    CREATE DATABASE DB_test;

    GO

    -- 启用更改跟踪

    ALTER DATABASE DB_test SET

        CHANGE_TRACKING = ON

        (

               AUTO_CLEANUP = ON,          -- 打开自动清理选项

               CHANGE_RETENTION = 1 HOURS  -- 数据保存期为1 时

        );

     

    ALTER DATABASE DB_test SET

        ALLOW_SNAPSHOT_ISOLATION ON;       -- 允许在测试数据库中使用 SNAPSHOT 事务隔离级别

    GO

     

    -- ====================================================

    -- 测试的表

    USE DB_test;

    GO

    -- a. 同步的源表

    CREATE TABLE dbo.tb_source

    (

        pk_id int IDENTITY

           PRIMARY KEY,

        col1 int,

        col2 varchar(10),

        col3 nvarchar(max),

        col4 xml

    );

    GO

    -- 启用更改跟踪

    ALTER TABLE dbo.tb_source

        ENABLE CHANGE_TRACKING

           WITH

           (

               TRACK_COLUMNS_UPDATED = ON  -- 记录UPDATE 的列信息

           );

    GO

     

    -- b. 同步的目录表

    CREATE TABLE dbo.tb_Target

    (

        pk_id int

           PRIMARY KEY,

        col1 int,

        col2 varchar(10),

        col3 nvarchar(max),

        col4 xml

    );

    GO

     

    -- 记录同步情况的表

    CREATE TABLE dbo.tb_Change_Tracking

    (

        id int IDENTITY

           PRIMARY KEY,

        object_name sysname  UNIQUE,

        last_sync_version bigint,

        last_update_date datetime

    );

    GO

     

    2. 实现同步处理的存储过程

    -- ====================================================

    -- 数据同步处理的存储过程

    USE DB_test;

    GO

    -- 数据同步的存储过程- 同步未更新的数据

    -- 单次更新,更新完成后退出

    CREATE PROC dbo.p_SyncChangeData_tb_Srouce_Target

        @last_sync_version bigint = NULL OUTPUT,

        @min_valid_version bigint = NULL OUTPUT

    AS

    SET NOCOUNT ON;

    -- ========================================

    -- TRY...CATCH 中的标准事务处理模块

    -- a. 当前的事务数

    DECLARE    @__trancount int;

    SELECT    @__trancount = @@TRANCOUNT;

     

    -- TRY...CATCH 处理

    BEGIN TRY

        -- ========================================

        -- 源表信息

        DECLARE   @object_name sysname,@object_id int;

        SELECT @object_name = N'dbo.tb_source',

            @object_id = OBJECT_ID(@object_name); 

        -- ========================================

        -- 最后一次同步的版本

        IF @last_sync_version IS NULL

        BEGIN

           SELECT

               @last_sync_version = last_sync_version

           FROM dbo.tb_Change_Tracking

           WHERE object_name = @object_name;

     

           IF @@ROWCOUNT = 0

           BEGIN

               SET @last_sync_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);

     

               INSERT dbo.tb_Change_Tracking

               (

                  object_name, 

                  last_sync_version

                )

               VALUES

               (

                  @object_name, 

                  @last_sync_version

                );

           END;

        END;

     

        -- ========================================

        -- TRY...CATCH 中的标准事务处理模块

        -- b. 开启事务, 或者设置事务保存点

        SET TRANSACTION ISOLATION LEVEL SNAPSHOT; -- 使用快照隔离级别的事务

        IF @__trancount = 0

           BEGIN TRAN;

        ELSE

           SAVE TRAN __TRAN_SavePoint;

     

        -- ========================================

        -- 版本验证

        -- a. 验证是否有数据变更(如果上次同步的版本号= 当前数据库的最大版本号,则视为无数据变化)

        IF @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION()

           GOTO lb_Return;

     

        -- b. 验证同步的版本号是否有效(如果上次同步的版本号< 当前可用的最小版本号,则视为无效)

        IF @last_sync_version < CHANGE_TRACKING_MIN_VALID_VERSION(@object_id)

        BEGIN

           SET @min_valid_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);     

           GOTO lb_Return;

        END;

     

        -- c. 验证同步的版本号是否有效(如果上次同步的版本号> 当前数据库的最大版本号,则视为无效)

        IF @last_sync_version > CHANGE_TRACKING_CURRENT_VERSION()

        BEGIN

           SET @last_sync_version = NULL;

           GOTO lb_Return;

        END;

     

        -- ========================================

        -- 同步数据

        -- a. 插入

        WITH

        CHG AS

        (

           SELECT  DATA.*

           FROM dbo.tb_source DATA

           INNER JOIN 

           CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG  

           ON CHG.pk_id = DATA.pk_id

           WHERE CHG.SYS_CHANGE_OPERATION = N'I'

        )

        INSERT dbo.tb_Target

        SELECT * FROM CHG;

     

        -- b. 删除

        WITH

        CHG AS

        (

           SELECT CHG.*

           FROM CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG

           WHERE CHG.SYS_CHANGE_OPERATION = N'D'

        )

        DELETE DATA

        FROM dbo.tb_Target DATA

        INNER JOIN CHG  

        ON CHG.pk_id = DATA.pk_id;

     

        -- c. 更新

        WITH

        COL AS

        (

           SELECT *

           FROM

           (

               SELECT name, column_id

               FROM sys.columns

               WHERE object_id = @object_id

           ) DATA

     

           PIVOT

           (

              MAX(column_id)

              FOR name IN(  -- 源表的所有列的列表

                  [col1], [col2], [col3], [col4])

           )P               

        ),

        CHG AS

        (

           SELECT DATA.*,__SYS_CHANGE_COLUMNS = CHG.SYS_CHANGE_COLUMNS

           FROM dbo.tb_source DATA INNER JOIN 

           CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG

           ON CHG.pk_id = DATA.pk_id

           WHERE CHG.SYS_CHANGE_OPERATION = N'U'

        )

        UPDATE DATA SET             -- 判断列是否需要更新(也可以不判断,直接更新所有的列)

           [col1] = CASE

                      WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col1], CHG.__SYS_CHANGE_COLUMNS) = 1

                         THEN CHG.[col1]

                      ELSE DATA.[col1]

                  END,

           [col2] = CASE

                      WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col2], CHG.__SYS_CHANGE_COLUMNS) = 1

                         THEN CHG.[col2]

                      ELSE DATA.[col2]

                  END,

           [col3] = CASE

                      WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col3], CHG.__SYS_CHANGE_COLUMNS) = 1

                         THEN CHG.[col3]

                      ELSE DATA.[col3]

                  END,

           [col4] = CASE

                      WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col4], CHG.__SYS_CHANGE_COLUMNS) = 1

                         THEN CHG.[col4]

                      ELSE DATA.[col4]

                  END

        FROM dbo.tb_Target DATA

        INNER JOIN CHG

        ON CHG.pk_id = DATA.pk_id

        CROSS JOIN COL;

     

        -- ========================================

        -- 更新最后一次同步的版本号

        UPDATE dbo.tb_Change_Tracking SET

           @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION(),

           last_sync_version = @last_sync_version,

           last_update_date = GETDATE();

     

        -- ========================================

        -- TRY...CATCH 中的标准事务处理模块

        -- c. 提交事务

        --    有可提交的事务, 并且事务是在当前模块中开启的情况下, 才提交事务

        IF XACT_STATE() = 1 AND @__trancount = 0

           COMMIT;

     

    lb_Return:

        -- ========================================

        -- TRY...CATCH 中的标准事务处理模块

        -- d. 为了防止TRY 块中有遗漏的事务处理, 在TRY 模块的结束部分做最终的事务处理

        IF @__trancount = 0

        BEGIN

           IF XACT_STATE() = -1

               ROLLBACK TRAN;

           ELSE

           BEGIN     

               WHILE @@TRANCOUNT > 0

                  COMMIT TRAN;

           END;

        END;

    END TRY

    BEGIN CATCH

        -- ========================================

        -- TRY...CATCH 中的标准事务处理模块

        -- e. CATCH 块的事务回滚

        IF XACT_STATE() <> 0

        BEGIN

           IF @__trancount = 0

               ROLLBACK TRAN;

           -- XACT_STATE 为-1 时, 不能回滚到事务保存点, 这种情况留给外层调用者做统一的事务回滚

           -- 通过@@TRANCOUNT > @__trancount 的判断, 即使在TRY 模块中没有设置事务保存点的情况下跳到此步骤, 也不会出错

           ELSE IF XACT_STATE() = 1 AND @@TRANCOUNT > @__trancount

               ROLLBACK TRAN __TRAN_SavePoint;

        END;

     

        -- ========================================

        -- 错误消息处理

        -- a. 获取错误信息

        --    这提提取了错误相关的全部信息, 可以根据实际需要调整

        DECLARE @__error_number int,

           @__error_message nvarchar(2048),

           @__error_severity int,

           @__error_state int,

           @__error_line int,

           @__error_procedure nvarchar(126),

           @__user_name nvarchar(128),

           @__host_name nvarchar(128);

     

        SELECT @__error_number = ERROR_NUMBER(),

           @__error_message = ERROR_MESSAGE(),

           @__error_severity = ERROR_SEVERITY(),

           @__error_state = ERROR_STATE(),

           @__error_line = ERROR_LINE(),

           @__error_procedure = ERROR_PROCEDURE(),

           @__user_name = SUSER_SNAME(),

           @__host_name = HOST_NAME();

     

        -- c. 如果没有打算在CATCH 模块中对错误进行处理, 则应该抛出错误给调用者

        RAISERROR

        (

           N'User: %s, Host: %s, Procedure: %s, Error %d, Level %d, State %d, Line %d, Message: %s ',

           @__error_severity,

           1,

           @__user_name,

           @__host_name,

           @__error_procedure,

           @__error_number,

           @__error_severity,

           @__error_state,

           @__error_line,

           @__error_message

         );

    END CATCH;

    GO

     

    -- 数据同步的存储过程- 同步未更新的数据

    -- 循环更新,直到手工结束

    -- 可设置一个在SQL Agent服务启动时工作的JOB 来调用这个存储过程,这样就可以实现自动同步

    CREATE PROC dbo.p_SyncChangeData_Circle_tb_Srouce_Target

        @interval int = 1 -- 循环检测之间相隔的秒数(指定负数或NULL值会调整为)

    AS

    SET NOCOUNT ON;

     

    IF @interval IS NULL OR @interval < 0

        SET @interval = 1;

     

    DECLARE @last_sync_version bigint,

        @min_valid_version bigint,

        @date_delay datetime;

     

    WHILE 1 = 1

    BEGIN

        EXEC dbo.p_SyncChangeData_tb_Srouce_Target

           @last_sync_version = @last_sync_version OUTPUT,

           @min_valid_version = @min_valid_version OUTPUT;

     

        IF @min_valid_version IS NOT NULL

        BEGIN

           RAISERROR(N'某些未同步的数据已经丢失', 16, 1);

           BREAK;

        END;

        IF @last_sync_version IS NULL

        BEGIN

           RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);

           BREAK;

        END;

     

        SET @date_delay = DATEADD(Second, @interval, GETDATE());

        WAITFOR TIME @date_delay;

    END

    GO

     

    3.同步测试

    -- ====================================================

    -- 测试

    USE DB_test;

    GO

    -- A.01. 变更源表数据

    INSERT dbo.tb_source

    (

        col1

    )

    VALUES

    (

        1

    );

     

    INSERT dbo.tb_source

    (

        col1

    )

    VALUES

    (

        2

     ),

    (

        3

    );

     

    UPDATE dbo.tb_source SET

        col2 = 'update'

    WHERE pk_id = 2;

     

    -- A.02. 同步源表数据,并查询同步结果

    DECLARE @last_sync_version bigint,

        @min_valid_version bigint;

     

    EXEC dbo.p_SyncChangeData_tb_Srouce_Target

        @last_sync_version = @last_sync_version OUTPUT,

        @min_valid_version = @min_valid_version OUTPUT;

     

    IF @min_valid_version IS NOT NULL

        RAISERROR(N'某些未同步的数据已经丢失', 16, 1);

     

    IF @last_sync_version IS NULL

        RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);

     

    SELECT * FROM dbo.tb_source;

    SELECT * FROM dbo.tb_Target;

    SELECT * FROM dbo.tb_Change_Tracking;

     

    -- B.01. 变更源表数据

    INSERT dbo.tb_source

    (

        col1

    )

    VALUES

    (

        11

     );

     

    UPDATE dbo.tb_source SET

        col2 = 'update.1'

    WHERE pk_id = 2;

     

    UPDATE dbo.tb_source SET

        col2 = 'update 3'

    WHERE pk_id = 3;

     

    DELETE FROM dbo.tb_source

    WHERE pk_id < 3;

     

    SET IDENTITY_INSERT dbo.tb_source ON;

     

    INSERT dbo.tb_source

    (

        pk_id, col1, col2

    )

    VALUES

    (

        1, 11, 'insert'

     );

    SET IDENTITY_INSERT dbo.tb_source OFF;

     

    -- B.02. 同步源表数据,并查询同步结果

    EXEC dbo.p_SyncChangeData_tb_Srouce_Target

        @last_sync_version = @last_sync_version OUTPUT,

        @min_valid_version = @min_valid_version OUTPUT;

     

    IF @min_valid_version IS NOT NULL

        RAISERROR(N'某些未同步的数据已经丢失', 16, 1);

    IF @last_sync_version IS NULL

        RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);

     

    SELECT * FROM dbo.tb_source;

    SELECT * FROM dbo.tb_Target;

    SELECT * FROM dbo.tb_Change_Tracking;

    GO

     

    4.删除示例测试环境

    -- ====================================================

    -- /* -- 删除测试环境

    USE DB_test;

    GO

    IF OBJECT_ID(N'dbo.p_SyncChangeData_tb_Srouce_Target') IS NOT NULL

        DROP PROC dbo.p_SyncChangeData_tb_Srouce_Target;

     

    IF OBJECT_ID(N'dbo.tb_source') IS NOT NULL

        DROP TABLE dbo.tb_source;

     

    IF OBJECT_ID(N'dbo.tb_Target') IS NOT NULL

        DROP TABLE dbo.tb_Target;

     

    IF OBJECT_ID(N'dbo.tb_Change_Tracking') IS NOT NULL

        DROP TABLE dbo.tb_Change_Tracking;

    GO

    --*/

    /*-- 删除测试数据库

    USE master;

    GO

    ALTER DATABASE DB_test SET

        SINGLE_USER

        WITH

           ROLLBACK AFTER 0;

    GO

    DROP DATABASE DB_test;

    --*/

     

    附:

    关于使用更改跟踪实现数据同步的一些补充说明:

    1.  使用 SNAPSHOT 事务隔离级别

    在进行同步的处理中,需要记录当前已经处理的最后一个版本号;而在查询更改跟踪信息时,无法指定要查询的更改跟踪信息的结束版本号。这就要求同步处理过程中,无论是查询更改的最后一个版本号,还是查询更改跟踪信息,都能保证是同一个变更版本的结束版本号。把所有的处理放在事务中,并使用SNAPSHOT事务隔离级别可以保证这点。

    2.  自动同步

    更改跟踪的信息只能被查询,所以在示例中,是在数据变更后,手动运行同步的存储过程实现数据同步。如果要自动同步,则只能使用定时查询的方式。本示例提供了一个名为“dbo.p_SyncChangeData_Circle_tb_Srouce_Target”的存储过程,可以使用Job或者其他程序在SQL Server启动时,调用这个存储过程,则这个存储过程会定时检查数据变更,并实现数据变更的自动同步。


    最新回复(0)