+ some change to the speed test

+ some formatting
+ some refactoring
This commit is contained in:
daniele.teti 2010-04-13 22:25:14 +00:00
parent 4f043bb2fa
commit 4e6dffa238
4 changed files with 1091 additions and 1093 deletions

View File

@ -19,10 +19,9 @@ unit StompClient;
{$MODE DELPHI}
{$DEFINE USESYNAPSE}
{$ENDIF}
// For Delphi users:
// Decomment following line to use synapse also in Delphi
{.$DEFINE USESYNAPSE}
{$DEFINE USESYNAPSE}
interface
@ -105,14 +104,12 @@ implementation
const
CHAR0 = #0;
{$ELSE}
uses
Windows,
IdGlobal,
Character;
{$ENDIF}
{ TStompClient }
@ -206,7 +203,7 @@ begin
Init;
{$IFDEF USESYNAPSE}
FSynapseTCP.Connect(Host, intToStr(Port));
FSynapseConnected:=true;
FSynapseConnected := True;
{$ELSE}
FTCP.Connect(Host, Port);
FTCP.IOHandler.MaxLineLength := MaxInt;
@ -283,7 +280,7 @@ begin
SendFrame(Frame);
{$IFDEF USESYNAPSE}
FSynapseTCP.CloseSocket;
FSynapseConnected:=false;
FSynapseConnected := False;
{$ELSE}
FTCP.Disconnect;
{$ENDIF}
@ -337,7 +334,6 @@ begin
end;
function TStompClient.Receive(ATimeout: Integer): IStompFrame;
{$IFDEF USESYNAPSE}
function InternalReceiveSynapse(ATimeout: Integer): IStompFrame;
var
@ -358,7 +354,7 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
if FSynapseTCP.LastError = WSAETIMEDOUT then
raise ESynapseTimeout.Create(FSynapseTCP.LastErrorDesc);
if c <> CHAR0 then
s := s + c
s := s + c //should be improved with a string buffer (daniele.teti)
else
begin
c := Chr(FSynapseTCP.RecvByte(ATimeout));
@ -419,7 +415,8 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
end;
except
on E: EIdReadTimeout do
begin tout := True;
begin
tout := True;
end;
on E: Exception do
begin
@ -441,11 +438,12 @@ function TStompClient.Receive(ATimeout: Integer): IStompFrame;
end;
end;
{$ENDIF}
begin
{$IFDEF USESYNAPSE}
result:=InternalReceiveSynapse(ATimeout);
Result := InternalReceiveSynapse(ATimeout);
{$ELSE}
result:=InternalReceiveINDY(ATimeout);
Result := InternalReceiveINDY(ATimeout);
{$ENDIF}
end;

View File

@ -12,10 +12,9 @@
{ ******************************************************* }
unit StompTypes;
{$ifdef FPC}
{$IFDEF FPC}
{$MODE DELPHI}
{$endif}
{$ENDIF}
interface
@ -49,8 +48,8 @@ type
function Remove(Key: string): IStompHeaders;
function IndexOf(Key: string): Integer;
function Count: Cardinal;
function GetAt(const Index: Integer): TKeyValue;
function Output: String;
function GetAt(const index: Integer): TKeyValue;
function Output: string;
end;
IStompFrame = interface
@ -72,9 +71,11 @@ type
procedure Receipt(const ReceiptID: string);
procedure Connect(Host: string = '127.0.0.1'; Port: Integer = 61613; ClientID: string = '');
procedure Disconnect;
procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto; Headers: IStompHeaders = nil);
procedure Subscribe(QueueOrTopicName: string; Ack: TAckMode = amAuto;
Headers: IStompHeaders = nil);
procedure Unsubscribe(Queue: string);
procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil); overload;
procedure Send(QueueOrTopicName: string; TextMessage: string; Headers: IStompHeaders = nil);
overload;
procedure Send(QueueOrTopicName: string; TextMessage: string; TransactionIdentifier: string;
Headers: IStompHeaders = nil); overload;
procedure Ack(const MessageID: string; const TransactionIdentifier: string = '');
@ -84,7 +85,7 @@ type
/// ////////////
function SetPassword(const Value: string): IStompClient;
function SetUserName(const Value: string): IStompClient;
function SetReceiveTimeout(const AMilliSeconds: cardinal): IStompClient;
function SetReceiveTimeout(const AMilliSeconds: Cardinal): IStompClient;
function Connected: Boolean;
end;
@ -95,9 +96,9 @@ type
procedure SetItems(index: Cardinal; const Value: TKeyValue);
public
class function NewDurableSubscriptionHeader(const SubscriptionName: String): TKeyValue;
class function NewDurableSubscriptionHeader(const SubscriptionName: string): TKeyValue;
class function NewPersistentHeader(const Value: Boolean): TKeyValue;
class function NewReplyToHeader(const DestinationName: String): TKeyValue;
class function NewReplyToHeader(const DestinationName: string): TKeyValue;
/// /////////////////////////////////////////////7
function Add(Key, Value: string): IStompHeaders; overload;
function Add(HeaderItem: TKeyValue): IStompHeaders; overload;
@ -105,10 +106,10 @@ type
function Remove(Key: string): IStompHeaders;
function IndexOf(Key: string): Integer;
function Count: Cardinal;
function GetAt(const Index: Integer): TKeyValue;
function GetAt(const index: Integer): TKeyValue;
constructor Create;
destructor Destroy; override;
function Output: String;
function Output: string;
property Items[index: Cardinal]: TKeyValue read GetItems write SetItems; default;
end;
@ -166,15 +167,29 @@ type
class function AckModeToStr(AckMode: TAckMode): string;
class function NewHeaders: IStompHeaders;
class function NewFrame: IStompFrame;
class function TimestampAsDateTime(const HeaderValue: String): TDateTime;
class function TimestampAsDateTime(const HeaderValue: string): TDateTime;
class function NewStomp(Host: string = '127.0.0.1';
Port: Integer = DEFAULT_STOMP_PORT; ClientID: string = ''; const UserName: string = 'guest';
const Password: string = 'guest'): IStompClient;
end;
implementation
uses
Dateutils;
Dateutils, StompClient;
class function TStompHeaders.NewDurableSubscriptionHeader(const SubscriptionName: String): TKeyValue;
class function StompUtils.NewStomp(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT;
ClientID: string = ''; const UserName: string = 'guest'; const Password: string = 'guest')
: IStompClient;
begin
Result := TStompClient.Create;
Result.SetUserName(UserName);
Result.SetPassword(Password);
Result.Connect(Host, Port, ClientID);
end;
class function TStompHeaders.NewDurableSubscriptionHeader(const SubscriptionName: string)
: TKeyValue;
begin
Result.Key := 'activemq.subscriptionName';
Result.Value := SubscriptionName;
@ -186,7 +201,7 @@ begin
Result.Value := LowerCase(BoolToStr(Value, true));
end;
class function TStompHeaders.NewReplyToHeader(const DestinationName: String): TKeyValue;
class function TStompHeaders.NewReplyToHeader(const DestinationName: string): TKeyValue;
begin
Result.Key := 'reply-to';
Result.Value := DestinationName;
@ -197,7 +212,7 @@ begin
Result := TStompHeaders.Create;
end;
class function StompUtils.TimestampAsDateTime(const HeaderValue: String): TDateTime;
class function StompUtils.TimestampAsDateTime(const HeaderValue: string): TDateTime;
begin
Result := EncodeDateTime(1970, 1, 1, 0, 0, 0, 0) + StrToInt64(HeaderValue) / 86400000;
end;
@ -241,7 +256,7 @@ begin
Result := FHeaders;
end;
function TStompFrame.Output: String;
function TStompFrame.Output: string;
begin
Result := FCommand + LINE_END + FHeaders.Output + LINE_END + FBody + LINE_END + COMMAND_END;
end;
@ -314,7 +329,7 @@ begin
Value := Copy(line, p + 1, Length(line) - p);
Result.Headers.Add(Key, Value);
end;
other := Copy(Buf, i, High(Integer));
other := Copy(Buf, i, high(Integer));
sContLen := Result.Headers.Value('content-length');
if (sContLen <> '') then
begin
@ -344,7 +359,7 @@ begin
on e: Exception do
begin
Result.Free;
raise EStomp.Create(e.Message);
raise EStomp.Create(e.message);
end;
end;
end;
@ -394,9 +409,9 @@ begin
inherited;
end;
function TStompHeaders.GetAt(const Index: Integer): TKeyValue;
function TStompHeaders.GetAt(const index: Integer): TKeyValue;
begin
Result := GetItems(Index)
Result := GetItems(index)
end;
function TStompHeaders.GetItems(index: Cardinal): TKeyValue;
@ -419,7 +434,7 @@ begin
end;
end;
function TStompHeaders.Output: String;
function TStompHeaders.Output: string;
var
i: Integer;
kv: TKeyValue;
@ -471,7 +486,8 @@ end;
{ TStompListener }
constructor TStompClientListener.Create(StompClient: IStompClient; StompClientListener: IStompClientListener);
constructor TStompClientListener.Create(StompClient: IStompClient;
StompClientListener: IStompClientListener);
begin
inherited Create(true);
FStompClientListener := StompClientListener;

View File

@ -15,15 +15,6 @@ uses
StompTypes,
Diagnostics;
function NewStomp(Host: string = '127.0.0.1'; Port: Integer = DEFAULT_STOMP_PORT;
ClientID: string = ''): IStompClient;
begin
Result := TStompClient.Create;
Result.SetUserName('guest');
Result.SetPassword('guest');
Result.Connect(Host, Port, ClientID);
end;
procedure Test_Unicode_Chars(serveraddress: string);
var
stomp: IStompClient;
@ -33,7 +24,7 @@ const
SVEDESE = 'Vad är Unicode';
ITALIANO = 'Cos''è Unicode';
begin
stomp := NewStomp(serveraddress);
stomp := StompUtils.NewStomp(serveraddress);
stomp.Subscribe('/topic/unicode');
stomp.Send('/topic/unicode', ITALIANO);
stomp.Send('/topic/unicode', SERBO);
@ -64,8 +55,8 @@ const
BODY3 = 'Hello World 3';
BODY4 = 'Hello World 4';
begin
stomp := NewStomp;
recv := NewStomp;
stomp := StompUtils.NewStomp;
recv := StompUtils.NewStomp;
stomp.Subscribe(TOPIC);
recv.Subscribe(TOPIC);
@ -103,65 +94,58 @@ end;
procedure Main(serveraddress: string = 'localhost');
var
stomp: TStompClient;
stomp: IStompClient;
frame: IStompFrame;
i, c: Integer;
msgcount: Cardinal;
sw: TStopWatch;
sw, sw1: TStopWatch;
message_data: string;
const
MSG = 1000;
MSG_SIZE = 1000;
MSG = 5000;
MSG_SIZE = 300;
begin
sw1 := TStopWatch.StartNew;
message_data := StringOfChar('X', MSG_SIZE);
WriteLn('TEST MESSAGE (', length(message_data) * sizeof(char), ' bytes):', #13#10, '"',
WriteLn('TEST MESSAGE IS (', length(message_data) * sizeof(char), ' bytes):', #13#10, '"',
message_data, '"'#13#10#13#10);
stomp := TStompClient.Create;
try
stomp.SetUserName('Daniele');
stomp.SetPassword('Paperino');
stomp.Connect(serveraddress);
stomp := StompUtils.NewStomp(serveraddress, DEFAULT_STOMP_PORT, '', 'Daniele', 'Teti');
stomp.Subscribe('/topic/foo.bar');
for c := 1 to 10 do
for c := 1 to 6 do
begin
WriteLn;
WriteLn('= STATS LOOP ', c, '=======================================');
sw := TStopWatch.StartNew;
for i := 1 to MSG do
begin
stomp.Send('/topic/foo.bar', message_data,
StompUtils.NewHeaders.Add(TStompHeaders.NewPersistentHeader(true)));
// '01234567890123456789012345678901234567890123456789'
if i mod 1000 = 0 then
WriteLn('Queued ', i, ' messages');
end;
WriteLn('Queued ', MSG, ' messages in ', sw.ElapsedMilliseconds, ' ms');
WriteLn('Now dequeuing...');
msgcount := 0;
sw.start;
sw := TStopWatch.StartNew;
while msgcount < MSG do
begin
frame := stomp.Receive;
if assigned(frame) then
begin
inc(msgcount);
assert(frame.GetBody = message_data);
frame := nil;
end
end;
sw.Stop;
WriteLn(msgcount, ' in ', sw.ElapsedMilliseconds, ' milliseconds and ', sw.ElapsedTicks,
' ticks');
WriteLn('Throughput: ');
WriteLn(FormatFloat('###,##0.000', sw.ElapsedMilliseconds / msgcount), ' ms/msg');
WriteLn(FormatFloat('###,##0.000', msgcount / sw.ElapsedMilliseconds), ' msg/ms');
WriteLn('= END LOOP ', c, '========================================='#13#10);
WriteLn('Dequeued ', msgcount, ' stomp messages in ', sw.ElapsedMilliseconds, ' ms');
WriteLn('Throughput: ',
FormatFloat('###,##0.000', sw.ElapsedMilliseconds / msgcount), ' ms/msg (',
FormatFloat('###,##0.000', msgcount / sw.ElapsedMilliseconds), ' msg/ms)');
// WriteLn('= END LOOP ', c, '========================================='#13#10);
end;
stomp.Unsubscribe('/topic/foo.bar');
stomp.Disconnect;
write('test finished...');
finally
stomp.Free;
end;
sw.Stop;
WriteLn('SPEED TEST FINISHED IN ', FormatFloat('###,##0.000', sw1.ElapsedMilliseconds / 1000),
' seconds');
end;
end.

View File

@ -9,7 +9,7 @@ uses
begin
try
// Main;
Main;
MainWithTransaction;
// Test_Unicode_Chars; //Non passa
Writeln('ALL TESTS OK');
@ -18,5 +18,5 @@ begin
Writeln(E.Classname, ': ', E.message);
end;
// readln;
readln;
end.