From beed7bc0a45a66522797e391d98087ecb072fc93 Mon Sep 17 00:00:00 2001 From: Daniele Teti Date: Mon, 17 Oct 2016 14:14:45 +0200 Subject: [PATCH] fix to some hearbeat issue --- StompClient.pas | 63 ++++++++++++++++++++++++++++++++++++++++++++----- StompTypes.pas | 2 +- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/StompClient.pas b/StompClient.pas index 306b36a7..3b1e3540 100644 --- a/StompClient.pas +++ b/StompClient.pas @@ -84,6 +84,7 @@ type FHeartBeatThread: THeartBeatThread; FServerIncomingHeartBeats: Int64; FServerOutgoingHeartBeats: Int64; + FOnHeartBeatError: TNotifyEvent; procedure ParseHeartBeat(Headers: IStompHeaders); procedure SetReceiptTimeout(const Value: Integer); procedure SetConnectionTimeout(const Value: UInt32); @@ -103,6 +104,8 @@ type procedure SendHeartBeat; function FormatErrorFrame(const AErrorFrame: IStompFrame): string; function ServerSupportsHeartBeat: boolean; + procedure OnHeartBeatErrorHandler(Sender: TObject); + procedure DoHeartBeatErrorHandler; public function SetPassword(const Value: string): IStompClient; function SetUserName(const Value: string): IStompClient; @@ -138,7 +141,7 @@ type AcceptVersion: TStompAcceptProtocol = TStompAcceptProtocol. Ver_1_0): IStompClient; overload; virtual; destructor Destroy; override; - procedure SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64); + function SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64): IStompClient; function Clone: IStompClient; function Connected: boolean; function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; @@ -155,6 +158,7 @@ type write FOnBeforeSendFrame; property OnAfterSendFrame: TSenderFrameEvent read FOnAfterSendFrame write FOnAfterSendFrame; + property OnHeartBeatError: TNotifyEvent read FOnHeartBeatError write FOnHeartBeatError; end; THeartBeatThread = class(TThread) @@ -162,11 +166,14 @@ type FStompClient: TStompClient; FLock: TObject; FOutgoingHeatBeatTimeout: Int64; + FOnHeartBeatError: TNotifyEvent; protected procedure Execute; override; + procedure DoHeartBeatError; public constructor Create(StompClient: TStompClient; Lock: TObject; OutgoingHeatBeatTimeout: Int64); virtual; + property OnHeartBeatError: TNotifyEvent read FOnHeartBeatError write FOnHeartBeatError; end; implementation @@ -349,6 +356,7 @@ begin if ServerSupportsHeartBeat then begin FHeartBeatThread := THeartBeatThread.Create(Self, FLock, FServerOutgoingHeartBeats); + FHeartBeatThread.OnHeartBeatError := OnHeartBeatErrorHandler; FHeartBeatThread.Start; end; @@ -447,6 +455,17 @@ begin DeInit; end; +procedure TStompClient.DoHeartBeatErrorHandler; +begin + if Assigned(FOnHeartBeatError) then + begin + try + FOnHeartBeatError(Self); + except + end; + end; +end; + function TStompClient.FormatErrorFrame(const AErrorFrame: IStompFrame): string; begin if AErrorFrame.Command <> 'ERROR' then @@ -534,6 +553,16 @@ begin SendFrame(Frame); end; +procedure TStompClient.OnHeartBeatErrorHandler(Sender: TObject); +begin + FHeartBeatThread.Terminate; + FHeartBeatThread.WaitFor; + FHeartBeatThread.Free; + FHeartBeatThread := nil; + Disconnect; + DoHeartBeatErrorHandler; +end; + procedure TStompClient.ParseHeartBeat(Headers: IStompHeaders); var lValue: string; @@ -541,7 +570,7 @@ var begin FServerOutgoingHeartBeats := 0; FServerIncomingHeartBeats := 0; - //WARNING!! server heart beat is reversed + // WARNING!! server heart beat is reversed lValue := Headers.Value('heart-beat'); if Trim(lValue) <> '' then begin @@ -665,7 +694,7 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame; lHeartBeat := lLine = ''; // here is not timeout because of the previous line if lHeartBeat then - WinApi.Windows.Beep(1500,200); + Winapi.Windows.Beep(1500, 200); if FServerProtocolVersion = '1.1' then // 1.1 supports heart-beats begin @@ -850,7 +879,7 @@ procedure TStompClient.SendHeartBeat; begin TMonitor.Enter(FLock); try - Winapi.Windows.Beep(600, 200); + // Winapi.Windows.Beep(600, 200); {$IFDEF USESYNAPSE} FSynapseTCP.SendString(LF); {$ELSE} @@ -877,11 +906,12 @@ begin FConnectionTimeout := Value; end; -procedure TStompClient.SetHeartBeat(const OutgoingHeartBeats, - IncomingHeartBeats: Int64); +function TStompClient.SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64) + : IStompClient; begin FOutgoingHeartBeats := OutgoingHeartBeats; FIncomingHeartBeats := IncomingHeartBeats; + Result := Self; end; function TStompClient.SetPassword(const Value: string): IStompClient; @@ -943,6 +973,22 @@ begin FOutgoingHeatBeatTimeout := OutgoingHeatBeatTimeout; end; +procedure THeartBeatThread.DoHeartBeatError; +begin + if Assigned(FOnHeartBeatError) then + begin + try + // TThread.Synchronize(nil, + // procedure + // begin + // FOnHeartBeatError(Self); + // end); + except + // do nothing here + end; + end; +end; + procedure THeartBeatThread.Execute; var lStart: TDateTime; @@ -955,7 +1001,12 @@ begin Sleep(100); end; if not Terminated then + begin + // If the connection is down, the socket is invalidated so + // it is not necessary to informa the main thread about + // such kind of disconnection. FStompClient.SendHeartBeat; + end; end; end; diff --git a/StompTypes.pas b/StompTypes.pas index 9837c6da..cccc147b 100644 --- a/StompTypes.pas +++ b/StompTypes.pas @@ -101,7 +101,7 @@ type function SetPassword(const Value: string): IStompClient; function SetUserName(const Value: string): IStompClient; function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient; - procedure SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64); + function SetHeartBeat(const OutgoingHeartBeats, IncomingHeartBeats: Int64): IStompClient; function Connected: Boolean; function GetProtocolVersion: string; function GetServer: string;