delphimvcframework/sources/MVCFramework.MessagingController.pas

347 lines
10 KiB
ObjectPascal
Raw Normal View History

// ***************************************************************************
//
// Delphi MVC Framework
//
// Copyright (c) 2010-2017 Daniele Teti and the DMVCFramework Team
//
// https://github.com/danieleteti/delphimvcframework
//
// ***************************************************************************
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// *************************************************************************** }
2015-12-22 12:38:17 +01:00
2015-04-01 17:01:23 +02:00
unit MVCFramework.MessagingController;
2013-10-30 00:48:23 +01:00
{$I dmvcframework.inc}
2013-10-30 00:48:23 +01:00
interface
uses
System.SysUtils,
System.DateUtils,
System.SyncObjs,
2013-10-30 00:48:23 +01:00
MVCFramework,
MVCFramework.Commons,
MVCFramework.Logger,
MVCFramework.TypesAliases,
2013-10-30 00:48:23 +01:00
StompClient,
StompTypes;
type
[MVCPath('/messages')]
TMVCBUSController = class(TMVCController)
protected
function GetUniqueDurableHeader(AClientId, ATopicName: string): string;
procedure InternalSubscribeUserToTopics(AClientId: string; AStompClient: IStompClient);
procedure InternalSubscribeUserToTopic(AClientId: string; ATopicName: string; AStompClient: IStompClient);
2013-10-30 00:48:23 +01:00
procedure AddTopicToUserSubscriptions(const ATopic: string);
procedure RemoveTopicFromUserSubscriptions(const ATopic: string);
procedure OnBeforeAction(AContext: TWebContext; const AActionNAme: string; var AHandled: Boolean); override;
2013-10-30 00:48:23 +01:00
public
2015-04-01 17:01:23 +02:00
[MVCHTTPMethod([httpPOST])]
[MVCPath('/clients/($clientid)')]
procedure SetClientID(AContext: TWebContext);
2015-04-01 17:01:23 +02:00
[MVCPath('/subscriptions/($topicorqueue)/($name)')]
[MVCHTTPMethod([httpPOST])]
procedure SubscribeToTopic(AContext: TWebContext);
2015-04-01 17:01:23 +02:00
[MVCPath('/subscriptions/($topicorqueue)/($name)')]
[MVCHTTPMethod([httpDELETE])]
procedure UnSubscribeFromTopic(AContext: TWebContext);
2015-04-01 17:01:23 +02:00
[MVCHTTPMethod([httpGET])]
[MVCPath]
procedure ReceiveMessages(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
[MVCHTTPMethod([httpPOST])]
2015-04-01 17:01:23 +02:00
[MVCPath('/($type)/($topicorqueue)')]
procedure EnqueueMessage(AContext: TWebContext);
2015-04-01 17:01:23 +02:00
[MVCHTTPMethod([httpGET])]
[MVCPath('/subscriptions')]
procedure CurrentlySubscribedTopics(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
end;
implementation
{ TMVCBUSController }
procedure TMVCBUSController.AddTopicToUserSubscriptions(const ATopic: string);
var
x: string;
2013-10-30 00:48:23 +01:00
topics: TArray<string>;
t: string;
ToAdd: Boolean;
2013-10-30 00:48:23 +01:00
begin
x := Session['__subscriptions'];
topics := x.Split([';']);
ToAdd := true;
for t in topics do
if t.Equals(ATopic) then
begin
ToAdd := False;
end;
if ToAdd then
begin
SetLength(topics, length(topics) + 1);
topics[length(topics) - 1] := ATopic;
Session['__subscriptions'] := string.Join(';', topics);
end;
end;
procedure TMVCBUSController.CurrentlySubscribedTopics(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
begin
ContentType := TMVCMediaType.TEXT_PLAIN;
2013-10-30 00:48:23 +01:00
Render(Session['__subscriptions']);
end;
procedure TMVCBUSController.EnqueueMessage(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
var
topicname: string;
2015-04-01 17:01:23 +02:00
queuetype: string;
2013-10-30 00:48:23 +01:00
begin
queuetype := AContext.Request.Params['type'].Trim.ToLower;
2015-04-01 17:01:23 +02:00
if (queuetype <> 'topic') and (queuetype <> 'queue') then
raise EMVCException.Create('Valid type are "queue" or "topic", got ' + queuetype);
topicname := AContext.Request.Params['topicorqueue'].Trim;
2013-10-30 00:48:23 +01:00
if topicname.IsEmpty then
raise EMVCException.Create('Invalid or empty topic');
if not AContext.Request.ThereIsRequestBody then
2015-04-01 17:01:23 +02:00
raise EMVCException.Create('Body request required');
// EnqueueMessageOnTopicOrQueue(queuetype = 'queue', '/' + queuetype + '/' + topicname,
// CTX.Request.BodyAsJSONObject.Clone as TJSONObject, true);
2015-04-01 17:01:23 +02:00
// EnqueueMessage('/queue/' + topicname, CTX.Request.BodyAsJSONObject.Clone as TJSONObject, true);
2013-10-30 00:48:23 +01:00
Render(200, 'Message sent to topic ' + topicname);
end;
function TMVCBUSController.GetUniqueDurableHeader(AClientId, ATopicName: string): string;
2013-10-30 00:48:23 +01:00
begin
Result := AClientId + '___' + ATopicName.Replace('/', '_', [rfReplaceAll]);
2013-10-30 00:48:23 +01:00
end;
procedure TMVCBUSController.ReceiveMessages(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
var
Stomp: IStompClient;
2015-04-01 17:01:23 +02:00
LClientID: string;
frame: IStompFrame;
obj, res: TJSONObject;
2015-04-01 17:01:23 +02:00
LFrames: TArray<IStompFrame>;
arr: TJSONArray;
2015-04-01 17:01:23 +02:00
LLastReceivedMessageTS: TDateTime;
LTimeOut: Boolean;
2013-10-30 00:48:23 +01:00
const
{$IFDEF TEST}
2015-04-01 17:01:23 +02:00
RECEIVE_TIMEOUT = 5; // seconds
2013-10-30 00:48:23 +01:00
{$ELSE}
2013-10-30 00:48:23 +01:00
RECEIVE_TIMEOUT = 60 * 5; // 5 minutes
{$ENDIF}
2013-10-30 00:48:23 +01:00
begin
2015-04-01 17:01:23 +02:00
LTimeOut := False;
LClientID := GetClientID;
Stomp := GetNewStompClient(LClientID);
2013-10-30 00:48:23 +01:00
try
2015-04-01 17:01:23 +02:00
InternalSubscribeUserToTopics(LClientID, Stomp);
// StartReceiving := now;
2013-10-30 00:48:23 +01:00
2015-04-01 17:01:23 +02:00
LLastReceivedMessageTS := now;
SetLength(LFrames, 0);
2013-10-30 00:48:23 +01:00
while not IsShuttingDown do
begin
2015-04-01 17:01:23 +02:00
LTimeOut := False;
2013-10-30 00:48:23 +01:00
frame := nil;
Log.Info('/messages receive', ClassName);
2015-04-01 17:01:23 +02:00
Stomp.Receive(frame, 100);
if Assigned(frame) then
// get 10 messages at max, and then send them to client
2013-10-30 00:48:23 +01:00
begin
2015-04-01 17:01:23 +02:00
LLastReceivedMessageTS := now;
SetLength(LFrames, length(LFrames) + 1);
LFrames[length(LFrames) - 1] := frame;
2013-10-30 00:48:23 +01:00
Stomp.Ack(frame.MessageID);
2015-04-01 17:01:23 +02:00
if length(LFrames) >= 10 then
2013-10-30 00:48:23 +01:00
break;
end
else
begin
2015-04-01 17:01:23 +02:00
if (length(LFrames) > 0) then
2013-10-30 00:48:23 +01:00
break;
2015-04-01 17:01:23 +02:00
if SecondsBetween(now, LLastReceivedMessageTS) >= RECEIVE_TIMEOUT then
2013-10-30 00:48:23 +01:00
begin
2015-04-01 17:01:23 +02:00
LTimeOut := true;
2013-10-30 00:48:23 +01:00
break;
end;
end;
end;
arr := TJSONArray.Create;
res := TJSONObject.Create(TJSONPair.Create('messages', arr));
2015-04-01 17:01:23 +02:00
for frame in LFrames do
2013-10-30 00:48:23 +01:00
begin
if Assigned(frame) then
begin
obj := TJSONObject.ParseJSONValue(frame.GetBody) as TJSONObject;
if Assigned(obj) then
begin
arr.AddElement(obj);
end
else
begin
Log.Error(Format
('Not valid JSON object in topic requested by user %s. The raw message is "%s"',
[LClientID, frame.GetBody]), ClassName);
2013-10-30 00:48:23 +01:00
end;
end;
end; // for in
res.AddPair('_timestamp', FormatDateTime('yyyy-mm-dd hh:nn:ss', now));
2015-04-01 17:01:23 +02:00
if LTimeOut then
begin
res.AddPair('_timeout', TJSONTrue.Create);
// Render(http_status.RequestTimeout, res);
2015-04-01 17:01:23 +02:00
end
2013-10-30 00:48:23 +01:00
else
2015-04-01 17:01:23 +02:00
begin
res.AddPair('_timeout', TJSONFalse.Create);
// Render(http_status.OK, res);
2015-04-01 17:01:23 +02:00
end;
2013-10-30 00:48:23 +01:00
finally
// Stomp.Disconnect;
end;
end;
2015-04-01 17:01:23 +02:00
procedure TMVCBUSController.RemoveTopicFromUserSubscriptions(const ATopic: string);
2013-10-30 00:48:23 +01:00
var
x: string;
2013-10-30 00:48:23 +01:00
topics, afterremovaltopics: TArray<string>;
IndexToRemove: Integer;
i: Integer;
2013-10-30 00:48:23 +01:00
begin
x := Session['__subscriptions'];
topics := x.Split([';']);
IndexToRemove := 0;
SetLength(afterremovaltopics, length(topics));
for i := 0 to length(topics) - 1 do
begin
if not topics[i].Equals(ATopic) then
begin
afterremovaltopics[IndexToRemove] := topics[i];
Inc(IndexToRemove);
end;
end;
if IndexToRemove <> length(ATopic) - 1 then
SetLength(afterremovaltopics, length(topics) - 1);
if length(afterremovaltopics) = 0 then
Session['__subscriptions'] := ''
else
Session['__subscriptions'] := string.Join(';', afterremovaltopics);
end;
procedure TMVCBUSController.SetClientID(AContext: TWebContext);
2015-04-01 17:01:23 +02:00
begin
Session[CLIENTID_KEY] := AContext.Request.Params['clientid'];
2015-04-01 17:01:23 +02:00
end;
procedure TMVCBUSController.SubscribeToTopic(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
var
2015-04-01 17:01:23 +02:00
LStomp: IStompClient;
LClientID: string;
LTopicName: string;
LTopicOrQueue: string;
2013-10-30 00:48:23 +01:00
begin
2015-04-01 17:01:23 +02:00
LClientID := GetClientID;
LTopicName := AContext.Request.Params['name'].ToLower;
LTopicOrQueue := AContext.Request.Params['topicorqueue'].ToLower;
2015-04-01 17:01:23 +02:00
LStomp := GetNewStompClient(LClientID);
2013-10-30 00:48:23 +01:00
try
2015-04-01 17:01:23 +02:00
LTopicName := '/' + LTopicOrQueue + '/' + LTopicName;
InternalSubscribeUserToTopic(LClientID, LTopicName, LStomp);
Render(200, 'Subscription OK for ' + LTopicName);
2013-10-30 00:48:23 +01:00
finally
// Stomp.Disconnect;
end;
end;
procedure TMVCBUSController.InternalSubscribeUserToTopics(AClientId: string; AStompClient: IStompClient);
2013-10-30 00:48:23 +01:00
var
x, t: string;
2013-10-30 00:48:23 +01:00
topics: TArray<string>;
begin
x := Session['__subscriptions'];
topics := x.Split([';']);
for t in topics do
InternalSubscribeUserToTopic(AClientId, t, AStompClient);
2013-10-30 00:48:23 +01:00
end;
procedure TMVCBUSController.OnBeforeAction(AContext: TWebContext; const AActionNAme: string;
var AHandled: Boolean);
2013-10-30 00:48:23 +01:00
begin
inherited;
2014-04-16 22:52:25 +02:00
if not StrToBool(Config['messaging']) then
2013-10-30 00:48:23 +01:00
begin
AHandled := true;
2013-10-30 00:48:23 +01:00
raise EMVCException.Create('Messaging extensions are not enabled');
end;
AHandled := False;
2013-10-30 00:48:23 +01:00
end;
procedure TMVCBUSController.InternalSubscribeUserToTopic(AClientId, ATopicName: string;
AStompClient: IStompClient);
// var
// LDurSubHeader: string;
// LHeaders: IStompHeaders;
2013-10-30 00:48:23 +01:00
begin
2016-08-09 13:11:11 +02:00
raise EMVCException.Create('Not implemented');
// LHeaders := TStompHeaders.Create;
// LDurSubHeader := GetUniqueDurableHeader(clientid, topicname);
// LHeaders.Add(TStompHeaders.NewDurableSubscriptionHeader(LDurSubHeader));
//
// if topicname.StartsWith('/topic') then
// LHeaders.Add('id', clientid); //https://www.rabbitmq.com/stomp.html
//
// StompClient.Subscribe(topicname, amClient, LHeaders);
// LogE('SUBSCRIBE TO ' + clientid + '@' + topicname + ' dursubheader:' + LDurSubHeader);
// AddTopicToUserSubscriptions(topicname);
2013-10-30 00:48:23 +01:00
end;
procedure TMVCBUSController.UnSubscribeFromTopic(AContext: TWebContext);
2013-10-30 00:48:23 +01:00
var
Stomp: IStompClient;
2013-10-30 00:48:23 +01:00
clientid: string;
thename: string;
s: string;
2013-10-30 00:48:23 +01:00
begin
clientid := GetClientID;
thename := AContext.Request.Params['name'].ToLower;
2013-10-30 00:48:23 +01:00
Stomp := GetNewStompClient(clientid);
2015-04-01 17:01:23 +02:00
s := '/queue/' + thename;
2013-10-30 00:48:23 +01:00
Stomp.Unsubscribe(s);
RemoveTopicFromUserSubscriptions(s);
Render(200, 'UnSubscription OK for ' + s);
end;
end.