delphimvcframework/StompClient.pas

1122 lines
30 KiB
ObjectPascal
Raw Normal View History

// Stomp Client for Embarcadero Delphi & FreePasca
// Tested With ApacheMQ 5.2/5.3, Apache Apollo 1.2, RabbitMQ
2016-10-14 15:19:06 +02:00
// Copyright (c) 2009-2016 Daniele Teti
2011-04-14 11:13:25 +02:00
//
// Contributors:
// Daniel Gaspary: dgaspary@gmail.com
// Oliver Marr: oliver.sn@wmarr.de
2013-05-10 18:00:10 +02:00
// Marco Mottadelli: mottadelli75@gmail.com
2011-04-14 11:13:25 +02:00
// WebSite: www.danieleteti.it
// email:d.teti@bittime.it
// *******************************************************
unit StompClient;
// For FreePascal users:
// Automatically selected synapse tcp library
{$IFDEF FPC}
{$MODE DELPHI}
{$DEFINE USESYNAPSE}
{$ENDIF}
// For Delphi users:
// Decomment following line to use synapse also in Delphi
{ .$DEFINE USESYNAPSE }
interface
uses
StompTypes,
SysUtils,
2016-10-14 15:19:06 +02:00
DateUtils,
2015-01-21 14:59:20 +01:00
{$IFNDEF USESYNAPSE}
IdTCPClient,
IdException,
IdExceptionCore,
2016-06-28 15:21:47 +02:00
IdHeaderList,
IdIOHandler, IdIOHandlerSocket, IdIOHandlerStack, IdSSL, IdSSLOpenSSL, // SSL
System.SyncObjs,
2015-01-21 14:59:20 +01:00
{$ELSE}
synsock,
blcksock,
2015-01-21 14:59:20 +01:00
{$ENDIF}
Classes;
type
{ TStompClient }
TSenderFrameEvent = procedure(AFrame: IStompFrame) of object;
2016-10-14 15:19:06 +02:00
THeartBeatThread = class;
TStompClient = class(TInterfacedObject, IStompClient)
private
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
FSynapseTCP: TTCPBlockSocket;
FSynapseConnected: boolean;
2015-01-21 14:59:20 +01:00
{$ELSE}
FTCP: TIdTCPClient;
FIOHandlerSocketOpenSSL : TIdSSLIOHandlerSocketOpenSSL;
2015-01-21 14:59:20 +01:00
{$ENDIF}
FOnConnect: TStompConnectNotifyEvent; // Add By GC 26/01/2011
2015-01-21 14:59:20 +01:00
FHeaders: IStompHeaders;
FPassword: string;
FUserName: string;
FTimeout: Integer;
FSession: string;
FInTransaction: boolean;
FTransactions: TStringList;
FReceiptTimeout: Integer;
FServerProtocolVersion: string;
FClientAcceptProtocolVersion: TStompAcceptProtocol;
2015-01-21 14:59:20 +01:00
FServer: string;
FOnBeforeSendFrame: TSenderFrameEvent;
FOnAfterSendFrame: TSenderFrameEvent;
2016-09-23 22:36:36 +02:00
FHost: string;
FPort: Integer;
FClientID: string;
FUseSSL : boolean; // SSL
FsslKeyFile : string; // SSL
FsslCertFile : string; // SSL
FsslKeyPass : string; // SSL
2016-09-23 22:36:36 +02:00
FAcceptVersion: TStompAcceptProtocol;
FConnectionTimeout: UInt32;
2016-10-14 15:19:06 +02:00
FOutgoingHeartBeats: Int64;
FIncomingHeartBeats: Int64;
FLock: TObject;
FHeartBeatThread: THeartBeatThread;
FServerIncomingHeartBeats: Int64;
FServerOutgoingHeartBeats: Int64;
2016-10-17 14:14:45 +02:00
FOnHeartBeatError: TNotifyEvent;
2016-10-14 15:19:06 +02:00
procedure ParseHeartBeat(Headers: IStompHeaders);
procedure SetReceiptTimeout(const Value: Integer);
2016-09-23 22:36:36 +02:00
procedure SetConnectionTimeout(const Value: UInt32);
function GetOnConnect: TStompConnectNotifyEvent;
procedure SetOnConnect(const Value: TStompConnectNotifyEvent);
protected
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
procedure SynapseSocketCallBack(Sender: TObject; Reason: THookSocketReason;
const Value: string);
2015-01-21 14:59:20 +01:00
{$ENDIF}
procedure Init;
procedure DeInit;
2016-09-23 22:36:36 +02:00
procedure MergeHeaders(var AFrame: IStompFrame;
var AHeaders: IStompHeaders);
procedure SendFrame(AFrame: IStompFrame);
2016-10-14 15:19:06 +02:00
procedure SendHeartBeat;
2016-07-26 11:21:07 +02:00
function FormatErrorFrame(const AErrorFrame: IStompFrame): string;
2016-10-14 15:19:06 +02:00
function ServerSupportsHeartBeat: boolean;
2016-10-17 14:14:45 +02:00
procedure OnHeartBeatErrorHandler(Sender: TObject);
procedure DoHeartBeatErrorHandler;
procedure OpenSSLGetPassword(var Password: String);
public
Function SetUseSSL(const boUseSSL: boolean;
const KeyFile : string =''; const CertFile : string = '';
const PassPhrase : string = ''): IStompClient; // SSL
function SetPassword(const Value: string): IStompClient;
function SetUserName(const Value: string): IStompClient;
2016-09-23 22:36:36 +02:00
function Receive(out StompFrame: IStompFrame; ATimeout: Integer)
: boolean; overload;
function Receive: IStompFrame; overload;
function Receive(ATimeout: Integer): IStompFrame; overload;
procedure Receipt(const ReceiptID: string);
2016-09-23 22:36:36 +02:00
procedure Connect(Host: string = '127.0.0.1';
Port: Integer = DEFAULT_STOMP_PORT; ClientID: string = '';
AcceptVersion: TStompAcceptProtocol = TStompAcceptProtocol.
2016-10-14 15:19:06 +02:00
Ver_1_0);
2013-10-15 10:14:36 +02:00
procedure Disconnect;
2016-09-23 22:36:36 +02:00
procedure Subscribe(QueueOrTopicName: string;
Ack: TAckMode = TAckMode.amAuto; Headers: IStompHeaders = nil);
procedure Unsubscribe(Queue: string; const subscriptionId: string = ''); // Unsubscribe STOMP 1.1 : It requires that the id header matches the id value of previous SUBSCRIBE operation.
2016-09-23 22:36:36 +02:00
procedure Send(QueueOrTopicName: string; TextMessage: string;
Headers: IStompHeaders = nil); overload;
2016-09-23 22:36:36 +02:00
procedure Send(QueueOrTopicName: string; TextMessage: string;
TransactionIdentifier: string; Headers: IStompHeaders = nil); overload;
procedure Ack(const MessageID: string; const subscriptionId: string = '';
const TransactionIdentifier: string = ''); // ACK STOMP 1.1 : has two REQUIRED headers: message-id, which MUST contain a value matching the message-id for the MESSAGE being acknowledged and subscription, which MUST be set to match the value of the subscription's id header
{ STOMP 1.1 }
procedure Nack(const MessageID: string; const subscriptionId: string = '';
const TransactionIdentifier: string = ''); // NACK STOMP 1.1 : takes the same headers as ACK: message-id (mandatory), subscription (mandatory) and transaction (OPTIONAL).
procedure BeginTransaction(const TransactionIdentifier: string);
procedure CommitTransaction(const TransactionIdentifier: string);
procedure AbortTransaction(const TransactionIdentifier: string);
/// ////////////
2016-08-02 18:15:11 +02:00
constructor Create; overload; virtual;
2016-09-23 22:36:36 +02:00
class function CreateAndConnect(Host: string = '127.0.0.1';
Port: Integer = DEFAULT_STOMP_PORT; ClientID: string = '';
AcceptVersion: TStompAcceptProtocol = TStompAcceptProtocol.
2016-10-14 15:19:06 +02:00
Ver_1_0): IStompClient; overload; virtual;
destructor Destroy; override;
2016-10-17 14:14:45 +02:00
function SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64): IStompClient;
2016-09-23 22:36:36 +02:00
function Clone: IStompClient;
function Connected: boolean;
function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient;
function GetProtocolVersion: string;
function GetServer: string;
function GetSession: string;
2016-09-23 22:36:36 +02:00
property ReceiptTimeout: Integer read FReceiptTimeout
write SetReceiptTimeout;
property Transactions: TStringList read FTransactions;
2016-09-23 22:36:36 +02:00
property ConnectionTimeout: UInt32 read FConnectionTimeout
write SetConnectionTimeout;
// * Manage Events
2016-09-23 22:36:36 +02:00
property OnBeforeSendFrame: TSenderFrameEvent read FOnBeforeSendFrame
write FOnBeforeSendFrame;
property OnAfterSendFrame: TSenderFrameEvent read FOnAfterSendFrame
write FOnAfterSendFrame;
2016-10-17 14:14:45 +02:00
property OnHeartBeatError: TNotifyEvent read FOnHeartBeatError write FOnHeartBeatError;
// Add by GC 26/01/2001
property OnConnect: TStompConnectNotifyEvent read GetOnConnect write SetOnConnect;
end;
2016-10-14 15:19:06 +02:00
THeartBeatThread = class(TThread)
private
FStompClient: TStompClient;
FLock: TObject;
FOutgoingHeatBeatTimeout: Int64;
2016-10-17 14:14:45 +02:00
FOnHeartBeatError: TNotifyEvent;
2016-10-14 15:19:06 +02:00
protected
procedure Execute; override;
2016-10-17 14:14:45 +02:00
procedure DoHeartBeatError;
2016-10-14 15:19:06 +02:00
public
constructor Create(StompClient: TStompClient; Lock: TObject;
OutgoingHeatBeatTimeout: Int64); virtual;
2016-10-17 14:14:45 +02:00
property OnHeartBeatError: TNotifyEvent read FOnHeartBeatError write FOnHeartBeatError;
2016-10-14 15:19:06 +02:00
end;
implementation
{$IFDEF FPC}
2016-10-14 15:19:06 +02:00
const
CHAR0 = #0;
{$ELSE}
2016-10-14 15:19:06 +02:00
uses
2013-05-10 18:00:10 +02:00
// Windows, // Remove windows unit for compiling on ios
2015-01-21 14:59:20 +01:00
IdGlobal,
2016-06-28 15:21:47 +02:00
IdGlobalProtocols,
2016-10-14 15:19:06 +02:00
Character, Winapi.Windows;
{$ENDIF}
{ TStompClient }
procedure TStompClient.AbortTransaction(const TransactionIdentifier: string);
var
Frame: IStompFrame;
begin
2015-01-21 14:59:20 +01:00
if FTransactions.IndexOf(TransactionIdentifier) > -1 then
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'ABORT';
Frame.Headers.Add('transaction', TransactionIdentifier);
SendFrame(Frame);
2013-10-15 10:14:36 +02:00
FInTransaction := False;
FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier));
end
else
2016-09-23 22:36:36 +02:00
raise EStomp.CreateFmt
('Abort Transaction Error. Transaction [%s] not found',
[TransactionIdentifier]);
end;
procedure TStompClient.Ack(const MessageID: string; const subscriptionId: string;
2016-09-23 22:36:36 +02:00
const TransactionIdentifier: string);
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'ACK';
Frame.Headers.Add(TStompHeaders.MESSAGE_ID, MessageID);
if subscriptionId <> '' then
Frame.Headers.Add('subscription', subscriptionId);
if TransactionIdentifier <> '' then
2016-10-14 15:19:06 +02:00
Frame.Headers.Add('transaction', TransactionIdentifier);
SendFrame(Frame);
end;
procedure TStompClient.BeginTransaction(const TransactionIdentifier: string);
var
Frame: IStompFrame;
begin
2015-01-21 14:59:20 +01:00
if FTransactions.IndexOf(TransactionIdentifier) = -1 then
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'BEGIN';
Frame.Headers.Add('transaction', TransactionIdentifier);
SendFrame(Frame);
// CheckReceipt(Frame);
FInTransaction := True;
FTransactions.Add(TransactionIdentifier);
end
else
2016-09-23 22:36:36 +02:00
raise EStomp.CreateFmt
('Begin Transaction Error. Transaction [%s] still open',
[TransactionIdentifier]);
end;
2013-10-15 10:14:36 +02:00
// procedure TStompClient.CheckReceipt(Frame: TStompFrame);
// var
// ReceiptID: string;
// begin
// if FEnableReceipts then
// begin
// ReceiptID := inttostr(GetTickCount);
2016-10-14 15:19:06 +02:00
// Frame.Headers.Add('receipt', ReceiptID);
2013-10-15 10:14:36 +02:00
// SendFrame(Frame);
// Receipt(ReceiptID);
// end
// else
// SendFrame(Frame);
// end;
2016-09-23 22:36:36 +02:00
function TStompClient.Clone: IStompClient;
begin
Result := TStompClient.Create;
2016-09-27 17:37:20 +02:00
Result.SetUserName(FUserName).SetPassword(FPassword);
2016-09-23 22:36:36 +02:00
TStompClient(Result).ConnectionTimeout := FConnectionTimeout;
TStompClient(Result).Connect(FHost, FPort, FClientID, FAcceptVersion);
2016-09-27 17:37:20 +02:00
2016-09-23 22:36:36 +02:00
end;
procedure TStompClient.CommitTransaction(const TransactionIdentifier: string);
var
Frame: IStompFrame;
begin
2015-01-21 14:59:20 +01:00
if FTransactions.IndexOf(TransactionIdentifier) > -1 then
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'COMMIT';
Frame.Headers.Add('transaction', TransactionIdentifier);
SendFrame(Frame);
2013-10-15 10:14:36 +02:00
FInTransaction := False;
FTransactions.Delete(FTransactions.IndexOf(TransactionIdentifier));
end
else
2016-09-23 22:36:36 +02:00
raise EStomp.CreateFmt
('Commit Transaction Error. Transaction [%s] not found',
[TransactionIdentifier]);
end;
2016-07-26 17:41:05 +02:00
procedure TStompClient.Connect(Host: string; Port: Integer; ClientID: string;
AcceptVersion: TStompAcceptProtocol);
var
Frame: IStompFrame;
2016-10-14 15:19:06 +02:00
lHeartBeat: string;
begin
2016-09-23 22:36:36 +02:00
FHost := Host;
FPort := Port;
FClientID := ClientID;
FAcceptVersion := AcceptVersion;
try
Init;
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
2013-10-15 10:14:36 +02:00
FSynapseConnected := False;
FSynapseTCP.Connect(Host, intToStr(Port));
FSynapseConnected := True;
2015-01-21 14:59:20 +01:00
{$ELSE}
if FUseSSL then
begin
FIOHandlerSocketOpenSSL.OnGetPassword := OpenSSLGetPassword;
FIOHandlerSocketOpenSSL.Port := 0 ;
FIOHandlerSocketOpenSSL.DefaultPort := 0 ;
FIOHandlerSocketOpenSSL.SSLOptions.Method := sslvTLSv1_2; //sslvSSLv3; //sslvSSLv23;
FIOHandlerSocketOpenSSL.SSLOptions.KeyFile := FsslKeyFile;
FIOHandlerSocketOpenSSL.SSLOptions.CertFile := FsslCertFile;
FIOHandlerSocketOpenSSL.SSLOptions.Mode := sslmUnassigned; //sslmClient;
FIOHandlerSocketOpenSSL.SSLOptions.VerifyMode := [];
FIOHandlerSocketOpenSSL.SSLOptions.VerifyDepth := 0;
// FIOHandlerSocketOpenSSL.OnBeforeConnect := BeforeConnect;
FTCP.IOHandler := FIOHandlerSocketOpenSSL;
end
else
begin
FTCP.IOHandler := nil;
end;
2016-09-23 22:36:36 +02:00
FTCP.ConnectTimeout := FConnectionTimeout;
FTCP.Connect(Host, Port);
FTCP.IOHandler.MaxLineLength := MaxInt;
2015-01-21 14:59:20 +01:00
{$ENDIF}
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'CONNECT';
FClientAcceptProtocolVersion := AcceptVersion;
2016-10-14 15:19:06 +02:00
if TStompAcceptProtocol.Ver_1_1 in [FClientAcceptProtocolVersion]
2016-09-23 22:36:36 +02:00
then
begin
2016-10-14 15:19:06 +02:00
Frame.Headers.Add('accept-version', '1.1'); // stomp 1.1
lHeartBeat := Format('%d,%d', [FOutgoingHeartBeats, FIncomingHeartBeats]);
Frame.Headers.Add('heart-beat', lHeartBeat); // stomp 1.1
end
else
begin
Frame.Headers.Add('accept-version', '1.0'); // stomp 1.0
end;
2016-10-14 15:19:06 +02:00
Frame.Headers.Add('login', FUserName).Add('passcode', FPassword);
2016-09-23 22:36:36 +02:00
FClientID := ClientID;
if ClientID <> '' then
2016-09-23 22:36:36 +02:00
begin
2016-10-14 15:19:06 +02:00
Frame.Headers.Add('client-id', ClientID);
2016-09-23 22:36:36 +02:00
end;
SendFrame(Frame);
Frame := nil;
while Frame = nil do
Frame := Receive;
2016-10-14 15:19:06 +02:00
if Frame.Command = 'ERROR' then
2016-06-28 15:21:47 +02:00
raise EStomp.Create(FormatErrorFrame(Frame));
2016-10-14 15:19:06 +02:00
if Frame.Command = 'CONNECTED' then
begin
FSession := Frame.Headers.Value('session');
FServerProtocolVersion := Frame.Headers.Value('version'); // stomp 1.1
FServer := Frame.Headers.Value('server'); // stomp 1.1
ParseHeartBeat(Frame.Headers);
end;
// Let's start the hearbeat thread
if ServerSupportsHeartBeat then
begin
2016-10-14 15:19:06 +02:00
FHeartBeatThread := THeartBeatThread.Create(Self, FLock, FServerOutgoingHeartBeats);
2016-10-17 14:14:45 +02:00
FHeartBeatThread.OnHeartBeatError := OnHeartBeatErrorHandler;
2016-10-14 15:19:06 +02:00
FHeartBeatThread.Start;
end;
2016-10-14 15:19:06 +02:00
{ todo: 'Call event?' -> by Gc}
// Add by GC 26/01/2011
if Assigned(FOnConnect) then
FOnConnect(Self, Frame);
except
on E: Exception do
begin
raise EStomp.Create(E.message);
end;
end;
end;
function TStompClient.Connected: boolean;
begin
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
Result := Assigned(FSynapseTCP) and FSynapseConnected;
{$ELSE} // ClosedGracefully <> FTCP.Connected !!!
Result := Assigned(FTCP) and FTCP.Connected and (not FTCP.IOHandler.ClosedGracefully);
2015-01-21 14:59:20 +01:00
{$ENDIF}
end;
2016-09-23 22:36:36 +02:00
class function TStompClient.CreateAndConnect(Host: string; Port: Integer;
ClientID: string; AcceptVersion: TStompAcceptProtocol): IStompClient;
2016-08-02 18:15:11 +02:00
begin
Result := TStompClient.Create;
Result.Connect(Host, Port, ClientID, AcceptVersion);
end;
constructor TStompClient.Create;
begin
inherited;
2016-10-14 15:19:06 +02:00
FLock := TObject.Create;
2013-10-15 10:14:36 +02:00
FInTransaction := False;
FSession := '';
FUserName := 'guest';
FPassword := 'guest';
FUseSSL := false;
FHeaders := TStompHeaders.Create;
2016-07-26 17:41:05 +02:00
FTimeout := 200;
FReceiptTimeout := FTimeout;
2016-09-23 22:36:36 +02:00
FConnectionTimeout := 1000 * 10; // 10secs
2016-10-14 15:19:06 +02:00
FIncomingHeartBeats := 10000; // 10secs
FOutgoingHeartBeats := 0; // disabled
end;
procedure TStompClient.DeInit;
begin
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
FreeAndNil(FSynapseTCP);
2015-01-21 14:59:20 +01:00
{$ELSE}
FreeAndNil(FTCP);
FreeAndNil(FIOHandlerSocketOpenSSL);
2015-01-21 14:59:20 +01:00
{$ENDIF}
FreeAndNil(FTransactions);
end;
destructor TStompClient.Destroy;
begin
2013-10-15 10:14:36 +02:00
Disconnect;
DeInit;
2016-10-14 15:19:06 +02:00
FLock.Free;
inherited;
end;
2013-10-15 10:14:36 +02:00
procedure TStompClient.Disconnect;
var
Frame: IStompFrame;
begin
if Connected then
begin
2016-10-14 15:19:06 +02:00
if ServerSupportsHeartBeat then
begin
Assert(Assigned(FHeartBeatThread), 'HeartBeat thread not created');
FHeartBeatThread.Terminate;
FHeartBeatThread.WaitFor;
FHeartBeatThread.Free;
end;
2013-10-15 10:14:36 +02:00
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'DISCONNECT';
2013-10-15 10:14:36 +02:00
SendFrame(Frame);
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
FSynapseTCP.CloseSocket;
2013-10-15 10:14:36 +02:00
FSynapseConnected := False;
2015-01-21 14:59:20 +01:00
{$ELSE}
FTCP.Disconnect;
2015-01-21 14:59:20 +01:00
{$ENDIF}
end;
DeInit;
end;
2016-10-17 14:14:45 +02:00
procedure TStompClient.DoHeartBeatErrorHandler;
begin
if Assigned(FOnHeartBeatError) then
begin
try
FOnHeartBeatError(Self);
except
end;
end;
end;
2016-07-26 11:21:07 +02:00
function TStompClient.FormatErrorFrame(const AErrorFrame: IStompFrame): string;
2016-06-28 15:21:47 +02:00
begin
2016-10-14 15:19:06 +02:00
if AErrorFrame.Command <> 'ERROR' then
2016-06-28 15:21:47 +02:00
raise EStomp.Create('Not an ERROR frame');
2016-10-14 15:19:06 +02:00
Result := AErrorFrame.Headers.Value('message') + ': ' +
AErrorFrame.Body;
2016-06-28 15:21:47 +02:00
end;
function TStompClient.GetOnConnect: TStompConnectNotifyEvent;
begin
Result := FOnConnect;
end;
function TStompClient.GetProtocolVersion: string;
begin
Result := FServerProtocolVersion;
end;
function TStompClient.GetServer: string;
begin
Result := FServer;
end;
function TStompClient.GetSession: string;
begin
Result := FSession;
end;
procedure TStompClient.Init;
begin
DeInit;
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
FSynapseTCP := TTCPBlockSocket.Create;
FSynapseTCP.OnStatus := SynapseSocketCallBack;
FSynapseTCP.RaiseExcept := True;
2015-01-21 14:59:20 +01:00
{$ELSE}
FIOHandlerSocketOpenSSL := TIdSSLIOHandlerSocketOpenSSL.Create(nil);
FTCP := TIdTCPClient.Create(nil);
2015-01-21 14:59:20 +01:00
{$ENDIF}
FTransactions := TStringList.Create;
end;
{$IFDEF USESYNAPSE}
2016-10-14 15:19:06 +02:00
procedure TStompClient.SynapseSocketCallBack(Sender: TObject;
Reason: THookSocketReason; const Value: string);
begin
// As seen at TBlockSocket.ExceptCheck procedure, it SEEMS safe to say
// when an error occurred and is not a Timeout, the connection is broken
2016-09-23 22:36:36 +02:00
if (Reason = HR_Error) and (FSynapseTCP.LastError <> WSAETIMEDOUT) then
begin
2013-10-15 10:14:36 +02:00
FSynapseConnected := False;
end;
end;
{$ENDIF}
2016-10-14 15:19:06 +02:00
2016-09-23 22:36:36 +02:00
procedure TStompClient.MergeHeaders(var AFrame: IStompFrame;
var AHeaders: IStompHeaders);
var
i: Integer;
h: TKeyValue;
begin
if Assigned(AHeaders) then
if AHeaders.Count > 0 then
for i := 0 to AHeaders.Count - 1 do
begin
h := AHeaders.GetAt(i);
2016-10-14 15:19:06 +02:00
AFrame.Headers.Add(h.Key, h.Value);
end;
// If the frame has some content, then set the length of that content.
if (AFrame.ContentLength > 0) then
2016-10-14 15:19:06 +02:00
AFrame.Headers.Add('content-length', intToStr(AFrame.ContentLength));
end;
procedure TStompClient.Nack(const MessageID, subscriptionId, TransactionIdentifier: string);
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'NACK';
Frame.Headers.Add('message-id', MessageID);
if subscriptionId <> '' then
Frame.Headers.Add('subscription', subscriptionId);
if TransactionIdentifier <> '' then
2016-10-14 15:19:06 +02:00
Frame.Headers.Add('transaction', TransactionIdentifier);
SendFrame(Frame);
end;
2016-10-17 14:14:45 +02:00
procedure TStompClient.OnHeartBeatErrorHandler(Sender: TObject);
begin
FHeartBeatThread.Terminate;
FHeartBeatThread.WaitFor;
FHeartBeatThread.Free;
FHeartBeatThread := nil;
Disconnect;
DoHeartBeatErrorHandler;
end;
procedure TStompClient.OpenSSLGetPassword(var Password: String);
begin
Password := FsslKeyPass;
end;
2016-10-14 15:19:06 +02:00
procedure TStompClient.ParseHeartBeat(Headers: IStompHeaders);
var
lValue: string;
lIntValue: string;
begin
FServerOutgoingHeartBeats := 0;
FServerIncomingHeartBeats := 0;
2016-10-17 14:14:45 +02:00
// WARNING!! server heart beat is reversed
2016-10-14 15:19:06 +02:00
lValue := Headers.Value('heart-beat');
if Trim(lValue) <> '' then
begin
lIntValue := Fetch(lValue, ',');
FServerIncomingHeartBeats := StrToInt(lIntValue);
FServerOutgoingHeartBeats := StrToInt(lValue);
end;
end;
procedure TStompClient.Receipt(const ReceiptID: string);
var
Frame: IStompFrame;
begin
if Receive(Frame, FReceiptTimeout) then
begin
2016-10-14 15:19:06 +02:00
if Frame.Command <> 'RECEIPT' then
raise EStomp.Create('Receipt command error');
2016-10-14 15:19:06 +02:00
if Frame.Headers.Value('receipt-id') <> ReceiptID then
raise EStomp.Create('Receipt receipt-id error');
end;
end;
2016-09-23 22:36:36 +02:00
function TStompClient.Receive(out StompFrame: IStompFrame;
ATimeout: Integer): boolean;
begin
StompFrame := nil;
StompFrame := Receive(ATimeout);
Result := Assigned(StompFrame);
end;
function TStompClient.Receive(ATimeout: Integer): IStompFrame;
{$IFDEF USESYNAPSE}
function InternalReceiveSynapse(ATimeout: Integer): IStompFrame;
var
2015-01-21 14:59:20 +01:00
c: char;
s: string;
tout: boolean;
begin
2013-10-15 10:14:36 +02:00
tout := False;
Result := nil;
try
try
FSynapseTCP.SetRecvTimeout(ATimeout);
s := '';
try
while True do
begin
c := Chr(FSynapseTCP.RecvByte(ATimeout));
if c <> CHAR0 then
2016-09-23 22:36:36 +02:00
s := s + c
// should be improved with a string buffer (daniele.teti)
else
begin
c := Chr(FSynapseTCP.RecvByte(ATimeout));
Break;
end;
end;
except
on E: ESynapseError do
begin
if E.ErrorCode = WSAETIMEDOUT then
tout := True
else
raise;
end;
on E: Exception do
begin
raise;
end;
end;
if not tout then
begin
Result := StompUtils.CreateFrame(s + CHAR0);
end;
finally
s := '';
end;
except
on E: Exception do
begin
raise;
end;
end;
end;
{$ELSE}
function InternalReceiveINDY(ATimeout: Integer): IStompFrame;
var
2016-10-14 15:19:06 +02:00
lLine: string;
2016-06-28 15:21:47 +02:00
lSBuilder: TStringBuilder;
Headers: TIdHeaderList;
ContentLength: Integer;
Charset: string;
2016-10-14 15:19:06 +02:00
lHeartBeat: boolean;
lTimestampFirstReadLn: TDateTime;
2015-01-21 14:59:20 +01:00
{$IF CompilerVersion < 24}
2016-06-28 15:21:47 +02:00
Encoding: TIdTextEncoding;
FreeEncoding: boolean;
2015-01-21 14:59:20 +01:00
{$ELSE}
2016-06-28 15:21:47 +02:00
Encoding: IIdTextEncoding;
2015-01-21 14:59:20 +01:00
{$ENDIF}
2016-06-28 15:21:47 +02:00
begin
Result := nil;
2016-06-28 15:21:47 +02:00
lSBuilder := TStringBuilder.Create(1024 * 4);
try
2016-07-26 11:21:07 +02:00
FTCP.Socket.ReadTimeout := ATimeout;
2016-06-28 15:21:47 +02:00
FTCP.Socket.DefStringEncoding :=
{$IF CompilerVersion < 24}TIdTextEncoding.UTF8{$ELSE}IndyTextEncoding_UTF8{$ENDIF};
try
2016-10-14 15:19:06 +02:00
lTimestampFirstReadLn := Now;
2016-06-28 15:21:47 +02:00
// read command line
2016-10-14 15:19:06 +02:00
while True do
begin
lLine := FTCP.Socket.ReadLn(LF, ATimeout, -1,
FTCP.Socket.DefStringEncoding);
if FTCP.Socket.ReadLnTimedout then
Break;
lHeartBeat := lLine = ''; // here is not timeout because of the previous line
if FServerProtocolVersion = '1.1' then // 1.1 supports heart-beats
begin
if (not lHeartBeat) or (lLine <> '') then
Break;
if MilliSecondsBetween(lTimestampFirstReadLn, Now) >= ATimeout then
Break;
end
else
Break; // 1.0
end;
if lLine = '' then
2016-07-26 17:41:05 +02:00
Exit(nil);
2016-10-14 15:19:06 +02:00
lSBuilder.Append(lLine + LF);
2016-06-28 15:21:47 +02:00
// read headers
Headers := TIdHeaderList.Create(QuotePlain);
try
2016-06-28 15:21:47 +02:00
repeat
2016-10-14 15:19:06 +02:00
lLine := FTCP.Socket.ReadLn;
lSBuilder.Append(lLine + LF);
if lLine = '' then
Break;
2016-10-14 15:19:06 +02:00
// in case of duplicated header, only the first is considered
// https://stomp.github.io/stomp-specification-1.1.html#Repeated_Header_Entries
if Headers.IndexOfName(Fetch(lLine, ':', False, False)) = -1 then
Headers.Add(lLine);
2016-06-28 15:21:47 +02:00
until False;
// read body
//
// NOTE: non-text data really should be read as a Stream instead of a String!!!
//
if IsHeaderMediaType(Headers.Values['content-type'], 'text') then
begin
2016-06-28 15:21:47 +02:00
Charset := Headers.Params['content-type', 'charset'];
if Charset = '' then
Charset := 'utf-8';
Encoding := CharsetToEncoding(Charset);
{$IF CompilerVersion < 24}
FreeEncoding := True;
{$ENDIF}
end
else
begin
2016-06-28 15:21:47 +02:00
Encoding := IndyTextEncoding_8Bit();
{$IF CompilerVersion < 24}
FreeEncoding := False;
{$ENDIF}
end;
{$IF CompilerVersion < 24}
try
{$ENDIF}
if Headers.IndexOfName('content-length') <> -1 then
begin
// length specified, read exactly that many bytes
ContentLength := IndyStrToInt(Headers.Values['content-length']);
if ContentLength > 0 then
begin
2016-10-14 15:19:06 +02:00
lLine := FTCP.Socket.ReadString(ContentLength, Encoding);
lSBuilder.Append(lLine);
2016-06-28 15:21:47 +02:00
end;
// frame must still be terminated by a null
2016-07-26 17:41:05 +02:00
FTCP.Socket.ReadLn(#0 + LF);
2016-06-28 15:21:47 +02:00
end
else
2016-06-28 15:21:47 +02:00
begin
// no length specified, body terminated by frame terminating null
2016-10-14 15:19:06 +02:00
lLine := FTCP.Socket.ReadLn(#0 + LF, Encoding);
lSBuilder.Append(lLine);
2016-06-28 15:21:47 +02:00
end;
lSBuilder.Append(#0);
{$IF CompilerVersion < 24}
finally
if FreeEncoding then
Encoding.Free;
end;
2016-06-28 15:21:47 +02:00
{$ENDIF}
finally
Headers.Free;
end;
2016-06-28 15:21:47 +02:00
except
on E: Exception do
begin
2016-06-28 15:21:47 +02:00
if lSBuilder.Length > 0 then
raise EStomp.Create(E.message + sLineBreak + lSBuilder.toString)
else
raise;
end;
end;
2016-06-28 15:21:47 +02:00
Result := StompUtils.CreateFrame(lSBuilder.toString);
2016-10-14 15:19:06 +02:00
if Result.Command = 'ERROR' then
2016-06-28 15:21:47 +02:00
raise EStomp.Create(FormatErrorFrame(Result));
finally
lSBuilder.Free;
end;
end;
{$ENDIF}
2016-10-14 15:19:06 +02:00
begin
2015-01-21 14:59:20 +01:00
{$IFDEF USESYNAPSE}
2013-10-15 10:14:36 +02:00
Result := InternalReceiveSynapse(ATimeout);
2015-01-21 14:59:20 +01:00
{$ELSE}
2013-10-15 10:14:36 +02:00
Result := InternalReceiveINDY(ATimeout);
2013-10-14 14:28:17 +02:00
2015-01-21 14:59:20 +01:00
{$ENDIF}
end;
function TStompClient.Receive: IStompFrame;
begin
Result := Receive(FTimeout);
end;
2016-09-23 22:36:36 +02:00
procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string;
Headers: IStompHeaders);
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'SEND';
Frame.Headers.Add('destination', QueueOrTopicName);
Frame.Body := TextMessage;
MergeHeaders(Frame, Headers);
SendFrame(Frame);
end;
procedure TStompClient.Send(QueueOrTopicName: string; TextMessage: string;
TransactionIdentifier: string; Headers: IStompHeaders);
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'SEND';
Frame.Headers.Add('destination', QueueOrTopicName);
Frame.Headers.Add('transaction', TransactionIdentifier);
Frame.Body := TextMessage;
MergeHeaders(Frame, Headers);
SendFrame(Frame);
end;
procedure TStompClient.SendFrame(AFrame: IStompFrame);
begin
2016-10-14 15:19:06 +02:00
TMonitor.Enter(FLock);
Try
if Connected then // Test if error on Socket
begin
{$IFDEF USESYNAPSE}
if Assigned(FOnBeforeSendFrame) then
FOnBeforeSendFrame(AFrame);
FSynapseTCP.SendString(AFrame.output);
if Assigned(FOnAfterSendFrame) then
FOnAfterSendFrame(AFrame);
{$ELSE}
// FTCP.IOHandler.write(TEncoding.ASCII.GetBytes(AFrame.output));
if Assigned(FOnBeforeSendFrame) then
FOnBeforeSendFrame(AFrame);
{$IF CompilerVersion < 25}
FTCP.IOHandler.write(TEncoding.UTF8.GetBytes(AFrame.output));
{$IFEND}
{$IF CompilerVersion >= 25}
FTCP.IOHandler.write(IndyTextEncoding_UTF8.GetBytes(AFrame.output));
{$IFEND}
if Assigned(FOnAfterSendFrame) then
FOnAfterSendFrame(AFrame);
{$ENDIF}
end;
Finally
2016-10-14 15:19:06 +02:00
TMonitor.Exit(FLock);
End;
2016-10-14 15:19:06 +02:00
end;
procedure TStompClient.SendHeartBeat;
begin
TMonitor.Enter(FLock);
Try
if Connected then
begin
// Winapi.Windows.Beep(600, 200);
{$IFDEF USESYNAPSE}
FSynapseTCP.SendString(LF);
{$ELSE}
{$IF CompilerVersion < 25}
FTCP.IOHandler.write(TEncoding.UTF8.GetBytes(LF));
{$IFEND}
{$IF CompilerVersion >= 25}
FTCP.IOHandler.write(IndyTextEncoding_UTF8.GetBytes(LF));
{$IFEND}
{$ENDIF}
end;
Finally
2016-10-14 15:19:06 +02:00
TMonitor.Exit(FLock);
End;
2016-10-14 15:19:06 +02:00
end;
function TStompClient.ServerSupportsHeartBeat: boolean;
begin
Result := (FServerProtocolVersion = '1.1') and (FServerOutgoingHeartBeats > 0)
2016-09-23 22:36:36 +02:00
end;
2016-09-23 22:36:36 +02:00
procedure TStompClient.SetConnectionTimeout(const Value: UInt32);
begin
FConnectionTimeout := Value;
end;
2016-10-17 14:14:45 +02:00
function TStompClient.SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64)
: IStompClient;
2016-10-14 15:19:06 +02:00
begin
FOutgoingHeartBeats := OutgoingHeartBeats;
FIncomingHeartBeats := IncomingHeartBeats;
2016-10-17 14:14:45 +02:00
Result := Self;
2016-10-14 15:19:06 +02:00
end;
procedure TStompClient.SetOnConnect(const Value: TStompConnectNotifyEvent);
begin
FOnConnect := Value;
end;
function TStompClient.SetPassword(const Value: string): IStompClient;
begin
FPassword := Value;
Result := Self;
end;
procedure TStompClient.SetReceiptTimeout(const Value: Integer);
begin
FReceiptTimeout := Value;
end;
2016-09-23 22:36:36 +02:00
function TStompClient.SetReceiveTimeout(const AMilliSeconds: Cardinal)
: IStompClient;
begin
FTimeout := AMilliSeconds;
Result := Self;
end;
function TStompClient.SetUserName(const Value: string): IStompClient;
begin
FUserName := Value;
Result := Self;
end;
function TStompClient.SetUseSSL(const boUseSSL: boolean; const KeyFile,
CertFile, PassPhrase: string): IStompClient;
begin
FUseSSL := boUseSSL;
FsslKeyFile := KeyFile;
FsslCertFile := CertFile;
FsslKeyPass := PassPhrase;
Result := Self;
end;
2016-09-23 22:36:36 +02:00
procedure TStompClient.Subscribe(QueueOrTopicName: string;
Ack: TAckMode = TAckMode.amAuto; Headers: IStompHeaders = nil);
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'SUBSCRIBE';
Frame.Headers.Add('destination', QueueOrTopicName)
2016-09-23 22:36:36 +02:00
.Add('ack', StompUtils.AckModeToStr(Ack));
if Headers <> nil then
MergeHeaders(Frame, Headers);
SendFrame(Frame);
end;
procedure TStompClient.Unsubscribe(Queue: string; const subscriptionId: string = '');
var
Frame: IStompFrame;
begin
Frame := TStompFrame.Create;
2016-10-14 15:19:06 +02:00
Frame.Command := 'UNSUBSCRIBE';
Frame.Headers.Add('destination', Queue);
if subscriptionId <> '' then
Frame.Headers.Add('id', subscriptionId);
SendFrame(Frame);
end;
2016-10-14 15:19:06 +02:00
{ THeartBeatThread }
constructor THeartBeatThread.Create(StompClient: TStompClient; Lock: TObject;
OutgoingHeatBeatTimeout: Int64);
begin
inherited Create(True);
FStompClient := StompClient;
FLock := Lock;
FOutgoingHeatBeatTimeout := OutgoingHeatBeatTimeout;
end;
2016-10-17 14:14:45 +02:00
procedure THeartBeatThread.DoHeartBeatError;
begin
if Assigned(FOnHeartBeatError) then
begin
try
// TThread.Synchronize(nil,
// procedure
// begin
// FOnHeartBeatError(Self);
// end);
except
// do nothing here
end;
end;
end;
2016-10-14 15:19:06 +02:00
procedure THeartBeatThread.Execute;
var
lStart: TDateTime;
begin
while not Terminated do
begin
lStart := Now;
while (not Terminated) and (MilliSecondsBetween(Now, lStart) < FOutgoingHeatBeatTimeout) do
begin
Sleep(100);
end;
if not Terminated then
2016-10-17 14:14:45 +02:00
begin
// If the connection is down, the socket is invalidated so
// it is not necessary to informa the main thread about
// such kind of disconnection.
2016-10-14 15:19:06 +02:00
FStompClient.SendHeartBeat;
2016-10-17 14:14:45 +02:00
end;
2016-10-14 15:19:06 +02:00
end;
end;
end.