Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Предыдущие посты по этой теме:
· Репликация таблиц средствами Change Tracking
· Краткое введение в сервис-брокер
· Репликация средствами Change Tracking. Небольшое упражнение на FOR XML PATH и XQuery.
В посте Репликация таблиц средствами Change Tracking мы рассмотрели вариант синхронизации таблиц tbl_1 и tbl_2 при помощи появившегося в SQL Server 2008 механизма отслеживания изменений Change Tracking. В данном посте мы разовьем этот сценарий на случай, когда таблицы находятся на разных серверах. В качестве транспорта будет использоваться появившийся в SQL Server 2005 механизм асинхронного взаимодействия сервис-брокер. Таблица tbl_1 будет находиться на сервере Маша. Change Tracking будет отслеживать происходящие над ней изменения (delete, insert, update). Эти изменения будут превращаться в XML-сообщение и доставляться сервис-брокером на сервер Дубровский, где XML превратится обратно в DML-команды, которые будут применены к таблице tbl_2 на этом сервере. Вместо сервис-брокера можно задействовать свой транспорт по доставке XML, тогда этот сценарий может применяться, когда Маша и Дубровский оба SQL Expressы. Сервис-брокер входит в состав SQL Express, однако два SQL Expressа через него общаться не могут. Необходимо, чтобы хотя бы одна сторона имела взрослую редакцию.
В упрощенном примере обе таблицы будут находиться на одном сервере в одной базе ChangeTracking_Test.
use tempdb
if exists(select 1 from sys.databases where name = 'ChangeTracking_Test') begin
alter database ChangeTracking_Test set single_user with rollback immediate
drop database ChangeTracking_Test
end
create database ChangeTracking_Test
use ChangeTracking_Test
Скрипт 1
В базе со стороны Маши будут работать два процесса: имитация пользовательской активности, вносящая в tbl_1 случайные изменения, и периодическая синхронизация. Чтобы заморозить tbl_1 на момент синхронизации, используется уровень изоляции snapshot. Его нужно включить на базе со стороны Маши:
alter database ChangeTracking_Test set single_user with rollback immediate
alter database ChangeTracking_Test set read_committed_snapshot on
alter database ChangeTracking_Test set multi_user
alter database ChangeTracking_Test set allow_snapshot_isolation on
Скрипт 2
На базе со стороны Маши должен быть поднят Change Tracking:
if not exists (select 1 from sys.change_tracking_databases where database_id = db_id('ChangeTracking_Test'))
alter database ChangeTracking_Test set change_tracking = on (change_retention = 10 minutes, auto_cleanup = on)
Скрипт 3
Со стороны Маши и со стороны Дубровского должен быть задействован сервис-брокер. Практикой хорошего тона при использовании сервис-брокера является иметь мастер-ключ на базе, чтобы потом не вылезла ошибка Краткое введение в сервис-брокер\Скрипт 3.
create master key encryption by password = 'AbraCadabra'
if (select is_broker_enabled from sys.databases where name = 'ChangeTracking_Test') = 0
alter database ChangeTracking_Test set enable_broker with rollback immediate
Скрипт 4
На стороне Маши имеется таблица tbl_1
if object_id('dbo.tbl_1', 'U') is not null drop table tbl_1
create table tbl_1 (
id1 int identity,
id2 int default (datepart(ns, sysdatetime()) / 100),
fld1 varchar(10),
fld2 sql_variant,
primary key(id1, id2)
)
Скрипт 5
с которой будет синхронизироваться таблица tbl_2 на стороне Дубровского:
if object_id('dbo.tbl_2', 'U') is not null drop table tbl_2
create table tbl_2 (
id1 int,
id2 int,
fld1 varchar(10),
fld2 sql_variant,
primary key(id1, id2)
)
Скрипт 6
Включаем отслеживание изменений по tbl_1:
if not exists (select 1 from sys.change_tracking_tables where object_id = object_id('tbl_1'))
alter table tbl_1 enable change_tracking
Скрипт 7
На стороне Маши создаем вспомогательную таблицу для хранения предыдущей версии синхронизации и журналирования результатов
use ChangeTracking_Test
if object_id('dbo.Sync_Log', 'U') is not null drop table dbo.Sync_Log
create table dbo.Sync_Log (dt datetime default sysdatetime(), version bigint default change_tracking_current_version(),
source sysname, destination sysname, status nvarchar(200),
deleted bigint, inserted bigint, updated bigint)
insert dbo.Sync_Log (version, source, destination)
values (change_tracking_min_valid_version(object_id('dbo.tbl_1')), 'tbl_1', 'tbl_2')
Скрипт 8
Работающий на периодической основе со стороны Маши скрипт синхронизации будет отлавливать при помощи функции ChangeTable изменения в tbl_1 и превращать их в XML вида Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. Я для него даже создал схему:
if exists (select 1 from sys.xml_schema_collections where name = 'CT_Changes_tbl_1_xsd') drop xml schema collection CT_Changes_tbl_1_xsd
create xml schema collection CT_Changes_tbl_1_xsd as
N'<?xml version="1.0" encoding="utf-16"?>
<xs:schema xmlns:xs="https://www.w3.org/2001/XMLSchema">
<xs:element name="CT_Changes">
<xs:complexType>
<xs:sequence>
<xs:element ref="Record" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="table_name" type="xs:string" use="required" />
<xs:attribute name="version_since" type="xs:long" use="required" />
<xs:attribute name="version_upto" type="xs:long" use="required" />
</xs:complexType>
</xs:element>
<xs:element name="Record">
<xs:complexType>
<xs:sequence>
<xs:element ref="PK" minOccurs ="1" maxOccurs ="1"/>
<xs:element name="fld1" type="xs:string" minOccurs="0" />
<xs:element name="fld2" type="xs:string" minOccurs="0" />
</xs:sequence>
<xs:attribute name="operation" type="xs:string" use="required" />
<xs:attribute name="change_no" type="xs:long" use="required" />
<xs:attribute name="commit_time" type="xs:dateTime" use="required" />
</xs:complexType>
</xs:element>
<xs:element name="PK">
<xs:complexType>
<xs:sequence>
<xs:element name="id1" type="xs:int" />
<xs:element name="id2" type="xs:int" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>'
Скрипт 9
Конфигурируем сервис-брокер, создавая тип сообщения для передачи изменений (он будет валидироваться схемой CT_Changes_tbl_1_xsd), контракт, по которому будут передаваться сообщения этого типа (имена чувствительны к регистру невзирая на коллацию), очереди для сообщений, сервисы как конечные точки и открываем диалог, в рамках которого сервис Маша будет передавать сервису Дубровский сообщения по только что определенному контракту, короче, Краткое введение в сервис-брокер\Скрипты 4 - 9.
if exists(select 1 from sys.services where name = 'Masha') drop service Masha
if exists(select 1 from sys.services where name = 'Dubrovsky') drop service Dubrovsky
if exists(select 1 from sys.service_contracts where name = 'CT_Changes_tbl_1_Contract')
drop contract CT_Changes_tbl_1_Contract
if exists(select 1 from sys.service_message_types where name = 'CT_Changes_tbl_1_MessageType')
drop message type CT_Changes_tbl_1_MessageType
create message type CT_Changes_tbl_1_MessageType validation = valid_xml with schema collection CT_Changes_tbl_1_xsd
create contract CT_Changes_tbl_1_Contract (CT_Changes_tbl_1_MessageType sent by initiator)
if exists(select 1 from sys.service_queues where name = 'QueueMashi') drop queue QueueMashi
create queue QueueMashi
if exists(select 1 from sys.service_queues where name = 'QueueDubrovskogo') drop queue QueueDubrovskogo
create queue QueueDubrovskogo
create service Masha on queue QueueMashi (CT_Changes_tbl_1_Contract)
create service Dubrovsky on queue QueueDubrovskogo (CT_Changes_tbl_1_Contract)
declare @ch uniqueidentifier
begin dialog conversation @ch from service Masha to service 'Dubrovsky' on contract CT_Changes_tbl_1_Contract
Скрипт 10
С отдельного коннекта в SSMS на стороне Маши запускаем имитацию пользовательской активности Репликация таблиц средствами Change Tracking\Скрипт 5.
Модифицируем скрипт синхронизации Репликация таблиц средствами Change Tracking\Скрипт 7. Скрипт состоит из 3-х частей, разделенных комментарными линиями. Первая часть осталась без изменений. Вторая изменена с тем, чтобы он не напрямую применял изменения к tbl_2, а превращал их в XML (Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 3) и кидал в очередь брокеру. Третья, как и первая, осталась без изменений. Она опциональна. В ней я пользуюсь тем, что таблицы на самом деле находятся в одной базе, и сравниваю их, чтобы убедиться, что синхронизация работает.
while 1 = 1 begin
-----------------------------------------------------------------------------------------------------
waitfor delay '00:01:00'
set transaction isolation level snapshot
begin tran
declare @lastVersion bigint --здесь будет храниться последняя версия, которой синхронизирована tbl_2
declare @curVersion table (curVersion bigint); delete from @curVersion --здесь будет храниться текущая версия изменений
select @lastVersion = isnull(max(version), 0) from dbo.Sync_Log where source = 'tbl_1' and destination = 'tbl_2' --берем последнюю версию из нашего журнала
insert dbo.Sync_Log (source, destination) output inserted.Version into @curVersion values ('tbl_1', 'tbl_2') --отмечаем в журнале текущий факт синхронизации
--Если autocleanup успел почистить изменения tbl_1, которые еще не были доставлены на tbl_2, поднимаем аварийную ситуацию.
if @lastVersion < change_tracking_min_valid_version(object_id('dbo.tbl_1')) begin
declare @msg nvarchar(200) = 'Часть изменений потеряна! Требуется ручная синхронизация!'
update dbo.Sync_Log set status = @msg where version = (select curVersion from @curVersion) --фиксируем ее в журнале
raiserror (@msg, 21, 1) with log --и вызываем строгую ошибку, которая прерывает выполнение скрипта
end
--Если за период с прошлой синхронизации ничего нового не произошло, можно не париться.
if @lastVersion = change_tracking_current_version() goto konec
-----------------------------------------------------------------------------------------------------
--Превращаем результат changetable в xml и передаем его в очередь.
declare @x xml = (
select 'tbl_1' as [@table_name], @lastVersion as [@version_since], change_tracking_current_version() as [@version_upto],
(
select ct.SYS_CHANGE_OPERATION as [@operation], ct.SYS_CHANGE_VERSION as [@change_no], sct.commit_time as [@commit_time],
ct.id1 as [PK/id1], ct.id2 as [PK/id2], t.fld1 as fld1, t.fld2 as fld2
from changetable(changes tbl_1, @lastVersion) ct
join sys.dm_tran_commit_table sct on ct.sys_change_version = sct.commit_ts
left join tbl_1 t on t.id1 = ct.id1 and t.id2 = ct.id2
for xml path('Record'), type
)
for xml path('CT_Changes')
)
declare @ch uniqueidentifier =
(
select top 1 ce.conversation_handle from sys.conversation_endpoints ce join
sys.services s on ce.service_id = s.service_id
join sys.service_queues sq on s.service_queue_id = sq.object_id
where s.name = 'Masha' and ce.far_service = 'Dubrovsky' and ce.is_initiator = 1 and ce.state <> 'ER'
) --всегда открыт только один диалог, инициатором которого является Маша
;send on conversation @ch message type CT_Changes_tbl_1_MessageType (@x) --в него и зафигачиваем этот XML
-----------------------------------------------------------------------------------------------------
konec:
--Сравнение копии с оригиналом.
declare @n1 bigint, @n2 bigint
select @n1 = count(1) from (select * from tbl_1 except select * from tbl_2) t --сколько записей в оригинале не хватает в копии
select @n2 = count(1) from (select * from tbl_2 except select * from tbl_1) t --и наоборот
update dbo.Sync_Log set status = case when @n1 <> 0 or @n2 <> 0 then 'Обнаружено ' + cast(@n1 as varchar(20)) + ' записей в tbl_1, не совпадающих с tbl_2, и ' + cast(@n2 as varchar(20)) + ' записей в tbl_2, не совпадающих с tbl_1.' else 'OK' end where version = (select curVersion from @curVersion) --отражаем несовпадения в журнале, а если их нет, то ОК
commit
set transaction isolation level read committed
end
Скрипт 11
Запускаем этот скрипт с нового коннекта в SSMS. Каждую минуту в очередь Дубровского будет капать сообщение от Маши. Можно их посмотреть select *, cast(message_body as xml) from QueueDubrovskogo и убедиться, что в message_body приходит нечто по образу Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. На несовпадения между tbl_1 и tbl_2, о которых сообщается в таблице Sync_Log, пока не обращаем внимания. Мы убедились, что Change Tracking исправно отслеживает изменения в tbl_1, а сервис-брокер исправно доставляет их на сервер с tbl_2. Теперь на Дубровском напишем процедуру очереди, которая будет разгребать валящиеся сообщения, превращать XML обратно в операторы DML и применять их к tbl_2, чтобы синхронизировать ее с tbl_1. Джойним tbl_2 по полям РК c записями, полученными из XML. Те, у которых operation="D", удаляются, "I" - вставляются, "U" - обновляются. Апдейты dbo.Sync_Log, которые идут после каждой операции, предназначены для контрольных целей. По-хорошему, Sync_Log нужно было разделить, сделав журнал на стороне отправки и журнал на стороне приема. Я не стал этим заморачиваться, беззастенчиво воспользовавшись тем, что в данном примере стороны физически совпадают. Процедура очереди будет написана на основе Краткое введение в сервис-брокер\Скрипты 21-22 и Небольшое упражнение на FOR XML PATH и XQuery\Скрипты 4-5.
if object_id('ProcessSyncMessages', 'P') is not null drop proc ProcessSyncMessages
go
--Процедура производит синхронизацию tbl_2 с tbl_1
create proc ProcessSyncMessages as begin
declare @ch uniqueidentifier, @msgtype sysname, @body varbinary(max)
--Читаем из очереди сообщение с изменениями
while 1 = 1 begin
waitfor (receive top(1) @ch = conversation_handle, @msgtype = message_type_name, @body = message_body from QueueDubrovskogo)
if @@rowcount = 0 return
if @msgtype <> 'CT_Changes_tbl_1_MessageType' return
select @@rowcount, @msgtype
declare @x xml = @body
declare @curVersion bigint = (select x.value('@version_upto[1]', 'bigint') from @x.nodes('CT_Changes') d(x))
--Удаляем удаленные записи
delete t from tbl_2 t join (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2 from @x.nodes('CT_Changes/Record[@operation="D"]') d(x)) ct on t.id1 = ct.id1 and t.id2 = ct.id2
update dbo.Sync_Log set deleted = @@rowcount where version = @curVersion --их количество вносим в журнал
--Добавляем новые
insert tbl_2 select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="I"]') d(x)
update dbo.Sync_Log set inserted = @@rowcount where version = @curVersion --их количество вносим в журнал
--Обновляем модифицированные
;with cte as (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="U"]') d(x))
update t2 set t2.fld1 = cte.fld1, t2.fld2 = cte.fld2 from tbl_2 t2 join cte on t2.id1 = cte.id1 and t2.id2 = cte.id2
update dbo.Sync_Log set updated = @@rowcount where version = @curVersion --их количество вносим в журнал
end
end
go
alter queue QueueDubrovskogo with activation
(
status = on,
procedure_name = ProcessSyncMessages,
max_queue_readers = 1,
execute as self
)
Скрипт 12
Пускаем по-новой, выжидаем, смотрим, что получилось.
Рис.1
Не, ну это просто праздник какой-то. Все ж работает. Change Tracking, Service Broker, все рулит. А что в журнале?
Рис.2
В журнале тоже все правильно. Черт, единственно, я промахнулся с полем status. Третью часть из Скрипта 11 (Сравнение копии с оригиналом) надо было перетащить в процедуру очереди (Скрипт 12). Не сообразил, осталось исторически с поста Репликация таблиц средствами Change Tracking, где все было синхронно. А сейчас получается, что она отдала изменения в очередь и сразу полезла проверять таблицы на совпадение, а изменения еще не успели дойти и примениться. Поэтому вместо ОК в поле status везде несовпадения. Принципиально это ни на что не влияет, но выглядит неаккуратно. Ладно, предоставляется читателям в качестве самостоятельного упражнения.
По окончании демонстрации стопятся Скрипты 10, 11. Чтобы погасить процедуру обработки очереди, нужно выполнить скрипт
alter queue QueueDubrovskogo with activation
(
status = off
)
select * from sys.dm_broker_activated_tasks
kill 35
где вместо 35 нужно поставить спид, на котором она болтается в конкретном случае.