From 4e6dffa2380ccf5636b37b0038a8502db4f9b343 Mon Sep 17 00:00:00 2001 From: "daniele.teti" Date: Tue, 13 Apr 2010 22:25:14 +0000 Subject: [PATCH] + some change to the speed test + some formatting + some refactoring --- StompClient.pas | 1070 +++++++++++++++++++------------------- StompTypes.pas | 1010 +++++++++++++++++------------------ test/MainU.pas | 100 ++-- test/teststompclient.dpr | 4 +- 4 files changed, 1091 insertions(+), 1093 deletions(-) diff --git a/StompClient.pas b/StompClient.pas index e787166a..7a245cda 100644 --- a/StompClient.pas +++ b/StompClient.pas @@ -1,538 +1,536 @@ { ******************************************************* } -{ } -{ Stomp Client for Embarcadero Delphi & FreePascal } -{ Tested With ApacheMQ 5.2/5.3 } -{ Copyright (c) 2009-2009 Daniele Teti } -{ } -{ Contributors: } -{ Daniel Gaspary: dgaspary@gmail.com } -{ } -{ WebSite: www.danieleteti.it } -{ email:d.teti@bittime.it } -{ ******************************************************* } - -unit StompClient; - -//For FreePascal users: -//Automatically selected synapse tcp library -{$IFDEF FPC} -{$MODE DELPHI} -{$DEFINE USESYNAPSE} -{$ENDIF} - -//For Delphi users: -//Decomment following line to use synapse also in Delphi -{.$DEFINE USESYNAPSE} - -interface - -uses - StompTypes, - SysUtils, -{$IFNDEF USESYNAPSE} - IdTCPClient, - IdException, - IdExceptionCore, -{$ELSE} - synsock, - blcksock, -{$ENDIF} - Classes; - -type -{$IFDEF USESYNAPSE} - ESynapseTimeout = Exception; -{$ENDIF} - - TStompClient = class(TInterfacedObject, IStompClient) - private -{$IFDEF USESYNAPSE} - FSynapseTCP: TTCPBlockSocket; - FSynapseConnected: boolean; -{$ELSE} - FTCP: TIdTCPClient; -{$ENDIF} - FHeaders: IStompHeaders; - FPassword: string; - FUserName: string; - FTimeout: Integer; - FSession: string; - FInTransaction: boolean; - FTransactions: TStringList; - FReceiptTimeout: Integer; - procedure SetReceiptTimeout(const Value: Integer); - - protected - procedure Init; - procedure DeInit; - procedure MergeHeaders(var AFrame: IStompFrame; var AHeaders: IStompHeaders); - procedure SendFrame(AFrame: IStompFrame); - - public - function SetPassword(const Value: string): IStompClient; - function SetUserName(const Value: string): IStompClient; - function Receive(out StompFrame: IStompFrame; ATimeout: Integer): boolean; overload; - function Receive: IStompFrame; overload; - function Receive(ATimeout: Integer): IStompFrame; overload; - procedure Receipt(const ReceiptID: string); - procedure Connect(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT; - ClientID: string = ''); - procedure Disconnect; - procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; - Headers: IStompHeaders = nil); - procedure Unsubscribe(Queue: string); - procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil); - overload; - procedure Send(QueueOrTopicName: string; TextMessage: string; TransactionIdentifier: string; - Headers: IStompHeaders = nil); overload; - procedure Ack(const MessageID: string; const TransactionIdentifier: string = ''); - procedure BeginTransaction(const TransactionIdentifier: string); - procedure CommitTransaction(const TransactionIdentifier: string); - procedure AbortTransaction(const TransactionIdentifier: string); - /// //////////// - constructor Create; virtual; - destructor Destroy; override; - function Connected: boolean; - function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; - property Session: string read FSession; - property ReceiptTimeout: Integer read FReceiptTimeout write SetReceiptTimeout; - property Transactions: TStringList read FTransactions; - end; - -implementation - -{$IFDEF FPC} - -const - CHAR0 = #0; - -{$ELSE} - -uses - Windows, - IdGlobal, - Character; - -{$ENDIF} -{ TStompClient } - -procedure TStompClient.AbortTransaction(const TransactionIdentifier: string); -var - Frame: IStompFrame; -begin - if FTransactions.IndexOf(TransactionIdentifier) > -1 then - begin - Frame := TStompFrame.Create; - Frame.SetCommand('ABORT'); - Frame.GetHeaders.Add('transaction', TransactionIdentifier); - SendFrame(Frame); - FInTransaction := False; - FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); - end - else - raise EStomp.CreateFmt('Abort Transaction Error. Transaction [%s] not found', - [TransactionIdentifier]); -end; - -procedure TStompClient.Ack(const MessageID: string; const TransactionIdentifier: string); -var - Frame: IStompFrame; -begin - Frame := TStompFrame.Create; - Frame.SetCommand('ACK'); - Frame.GetHeaders.Add('message-id', MessageID); - if TransactionIdentifier <> '' then - Frame.GetHeaders.Add('transaction', TransactionIdentifier); - SendFrame(Frame); -end; - -procedure TStompClient.BeginTransaction(const TransactionIdentifier: string); -var - Frame: IStompFrame; -begin - if FTransactions.IndexOf(TransactionIdentifier) = -1 then - begin - Frame := TStompFrame.Create; - Frame.SetCommand('BEGIN'); - Frame.GetHeaders.Add('transaction', TransactionIdentifier); - SendFrame(Frame); - // CheckReceipt(Frame); - FInTransaction := True; - FTransactions.Add(TransactionIdentifier); - end - else - raise EStomp.CreateFmt('Begin Transaction Error. Transaction [%s] still open', - [TransactionIdentifier]); -end; - -// procedure TStompClient.CheckReceipt(Frame: TStompFrame); -// var -// ReceiptID: string; -// begin -// if FEnableReceipts then -// begin -// ReceiptID := inttostr(GetTickCount); -// Frame.GetHeaders.Add('receipt', ReceiptID); -// SendFrame(Frame); -// Receipt(ReceiptID); -// end -// else -// SendFrame(Frame); -// end; - -procedure TStompClient.CommitTransaction(const TransactionIdentifier: string); -var - Frame: IStompFrame; -begin - if FTransactions.IndexOf(TransactionIdentifier) > -1 then - begin - Frame := TStompFrame.Create; - Frame.SetCommand('COMMIT'); - Frame.GetHeaders.Add('transaction', TransactionIdentifier); - SendFrame(Frame); - FInTransaction := False; - FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); - end - else - raise EStomp.CreateFmt('Commit Transaction Error. Transaction [%s] not found', - [TransactionIdentifier]); -end; - -procedure TStompClient.Connect(Host: string; Port: Integer; ClientID: string); -var - Frame: IStompFrame; -begin - try - Init; -{$IFDEF USESYNAPSE} - FSynapseTCP.Connect(Host, intToStr(Port)); - FSynapseConnected:=true; -{$ELSE} - FTCP.Connect(Host, Port); - FTCP.IOHandler.MaxLineLength := MaxInt; -{$ENDIF} - Frame := TStompFrame.Create; - Frame.SetCommand('CONNECT'); - Frame.GetHeaders.Add('login', FUserName).Add('passcode', FPassword); - if ClientID <> '' then - Frame.GetHeaders.Add('client-id', ClientID); - SendFrame(Frame); - Frame := nil; - while Frame = nil do - Frame := Receive; - if Frame.GetCommand = 'ERROR' then - raise EStomp.Create(Frame.output); - if Frame.GetCommand = 'CONNECTED' then - begin - FSession := Frame.GetHeaders.Value('session'); - end; - { todo: 'Call event?' } - except - on E: Exception do - begin - raise EStomp.Create(E.message); - end; - end; -end; - -function TStompClient.Connected: boolean; -begin -{$IFDEF USESYNAPSE} - Result := Assigned(FSynapseTCP) and FSynapseConnected; -{$ELSE} - Result := Assigned(FTCP) and FTCP.Connected; -{$ENDIF} -end; - -constructor TStompClient.Create; -begin - inherited; - FInTransaction := False; - FSession := ''; - FUserName := 'guest'; - FPassword := 'guest'; - FHeaders := TStompHeaders.Create; - FTimeout := 200; - FReceiptTimeout := FTimeout; -end; - -procedure TStompClient.DeInit; -begin -{$IFDEF USESYNAPSE} - FreeAndNil(FSynapseTCP); -{$ELSE} - FreeAndNil(FTCP); -{$ENDIF} - FreeAndNil(FTransactions); -end; - -destructor TStompClient.Destroy; -begin - DeInit; - inherited; -end; - -procedure TStompClient.Disconnect; -var - Frame: IStompFrame; -begin - if Connected then - begin - Frame := TStompFrame.Create; - Frame.SetCommand('DISCONNECT'); - SendFrame(Frame); -{$IFDEF USESYNAPSE} - FSynapseTCP.CloseSocket; - FSynapseConnected:=false; -{$ELSE} - FTCP.Disconnect; -{$ENDIF} - end; - DeInit; -end; - -procedure TStompClient.Init; -begin - DeInit; -{$IFDEF USESYNAPSE} - FSynapseTCP := TTCPBlockSocket.Create; -{$ELSE} - FTCP := TIdTCPClient.Create(nil); -{$ENDIF} - FTransactions := TStringList.Create; -end; - -procedure TStompClient.MergeHeaders(var AFrame: IStompFrame; var AHeaders: IStompHeaders); -var - i: Integer; - h: TKeyValue; -begin - if Assigned(AHeaders) then - if AHeaders.Count > 0 then - for i := 0 to AHeaders.Count - 1 do - begin - h := AHeaders.GetAt(i); - AFrame.GetHeaders.Add(h.Key, h.Value); - end; -end; - -procedure TStompClient.Receipt(const ReceiptID: string); -var - Frame: IStompFrame; -begin - if Receive(Frame, FReceiptTimeout) then - begin - if Frame.GetCommand <> 'RECEIPT' then - raise EStomp.Create('Receipt command error'); - if Frame.GetHeaders.Value('receipt-id') <> ReceiptID then - raise EStomp.Create('Receipt receipt-id error'); - end; -end; - -function TStompClient.Receive(out StompFrame: IStompFrame; ATimeout: Integer): boolean; -begin - StompFrame := nil; - StompFrame := Receive(ATimeout); - Result := Assigned(StompFrame); -end; - -function TStompClient.Receive(ATimeout: Integer): IStompFrame; - -{$IFDEF USESYNAPSE} - function InternalReceiveSynapse(ATimeout: Integer): IStompFrame; - var - c: char; - s: string; - tout: boolean; - begin - tout := False; - Result := nil; - try - try - FSynapseTCP.SetRecvTimeout(ATimeout); - s := ''; - try - while True do - begin - c := Chr(FSynapseTCP.RecvByte(ATimeout)); - if FSynapseTCP.LastError = WSAETIMEDOUT then - raise ESynapseTimeout.Create(FSynapseTCP.LastErrorDesc); - if c <> CHAR0 then - s := s + c - else - begin - c := Chr(FSynapseTCP.RecvByte(ATimeout)); - if FSynapseTCP.LastError = WSAETIMEDOUT then - raise ESynapseTimeout.Create('' { FSynapseTCP.LastErrorDesc } ); - Break; - end; - end; - except - on E: ESynapseTimeout do - begin - tout := True; - end; - on E: Exception do - begin - raise ; - end; - end; - if not tout then - begin - Result := StompUtils.CreateFrame(s + CHAR0); - end; - finally - s:=''; - end; - except - on E: Exception do - begin - raise ; - end; - end; - end; -{$ELSE} - function InternalReceiveINDY(ATimeout: Integer): IStompFrame; - var - c: char; - sb: TStringBuilder; - tout: boolean; - begin - tout := False; - Result := nil; - try - try - sb := TStringBuilder.Create(1024); - FTCP.ReadTimeout := ATimeout; - try - FTCP.IOHandler.CheckForDataOnSource(1); - while True do - begin - c := FTCP.IOHandler.ReadChar; - if c <> CHAR0 then - sb.Append(c) - else - begin - FTCP.IOHandler.ReadChar; - Break; - end; - end; - except - on E: EIdReadTimeout do - begin tout := True; - end; - on E: Exception do - begin - raise ; - end; - end; - if not tout then - begin - Result := StompUtils.CreateFrame(sb.ToString + CHAR0); - end; - finally - sb.Free; - end; - except - on E: Exception do - begin - raise ; - end; - end; - end; -{$ENDIF} -begin -{$IFDEF USESYNAPSE} - result:=InternalReceiveSynapse(ATimeout); -{$ELSE} - result:=InternalReceiveINDY(ATimeout); -{$ENDIF} -end; - -function TStompClient.Receive: IStompFrame; -begin - Result := Receive(FTimeout); -end; - -procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders); -var - Frame: IStompFrame; -begin - Frame := TStompFrame.Create; - Frame.SetCommand('SEND'); - Frame.GetHeaders.Add('destination', QueueOrTopicName); - Frame.SetBody(TextMessage); - MergeHeaders(Frame, Headers); - SendFrame(Frame); -end; - -procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string; - TransactionIdentifier: string; Headers: IStompHeaders); -var - Frame: IStompFrame; -begin - Frame := TStompFrame.Create; - Frame.SetCommand('SEND'); - Frame.GetHeaders.Add('destination', QueueOrTopicName); - Frame.GetHeaders.Add('transaction', TransactionIdentifier); - Frame.SetBody(TextMessage); - MergeHeaders(Frame, Headers); - SendFrame(Frame); -end; - -procedure TStompClient.SendFrame(AFrame: IStompFrame); -begin -{$IFDEF USESYNAPSE} - FSynapseTCP.SendString(AFrame.output); -{$ELSE} - FTCP.IOHandler.write(TEncoding.ASCII.GetBytes(AFrame.output)); -{$ENDIF} -end; - -function TStompClient.SetPassword(const Value: string): IStompClient; -begin - FPassword := Value; - Result := Self; -end; - -procedure TStompClient.SetReceiptTimeout(const Value: Integer); -begin - FReceiptTimeout := Value; -end; - -function TStompClient.SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; -begin - FTimeout := AMilliSeconds; - Result := Self; -end; - -function TStompClient.SetUserName(const Value: string): IStompClient; -begin - FUserName := Value; - Result := Self; -end; - -procedure TStompClient.Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; - Headers: IStompHeaders = nil); -var - Frame: IStompFrame; -begin - Frame := TStompFrame.Create; - Frame.SetCommand('SUBSCRIBE'); - Frame.GetHeaders.Add('destination', QueueOrTopicName).Add('ack', StompUtils.AckModeToStr(Ack)); - if Headers <> nil then - MergeHeaders(Frame, Headers); - SendFrame(Frame); -end; - -procedure TStompClient.Unsubscribe(Queue: string); -var - Frame: IStompFrame; -begin - Frame := TStompFrame.Create; - Frame.SetCommand('UNSUBSCRIBE'); - Frame.GetHeaders.Add('destination', Queue); - SendFrame(Frame); -end; - +{ } +{ Stomp Client for Embarcadero Delphi & FreePascal } +{ Tested With ApacheMQ 5.2/5.3 } +{ Copyright (c) 2009-2009 Daniele Teti } +{ } +{ Contributors: } +{ Daniel Gaspary: dgaspary@gmail.com } +{ } +{ WebSite: www.danieleteti.it } +{ email:d.teti@bittime.it } +{ ******************************************************* } + +unit StompClient; + +// For FreePascal users: +// Automatically selected synapse tcp library +{$IFDEF FPC} +{$MODE DELPHI} +{$DEFINE USESYNAPSE} +{$ENDIF} +// For Delphi users: +// Decomment following line to use synapse also in Delphi +{$DEFINE USESYNAPSE} + +interface + +uses + StompTypes, + SysUtils, +{$IFNDEF USESYNAPSE} + IdTCPClient, + IdException, + IdExceptionCore, +{$ELSE} + synsock, + blcksock, +{$ENDIF} + Classes; + +type +{$IFDEF USESYNAPSE} + ESynapseTimeout = Exception; +{$ENDIF} + + TStompClient = class(TInterfacedObject, IStompClient) + private +{$IFDEF USESYNAPSE} + FSynapseTCP: TTCPBlockSocket; + FSynapseConnected: boolean; +{$ELSE} + FTCP: TIdTCPClient; +{$ENDIF} + FHeaders: IStompHeaders; + FPassword: string; + FUserName: string; + FTimeout: Integer; + FSession: string; + FInTransaction: boolean; + FTransactions: TStringList; + FReceiptTimeout: Integer; + procedure SetReceiptTimeout(const Value: Integer); + + protected + procedure Init; + procedure DeInit; + procedure MergeHeaders(var AFrame: IStompFrame; var AHeaders: IStompHeaders); + procedure SendFrame(AFrame: IStompFrame); + + public + function SetPassword(const Value: string): IStompClient; + function SetUserName(const Value: string): IStompClient; + function Receive(out StompFrame: IStompFrame; ATimeout: Integer): boolean; overload; + function Receive: IStompFrame; overload; + function Receive(ATimeout: Integer): IStompFrame; overload; + procedure Receipt(const ReceiptID: string); + procedure Connect(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT; + ClientID: string = ''); + procedure Disconnect; + procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; + Headers: IStompHeaders = nil); + procedure Unsubscribe(Queue: string); + procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil); + overload; + procedure Send(QueueOrTopicName: string; TextMessage: string; TransactionIdentifier: string; + Headers: IStompHeaders = nil); overload; + procedure Ack(const MessageID: string; const TransactionIdentifier: string = ''); + procedure BeginTransaction(const TransactionIdentifier: string); + procedure CommitTransaction(const TransactionIdentifier: string); + procedure AbortTransaction(const TransactionIdentifier: string); + /// //////////// + constructor Create; virtual; + destructor Destroy; override; + function Connected: boolean; + function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; + property Session: string read FSession; + property ReceiptTimeout: Integer read FReceiptTimeout write SetReceiptTimeout; + property Transactions: TStringList read FTransactions; + end; + +implementation + +{$IFDEF FPC} + +const + CHAR0 = #0; +{$ELSE} + +uses + Windows, + IdGlobal, + Character; +{$ENDIF} +{ TStompClient } + +procedure TStompClient.AbortTransaction(const TransactionIdentifier: string); +var + Frame: IStompFrame; +begin + if FTransactions.IndexOf(TransactionIdentifier) > -1 then + begin + Frame := TStompFrame.Create; + Frame.SetCommand('ABORT'); + Frame.GetHeaders.Add('transaction', TransactionIdentifier); + SendFrame(Frame); + FInTransaction := False; + FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); + end + else + raise EStomp.CreateFmt('Abort Transaction Error. Transaction [%s] not found', + [TransactionIdentifier]); +end; + +procedure TStompClient.Ack(const MessageID: string; const TransactionIdentifier: string); +var + Frame: IStompFrame; +begin + Frame := TStompFrame.Create; + Frame.SetCommand('ACK'); + Frame.GetHeaders.Add('message-id', MessageID); + if TransactionIdentifier <> '' then + Frame.GetHeaders.Add('transaction', TransactionIdentifier); + SendFrame(Frame); +end; + +procedure TStompClient.BeginTransaction(const TransactionIdentifier: string); +var + Frame: IStompFrame; +begin + if FTransactions.IndexOf(TransactionIdentifier) = -1 then + begin + Frame := TStompFrame.Create; + Frame.SetCommand('BEGIN'); + Frame.GetHeaders.Add('transaction', TransactionIdentifier); + SendFrame(Frame); + // CheckReceipt(Frame); + FInTransaction := True; + FTransactions.Add(TransactionIdentifier); + end + else + raise EStomp.CreateFmt('Begin Transaction Error. Transaction [%s] still open', + [TransactionIdentifier]); +end; + +// procedure TStompClient.CheckReceipt(Frame: TStompFrame); +// var +// ReceiptID: string; +// begin +// if FEnableReceipts then +// begin +// ReceiptID := inttostr(GetTickCount); +// Frame.GetHeaders.Add('receipt', ReceiptID); +// SendFrame(Frame); +// Receipt(ReceiptID); +// end +// else +// SendFrame(Frame); +// end; + +procedure TStompClient.CommitTransaction(const TransactionIdentifier: string); +var + Frame: IStompFrame; +begin + if FTransactions.IndexOf(TransactionIdentifier) > -1 then + begin + Frame := TStompFrame.Create; + Frame.SetCommand('COMMIT'); + Frame.GetHeaders.Add('transaction', TransactionIdentifier); + SendFrame(Frame); + FInTransaction := False; + FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); + end + else + raise EStomp.CreateFmt('Commit Transaction Error. Transaction [%s] not found', + [TransactionIdentifier]); +end; + +procedure TStompClient.Connect(Host: string; Port: Integer; ClientID: string); +var + Frame: IStompFrame; +begin + try + Init; +{$IFDEF USESYNAPSE} + FSynapseTCP.Connect(Host, intToStr(Port)); + FSynapseConnected := True; +{$ELSE} + FTCP.Connect(Host, Port); + FTCP.IOHandler.MaxLineLength := MaxInt; +{$ENDIF} + Frame := TStompFrame.Create; + Frame.SetCommand('CONNECT'); + Frame.GetHeaders.Add('login', FUserName).Add('passcode', FPassword); + if ClientID <> '' then + Frame.GetHeaders.Add('client-id', ClientID); + SendFrame(Frame); + Frame := nil; + while Frame = nil do + Frame := Receive; + if Frame.GetCommand = 'ERROR' then + raise EStomp.Create(Frame.output); + if Frame.GetCommand = 'CONNECTED' then + begin + FSession := Frame.GetHeaders.Value('session'); + end; + { todo: 'Call event?' } + except + on E: Exception do + begin + raise EStomp.Create(E.message); + end; + end; +end; + +function TStompClient.Connected: boolean; +begin +{$IFDEF USESYNAPSE} + Result := Assigned(FSynapseTCP) and FSynapseConnected; +{$ELSE} + Result := Assigned(FTCP) and FTCP.Connected; +{$ENDIF} +end; + +constructor TStompClient.Create; +begin + inherited; + FInTransaction := False; + FSession := ''; + FUserName := 'guest'; + FPassword := 'guest'; + FHeaders := TStompHeaders.Create; + FTimeout := 200; + FReceiptTimeout := FTimeout; +end; + +procedure TStompClient.DeInit; +begin +{$IFDEF USESYNAPSE} + FreeAndNil(FSynapseTCP); +{$ELSE} + FreeAndNil(FTCP); +{$ENDIF} + FreeAndNil(FTransactions); +end; + +destructor TStompClient.Destroy; +begin + DeInit; + inherited; +end; + +procedure TStompClient.Disconnect; +var + Frame: IStompFrame; +begin + if Connected then + begin + Frame := TStompFrame.Create; + Frame.SetCommand('DISCONNECT'); + SendFrame(Frame); +{$IFDEF USESYNAPSE} + FSynapseTCP.CloseSocket; + FSynapseConnected := False; +{$ELSE} + FTCP.Disconnect; +{$ENDIF} + end; + DeInit; +end; + +procedure TStompClient.Init; +begin + DeInit; +{$IFDEF USESYNAPSE} + FSynapseTCP := TTCPBlockSocket.Create; +{$ELSE} + FTCP := TIdTCPClient.Create(nil); +{$ENDIF} + FTransactions := TStringList.Create; +end; + +procedure TStompClient.MergeHeaders(var AFrame: IStompFrame; var AHeaders: IStompHeaders); +var + i: Integer; + h: TKeyValue; +begin + if Assigned(AHeaders) then + if AHeaders.Count > 0 then + for i := 0 to AHeaders.Count - 1 do + begin + h := AHeaders.GetAt(i); + AFrame.GetHeaders.Add(h.Key, h.Value); + end; +end; + +procedure TStompClient.Receipt(const ReceiptID: string); +var + Frame: IStompFrame; +begin + if Receive(Frame, FReceiptTimeout) then + begin + if Frame.GetCommand <> 'RECEIPT' then + raise EStomp.Create('Receipt command error'); + if Frame.GetHeaders.Value('receipt-id') <> ReceiptID then + raise EStomp.Create('Receipt receipt-id error'); + end; +end; + +function TStompClient.Receive(out StompFrame: IStompFrame; ATimeout: Integer): boolean; +begin + StompFrame := nil; + StompFrame := Receive(ATimeout); + Result := Assigned(StompFrame); +end; + +function TStompClient.Receive(ATimeout: Integer): IStompFrame; +{$IFDEF USESYNAPSE} + function InternalReceiveSynapse(ATimeout: Integer): IStompFrame; + var + c: char; + s: string; + tout: boolean; + begin + tout := False; + Result := nil; + try + try + FSynapseTCP.SetRecvTimeout(ATimeout); + s := ''; + try + while True do + begin + c := Chr(FSynapseTCP.RecvByte(ATimeout)); + if FSynapseTCP.LastError = WSAETIMEDOUT then + raise ESynapseTimeout.Create(FSynapseTCP.LastErrorDesc); + if c <> CHAR0 then + s := s + c //should be improved with a string buffer (daniele.teti) + else + begin + c := Chr(FSynapseTCP.RecvByte(ATimeout)); + if FSynapseTCP.LastError = WSAETIMEDOUT then + raise ESynapseTimeout.Create('' { FSynapseTCP.LastErrorDesc } ); + Break; + end; + end; + except + on E: ESynapseTimeout do + begin + tout := True; + end; + on E: Exception do + begin + raise ; + end; + end; + if not tout then + begin + Result := StompUtils.CreateFrame(s + CHAR0); + end; + finally + s := ''; + end; + except + on E: Exception do + begin + raise ; + end; + end; + end; +{$ELSE} + function InternalReceiveINDY(ATimeout: Integer): IStompFrame; + var + c: char; + sb: TStringBuilder; + tout: boolean; + begin + tout := False; + Result := nil; + try + try + sb := TStringBuilder.Create(1024); + FTCP.ReadTimeout := ATimeout; + try + FTCP.IOHandler.CheckForDataOnSource(1); + while True do + begin + c := FTCP.IOHandler.ReadChar; + if c <> CHAR0 then + sb.Append(c) + else + begin + FTCP.IOHandler.ReadChar; + Break; + end; + end; + except + on E: EIdReadTimeout do + begin + tout := True; + end; + on E: Exception do + begin + raise ; + end; + end; + if not tout then + begin + Result := StompUtils.CreateFrame(sb.ToString + CHAR0); + end; + finally + sb.Free; + end; + except + on E: Exception do + begin + raise ; + end; + end; + end; +{$ENDIF} + +begin +{$IFDEF USESYNAPSE} + Result := InternalReceiveSynapse(ATimeout); +{$ELSE} + Result := InternalReceiveINDY(ATimeout); +{$ENDIF} +end; + +function TStompClient.Receive: IStompFrame; +begin + Result := Receive(FTimeout); +end; + +procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders); +var + Frame: IStompFrame; +begin + Frame := TStompFrame.Create; + Frame.SetCommand('SEND'); + Frame.GetHeaders.Add('destination', QueueOrTopicName); + Frame.SetBody(TextMessage); + MergeHeaders(Frame, Headers); + SendFrame(Frame); +end; + +procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string; + TransactionIdentifier: string; Headers: IStompHeaders); +var + Frame: IStompFrame; +begin + Frame := TStompFrame.Create; + Frame.SetCommand('SEND'); + Frame.GetHeaders.Add('destination', QueueOrTopicName); + Frame.GetHeaders.Add('transaction', TransactionIdentifier); + Frame.SetBody(TextMessage); + MergeHeaders(Frame, Headers); + SendFrame(Frame); +end; + +procedure TStompClient.SendFrame(AFrame: IStompFrame); +begin +{$IFDEF USESYNAPSE} + FSynapseTCP.SendString(AFrame.output); +{$ELSE} + FTCP.IOHandler.write(TEncoding.ASCII.GetBytes(AFrame.output)); +{$ENDIF} +end; + +function TStompClient.SetPassword(const Value: string): IStompClient; +begin + FPassword := Value; + Result := Self; +end; + +procedure TStompClient.SetReceiptTimeout(const Value: Integer); +begin + FReceiptTimeout := Value; +end; + +function TStompClient.SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; +begin + FTimeout := AMilliSeconds; + Result := Self; +end; + +function TStompClient.SetUserName(const Value: string): IStompClient; +begin + FUserName := Value; + Result := Self; +end; + +procedure TStompClient.Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; + Headers: IStompHeaders = nil); +var + Frame: IStompFrame; +begin + Frame := TStompFrame.Create; + Frame.SetCommand('SUBSCRIBE'); + Frame.GetHeaders.Add('destination', QueueOrTopicName).Add('ack', StompUtils.AckModeToStr(Ack)); + if Headers <> nil then + MergeHeaders(Frame, Headers); + SendFrame(Frame); +end; + +procedure TStompClient.Unsubscribe(Queue: string); +var + Frame: IStompFrame; +begin + Frame := TStompFrame.Create; + Frame.SetCommand('UNSUBSCRIBE'); + Frame.GetHeaders.Add('destination', Queue); + SendFrame(Frame); +end; + end. diff --git a/StompTypes.pas b/StompTypes.pas index c31ffcad..67aa612e 100644 --- a/StompTypes.pas +++ b/StompTypes.pas @@ -1,499 +1,515 @@ { ******************************************************* } -{ } -{ Stomp Client for Embarcadero Delphi & FreePascal } -{ Tested With ApacheMQ 5.2/5.3 } -{ Copyright (c) 2009-2009 Daniele Teti } -{ } -{ Contributors: } -{ Daniel Gaspary: dgaspary@gmail.com } -{ } -{ WebSite: www.danieleteti.it } -{ email:d.teti@bittime.it } -{ ******************************************************* } - -unit StompTypes; - -{$ifdef FPC} - {$MODE DELPHI} -{$endif} - -interface - -uses - SysUtils, - Classes; - -const - LINE_END: char = #10; - COMMAND_END: char = #0; - DEFAULT_STOMP_PORT = 61613; - -type - TAckMode = (amAuto, amClient); - - EStomp = class(Exception) - end; - - TKeyValue = record - Key: string; - Value: string; - end; - - PKeyValue = ^TKeyValue; - - IStompHeaders = interface - ['{BD087D9D-0576-4C35-88F9-F5D6348E3894}'] - function Add(Key, Value: string): IStompHeaders; overload; - function Add(HeaderItem: TKeyValue): IStompHeaders; overload; - function Value(Key: string): string; - function Remove(Key: string): IStompHeaders; - function IndexOf(Key: string): Integer; - function Count: Cardinal; - function GetAt(const Index: Integer): TKeyValue; - function Output: String; - end; - - IStompFrame = interface - ['{68274885-D3C3-4890-A058-03B769B2191E}'] - function Output: string; - procedure SetHeaders(const Value: IStompHeaders); - function GetCommand: string; - procedure SetCommand(const Value: string); - function GetBody: string; - procedure SetBody(const Value: string); - function GetHeaders: IStompHeaders; - end; - - IStompClient = interface - ['{EDE6EF1D-59EE-4FCC-9CD7-B183E606D949}'] - function Receive(out StompFrame: IStompFrame; ATimeout: Integer): Boolean; overload; - function Receive: IStompFrame; overload; - function Receive(ATimeout: Integer): IStompFrame; overload; - procedure Receipt(const ReceiptID: string); - procedure Connect(Host: string = '127.0.0.1'; Port: Integer = 61613; ClientID: string = ''); - procedure Disconnect; - procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; Headers: IStompHeaders = nil); - procedure Unsubscribe(Queue: string); - procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil); overload; - procedure Send(QueueOrTopicName: string; TextMessage: string; TransactionIdentifier: string; - Headers: IStompHeaders = nil); overload; - procedure Ack(const MessageID: string; const TransactionIdentifier: string = ''); - procedure BeginTransaction(const TransactionIdentifier: string); - procedure CommitTransaction(const TransactionIdentifier: string); - procedure AbortTransaction(const TransactionIdentifier: string); - /// //////////// - function SetPassword(const Value: string): IStompClient; - function SetUserName(const Value: string): IStompClient; - function SetReceiveTimeout(const AMilliSeconds: cardinal): IStompClient; - function Connected: Boolean; - end; - - TStompHeaders = class(TInterfacedObject, IStompHeaders) - private - FList: TList; - function GetItems(index: Cardinal): TKeyValue; - procedure SetItems(index: Cardinal; const Value: TKeyValue); - - public - class function NewDurableSubscriptionHeader(const SubscriptionName: String): TKeyValue; - class function NewPersistentHeader(const Value: Boolean): TKeyValue; - class function NewReplyToHeader(const DestinationName: String): TKeyValue; - /// /////////////////////////////////////////////7 - function Add(Key, Value: string): IStompHeaders; overload; - function Add(HeaderItem: TKeyValue): IStompHeaders; overload; - function Value(Key: string): string; - function Remove(Key: string): IStompHeaders; - function IndexOf(Key: string): Integer; - function Count: Cardinal; - function GetAt(const Index: Integer): TKeyValue; - constructor Create; - destructor Destroy; override; - function Output: String; - property Items[index: Cardinal]: TKeyValue read GetItems write SetItems; default; - end; - - TStompFrame = class(TInterfacedObject, IStompFrame) - private - FCommand: string; - FBody: string; - FHeaders: IStompHeaders; - procedure SetHeaders(const Value: IStompHeaders); - function GetCommand: string; - procedure SetCommand(const Value: string); - function GetBody: string; - procedure SetBody(const Value: string); - function GetHeaders: IStompHeaders; - - public - constructor Create; - destructor Destroy; override; - property Command: string read GetCommand write SetCommand; - property Body: string read GetBody write SetBody; - // return '', when Key doesn't exist or Value of Key is '' - // otherwise, return Value; - function Output: string; - property Headers: IStompHeaders read GetHeaders write SetHeaders; - end; - - TAddress = record - Host: string; - Port: Integer; - UserName: string; - Password: string; - end; - - TAddresses = array of TAddress; - - IStompClientListener = interface - ['{C4C0D932-8994-43FB-9D32-A03FE86AEFE4}'] - procedure OnMessage(StompFrame: IStompFrame); - end; - - TStompClientListener = class(TThread) - strict protected - FStompClientListener: IStompClientListener; - FStompClient: IStompClient; - procedure Execute; override; - - public - constructor Create(StompClient: IStompClient; StompClientListener: IStompClientListener); - procedure StopListening; - end; - -type - StompUtils = class - class function CreateFrame(Buf: string): TStompFrame; - class function AckModeToStr(AckMode: TAckMode): string; - class function NewHeaders: IStompHeaders; - class function NewFrame: IStompFrame; - class function TimestampAsDateTime(const HeaderValue: String): TDateTime; - end; - -implementation - -uses - Dateutils; - -class function TStompHeaders.NewDurableSubscriptionHeader(const SubscriptionName: String): TKeyValue; -begin - Result.Key := 'activemq.subscriptionName'; - Result.Value := SubscriptionName; -end; - -class function TStompHeaders.NewPersistentHeader(const Value: Boolean): TKeyValue; -begin - Result.Key := 'persistent'; - Result.Value := LowerCase(BoolToStr(Value, true)); -end; - -class function TStompHeaders.NewReplyToHeader(const DestinationName: String): TKeyValue; -begin - Result.Key := 'reply-to'; - Result.Value := DestinationName; -end; - -class function StompUtils.NewHeaders: IStompHeaders; -begin - Result := TStompHeaders.Create; -end; - -class function StompUtils.TimestampAsDateTime(const HeaderValue: String): TDateTime; -begin - Result := EncodeDateTime(1970, 1, 1, 0, 0, 0, 0) + StrToInt64(HeaderValue) / 86400000; -end; - -class function StompUtils.AckModeToStr(AckMode: TAckMode): string; -begin - case AckMode of - amAuto: - Result := 'auto'; - amClient: - Result := 'client'; - else - raise EStomp.Create('Unknown AckMode'); - end; -end; - -constructor TStompFrame.Create; -begin - FHeaders := TStompHeaders.Create; - self.FCommand := ''; - self.FBody := ''; -end; - -destructor TStompFrame.Destroy; -begin - inherited; -end; - -function TStompFrame.GetBody: string; -begin - Result := FBody; -end; - -function TStompFrame.GetCommand: string; -begin - Result := FCommand; -end; - -function TStompFrame.GetHeaders: IStompHeaders; -begin - Result := FHeaders; -end; - -function TStompFrame.Output: String; -begin - Result := FCommand + LINE_END + FHeaders.Output + LINE_END + FBody + LINE_END + COMMAND_END; -end; - -procedure TStompFrame.SetBody(const Value: string); -begin - FBody := Value; -end; - -procedure TStompFrame.SetCommand(const Value: string); -begin - FCommand := Value; -end; - -procedure TStompFrame.SetHeaders(const Value: IStompHeaders); -begin - FHeaders := Value; -end; - -function GetLine(Buf: string; var From: Integer): string; -var - i: Integer; -begin - if (From > Length(Buf)) then - raise EStomp.Create('From out of bound.'); - - i := From; - - while (i <= Length(Buf)) do - begin - if (Buf[i] <> LINE_END) then - inc(i) - else - break; - end; - - if (Buf[i] = LINE_END) then - begin - Result := Copy(Buf, From, i - From); - From := i + 1; - exit; - end - else - raise EStomp.Create('End of Line not found.'); -end; - -class function StompUtils.CreateFrame(Buf: string): TStompFrame; -var - line: string; - i: Integer; - p: Integer; - Key, Value: string; - other: string; - contLen: Integer; - sContLen: string; -begin - Result := TStompFrame.Create; - i := 1; - try - Result.Command := GetLine(Buf, i); - while (true) do - begin - line := GetLine(Buf, i); - if (line = '') then - break; - p := Pos(':', line); - if (p = 0) then - raise Exception.Create('header line error'); - Key := Copy(line, 1, p - 1); - Value := Copy(line, p + 1, Length(line) - p); - Result.Headers.Add(Key, Value); - end; - other := Copy(Buf, i, High(Integer)); - sContLen := Result.Headers.Value('content-length'); - if (sContLen <> '') then - begin - contLen := StrToInt(sContLen); - if Length(other) < contLen + 2 then - raise EStomp.Create('frame too short'); - if Copy(other, contLen + 1, 2) <> COMMAND_END + LINE_END then - raise Exception.Create('frame ending error'); - Result.Body := Copy(other, 1, contLen); - // Buf := Copy(other, contLen + 3, High(Integer)); - end - else - begin - p := Pos(COMMAND_END, other); - if (p = 0) then - raise EStomp.Create('frame no ending'); - Result.Body := Copy(other, 1, p - 2); - // Buf := Copy(other, p + 2, High(Integer)); - end; - except - on EStomp do - begin - // ignore - Result.Free; - Result := nil; - end; - on e: Exception do - begin - Result.Free; - raise EStomp.Create(e.Message); - end; - end; -end; - -class function StompUtils.NewFrame: IStompFrame; -begin - Result := TStompFrame.Create; -end; - -{ TStompHeaders } - -function TStompHeaders.Add(Key, Value: string): IStompHeaders; -var - p: PKeyValue; -begin - New(p); - p^.Key := Key; - p^.Value := Value; - FList.Add(p); - Result := self; -end; - -function TStompHeaders.Add(HeaderItem: TKeyValue): IStompHeaders; -begin - Result := Add(HeaderItem.Key, HeaderItem.Value); -end; - -function TStompHeaders.Count: Cardinal; -begin - Result := FList.Count; -end; - -constructor TStompHeaders.Create; -begin - inherited; - FList := TList.Create; -end; - -destructor TStompHeaders.Destroy; -var - i: Integer; -begin - if FList.Count > 0 then - for i := 0 to FList.Count - 1 do - FreeMem(PKeyValue(FList[i])); - FList.Free; - inherited; -end; - -function TStompHeaders.GetAt(const Index: Integer): TKeyValue; -begin - Result := GetItems(Index) -end; - -function TStompHeaders.GetItems(index: Cardinal): TKeyValue; -begin - Result := PKeyValue(FList[index])^; -end; - -function TStompHeaders.IndexOf(Key: string): Integer; -var - i: Integer; -begin - Result := -1; - for i := 0 to FList.Count - 1 do - begin - if GetItems(i).Key = Key then - begin - Result := i; - break; - end; - end; -end; - -function TStompHeaders.Output: String; -var - i: Integer; - kv: TKeyValue; -begin - Result := ''; - if FList.Count > 0 then - for i := 0 to FList.Count - 1 do - begin - kv := Items[i]; - Result := Result + kv.Key + ':' + kv.Value + LINE_END; - end - else - Result := LINE_END; -end; - -function TStompHeaders.Remove(Key: string): IStompHeaders; -var - p: Integer; -begin - p := IndexOf(Key); - FreeMem(PKeyValue(FList[p])); - FList.Delete(p); - Result := self; -end; - -procedure TStompHeaders.SetItems(index: Cardinal; const Value: TKeyValue); -var - p: Integer; -begin - p := IndexOf(Value.Key); - if p > -1 then - begin - PKeyValue(FList[p])^.Key := Value.Key; - PKeyValue(FList[p])^.Value := Value.Value; - end - else - raise EStomp.Create('Error SetItems'); -end; - -function TStompHeaders.Value(Key: string): string; -var - i: Integer; -begin - Result := ''; - i := IndexOf(Key); - if i > -1 then - Result := GetItems(i).Value; -end; - -{ TStompListener } - -constructor TStompClientListener.Create(StompClient: IStompClient; StompClientListener: IStompClientListener); -begin - inherited Create(true); - FStompClientListener := StompClientListener; - FStompClient := StompClient; - Resume; -end; - -procedure TStompClientListener.Execute; -var - frame: IStompFrame; -begin - while not terminated do - begin - if FStompClient.Receive(frame, 2000) then - FStompClientListener.OnMessage(frame); - end; -end; - -procedure TStompClientListener.StopListening; -begin - Terminate; - WaitFor; -end; - +{ } +{ Stomp Client for Embarcadero Delphi & FreePascal } +{ Tested With ApacheMQ 5.2/5.3 } +{ Copyright (c) 2009-2009 Daniele Teti } +{ } +{ Contributors: } +{ Daniel Gaspary: dgaspary@gmail.com } +{ } +{ WebSite: www.danieleteti.it } +{ email:d.teti@bittime.it } +{ ******************************************************* } + +unit StompTypes; +{$IFDEF FPC} +{$MODE DELPHI} +{$ENDIF} + +interface + +uses + SysUtils, + Classes; + +const + LINE_END: char = #10; + COMMAND_END: char = #0; + DEFAULT_STOMP_PORT = 61613; + +type + TAckMode = (amAuto, amClient); + + EStomp = class(Exception) + end; + + TKeyValue = record + Key: string; + Value: string; + end; + + PKeyValue = ^TKeyValue; + + IStompHeaders = interface + ['{BD087D9D-0576-4C35-88F9-F5D6348E3894}'] + function Add(Key, Value: string): IStompHeaders; overload; + function Add(HeaderItem: TKeyValue): IStompHeaders; overload; + function Value(Key: string): string; + function Remove(Key: string): IStompHeaders; + function IndexOf(Key: string): Integer; + function Count: Cardinal; + function GetAt(const index: Integer): TKeyValue; + function Output: string; + end; + + IStompFrame = interface + ['{68274885-D3C3-4890-A058-03B769B2191E}'] + function Output: string; + procedure SetHeaders(const Value: IStompHeaders); + function GetCommand: string; + procedure SetCommand(const Value: string); + function GetBody: string; + procedure SetBody(const Value: string); + function GetHeaders: IStompHeaders; + end; + + IStompClient = interface + ['{EDE6EF1D-59EE-4FCC-9CD7-B183E606D949}'] + function Receive(out StompFrame: IStompFrame; ATimeout: Integer): Boolean; overload; + function Receive: IStompFrame; overload; + function Receive(ATimeout: Integer): IStompFrame; overload; + procedure Receipt(const ReceiptID: string); + procedure Connect(Host: string = '127.0.0.1'; Port: Integer = 61613; ClientID: string = ''); + procedure Disconnect; + procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; + Headers: IStompHeaders = nil); + procedure Unsubscribe(Queue: string); + procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil); + overload; + procedure Send(QueueOrTopicName: string; TextMessage: string; TransactionIdentifier: string; + Headers: IStompHeaders = nil); overload; + procedure Ack(const MessageID: string; const TransactionIdentifier: string = ''); + procedure BeginTransaction(const TransactionIdentifier: string); + procedure CommitTransaction(const TransactionIdentifier: string); + procedure AbortTransaction(const TransactionIdentifier: string); + /// //////////// + function SetPassword(const Value: string): IStompClient; + function SetUserName(const Value: string): IStompClient; + function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; + function Connected: Boolean; + end; + + TStompHeaders = class(TInterfacedObject, IStompHeaders) + private + FList: TList; + function GetItems(index: Cardinal): TKeyValue; + procedure SetItems(index: Cardinal; const Value: TKeyValue); + + public + class function NewDurableSubscriptionHeader(const SubscriptionName: string): TKeyValue; + class function NewPersistentHeader(const Value: Boolean): TKeyValue; + class function NewReplyToHeader(const DestinationName: string): TKeyValue; + /// /////////////////////////////////////////////7 + function Add(Key, Value: string): IStompHeaders; overload; + function Add(HeaderItem: TKeyValue): IStompHeaders; overload; + function Value(Key: string): string; + function Remove(Key: string): IStompHeaders; + function IndexOf(Key: string): Integer; + function Count: Cardinal; + function GetAt(const index: Integer): TKeyValue; + constructor Create; + destructor Destroy; override; + function Output: string; + property Items[index: Cardinal]: TKeyValue read GetItems write SetItems; default; + end; + + TStompFrame = class(TInterfacedObject, IStompFrame) + private + FCommand: string; + FBody: string; + FHeaders: IStompHeaders; + procedure SetHeaders(const Value: IStompHeaders); + function GetCommand: string; + procedure SetCommand(const Value: string); + function GetBody: string; + procedure SetBody(const Value: string); + function GetHeaders: IStompHeaders; + + public + constructor Create; + destructor Destroy; override; + property Command: string read GetCommand write SetCommand; + property Body: string read GetBody write SetBody; + // return '', when Key doesn't exist or Value of Key is '' + // otherwise, return Value; + function Output: string; + property Headers: IStompHeaders read GetHeaders write SetHeaders; + end; + + TAddress = record + Host: string; + Port: Integer; + UserName: string; + Password: string; + end; + + TAddresses = array of TAddress; + + IStompClientListener = interface + ['{C4C0D932-8994-43FB-9D32-A03FE86AEFE4}'] + procedure OnMessage(StompFrame: IStompFrame); + end; + + TStompClientListener = class(TThread) + strict protected + FStompClientListener: IStompClientListener; + FStompClient: IStompClient; + procedure Execute; override; + + public + constructor Create(StompClient: IStompClient; StompClientListener: IStompClientListener); + procedure StopListening; + end; + +type + StompUtils = class + class function CreateFrame(Buf: string): TStompFrame; + class function AckModeToStr(AckMode: TAckMode): string; + class function NewHeaders: IStompHeaders; + class function NewFrame: IStompFrame; + class function TimestampAsDateTime(const HeaderValue: string): TDateTime; + class function NewStomp(Host: string = '127.0.0.1'; + Port: Integer = DEFAULT_STOMP_PORT; ClientID: string = ''; const UserName: string = 'guest'; + const Password: string = 'guest'): IStompClient; + end; + +implementation + +uses + Dateutils, StompClient; + +class function StompUtils.NewStomp(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT; + ClientID: string = ''; const UserName: string = 'guest'; const Password: string = 'guest') + : IStompClient; +begin + Result := TStompClient.Create; + Result.SetUserName(UserName); + Result.SetPassword(Password); + Result.Connect(Host, Port, ClientID); +end; + +class function TStompHeaders.NewDurableSubscriptionHeader(const SubscriptionName: string) + : TKeyValue; +begin + Result.Key := 'activemq.subscriptionName'; + Result.Value := SubscriptionName; +end; + +class function TStompHeaders.NewPersistentHeader(const Value: Boolean): TKeyValue; +begin + Result.Key := 'persistent'; + Result.Value := LowerCase(BoolToStr(Value, true)); +end; + +class function TStompHeaders.NewReplyToHeader(const DestinationName: string): TKeyValue; +begin + Result.Key := 'reply-to'; + Result.Value := DestinationName; +end; + +class function StompUtils.NewHeaders: IStompHeaders; +begin + Result := TStompHeaders.Create; +end; + +class function StompUtils.TimestampAsDateTime(const HeaderValue: string): TDateTime; +begin + Result := EncodeDateTime(1970, 1, 1, 0, 0, 0, 0) + StrToInt64(HeaderValue) / 86400000; +end; + +class function StompUtils.AckModeToStr(AckMode: TAckMode): string; +begin + case AckMode of + amAuto: + Result := 'auto'; + amClient: + Result := 'client'; + else + raise EStomp.Create('Unknown AckMode'); + end; +end; + +constructor TStompFrame.Create; +begin + FHeaders := TStompHeaders.Create; + self.FCommand := ''; + self.FBody := ''; +end; + +destructor TStompFrame.Destroy; +begin + inherited; +end; + +function TStompFrame.GetBody: string; +begin + Result := FBody; +end; + +function TStompFrame.GetCommand: string; +begin + Result := FCommand; +end; + +function TStompFrame.GetHeaders: IStompHeaders; +begin + Result := FHeaders; +end; + +function TStompFrame.Output: string; +begin + Result := FCommand + LINE_END + FHeaders.Output + LINE_END + FBody + LINE_END + COMMAND_END; +end; + +procedure TStompFrame.SetBody(const Value: string); +begin + FBody := Value; +end; + +procedure TStompFrame.SetCommand(const Value: string); +begin + FCommand := Value; +end; + +procedure TStompFrame.SetHeaders(const Value: IStompHeaders); +begin + FHeaders := Value; +end; + +function GetLine(Buf: string; var From: Integer): string; +var + i: Integer; +begin + if (From > Length(Buf)) then + raise EStomp.Create('From out of bound.'); + + i := From; + + while (i <= Length(Buf)) do + begin + if (Buf[i] <> LINE_END) then + inc(i) + else + break; + end; + + if (Buf[i] = LINE_END) then + begin + Result := Copy(Buf, From, i - From); + From := i + 1; + exit; + end + else + raise EStomp.Create('End of Line not found.'); +end; + +class function StompUtils.CreateFrame(Buf: string): TStompFrame; +var + line: string; + i: Integer; + p: Integer; + Key, Value: string; + other: string; + contLen: Integer; + sContLen: string; +begin + Result := TStompFrame.Create; + i := 1; + try + Result.Command := GetLine(Buf, i); + while (true) do + begin + line := GetLine(Buf, i); + if (line = '') then + break; + p := Pos(':', line); + if (p = 0) then + raise Exception.Create('header line error'); + Key := Copy(line, 1, p - 1); + Value := Copy(line, p + 1, Length(line) - p); + Result.Headers.Add(Key, Value); + end; + other := Copy(Buf, i, high(Integer)); + sContLen := Result.Headers.Value('content-length'); + if (sContLen <> '') then + begin + contLen := StrToInt(sContLen); + if Length(other) < contLen + 2 then + raise EStomp.Create('frame too short'); + if Copy(other, contLen + 1, 2) <> COMMAND_END + LINE_END then + raise Exception.Create('frame ending error'); + Result.Body := Copy(other, 1, contLen); + // Buf := Copy(other, contLen + 3, High(Integer)); + end + else + begin + p := Pos(COMMAND_END, other); + if (p = 0) then + raise EStomp.Create('frame no ending'); + Result.Body := Copy(other, 1, p - 2); + // Buf := Copy(other, p + 2, High(Integer)); + end; + except + on EStomp do + begin + // ignore + Result.Free; + Result := nil; + end; + on e: Exception do + begin + Result.Free; + raise EStomp.Create(e.message); + end; + end; +end; + +class function StompUtils.NewFrame: IStompFrame; +begin + Result := TStompFrame.Create; +end; + +{ TStompHeaders } + +function TStompHeaders.Add(Key, Value: string): IStompHeaders; +var + p: PKeyValue; +begin + New(p); + p^.Key := Key; + p^.Value := Value; + FList.Add(p); + Result := self; +end; + +function TStompHeaders.Add(HeaderItem: TKeyValue): IStompHeaders; +begin + Result := Add(HeaderItem.Key, HeaderItem.Value); +end; + +function TStompHeaders.Count: Cardinal; +begin + Result := FList.Count; +end; + +constructor TStompHeaders.Create; +begin + inherited; + FList := TList.Create; +end; + +destructor TStompHeaders.Destroy; +var + i: Integer; +begin + if FList.Count > 0 then + for i := 0 to FList.Count - 1 do + FreeMem(PKeyValue(FList[i])); + FList.Free; + inherited; +end; + +function TStompHeaders.GetAt(const index: Integer): TKeyValue; +begin + Result := GetItems(index) +end; + +function TStompHeaders.GetItems(index: Cardinal): TKeyValue; +begin + Result := PKeyValue(FList[index])^; +end; + +function TStompHeaders.IndexOf(Key: string): Integer; +var + i: Integer; +begin + Result := -1; + for i := 0 to FList.Count - 1 do + begin + if GetItems(i).Key = Key then + begin + Result := i; + break; + end; + end; +end; + +function TStompHeaders.Output: string; +var + i: Integer; + kv: TKeyValue; +begin + Result := ''; + if FList.Count > 0 then + for i := 0 to FList.Count - 1 do + begin + kv := Items[i]; + Result := Result + kv.Key + ':' + kv.Value + LINE_END; + end + else + Result := LINE_END; +end; + +function TStompHeaders.Remove(Key: string): IStompHeaders; +var + p: Integer; +begin + p := IndexOf(Key); + FreeMem(PKeyValue(FList[p])); + FList.Delete(p); + Result := self; +end; + +procedure TStompHeaders.SetItems(index: Cardinal; const Value: TKeyValue); +var + p: Integer; +begin + p := IndexOf(Value.Key); + if p > -1 then + begin + PKeyValue(FList[p])^.Key := Value.Key; + PKeyValue(FList[p])^.Value := Value.Value; + end + else + raise EStomp.Create('Error SetItems'); +end; + +function TStompHeaders.Value(Key: string): string; +var + i: Integer; +begin + Result := ''; + i := IndexOf(Key); + if i > -1 then + Result := GetItems(i).Value; +end; + +{ TStompListener } + +constructor TStompClientListener.Create(StompClient: IStompClient; + StompClientListener: IStompClientListener); +begin + inherited Create(true); + FStompClientListener := StompClientListener; + FStompClient := StompClient; + Resume; +end; + +procedure TStompClientListener.Execute; +var + frame: IStompFrame; +begin + while not terminated do + begin + if FStompClient.Receive(frame, 2000) then + FStompClientListener.OnMessage(frame); + end; +end; + +procedure TStompClientListener.StopListening; +begin + Terminate; + WaitFor; +end; + end. diff --git a/test/MainU.pas b/test/MainU.pas index 907a4af1..3bc01373 100644 --- a/test/MainU.pas +++ b/test/MainU.pas @@ -15,15 +15,6 @@ uses StompTypes, Diagnostics; -function NewStomp(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT; - ClientID: string = ''): IStompClient; -begin - Result := TStompClient.Create; - Result.SetUserName('guest'); - Result.SetPassword('guest'); - Result.Connect(Host, Port, ClientID); -end; - procedure Test_Unicode_Chars(serveraddress: string); var stomp: IStompClient; @@ -33,7 +24,7 @@ const SVEDESE = 'Vad är Unicode'; ITALIANO = 'Cos''è Unicode'; begin - stomp := NewStomp(serveraddress); + stomp := StompUtils.NewStomp(serveraddress); stomp.Subscribe('/topic/unicode'); stomp.Send('/topic/unicode', ITALIANO); stomp.Send('/topic/unicode', SERBO); @@ -64,8 +55,8 @@ const BODY3 = 'Hello World 3'; BODY4 = 'Hello World 4'; begin - stomp := NewStomp; - recv := NewStomp; + stomp := StompUtils.NewStomp; + recv := StompUtils.NewStomp; stomp.Subscribe(TOPIC); recv.Subscribe(TOPIC); @@ -103,65 +94,58 @@ end; procedure Main(serveraddress: string = 'localhost'); var - stomp: TStompClient; + stomp: IStompClient; frame: IStompFrame; i, c: Integer; msgcount: Cardinal; - sw: TStopWatch; + sw, sw1: TStopWatch; message_data: string; const - MSG = 1000; - MSG_SIZE = 1000; + MSG = 5000; + MSG_SIZE = 300; begin + sw1 := TStopWatch.StartNew; message_data := StringOfChar('X', MSG_SIZE); - WriteLn('TEST MESSAGE (', length(message_data) * sizeof(char), ' bytes):', #13#10, '"', + WriteLn('TEST MESSAGE IS (', length(message_data) * sizeof(char), ' bytes):', #13#10, '"', message_data, '"'#13#10#13#10); - stomp := TStompClient.Create; - try - stomp.SetUserName('Daniele'); - stomp.SetPassword('Paperino'); - stomp.Connect(serveraddress); - stomp.Subscribe('/topic/foo.bar'); + stomp := StompUtils.NewStomp(serveraddress, DEFAULT_STOMP_PORT, '', 'Daniele', 'Teti'); + stomp.Subscribe('/topic/foo.bar'); - for c := 1 to 10 do + for c := 1 to 6 do + begin + WriteLn; + WriteLn('= STATS LOOP ', c, '======================================='); + sw := TStopWatch.StartNew; + for i := 1 to MSG do + stomp.Send('/topic/foo.bar', message_data, + StompUtils.NewHeaders.Add(TStompHeaders.NewPersistentHeader(true))); + WriteLn('Queued ', MSG, ' messages in ', sw.ElapsedMilliseconds, ' ms'); + WriteLn('Now dequeuing...'); + + msgcount := 0; + sw := TStopWatch.StartNew; + while msgcount < MSG do begin - WriteLn; - WriteLn('= STATS LOOP ', c, '======================================='); - for i := 1 to MSG do + frame := stomp.Receive; + if assigned(frame) then begin - stomp.Send('/topic/foo.bar', message_data, - StompUtils.NewHeaders.Add(TStompHeaders.NewPersistentHeader(true))); - // '01234567890123456789012345678901234567890123456789' - if i mod 1000 = 0 then - WriteLn('Queued ', i, ' messages'); - end; - - msgcount := 0; - sw.start; - while msgcount < MSG do - begin - frame := stomp.Receive; - if assigned(frame) then - begin - inc(msgcount); - frame := nil; - end - end; - sw.Stop; - WriteLn(msgcount, ' in ', sw.ElapsedMilliseconds, ' milliseconds and ', sw.ElapsedTicks, - ' ticks'); - WriteLn('Throughput: '); - WriteLn(FormatFloat('###,##0.000', sw.ElapsedMilliseconds / msgcount), ' ms/msg'); - WriteLn(FormatFloat('###,##0.000', msgcount / sw.ElapsedMilliseconds), ' msg/ms'); - WriteLn('= END LOOP ', c, '========================================='#13#10); + inc(msgcount); + assert(frame.GetBody = message_data); + frame := nil; + end end; - - stomp.Unsubscribe('/topic/foo.bar'); - stomp.Disconnect; - write('test finished...'); - finally - stomp.Free; + sw.Stop; + WriteLn('Dequeued ', msgcount, ' stomp messages in ', sw.ElapsedMilliseconds, ' ms'); + WriteLn('Throughput: ', + FormatFloat('###,##0.000', sw.ElapsedMilliseconds / msgcount), ' ms/msg (', + FormatFloat('###,##0.000', msgcount / sw.ElapsedMilliseconds), ' msg/ms)'); + // WriteLn('= END LOOP ', c, '========================================='#13#10); end; + stomp.Unsubscribe('/topic/foo.bar'); + stomp.Disconnect; + sw.Stop; + WriteLn('SPEED TEST FINISHED IN ', FormatFloat('###,##0.000', sw1.ElapsedMilliseconds / 1000), + ' seconds'); end; end. diff --git a/test/teststompclient.dpr b/test/teststompclient.dpr index 54b8bab3..64877156 100644 --- a/test/teststompclient.dpr +++ b/test/teststompclient.dpr @@ -9,7 +9,7 @@ uses begin try - // Main; + Main; MainWithTransaction; // Test_Unicode_Chars; //Non passa Writeln('ALL TESTS OK'); @@ -18,5 +18,5 @@ begin Writeln(E.Classname, ': ', E.message); end; - // readln; + readln; end.