ADD Some headers names constants

This commit is contained in:
daniele.teti@gmail.com 2013-10-15 08:14:36 +00:00
parent 6688fc17a3
commit 41d42ff9b9
2 changed files with 56 additions and 77 deletions

View File

@ -1,6 +1,6 @@
// Stomp Client for Embarcadero Delphi & FreePascal // Stomp Client for Embarcadero Delphi & FreePascal
// Tested With ApacheMQ 5.2/5.3, Apache Apollo 1.2/1.6 // Tested With ApacheMQ 5.2/5.3, Apache Apollo 1.2
// Copyright (c) 2009-2013 Daniele Teti // Copyright (c) 2009-2012 Daniele Teti
// //
// Contributors: // Contributors:
// Daniel Gaspary: dgaspary@gmail.com // Daniel Gaspary: dgaspary@gmail.com
@ -102,7 +102,7 @@ type
procedure Receipt(const ReceiptID: string); procedure Receipt(const ReceiptID: string);
procedure Connect(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT; procedure Connect(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT;
ClientID: string = ''; AcceptVersion: TStompAcceptProtocol = STOMP_Version_1_0); ClientID: string = ''; AcceptVersion: TStompAcceptProtocol = STOMP_Version_1_0);
procedure Disconnect(WithoutSendingFrame: boolean = false); procedure Disconnect;
procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto;
Headers: IStompHeaders = nil); Headers: IStompHeaders = nil);
procedure Unsubscribe(Queue: string); procedure Unsubscribe(Queue: string);
@ -161,7 +161,7 @@ begin
Frame.SetCommand('ABORT'); Frame.SetCommand('ABORT');
Frame.GetHeaders.Add('transaction', TransactionIdentifier); Frame.GetHeaders.Add('transaction', TransactionIdentifier);
SendFrame(Frame); SendFrame(Frame);
FInTransaction := false; FInTransaction := False;
FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier));
end end
else else
@ -175,7 +175,7 @@ var
begin begin
Frame := TStompFrame.Create; Frame := TStompFrame.Create;
Frame.SetCommand('ACK'); Frame.SetCommand('ACK');
Frame.GetHeaders.Add('message-id', MessageID); Frame.GetHeaders.Add(TStompHeaders.MESSAGE_ID, MessageID);
if TransactionIdentifier <> '' then if TransactionIdentifier <> '' then
Frame.GetHeaders.Add('transaction', TransactionIdentifier); Frame.GetHeaders.Add('transaction', TransactionIdentifier);
SendFrame(Frame); SendFrame(Frame);
@ -200,6 +200,21 @@ begin
[TransactionIdentifier]); [TransactionIdentifier]);
end; end;
// procedure TStompClient.CheckReceipt(Frame: TStompFrame);
// var
// ReceiptID: string;
// begin
// if FEnableReceipts then
// begin
// ReceiptID := inttostr(GetTickCount);
// Frame.GetHeaders.Add('receipt', ReceiptID);
// SendFrame(Frame);
// Receipt(ReceiptID);
// end
// else
// SendFrame(Frame);
// end;
procedure TStompClient.CommitTransaction(const TransactionIdentifier: string); procedure TStompClient.CommitTransaction(const TransactionIdentifier: string);
var var
Frame: IStompFrame; Frame: IStompFrame;
@ -210,7 +225,7 @@ begin
Frame.SetCommand('COMMIT'); Frame.SetCommand('COMMIT');
Frame.GetHeaders.Add('transaction', TransactionIdentifier); Frame.GetHeaders.Add('transaction', TransactionIdentifier);
SendFrame(Frame); SendFrame(Frame);
FInTransaction := false; FInTransaction := False;
FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier)); FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier));
end end
else else
@ -228,7 +243,7 @@ begin
{$IFDEF USESYNAPSE} {$IFDEF USESYNAPSE}
FSynapseConnected := false; FSynapseConnected := False;
FSynapseTCP.Connect(Host, intToStr(Port)); FSynapseTCP.Connect(Host, intToStr(Port));
FSynapseConnected := True; FSynapseConnected := True;
@ -291,7 +306,7 @@ end;
constructor TStompClient.Create; constructor TStompClient.Create;
begin begin
inherited; inherited;
FInTransaction := false; FInTransaction := False;
FSession := ''; FSession := '';
FUserName := 'guest'; FUserName := 'guest';
FPassword := 'guest'; FPassword := 'guest';
@ -318,39 +333,28 @@ end;
destructor TStompClient.Destroy; destructor TStompClient.Destroy;
begin begin
try Disconnect;
Disconnect(false);
except
end;
DeInit; DeInit;
inherited; inherited;
end; end;
procedure TStompClient.Disconnect(WithoutSendingFrame: boolean); procedure TStompClient.Disconnect;
var var
Frame: IStompFrame; Frame: IStompFrame;
begin begin
if Connected then if Connected then
begin begin
if not WithoutSendingFrame then
begin
try
Frame := TStompFrame.Create; Frame := TStompFrame.Create;
Frame.SetCommand('DISCONNECT'); Frame.SetCommand('DISCONNECT');
SendFrame(Frame); SendFrame(Frame);
except
// socket could be already dead
end;
end;
{$IFDEF USESYNAPSE} {$IFDEF USESYNAPSE}
FSynapseTCP.CloseSocket; FSynapseTCP.CloseSocket;
FSynapseConnected := false; FSynapseConnected := False;
{$ELSE} {$ELSE}
FTCP.Socket.Close;
FTCP.Disconnect; FTCP.Disconnect;
{$ENDIF} {$ENDIF}
@ -404,7 +408,7 @@ begin
if (Reason = HR_Error) and (FSynapseTCP.LastError <> WSAETIMEDOUT) if (Reason = HR_Error) and (FSynapseTCP.LastError <> WSAETIMEDOUT)
then then
begin begin
FSynapseConnected := false; FSynapseConnected := False;
end; end;
end; end;
@ -467,7 +471,7 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
s : string; s : string;
tout: boolean; tout: boolean;
begin begin
tout := false; tout := False;
Result := nil; Result := nil;
try try
try try
@ -526,14 +530,14 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
begin begin
// UTF8Encoding := TEncoding.UTF8; // UTF8Encoding := TEncoding.UTF8;
UTF8Encoding := IndyTextEncoding_UTF8(); UTF8Encoding := IndyTextEncoding_UTF8();
tout := false; tout := False;
Result := nil; Result := nil;
try try
sb := TStringBuilder.Create(1024 * 4); sb := TStringBuilder.Create(1024 * 4);
try try
FTCP.ReadTimeout := ATimeout; FTCP.ReadTimeout := ATimeout;
try try
FirstValidChar := false; FirstValidChar := False;
FTCP.Socket.CheckForDataOnSource(1); FTCP.Socket.CheckForDataOnSource(1);
while True do while True do
begin begin
@ -566,7 +570,7 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
begin begin
Result := StompUtils.CreateFrame(sb.toString + CHAR0); Result := StompUtils.CreateFrame(sb.toString + CHAR0);
if Result.GetCommand = 'ERROR' then if Result.GetCommand = 'ERROR' then
raise EStompDisconnectionError.Create(Result.GetHeaders.Value('message')); raise EStomp.Create(Result.GetHeaders.Value('message'));
end; end;
finally finally
sb.Free; sb.Free;
@ -583,7 +587,6 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
begin begin
try
{$IFDEF USESYNAPSE} {$IFDEF USESYNAPSE}
@ -595,16 +598,6 @@ begin
{$ENDIF} {$ENDIF}
except
on E: EStompDisconnectionError do
begin
Disconnect;
raise;
end;
on E: Exception do
raise;
end
end; end;
function TStompClient.Receive: IStompFrame; function TStompClient.Receive: IStompFrame;
@ -640,8 +633,6 @@ end;
procedure TStompClient.SendFrame(AFrame: IStompFrame); procedure TStompClient.SendFrame(AFrame: IStompFrame);
begin begin
if not Connected then
raise EStomp.Create('StompClient not connected');
{$IFDEF USESYNAPSE} {$IFDEF USESYNAPSE}

View File

@ -1,6 +1,6 @@
// Stomp Client for Embarcadero Delphi & FreePascal // Stomp Client for Embarcadero Delphi & FreePascal
// Tested With ApacheMQ 5.2/5.3 // Tested With ApacheMQ 5.2/5.3
// Copyright (c) 2009-2013 Daniele Teti // Copyright (c) 2009-2009 Daniele Teti
// //
// Contributors: // Contributors:
// Daniel Gaspary: dgaspary@gmail.com // Daniel Gaspary: dgaspary@gmail.com
@ -34,10 +34,6 @@ type
EStomp = class(Exception) EStomp = class(Exception)
end; end;
EStompDisconnectionError = class(EStomp)
end;
TKeyValue = record TKeyValue = record
Key: string; Key: string;
Value: string; Value: string;
@ -76,7 +72,7 @@ type
procedure Receipt(const ReceiptID: string); procedure Receipt(const ReceiptID: string);
procedure Connect(Host: string = '127.0.0.1'; Port: Integer = 61613; ClientID: string = ''; procedure Connect(Host: string = '127.0.0.1'; Port: Integer = 61613; ClientID: string = '';
AcceptVersion: TStompAcceptProtocol = STOMP_Version_1_0); AcceptVersion: TStompAcceptProtocol = STOMP_Version_1_0);
procedure Disconnect(WithoutSendingFrame: Boolean = false); procedure Disconnect;
procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto;
Headers: IStompHeaders = nil); Headers: IStompHeaders = nil);
procedure Unsubscribe(Queue: string); procedure Unsubscribe(Queue: string);
@ -110,7 +106,12 @@ type
class function NewDurableSubscriptionHeader(const SubscriptionName: string): TKeyValue; class function NewDurableSubscriptionHeader(const SubscriptionName: string): TKeyValue;
class function NewPersistentHeader(const Value: Boolean): TKeyValue; class function NewPersistentHeader(const Value: Boolean): TKeyValue;
class function NewReplyToHeader(const DestinationName: string): TKeyValue; class function NewReplyToHeader(const DestinationName: string): TKeyValue;
/// /////////////////////////////////////////////7 /// /////////////////////////////////////////////7
const
MESSAGE_ID: string = 'message-id';
TRANSACTION: string = 'transaction';
/// /
function Add(Key, Value: string): IStompHeaders; overload; function Add(Key, Value: string): IStompHeaders; overload;
function Add(HeaderItem: TKeyValue): IStompHeaders; overload; function Add(HeaderItem: TKeyValue): IStompHeaders; overload;
function Value(Key: string): string; function Value(Key: string): string;
@ -533,26 +534,16 @@ var
frame : IStompFrame; frame : IStompFrame;
StopListen: Boolean; StopListen: Boolean;
begin begin
StopListen := false; StopListen := False;
while (not terminated) and (not StopListen) do while not terminated do
begin begin
try
if FStompClient.Receive(frame, 2000) then if FStompClient.Receive(frame, 2000) then
begin begin
try
FStompClientListener.OnMessage(FStompClient, frame, StopListen); FStompClientListener.OnMessage(FStompClient, frame, StopListen);
except if StopListen then
end;
end;
except
on e: EStompDisconnectionError do
begin begin
try
StopListen := true;
FStompClientListener.OnStopListen(FStompClient); FStompClientListener.OnStopListen(FStompClient);
except StopListening;
end;
end; end;
end; end;
end; end;
@ -566,11 +557,8 @@ end;
procedure TStompClientListener.StopListening; procedure TStompClientListener.StopListening;
begin begin
Terminate; Terminate;
WaitFor; Free;
try // WaitFor;
FStompClientListener.OnStopListen(FStompClient);
except
end;
end; end;
function TStompClientListener._AddRef: Integer; function TStompClientListener._AddRef: Integer;