diff --git a/Apps/MQTTClient/MQTTClientApp.lps b/Apps/MQTTClient/MQTTClientApp.lps index 667bba9..54a7dcb 100644 --- a/Apps/MQTTClient/MQTTClientApp.lps +++ b/Apps/MQTTClient/MQTTClientApp.lps @@ -4,12 +4,12 @@ - + - - + + @@ -18,10 +18,10 @@ + - - - + + @@ -31,10 +31,9 @@ - - + - + @@ -43,14 +42,14 @@ - + - + @@ -58,7 +57,7 @@ - + @@ -67,7 +66,7 @@ - + @@ -76,7 +75,7 @@ - + @@ -85,36 +84,31 @@ - + - - - - - - - - - + + + + - - - - + + + + - - - - + + + + @@ -122,42 +116,43 @@ - + - + - - - - + + + + + - + - + - + @@ -165,145 +160,191 @@ - + - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + diff --git a/Apps/MQTTServer/MQTTServerApp.lpi b/Apps/MQTTServer/MQTTServerApp.lpi index 4661ea7..31823ed 100644 --- a/Apps/MQTTServer/MQTTServerApp.lpi +++ b/Apps/MQTTServer/MQTTServerApp.lpi @@ -25,16 +25,19 @@ - + - + - + - + + + + @@ -92,6 +95,7 @@ + diff --git a/Apps/MQTTServer/MQTTServerApp.lps b/Apps/MQTTServer/MQTTServerApp.lps index 91e0895..7c2424f 100644 --- a/Apps/MQTTServer/MQTTServerApp.lps +++ b/Apps/MQTTServer/MQTTServerApp.lps @@ -7,7 +7,6 @@ - @@ -20,8 +19,9 @@ - - + + + @@ -33,9 +33,9 @@ - + - + @@ -47,7 +47,7 @@ - + @@ -132,7 +132,7 @@ - + @@ -141,14 +141,15 @@ - - - + + - + + + @@ -157,15 +158,15 @@ - + - + - + @@ -181,7 +182,7 @@ - + @@ -216,15 +217,16 @@ - - + + + - + @@ -554,130 +556,130 @@ - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - - + + - + - - + + - - + + - - + + - - + + - - + + diff --git a/Apps/MQTTServer/MQTTServerApp.res b/Apps/MQTTServer/MQTTServerApp.res index 4ff746d..877868c 100644 Binary files a/Apps/MQTTServer/MQTTServerApp.res and b/Apps/MQTTServer/MQTTServerApp.res differ diff --git a/Forms/clientfm.lfm b/Forms/clientfm.lfm index be16ecf..f0c408c 100644 --- a/Forms/clientfm.lfm +++ b/Forms/clientfm.lfm @@ -1,7 +1,7 @@ object ClientForm: TClientForm - Left = 546 + Left = 549 Height = 381 - Top = 169 + Top = 191 Width = 695 Caption = 'MQTT Client' ClientHeight = 381 @@ -53,9 +53,9 @@ object ClientForm: TClientForm Height = 296 Top = 48 Width = 664 - ActivePage = LogTab + ActivePage = PacketsInMemTab Anchors = [akTop, akLeft, akRight, akBottom] - TabIndex = 2 + TabIndex = 3 TabOrder = 4 object SubscriptionsTab: TTabSheet Caption = 'Subscriptions' @@ -261,6 +261,31 @@ object ClientForm: TClientForm ) end end + object PacketsInMemTab: TTabSheet + Caption = 'PacketsInMem' + ClientHeight = 265 + ClientWidth = 654 + OnContextPopup = PacketsInMemTabContextPopup + object PacketsMemo: TMemo + Left = 0 + Height = 240 + Top = 25 + Width = 654 + Align = alClient + ScrollBars = ssVertical + TabOrder = 0 + end + object RefreshPacketsBtn: TButton + Left = 0 + Height = 25 + Top = 0 + Width = 654 + Align = alTop + Caption = 'Refresh' + OnClick = RefreshPacketsBtnClick + TabOrder = 1 + end + end end object ConnectBtn: TButton Left = 16 diff --git a/Forms/clientfm.pas b/Forms/clientfm.pas index 3eabc2b..349e0b1 100644 --- a/Forms/clientfm.pas +++ b/Forms/clientfm.pas @@ -7,7 +7,7 @@ interface uses Classes, SysUtils, FileUtil, Forms, Controls, Graphics, Dialogs, StdCtrls, Grids, ComCtrls, Menus, ExtCtrls, Buffers, Logging, lNetComponents, - MQTTConsts, MQTTClient, MQTTSubscriptions, lNet; + MQTTConsts, MQTTClient, MQTTSubscriptions, lNet, Types; type TDebugMessage = record @@ -20,6 +20,7 @@ TDebugMessage = record { TClientForm } TClientForm = class(TForm) + RefreshPacketsBtn: TButton; CBEnabled: TCheckBox; CBFiltered: TCheckBox; cbShowDebugMessages: TCheckBox; @@ -30,8 +31,10 @@ TClientForm = class(TForm) FilterText: TEdit; LogGrid: TStringGrid; LogToolbarPanel: TPanel; + PacketsMemo: TMemo; StatusBar: TStatusBar; LogTab: TTabSheet; + PacketsInMemTab: TTabSheet; TCP: TLTCPComponent; RefreshSubscriptionsItm: TMenuItem; PageControl: TPageControl; @@ -59,8 +62,11 @@ TClientForm = class(TForm) procedure FormCreate(Sender: TObject); procedure FormDestroy(Sender: TObject); procedure PublishBtnClick(Sender: TObject); + procedure RefreshPacketsBtnClick(Sender: TObject); procedure RefreshSubscriptionsItmClick(Sender: TObject); procedure SubscribeBtnClick(Sender: TObject); + procedure PacketsInMemTabContextPopup(Sender: TObject; MousePos: TPoint; + var Handled: Boolean); procedure TCPCanSend(aSocket: TLSocket); procedure TCPConnect(aSocket: TLSocket); procedure TCPDisconnect(aSocket: TLSocket); @@ -87,7 +93,7 @@ implementation {$R *.lfm} uses - ConnectFM, PublishFM, SubscribeFM; + MQTTPackets, MQTTPacketDefs, ConnectFM, PublishFM, SubscribeFM; { TClientForm } @@ -299,6 +305,12 @@ procedure TClientForm.SubscribeBtnClick(Sender: TObject); end; end; +procedure TClientForm.PacketsInMemTabContextPopup(Sender: TObject; MousePos: TPoint; + var Handled: Boolean); +begin + +end; + procedure TClientForm.UnsubscribeBtnClick(Sender: TObject); var LSubscriptions: TMQTTSubscriptionList; @@ -380,6 +392,12 @@ procedure TClientForm.PublishBtnClick(Sender: TObject); Client.Publish(PublishForm.edTopic.Text,PublishForm.edMessage.Text,TMQTTQOSType(PublishForm.cbQOS.ItemIndex),PublishForm.cbRetain.Checked); end; +procedure TClientForm.RefreshPacketsBtnClick(Sender: TObject); +begin + PacketsMemo.Clear; + PacketList.Dump(PacketsMemo.Lines); +end; + procedure TClientForm.ClearRecords; var I: Integer; diff --git a/Forms/serverfm.lfm b/Forms/serverfm.lfm index 891cfbb..345a6d4 100644 --- a/Forms/serverfm.lfm +++ b/Forms/serverfm.lfm @@ -5,31 +5,31 @@ object ServerForm: TServerForm Width = 817 ActiveControl = PageControl Caption = 'MQTT Server' - ClientHeight = 348 + ClientHeight = 346 ClientWidth = 817 Menu = MainMenu OnCreate = FormCreate OnDestroy = FormDestroy Position = poScreenCenter - LCLVersion = '1.6.4.0' + LCLVersion = '1.8.2.0' object PageControl: TPageControl Left = 0 - Height = 348 + Height = 346 Top = 0 Width = 817 - ActivePage = ConnectionsTab + ActivePage = PacketsInMemTab Align = alClient - TabIndex = 0 + TabIndex = 5 TabOrder = 0 object ConnectionsTab: TTabSheet Caption = 'Connections' - ClientHeight = 313 - ClientWidth = 811 + ClientHeight = 315 + ClientWidth = 807 object ConnectionsGrid: TStringGrid Left = 0 - Height = 313 + Height = 315 Top = 0 - Width = 811 + Width = 807 Align = alClient AutoEdit = False AutoFillColumns = True @@ -54,7 +54,7 @@ object ServerForm: TServerForm ButtonStyle = cbsEllipsis SizePriority = 0 Title.Caption = 'Will Message' - Width = 509 + Width = 505 end> DefaultRowHeight = 20 Enabled = False @@ -68,14 +68,14 @@ object ServerForm: TServerForm 80 120 100 - 509 + 505 ) end end object SubscriptionsTab: TTabSheet Caption = 'Subscriptions' - ClientHeight = 313 - ClientWidth = 811 + ClientHeight = 315 + ClientWidth = 807 object SubscriptionsGrid: TStringGrid Left = 0 Height = 336 @@ -105,6 +105,7 @@ object ServerForm: TServerForm SizePriority = 0 Title.Alignment = taRightJustify Title.Caption = 'Age' + Width = 64 end> DefaultRowHeight = 20 Enabled = False @@ -124,8 +125,8 @@ object ServerForm: TServerForm end object SessionsTab: TTabSheet Caption = 'Sessions' - ClientHeight = 313 - ClientWidth = 811 + ClientHeight = 315 + ClientWidth = 807 object SessionsGrid: TStringGrid Left = 0 Height = 336 @@ -172,8 +173,8 @@ object ServerForm: TServerForm end object RetainedMessagesTab: TTabSheet Caption = 'Retained Messages' - ClientHeight = 313 - ClientWidth = 811 + ClientHeight = 315 + ClientWidth = 807 object RetainedMessagesGrid: TStringGrid Left = 0 Height = 336 @@ -220,8 +221,8 @@ object ServerForm: TServerForm end object TabSheet1: TTabSheet Caption = 'Log' - ClientHeight = 313 - ClientWidth = 811 + ClientHeight = 315 + ClientWidth = 807 object LogGrid: TStringGrid Left = 0 Height = 286 @@ -312,6 +313,30 @@ object ServerForm: TServerForm end end end + object PacketsInMemTab: TTabSheet + Caption = 'PacketsInMem' + ClientHeight = 315 + ClientWidth = 807 + object RefreshPacketListBtn: TButton + Left = 0 + Height = 25 + Top = 0 + Width = 807 + Align = alTop + Caption = 'Refresh' + OnClick = RefreshPacketListBtnClick + TabOrder = 0 + end + object PacketListMemo: TMemo + Left = 0 + Height = 290 + Top = 25 + Width = 807 + Align = alClient + ScrollBars = ssVertical + TabOrder = 1 + end + end end object MainMenu: TMainMenu left = 256 @@ -374,7 +399,6 @@ object ServerForm: TServerForm end end object Server: TMQTTServer - MaximumQOS = qtAT_MOST_ONCE RequireAuthentication = False AllowNullClientIDs = True OnAccepted = ServerAccepted diff --git a/Forms/serverfm.pas b/Forms/serverfm.pas index 46f3551..0bff09b 100644 --- a/Forms/serverfm.pas +++ b/Forms/serverfm.pas @@ -25,6 +25,8 @@ TDebugMessage = record { TServerForm } TServerForm = class(TForm) + PacketListMemo: TMemo; + RefreshPacketListBtn: TButton; CBEnabled: TCheckBox; CBFiltered: TCheckBox; ClearBtn: TButton; @@ -36,6 +38,7 @@ TServerForm = class(TForm) RetainedMessagesGrid: TStringGrid; RetainedMessagesTab: TTabSheet; TabSheet1: TTabSheet; + PacketsInMemTab: TTabSheet; TCP: TLTCPComponent; SessionsGrid: TStringGrid; MainMenu: TMainMenu; @@ -72,6 +75,7 @@ TServerForm = class(TForm) procedure LoadConfigurationItmClick(Sender: TObject); procedure PropertiesItmClick(Sender: TObject); procedure RefreshConnectionsItmClick(Sender: TObject); + procedure RefreshPacketListBtnClick(Sender: TObject); procedure RefreshRetainedMessagesItmClick(Sender: TObject); procedure RefreshSessionsItmClick(Sender: TObject); procedure RefreshSubscriptionsItmClick(Sender: TObject); @@ -115,7 +119,7 @@ implementation {$R *.lfm} uses - IniFiles, HelpFM, ServerPropertiesFM; + MQTTPackets, MQTTPacketDefs, IniFiles, HelpFM, ServerPropertiesFM; { TServerForm } @@ -537,6 +541,12 @@ procedure TServerForm.RefreshConnectionsItmClick(Sender: TObject); end; end; +procedure TServerForm.RefreshPacketListBtnClick(Sender: TObject); +begin + PacketListMemo.Clear; + PacketList.Dump(PacketListMemo.Lines); +end; + procedure TServerForm.RefreshSessionsItmClick(Sender: TObject); var I: Integer; diff --git a/src/mqttclient.pas b/src/mqttclient.pas index 059991e..2e2a612 100644 --- a/src/mqttclient.pas +++ b/src/mqttclient.pas @@ -487,8 +487,8 @@ procedure TMQTTClient.DataAvailable; else Bail(MQTT_ERROR_UNHANDLED_PACKETTYPE); end; - if Packet.PacketType = ptPUBLISH then - DestroyPacket := (Packet as TMQTTPUBLISHPacket).QOS = qtAT_MOST_ONCE; + if (Packet.PacketType = ptPUBLISH) and ((Packet as TMQTTPUBLISHPacket).QOS = qtEXACTLY_ONCE) then + DestroyPacket := False; end else if (State = csDisconnecting) then @@ -885,16 +885,22 @@ procedure TMQTTClient.HandlePUBRELPacket(APacket: TMQTTPUBRELPacket); Log.Send(mtDebug,'Received PUBREL (%d)',[APacket.PacketID]); FWaitingForAck.Remove(ptPUBREC,APacket.PacketID); Pkt := FPendingReceive.Find(ptPUBLISH,APacket.PacketID) as TMQTTPUBLISHPacket; - ReceiveMessage(Pkt.Topic,Pkt.Data,Pkt.QOS,Pkt.Retain); - Reply := TMQTTPUBCOMPPacket.Create; - try - Reply.PacketID := APacket.PacketID; - Reply.WriteToBuffer(SendBuffer); - Log.Send(mtDebug,'Sending PUBCOMP (%d)',[Reply.PacketID]); - SendData; - finally - Reply.Free; - end; + Assert(Assigned(Pkt)); + if Assigned(Pkt) then + begin + ReceiveMessage(Pkt.Topic,Pkt.Data,Pkt.QOS,Pkt.Retain); + FPendingReceive.Remove(Pkt); + Pkt.Free; + Reply := TMQTTPUBCOMPPacket.Create; + try + Reply.PacketID := APacket.PacketID; + Reply.WriteToBuffer(SendBuffer); + Log.Send(mtDebug,'Sending PUBCOMP (%d)',[Reply.PacketID]); + SendData; + finally + Reply.Free; + end; + end; end; end; diff --git a/src/mqttpacketdefs.pas b/src/mqttpacketdefs.pas index 4717aac..f83e3df 100644 --- a/src/mqttpacketdefs.pas +++ b/src/mqttpacketdefs.pas @@ -56,6 +56,7 @@ TMQTTCONNECTPacket = class(TMQTTPacket) public constructor Create; destructor Destroy; override; + function AsString: String; override; procedure WriteToBuffer(ABuffer: TBuffer); override; property UsernameFlag: Boolean read FUsernameFlag write FUsernameFlag; property PasswordFlag: Boolean read FPasswordFlag write FPasswordFlag; @@ -99,6 +100,7 @@ TMQTTPUBLISHPacket = class(TMQTTQueuedPacket) function GetPacketType: TMQTTPacketType; override; public destructor Destroy; override; + function AsString: String; override; procedure WriteToBuffer(ABuffer: TBuffer); override; property Duplicate: Boolean read FDuplicate write FDuplicate; property Retain: Boolean read FRetain write FRetain; @@ -828,6 +830,11 @@ destructor TMQTTCONNECTPacket.Destroy; inherited Destroy; end; +function TMQTTCONNECTPacket.AsString: String; +begin + Result:=inherited AsString + ' ClientID: ' + FClientID + ' Username: ' + FUsername; +end; + function TMQTTCONNECTPacket.GetPacketType: TMQTTPacketType; begin Result := ptCONNECT; @@ -990,6 +997,11 @@ destructor TMQTTPUBLISHPacket.Destroy; inherited Destroy; end; +function TMQTTPUBLISHPacket.AsString: String; +begin + Result := inherited AsString + ' QOS: ' + MQTTQOSTypeNames[FQOS] + ' Retain: ' + BoolToStr(FRetain,'true','false') + ' Duplicate: ' + BoolToStr(FDuplicate,'true','false') + ' Topic: ' + FTopic + ' Data: ' + FData; +end; + function TMQTTPUBLISHPacket.ReadFromBuffer(ABuffer: TBuffer): Word; var Success: Boolean; diff --git a/src/mqttpackets.pas b/src/mqttpackets.pas index 5b75e3c..5d733f5 100644 --- a/src/mqttpackets.pas +++ b/src/mqttpackets.pas @@ -20,7 +20,9 @@ TMQTTPacket = class(TObject) function ReadFromBuffer(ABuffer: TBuffer): Word; virtual; abstract; function GetPacketType: TMQTTPacketType; virtual; abstract; public + constructor Create; destructor Destroy; override; + function AsString: String; virtual; // For debugging purposes procedure WriteToBuffer(ABuffer: TBuffer); virtual; abstract; // property PacketType: TMQTTPacketType read GetPacketType; @@ -28,10 +30,13 @@ TMQTTPacket = class(TObject) property RemainingLength: DWORD read FRemainingLength write FRemainingLength; end; + { TMQTTPacketIDPacket } + TMQTTPacketIDPacket = class(TMQTTPacket) protected FPacketID: Word; public + function AsString: String; override; property PacketID: Word read FPacketID write FPacketID; end; @@ -88,6 +93,25 @@ TMQTTPacketIDManager = class(TObject) property Items[Index: Integer]: Word read GetItem; end; + { TMQTTPacketList } + + TMQTTPacketList = class(TObject) // A list of all constructed packets. Created to debug memory leaks. + private + FList: TList; + function GetCount: Integer; + function GetItem(Index: Integer): TMQTTPacket; + public + constructor Create; + destructor Destroy; override; + // + procedure Add(APacket: TMQTTPacket); + procedure Remove(APacket: TMQTTPacket); + procedure Dump(Strings: TStrings); + // + property Count: Integer read GetCount; + property Items[Index: Integer]: TMQTTPacket read GetItem; default; + end; + function ReadWordFromBuffer(ABuffer: TBuffer; out Value: Word): Boolean; procedure WriteWordToBuffer(ABuffer: TBuffer; const Value: Word); function ReadUTF8StringFromBuffer(ABuffer: TBuffer; out Str: UTF8String): Boolean; @@ -98,11 +122,75 @@ function ReadRemainingLengthFromBuffer(ABuffer: TBuffer; out Value: DWORD): Bool procedure WriteRemainingLengthToBuffer(ABuffer: TBuffer; const Value: DWORD); function ReadMQTTPacketFromBuffer(ABuffer: TBuffer; out Packet: TMQTTPacket; Connected: Boolean = True): Word; +var + PacketList: TMQTTPacketList; + implementation uses Sockets, LazUTF8, MQTTPacketDefs; +{ TMQTTPacketIDPacket } + +function TMQTTPacketIDPacket.AsString: String; +begin + Result := inherited AsString + ' PacketID: ' + IntToStr(PacketID); +end; + +{ TMQTTPacketList } + +constructor TMQTTPacketList.Create; +begin + inherited Create; + FList := TList.Create; +end; + +destructor TMQTTPacketList.Destroy; +begin + FList.Free; + inherited Destroy; +end; + +function TMQTTPacketList.GetCount: Integer; +begin + Result := FList.Count; +end; + +function TMQTTPacketList.GetItem(Index: Integer): TMQTTPacket; +begin + Result := TMQTTPacket(FList[Index]); +end; + +procedure TMQTTPacketList.Add(APacket: TMQTTPacket); +begin + FList.Remove(APacket); + FList.Add(APacket); +end; + +procedure TMQTTPacketList.Remove(APacket: TMQTTPacket); +begin + FList.Remove(APacket); +end; + +procedure TMQTTPacketList.Dump(Strings: TStrings); +var + X: Integer; + P: TMQTTPacket; +begin + if Assigned(Strings) then + begin + Strings.Clear; + for X := 0 to FList.Count - 1 do + begin + P := Items[X]; + if Assigned(P) then + begin + Strings.Add(P.AsString); + end; + end; + end; +end; + { TMQTTPacketQueue } function TMQTTPacketQueue.GetCount: Integer; @@ -193,11 +281,23 @@ function TMQTTPacket.Validate: Boolean; Result := not (PacketType in [ptBROKERCONNECT,ptReserved15]); end; +constructor TMQTTPacket.Create; +begin + inherited Create; + PacketList.Add(Self); +end; + destructor TMQTTPacket.Destroy; begin + PacketList.Remove(Self); inherited Destroy; end; +function TMQTTPacket.AsString: String; +begin + Result := 'Name: ' + GetPacketTypeName; +end; + { TMQTTPacketIDManager } function TMQTTPacketIDManager.GetCount: Integer; @@ -535,4 +635,8 @@ function ReadMQTTPacketFromBuffer(ABuffer: TBuffer; out Packet: TMQTTPacket; Con end; end; +initialization + PacketList := TMQTTPacketList.Create; +finalization + PacketList.Free; end. diff --git a/src/mqttserver.pas b/src/mqttserver.pas index d5c3ffe..15f4f9d 100644 --- a/src/mqttserver.pas +++ b/src/mqttserver.pas @@ -129,6 +129,7 @@ TMQTTServer = class(TComponent) FSessions : TMQTTSessionList; FRetainedMessages : TMQTTMessageList; FEnabled : Boolean; + FShutdown : Boolean; FRequireAuthentication : Boolean; FAllowNullClientIDs : Boolean; FStrictClientIDValidation : Boolean; @@ -315,6 +316,7 @@ constructor TMQTTServer.Create(AOwner: TComponent); destructor TMQTTServer.Destroy; begin + FShutdown := True; FThread.FServer := nil; FThread.Terminate; FSessions.Free; @@ -352,7 +354,7 @@ procedure TMQTTServer.UpdateSystemClockMessages; DispatchMessage(nil,'System/Time/Day',IntToStr(LNow.Day),qtAT_MOST_ONCE,true); if (LNow.Hour <> FLastTime.Hour) then DispatchMessage(nil,'System/Time/Hour',IntToStr(LNow.Hour),qtAT_MOST_ONCE,true); - //if (LNow.DayOfWeek <> FLastTime.DayOfWeek) then + //if (LNow.DayOfWeek <> FLastTime.DayOfWeek) then { Doesn't work in Raspberry pi } // DispatchMessage(nil,'System/Time/DOW',IntToStr(LNow.DayOfWeek),qtAT_MOST_ONCE,true); if (LNow.Minute <> FLastTime.Minute) then begin @@ -439,7 +441,7 @@ procedure TMQTTServer.DestroyConnection(Connection: TMQTTServerConnection); procedure TMQTTServer.ConnectionsChanged; begin - if Assigned(FOnConnectionsChanged) then + if (not FShutdown) and Assigned(FOnConnectionsChanged) then FOnConnectionsChanged(Self); end; @@ -769,8 +771,8 @@ procedure TMQTTServerConnection.DataAvailable(Buffer: TBuffer); else Bail(MQTT_ERROR_UNHANDLED_PACKETTYPE); end; - if Packet.PacketType = ptPUBLISH then - DestroyPacket := (Packet as TMQTTPUBLISHPacket).QOS = qtAT_MOST_ONCE; + if (Packet.PacketType = ptPUBLISH) and ((Packet as TMQTTPUBLISHPacket).QOS = qtEXACTLY_ONCE) then + DestroyPacket := False; end else if State = ssDisconnecting then