mirror of
https://github.com/danieleteti/delphimvcframework.git
synced 2024-11-16 00:05:53 +01:00
231 lines
7.3 KiB
ObjectPascal
231 lines
7.3 KiB
ObjectPascal
|
unit LoggerPro.NSQAppender;
|
|||
|
|
|||
|
interface
|
|||
|
|
|||
|
uses Classes, SysUtils, LoggerPro, System.Net.HttpClient;
|
|||
|
|
|||
|
type
|
|||
|
|
|||
|
{
|
|||
|
Author: St<EFBFBD>phane "Fulgan" GROBETY (https://github.com/Fulgan/)
|
|||
|
Log appender for NSQ (https://nsq.io) (https://github.com/nsqio/nsq)
|
|||
|
"NSQ is a realtime message processing system designed to operate at bitly's
|
|||
|
scale, handling billions of messages per day. It promotes distributed and
|
|||
|
decentralized topologies without single points of failure, enabling fault
|
|||
|
tolerance and high availability coupled with a reliable message delivery
|
|||
|
guarantee"
|
|||
|
|
|||
|
For testing, you can navigate to the NSQ folder and type the following commands:
|
|||
|
|
|||
|
This starts the NSQLookup service then starts a listener on the default
|
|||
|
endpoint(http:/127.0.0.1:4151)
|
|||
|
|
|||
|
start nsqlookupd
|
|||
|
start nsqd --lookupd-tcp-address=127.0.0.1:4160
|
|||
|
|
|||
|
|
|||
|
This starts a consumer for the topic "test" that outputs the messages to the console:
|
|||
|
|
|||
|
start nsq_tail --topic=test --lookupd-http-address=127.0.0.1:4161
|
|||
|
|
|||
|
(optional) This starts a consumer for the ephemeral topic "test" that outputs the messages to the console:
|
|||
|
|
|||
|
start nsq_tail --topic=test#ephemeral --lookupd-http-address=127.0.0.1:4161
|
|||
|
|
|||
|
(optional) This starts a NSQAdmin web interface that can be reached on http://localhost:4171/
|
|||
|
|
|||
|
start nsqadmin --lookupd-http-address=127.0.0.1:4161
|
|||
|
|
|||
|
Note about consumers:
|
|||
|
- If there is no consumer to received messages for a channel, NSQ will
|
|||
|
save them to memory and disk unless the topic has been marked as Ephemeral.
|
|||
|
Use NSQAdmin to delete any extra channel created.
|
|||
|
- Ephemeral topics are not saved or cached and the topic will be deleted
|
|||
|
once the last consumer disconnects
|
|||
|
- Writing a consumer is more complex than writing a client. A list of available
|
|||
|
client libraries can be found at https://nsq.io/clients/client_libraries.html
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
TOnCreateData = procedure(const sender : TObject; const LogItem: TLogItem; var Data: TStream);
|
|||
|
TOnNetSendError = procedure(const sender : TObject; const LogItem: TLogItem; const NetError: ENetHTTPClientException; var RetryCount: Integer);
|
|||
|
TLoggerProNSQAppenderBase = class(TLoggerProAppenderBase, ILogAppender)
|
|||
|
private
|
|||
|
FOnCreateData: TOnCreateData;
|
|||
|
FOnNetSendError: TOnNetSendError;
|
|||
|
procedure SetOnCreateData(const Value: TOnCreateData);
|
|||
|
procedure SetOnNetSendError(const Value: TOnNetSendError);
|
|||
|
protected
|
|||
|
FNSQUrl : string;
|
|||
|
FTopic: String;
|
|||
|
FUserName, FMachineName: string;
|
|||
|
FEphemeral: Boolean;
|
|||
|
FLastSignature: string;
|
|||
|
public
|
|||
|
const DEFAULT_NSQ_URL = 'http://127.0.0.1:4151';
|
|||
|
|
|||
|
function GetNSQUrl: string;
|
|||
|
procedure SetNSQUrl(const Value: string);
|
|||
|
function GetTopic: string;
|
|||
|
procedure SetTopic(const Value: string);
|
|||
|
procedure SetEphemeral(const Value: Boolean);
|
|||
|
/// <summary>TLoggerProNSQAppenderBase.Create
|
|||
|
/// </summary>
|
|||
|
/// <param name="aTopic"> (string) This is the "topic" of the channel. If left
|
|||
|
/// empty, the LogItem's tag will be used. </param>
|
|||
|
/// <param name="aEphemeral"> (Boolean) If true, the NSQ channel will be marked as
|
|||
|
/// Ephemeral: messages sent to this channel will neither be cached nor
|
|||
|
/// queued</param>
|
|||
|
/// <param name="aNSQUrl"> (string) URL of the NSQD service (usually, http://127.0.
|
|||
|
/// 0.1:4151)</param>
|
|||
|
/// <param name="aLogFormat"> (string) Log format to use if no custom log message
|
|||
|
/// creation event is defined </param>
|
|||
|
constructor Create(aTopic: string=''; aEphemeral: Boolean = False;
|
|||
|
aNSQUrl: string=DEFAULT_NSQ_URL;
|
|||
|
aLogFormat: string=DEFAULT_LOG_FORMAT);
|
|||
|
reintroduce;
|
|||
|
property NSQUrl: string read GetNSQUrl write SetNSQUrl;
|
|||
|
property Ephemeral: Boolean read FEphemeral write SetEphemeral;
|
|||
|
property OnCreateData: TOnCreateData read FOnCreateData write SetOnCreateData;
|
|||
|
property OnNetSendError: TOnNetSendError read FOnNetSendError write SetOnNetSendError;
|
|||
|
property Topic: string read GetTopic write SetTopic;
|
|||
|
procedure TearDown; override;
|
|||
|
procedure Setup; override;
|
|||
|
procedure WriteLog(const aLogItem: TLogItem); override;
|
|||
|
function CreateData(const SrcLogItem: TLogItem): TStream; virtual;
|
|||
|
end;
|
|||
|
|
|||
|
implementation
|
|||
|
|
|||
|
uses System.NetEncoding;
|
|||
|
|
|||
|
constructor TLoggerProNSQAppenderBase.Create(aTopic: string; aEphemeral: Boolean;
|
|||
|
aNSQUrl: string; aLogFormat: string);
|
|||
|
begin
|
|||
|
inherited Create(aLogFormat);
|
|||
|
FEphemeral := aEphemeral;
|
|||
|
FNSQUrl := 'http://127.0.0.1:4151';
|
|||
|
FUserName := aNSQUrl;
|
|||
|
FTopic := aTopic;
|
|||
|
end;
|
|||
|
|
|||
|
function TLoggerProNSQAppenderBase.CreateData(
|
|||
|
const SrcLogItem: TLogItem): TStream;
|
|||
|
begin
|
|||
|
result := nil;
|
|||
|
try
|
|||
|
if assigned(FOnCreateData) then
|
|||
|
begin
|
|||
|
FOnCreateData(Self, SrcLogItem, Result);
|
|||
|
end
|
|||
|
else
|
|||
|
begin
|
|||
|
result := TStringStream.Create(FormatLog(SrcLogItem), TEncoding.UTF8);
|
|||
|
end;
|
|||
|
except
|
|||
|
on e: Exception do
|
|||
|
begin
|
|||
|
FreeAndNil(Result);
|
|||
|
raise;
|
|||
|
end;
|
|||
|
end;
|
|||
|
end;
|
|||
|
|
|||
|
function TLoggerProNSQAppenderBase.GetNSQUrl: string;
|
|||
|
begin
|
|||
|
result := FNSQUrl;
|
|||
|
end;
|
|||
|
|
|||
|
function TLoggerProNSQAppenderBase.GetTopic: string;
|
|||
|
begin
|
|||
|
result := FTopic;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.SetEphemeral(const Value: Boolean);
|
|||
|
begin
|
|||
|
FEphemeral := Value;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.SetNSQUrl(const Value: string);
|
|||
|
begin
|
|||
|
FNSQUrl := value;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.SetOnCreateData(const Value: TOnCreateData);
|
|||
|
begin
|
|||
|
FOnCreateData := Value;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.SetOnNetSendError(
|
|||
|
const Value: TOnNetSendError);
|
|||
|
begin
|
|||
|
FOnNetSendError := Value;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.SetTopic(const Value: string);
|
|||
|
begin
|
|||
|
FTopic := value;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.Setup;
|
|||
|
begin
|
|||
|
inherited;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.TearDown;
|
|||
|
begin
|
|||
|
inherited;
|
|||
|
end;
|
|||
|
|
|||
|
procedure TLoggerProNSQAppenderBase.WriteLog(const aLogItem: TLogItem);
|
|||
|
var
|
|||
|
FHTTPCli: THTTPClient;
|
|||
|
URI: string;
|
|||
|
Data: TStream;
|
|||
|
TopicName: string;
|
|||
|
FRetryCount: Integer;
|
|||
|
begin
|
|||
|
FRetryCount := 0;
|
|||
|
FHTTPCli := THTTPClient.Create;
|
|||
|
try
|
|||
|
if Topic.trim.IsEmpty then
|
|||
|
TopicName := aLogItem.LogTag.Trim
|
|||
|
else
|
|||
|
TopicName := Topic.Trim;
|
|||
|
URI :=NSQUrl + '/pub?topic=' + TNetEncoding.URL.Encode(TopicName);
|
|||
|
if Ephemeral then
|
|||
|
URI := URI + '#ephemeral';
|
|||
|
Data := CreateData(aLogItem);
|
|||
|
if Assigned(Data) then
|
|||
|
begin
|
|||
|
repeat
|
|||
|
try
|
|||
|
// Set very short timeouts: this is a local call and we don't want to block the queue for too long.
|
|||
|
{$IF CompilerVersion >= 31}
|
|||
|
FHTTPCli.ConnectionTimeout := 100;
|
|||
|
FHTTPCli.ResponseTimeout := 200;
|
|||
|
{$ENDIF}
|
|||
|
Data.Seek(0, soFromBeginning);
|
|||
|
// ignore the respnse: as long as NSQD has accepted the POST, it will handle the result
|
|||
|
FHTTPCli.Post(URI, Data);
|
|||
|
break;
|
|||
|
except
|
|||
|
on e: ENetHTTPClientException do
|
|||
|
begin
|
|||
|
// if there is an event handler for net exception, call it
|
|||
|
if Assigned(FOnNetSendError) then
|
|||
|
OnNetSendError(self, aLogItem, e, FRetryCount);
|
|||
|
// if the handler has set FRetryCount to a positive value then retry the call
|
|||
|
if FRetryCount <= 0 then
|
|||
|
break;
|
|||
|
end;
|
|||
|
end;
|
|||
|
until false;
|
|||
|
end;
|
|||
|
finally
|
|||
|
FreeAndNil(FHTTPCli);
|
|||
|
end;
|
|||
|
end;
|
|||
|
|
|||
|
end.
|