otrdiena, 2011. gada 25. oktobris

Service Broker: asinhrons trigeris

Service Broker (SB) nodrošina ziņojumu sūtīšanu un rindas SQL Server- par to kas tas īsti ir var lasīt msdn. Šajā rakstā piemērs, kā izmantojot SB var realizēt asinhronu trigeri SQL Server datu bāzē.

Šī ir raksta 2. redakcija- raksts ir uzlabots (patiesībā pārrakstīts) un vismaz šobrīd- pilnīgāks par vairumu "google" atrodamajiem. Asinhrono trigeru gadījumā bieži tiek aizmirsts, ka SB visa komunikācija notiek dialogu formā- ziņojums tiek sūtīts un ziņojumam tiek gaidīta atbilde. Ja šī atbilde nepienāk, tā tiek gaidīta mūžīgi. Nelielā sistēmā tā nebūs diezko liela problēma, tomēr apmainoties ar miljoniem ziņu- tā var kļūt par ļoti nopietnu problēmu (kurai neglīti work-around'i ir pilna pasaule, arī šajā emuārā tāds agrāk vai vēlāk parādīsies- ar treknu norādi, ka kaut kas nav pareizi uztaisīts, ja tas jāizmanto. [papildināts 2011-11-04] Service Broker: dialoga piespiedu pārtraukšana).

Iesaku grāmatu, ja ir vēlme padziļināti apgūt SB.

Kā izskatās rezultāts
Sākšu ar rezultātu, lai jau uzreiz būtu skaidrs, kam tas domāts.
Ievietoju tabulā 'MyTable' 100 ierakstus:
Declare @i int = 0
while @i < 100
Begin
    Insert Into MyTable(Number) Values(@i)
    Set @i += 1; -- šī sintakse tikai > SQL Server 2008
End
Un varu sekot līdzi izpildes rezultātiem izpildot:
Select * From MyTable
Lūk kā pamazām tiek izpildīti uzdevumi- katrs rindas ieraksts šajā gadījumā ir viens uzdevums (protams, skriptā ir pateikts, ka katrs uzdevums jāizpilda piecas sekundes):


Kas ir izveidots
(skripts raksta beigās!)
Ir izveidota tabula, kurā tiek ievietoti dati. Datu ievietošana notiek sinhroni, katrai rindai ieliekot noklusēto statusu '0' (kolonna 'Stat') un aizpildot ievietošanas datumu (kolonna 'DateInserted'). Tabulas trigerī dati tiek nosūtīti asinhronai apstrādei norādot servisu, kas sūta ziņojumu (MyTableService) un servisu, kas saņem un apstrādā ziņojumu (ProcessService). Pēc ziņojuma apstrādes serviss (ProcessService) sūtīta atbildes ziņojumu sūtītāja servisam (MyTableService) par rezultātu. MyTableService piefiksē, kad atbilde ir saņemta (kolonna 'DateProcessed') un nomaina statusu uz '1'.

Šajā gadījumā tabula kalpo tikai kā pierādījums tam, ka tas strādā un trigeris "paslēpj" ziņojumu sūtīšanas komandas. Svarīgākais- notiek divu SB servisu savstarpējā komunikācijā:
  • viens serviss dod uzdevumus un gaida "atskaiti" par rezultātu
  • otrs serviss gaida uzdevumus, tos apstrādā un ziņo par rezultātiem darba devējam.
Mēģināju uzzīmēt bildi, kas nav visai glīta, bet ar tās palīdzību mēģināšu izskaidrot no kā sastāv izveidotie SB servisi.


  • MyTableService - tiek izmantots ziņojumu sūtīšanai. Katram servisam ir divas sastāvdaļas (šajā abstrakcijas līmenī): rinda un process, kas apstrādā rindas elementus. Šim servisam ir rinda, kas saucās 'MyTableQueue'. Rindu apkalpo procedūra 'sb_MyTableService'.
    Tā kā šis serviss ir uzdevumu devējs, tad attiecīgā servisa rindā nonāks paziņojumi par uzdevumu (ziņojumu) apstrādes rezultātiem.
    Procedūra 'sb_MyTableService' gaida, kad rindā parādās jauni procesa izpildes rezultāti. Tos saņem un pa vienam apstrādātā (tiek mainīts statuss un ievietots laiks, kad apstrādāta ziņojuma atbilde).
  • ProcessService - tiek izmantots uzdevumu apstrādei.
    Uzdevumi tiek glabāti rindā 'ProcessQueue'. Tiklīdz rindā parādās jauns uzdevums, tiek izmantota procedūra 'sb_ProcessService' lai apstrādātu uzdevumu(s)- pa vienam elementam. Pēc asptrādes tiek ziņots uzdevuma devējam, ka uzdevums apstrādāts.
Skripti
Lai datu bāzē atļautu Service Broker jāizpilda:
ALTER DATABASE DBNosaukums SET ENABLE_BROKER
Visu nepieciešamo objektu izveide datu bāzē (sanāk daudz.., testēts uz SQL Server 2008 R2 un SQL Server 2005 Express Edition):
Create Table dbo.MyTable
(
    MyTableID int primary key identity,
    Stat bit default 0,
    Number int,
    DateInserted datetime default GetDate(),
    DateProcessed datetime
)
Go
Create Procedure dbo.usp_ProcessData(@data xml)
As
Begin
    Declare @docHandle int;
    EXEC sp_xml_preparedocument @docHandle OUTPUT, @data
    Update m
        Set Stat = 1,
            DateProcessed = GETDATE()
    From dbo.MyTable m inner join
        (
            Select *
            FROM OPENXML(@docHandle, '/DataInserted/row', 2)
            With (MyTableID int )
        ) i On m.MyTableID = i.MyTableID
    EXEC sp_xml_removedocument @docHandle
End
Go
Create Queue MyTableQueue;
Create Service MyTableService On Queue MyTableQueue([DEFAULT])
Create Queue  ProcessQueue;
Create Service ProcessService On Queue ProcessQueue([DEFAULT])
Go
Create Trigger tr_MyTable_insert
    On dbo.MyTable For Insert
As
Begin           
        Declare @h UniqueIdentifier;
        Declare @doc xml;
       
        Set @doc =
        (
            Select MyTableID From inserted
            For XML Raw, Elements, Type, Root('DataInserted')
        );
        Begin Dialog Conversation @h
        From Service MyTableService
            To Service 'ProcessService'
        With Encryption = OFF;


        Send On Conversation @h(@doc)
End
Go
Create Procedure dbo.sb_ProcessService
As
Begin
    Set NoCount On;


    Declare @c uniqueidentifier;
    Declare @h uniqueidentifier;
    Declare @m xml;
    Declare @t sysname;


    While 1 = 1
    Begin
        Begin Transaction
        WaitFor(Get Conversation Group @c From dbo.ProcessQueue), TimeOut 100;


        If @c is Null
        Begin
            Rollback Transaction;
            Break;
        End


        While 1 = 1
        Begin
            Receive Top(1)
                @h = conversation_handle,
                @t = message_type_name,
                @m = cast(message_body as xml)
            From dbo.ProcessQueue
            Where conversation_group_id = @c;
           
            If(@@ROWCOUNT <>1)
                Break;
           
            If @t = N'DEFAULT'
            Begin
                WaitFor DELAY '00:00:05';
                Send On Conversation @h (@m)
                End Conversation @h;
                Continue;
            End
        End;
        Commit Transaction
    End   
End
Go
ALTER QUEUE dbo.ProcessQueue
    WITH ACTIVATION
    (
        STATUS=ON,
        PROCEDURE_NAME = dbo.sb_ProcessService,
        MAX_QUEUE_READERS = 1,
        Execute AS Self
    )
Go
Create Procedure dbo.sb_MyTableService
As
Begin
    Set NoCount On;


    Declare @c uniqueidentifier;
    Declare @h uniqueidentifier;
    Declare @m xml;
    Declare @t sysname;


    While 1 = 1
    Begin
        Begin Transaction
        WaitFor(Get Conversation Group @c From dbo.MyTableQueue), TimeOut 100;


        If @c is Null
        Begin
            Rollback Transaction;
            Break;
        End


        While 1 = 1
        Begin
            Receive Top(1)
                @h = conversation_handle,
                @t = message_type_name,
                @m = cast(message_body as xml)
            From dbo.MyTableQueue
            Where conversation_group_id = @c;
           
            If(@@ROWCOUNT <>1)
                Break;
            If @t = N'DEFAULT'
            Begin
                Exec dbo.usp_ProcessData @m
                Continue;
            End
           
            If @t = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
            Begin
                End Conversation @h;
                Continue;
            End
        End;
        Commit Transaction
    End   
End
Go
ALTER QUEUE dbo.MyTableQueue
    WITH ACTIVATION
    (
        STATUS=ON,
        PROCEDURE_NAME = dbo.sb_MyTableService,
        MAX_QUEUE_READERS = 1,
        Execute AS Self
    )
Go
Lai pēc sevis satīrītu, var izmantot:
-- CleanUp
Drop Table dbo.MyTable
Drop Service ProcessService
Drop Service MyTableService
Drop Queue MyTableQueue
Drop Queue ProcessQueue
Drop Proc dbo.sb_ProcessService
Drop Proc dbo.sb_MyTableService
Drop Proc dbo.usp_ProcessData

Nav komentāru:

Komentāra publicēšana