Skip to content

Commit

Permalink
added in a couple new message types to handle disconnect notices as w…
Browse files Browse the repository at this point in the history
…ell as authentication requests.

added in code to handle the message content types more efficiently using switch case.
  • Loading branch information
roger.castaldo@gmail.com committed Mar 24, 2013
1 parent 2bdf53a commit fcd5df0
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 102 deletions.
Binary file modified FreeswitchConfigSockets.suo
Binary file not shown.
198 changes: 100 additions & 98 deletions Library/ASocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ internal static bool StringsEqual(string str1, string str2)
private const string REMOVE_EVENT_COMMAND = "nixevent {0}";
private const string EVENT_FILTER_COMMAND = "filter {0} {1}";
private const string REMOVE_EVENT_FILTER_COMMAND = "filter delete {0} {1}";
private const string AUTH_COMMAND = "auth {0}";
private const string BACKGROUND_API_RESPONSE_EVENT = "SWITCH_EVENT_BACKGROUND_JOB";
private const string API_ISSUE_COMMAND = "bgapi {0}";

Expand All @@ -106,6 +105,7 @@ protected MT19937 Random
protected bool IsConnected
{
get { return _isConnected; }
set { _isConnected = value; }
}

private FreeSwitchLogLevels _currentLevel = FreeSwitchLogLevels.CONSOLE;
Expand All @@ -125,13 +125,11 @@ public FreeSwitchLogLevels LogLevel
private string _textReceived;
private List<string> _splitMessages;
private List<string> _processingMessages;
private bool _processing = false;
private List<sEventHandler> _handlers;
private Queue<byte[]> _awaitingCommands;
protected Queue<byte[]> _awaitingCommands;
private bool _exit = false;
private IPAddress _ipAddress;
private int _port;
private string _password;
private string _currentCommandID;
private delProcessEventMessage _eventProcessor;
private Queue<ManualResetEvent> _awaitingCommandsEvents;
Expand Down Expand Up @@ -206,7 +204,7 @@ protected string _IssueAPICommand(string command, bool api)
return "";
}

protected ASocket(IPAddress ip, int port,string password)
protected ASocket(IPAddress ip, int port)
{
_textReceived = "";
_processingMessages = new List<string>();
Expand All @@ -221,7 +219,6 @@ protected ASocket(IPAddress ip, int port,string password)
_isConnected = false;
_ipAddress = ip;
_port = port;
_password = password;
_awaitingCommands = new Queue<byte[]>();
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_mreMessageWaiting = new ManualResetEvent(false);
Expand All @@ -243,11 +240,8 @@ private void BackgroundRun()
try
{
_socket.Connect(_ipAddress, _port);
byte[] data = ASCIIEncoding.ASCII.GetBytes(string.Format(AUTH_COMMAND, _password) + MESSAGE_END_STRING);
_socket.Send(data, 0, data.Length, SocketFlags.None);
_backgroundDataReader = new Thread(new ThreadStart(_SocketDataReaderStart));
_backgroundDataReader.Start();
_isConnected = true;
}
catch (Exception e)
{
Expand All @@ -258,14 +252,6 @@ private void BackgroundRun()
break;
}
}
if (!_exit)
{
lock (_awaitingCommands)
{
while (_awaitingCommands.Count > 0)
_sendCommand(_awaitingCommands.Dequeue());
}
}
}

protected void _sendCommand(string commandString)
Expand All @@ -289,7 +275,19 @@ protected void _sendCommand(byte[] commandBytes)
}
else
{
_socket.Send(commandBytes, 0, commandBytes.Length, SocketFlags.None);
try
{
_socket.Send(commandBytes, 0, commandBytes.Length, SocketFlags.None);
}
catch (Exception e)
{
if (e is ObjectDisposedException)
{
_exit = true;
_isConnected = false;
}
throw e;
}
}
}

Expand Down Expand Up @@ -430,106 +428,110 @@ private void _MessageProcessorStart()
//fail safe for delayed header
if (!pars.ContainsKey("Content-Type"))
{
if (pars.ContainsKey("Event-Name"))
{
_processingMessages.Insert(0, origMsg);
origMsg = "Content-Type:text/event-plain\nContent-Length:" + origMsg.Length.ToString() + "\n";
pars = ASocketMessage.ParseProperties(origMsg);
}
else
{
if (DisposeInvalidMessage != null)
DisposeInvalidMessage(origMsg);
break;
}
if (_disposeInvalidMesssage != null)
_disposeInvalidMesssage(origMsg);
break;
}
if (pars.ContainsKey("Content-Length"))
{
if (_processingMessages.Count > 0)
{
subMsg = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
if (int.Parse(pars["Content-Length"]) > 0)
{
_processingMessages.Insert(0, origMsg);
break;
if (_processingMessages.Count > 0)
{
subMsg = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
{
_processingMessages.Insert(0, origMsg);
break;
}
}
}
if (pars["Content-Type"] == "text/event-plain")
switch (pars["Content-Type"])
{
if (subMsg == "")
{
_processingMessages.Insert(0, origMsg);
break;
}
else
{
SocketEvent se;
se = new SocketEvent(subMsg);
if (se["Content-Length"] != null)
case "text/event-plain":
if (subMsg == "")
{
if (_processingMessages.Count > 0)
{
se.Message = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
_processingMessages.Insert(0, origMsg);
break;
}
if (se.EventName == "BACKGROUND_JOB")
else
{
lock (_commandThreads)
SocketEvent se;
se = new SocketEvent(subMsg);
if (se["Content-Length"] != null)
{
if (_processingMessages.Count > 0)
{
se.Message = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
}
if (se.EventName == "BACKGROUND_JOB")
{
if (_commandThreads.ContainsKey(se["Job-UUID"]))
lock (_commandThreads)
{
lock (_awaitingCommandReturns)
if (_commandThreads.ContainsKey(se["Job-UUID"]))
{
_awaitingCommandReturns.Add(se["Job-UUID"], se.Message.Trim('\n'));
lock (_awaitingCommandReturns)
{
_awaitingCommandReturns.Add(se["Job-UUID"], se.Message.Trim('\n'));
}
ManualResetEvent mre = _commandThreads[se["Job-UUID"]];
_commandThreads.Remove(se["Job-UUID"]);
mre.Set();
}
ManualResetEvent mre = _commandThreads[se["Job-UUID"]];
_commandThreads.Remove(se["Job-UUID"]);
mre.Set();
}
}
msgs.Enqueue(se);
}
msgs.Enqueue(se);
}
}
else if (pars["Content-Type"] == "command/reply")
{
CommandReplyMessage crm = new CommandReplyMessage(origMsg, subMsg);
msgs.Enqueue(crm);
if (crm["Job-UUID"] != null)
{
lock (_awaitingCommandsEvents)
break;
case "command/reply":
CommandReplyMessage crm = new CommandReplyMessage(origMsg, subMsg);
msgs.Enqueue(crm);
if (crm["Job-UUID"] != null)
{
lock (_awaitingCommandsEvents)
{
_currentCommandID = crm["Job-UUID"];
_awaitingCommandsEvents.Dequeue().Set();
}
}
break;
case "log/data":
SocketLogMessage lg;
lg = new SocketLogMessage(subMsg);
if (_processingMessages.Count > 0)
{
_currentCommandID = crm["Job-UUID"];
_awaitingCommandsEvents.Dequeue().Set();
string eventMsg = _processingMessages[0];
_processingMessages.RemoveAt(0);
lg.FullMessage = eventMsg;
msgs.Enqueue(lg);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
}
}
else if (pars["Content-Type"] == "log/data")
{
SocketLogMessage lg;
lg = new SocketLogMessage(subMsg);
if (_processingMessages.Count > 0)
{
string eventMsg = _processingMessages[0];
_processingMessages.RemoveAt(0);
lg.FullMessage = eventMsg;
msgs.Enqueue(lg);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
case "text/disconnect-notice":
msgs.Enqueue(new DisconnectNoticeMessage(origMsg));
break;
case "auth/request":
msgs.Enqueue(new AuthenticationRequestMessage(origMsg));
break;
default:
if (_disposeInvalidMesssage != null)
_disposeInvalidMesssage(origMsg);
break;
}
}
if (msgs.Count > 0)
Expand Down
12 changes: 12 additions & 0 deletions Library/Messages/AuthenticationRequestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Org.Reddragonit.FreeSwitchSockets.Messages
{
public class AuthenticationRequestMessage : ASocketMessage
{
public AuthenticationRequestMessage(string message)
: base(message) { }
}
}
27 changes: 27 additions & 0 deletions Library/Messages/DisconnectNoticeMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Org.Reddragonit.FreeSwitchSockets.Messages
{
public class DisconnectNoticeMessage : ASocketMessage
{
public DisconnectNoticeMessage(string message)
: base(message) { }

public int LingerTime
{
get { return int.Parse(this["Linger-Time"]); }
}

public string ChannelName
{
get { return this["Channel-Name"]; }
}

public string SessionUUID
{
get { return this["Controlled-Session-UUID"]; }
}
}
}
23 changes: 21 additions & 2 deletions Library/Outbound/OutboundSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class OutboundSocket : ASocket
public const string DEFAULT_EVENT_SOCKET_LISTEN_IP = "127.0.0.1";
public const int DEFAULT_EVENT_SOCKET_LISTEN_PORT = 8021;
public const string DEFAULT_EVENT_SOCKET_PASSWORD = "ClueCon";
private const string AUTH_COMMAND = "auth {0}\n\n";

private struct sCommand
{
Expand Down Expand Up @@ -54,14 +55,16 @@ public sCommand(string command, bool api,MT19937 random)
private delProcessEventMessage _eventDelegate;
private delReloadXml _preReloadCall;
private delReloadXml _postReloadCall;
private string _password;

public OutboundSocket(IPAddress ip,int port,string password,delProcessEventMessage eventDelegate,delProcessLogMessage logDelegate,delReloadXml preReloadCall,delReloadXml postReloadCall)
: base(ip,port,password)
: base(ip,port)
{
_eventDelegate = eventDelegate;
_logDelegate = logDelegate;
_preReloadCall = preReloadCall;
_postReloadCall = postReloadCall;
_password = password;
}

public string IssueCommand(string command)
Expand All @@ -88,7 +91,23 @@ protected override void _processMessageQueue(Queue<ASocketMessage> messages)
while (messages.Count > 0)
{
ASocketMessage asm = messages.Dequeue();
new Thread(new ParameterizedThreadStart(_processMessage)).Start(asm);
if (asm is AuthenticationRequestMessage)
{
socket.Send(ASCIIEncoding.ASCII.GetBytes(string.Format(AUTH_COMMAND, _password)));
IsConnected = true;
lock (_awaitingCommands)
{
if (!_exit)
{
while (_awaitingCommands.Count > 0)
{
byte[] commandBytes = _awaitingCommands.Dequeue();
socket.Send(commandBytes, 0, commandBytes.Length, SocketFlags.None);
}
}
}
}else
new Thread(new ParameterizedThreadStart(_processMessage)).Start(asm);
}
}

Expand Down
Loading

0 comments on commit fcd5df0

Please sign in to comment.