|
|
Практическое использование классов .NET Framework для разработки «корпоративного мессенджера», часть 2
Описываются способы разработки сетевых приложений, на примере разработки корпоративного мессенджера (некий очень облегченный вариант аськи для локальной сети), с серверной частью и соответственно с клиентской.
MSG2_dotsite.zip
Преамбула
Напоминаю о своей первой части статьи данного цикла /Publications/Publication160.aspx, а описываемая статья, будет, судя по всему последней, и в ней мы избавимся от некоторых недостатков мессенджера, замеченных в предыдущей реализации:
- избавимся от блокирующего способа ожидания установления соединения;
- будем использовать XML для разбора передаваемых/принимаемых данных, что ускорит разбор строки и сделает алгоритм более «прозрачным»;
- и внедрим алгоритм сжатия при передаче/приеме данных, основанный, на облегченном варианте известного алгоритма lzss.
Использование XML для передаваемых данных
Понятно, что передаваемая информация должна иметь некую структуру, формат, которой, нужно бы знать и на стороне приемника.
В предыдущей версии мессенджера, я, делал так: public static void Message (string Username, string Date, string Message, string ipAddress, string port)
{
try
{
string message = CMSGMessage + CMSGUsername + Username.PadRight(CMSGUsers.CLengthUsername,' ')+
CMSGMessageDateTime + Date.PadRight(CLengthDateTime,' ')
+ Message;
// устанавливаем соединение с сервером
TcpClient client = new TcpClient(ipAddress, Convert.ToInt32( port ));
if (client != null)
{
Byte[] data = Encoding.Unicode.GetBytes(message);
NetworkStream stream = client.GetStream();
if (stream != null)
{
// отсылаем сообщение
stream.Write(data, 0, data.Length);
Console.WriteLine("Message :: write :: " + message);
}
client.Close();
}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
} // Message
Как видно, передаваемая строка, содержала управляющий префикс, в данном случае CMSGMessage, который, и позволял отличить один пакет данных от другого, например, отличить «вход в систему» от «передачи сообщения». После управляющего префикса, следует информация актуальная для пакета, в приведенном случае, это «имя пользователя», «дата» и «текст сообщения».
Соответственно для обработки пакетов данной структуры, использовался код, не отличающийся ни красотой, ни легкостью сопровождения, который, состоял из подобных отрывков кода (для каждого уникального вида пакета): int iPosMessage = message.IndexOf(CMSGMessage);
if (iPosMessage == 0) // да, похоже на передачу сообщения серверу
{
message = message.Remove(0, CMSGMessage.Length );
// имя пользователя
message = message.Remove(0, CMSGUsername.Length );
string username = message.Substring(0, CMSGUsers.CLengthUsername);
username = username.Trim();
message = message.Remove(0, CMSGUsers.CLengthUsername );
// дата
message = message.Remove(0, CMSGMessageDateTime.Length );
string datetime = message.Substring(0, CLengthDateTime);
datetime = datetime.Trim();
message = message.Remove(0, CLengthDateTime );
// сообщение
// делегируем в GUI
if (onUserMessage != null)
guiControlSender.BeginInvoke(onUserMessage, new object[]{username, datetime, message});
return;
}
Из приведенного кода видно, как осуществлялся разбор строки.
Но ведь разбор передаваемой строки и многое другое, можно с легкостью возложить на классы .NET Framework. И если сделать некое допущение, и, предположить, что принятая строка, есть строка в формате XML, то, уже можно будет использовать такой код, приведу часть уже известного, из прошлой статьи, метода Analysis, потомка класса CSocketThread: public override void Analysis(XmlDocument xd, NetworkStream stream)
{
// разбор сообщения в формате XML
try
{
switch (xd.FirstChild.Name)
{
case CMSGMessage
{
// группа
string group = "";
XmlNodeList xnlGroups = xd.GetElementsByTagName(CMSGGroup);
foreach( XmlNode xn in xnlGroups)
group += xn.InnerText;
// пользователь
string username = "";
XmlNodeList xnlUsernames = xd.GetElementsByTagName(CMSGUsername);
foreach( XmlNode xn in xnlUsernames)
username += xn.InnerText;
// дата и время отправки сообщения
string datetime = "";
XmlNodeList xnlDatetimes = xd.GetElementsByTagName(CMSGMessageDateTime);
foreach( XmlNode xn in xnlDatetimes)
datetime += xn.InnerText;
// текст сообщения
string messages = "";
XmlNodeList xnlMessages = xd.GetElementsByTagName(CMSGMessage);
foreach( XmlNode xn in xnlMessages)
messages += xn.InnerText;
// делегируем в GUI
if (onUserMessaged != null)
guiControlSender.BeginInvoke(onUserMessaged,
new object[]{group, username, datetime, messages});
break;
}
} // switch (xd.FirstChild.Name)
} // try
catch (System.Xml.XmlException xmlEx)
{
Console.WriteLine("XmlException {0}", xmlEx.Message);
}
catch (System.Exception Ex)
{
Console.WriteLine("Exception {0}", Ex.Message);
}
finally
{
}
} // Analysis
Не правда ли, данный код выглядит очень даже симпатично, прозрачно, и, что не менее важно, сопровождать его будет совсем просто, да и он, как мне кажется, более корректен с точки зрения реализации бизнес логики, так как избавлен от некоторых логических ошибок, что могут возникнуть в предыдущей версии.
А, теперь посмотрим на код, формирующий XML-строку. Приведу два метода класса CSocketThread, первый метод: public static void Message (string group, string username, string date, string message, string ipAddress, string port)
{
try
{
string messages = BuildXMLString(CMSGMessage,
new string[8]{
CMSGGroup, group, CMSGUsername, username,
CMSGMessageDateTime, date, CMSGMessage, message}
);
// устанавливаем соединение
TcpClient client = new TcpClient(ipAddress, Convert.ToInt32( port ));
if (client != null)
{
Byte[] data = Encoding.Unicode.GetBytes(messages);
NetworkStream stream = client.GetStream();
if (stream != null)
{
// посылаем данные
stream.Write(data, 0, data.Length);
}
client.Close();
}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
} // Message
И, соответственно, второй: public static string BuildXMLString(string command, string[] parameters)
{
string result = "";
try
{
XmlDocument xd = new XmlDocument();
XmlElement commandXML = xd.CreateElement(command);
xd.AppendChild(commandXML);
if (parameters != null)
{
for(int i=0; i < parameters.Length;)
{
XmlElement elementXML = xd.CreateElement(parameters[i]); i++;
elementXML.InnerText=parameters[i]; i++;
commandXML.AppendChild(elementXML);
}
}
result = xd.OuterXml;
}
catch (System.Xml.XmlException xmlEx)
{
Console.WriteLine("XmlException {0}", xmlEx.Message);
}
catch (System.Exception Ex)
{
Console.WriteLine("Exception {0}", Ex.Message);
}
return result;
}
Алгоритм сжатия передаваемых данных
Алгоритмов сжатия информации много, я, реализовал некий облегченный вариант LZSS, полное описание которого можно с легкостью найти в Internet.
Отличий от стандарта в моей реализации несколько, во-первых, от невнимательности я перепутал биты (править уже не стал), то есть в стандарте бит равный «0» начинает запакованную последовательность, у меня, наоборот, бит равный «0» начинает не запакованную последовательность. Далее, я не реализовывал двоичное дерево для ускорения поиска, и, в моей версии алгоритма нет кольцевого буфера, а есть только входная последовательность, требующая сжатия, и выходная, то есть уже запакованная.
И мой алгоритм ориентирован на сжатие довольно коротких сообщений длиной до 4 Кбайт.
Привожу класс (полностью), реализующий данный алгоритм: using System;
using System.Text;
using System.Collections;
namespace MSGLibrary
{
public class CLZSimple
{
byte[] in_bytes;
int count_in_bytes, current_in;
ArrayList out_bytes;
int count_out_bytes;
// bit collector
byte bits;
int length_bits;
protected void addBit(byte x)
{
if (length_bits > 7)
{
out_bytes.Add(bits);
count_out_bytes++;
length_bits = bits = 0;
}
length_bits++;
bits = (byte)(bits << 1);
bits = (byte)(bits | (x & 0x01));
}
protected void addBits(int x, int length)
{
BitArray ba = new BitArray(length,false);
for (int i = 0; i < length; i++)
{
int bit = (int)Math.Pow(2, length - i - 1);
addBit( (byte)(( (x & bit) == bit)? 1:0) );
}
}
protected void addBits(byte x)
{
addBits(x, 8);
}
protected int search_sequence(out int max_length)
{
int current = 0, position_for_max_length = -1;
int current_in = this.current_in;
max_length = 0;
int position = -1;
while ((current_in < count_in_bytes) && (current < count_in_bytes) && (current < current_in))
{
if ( in_bytes[current] == in_bytes[current_in])
{
position = current;
while (
(current_in < count_in_bytes) &&
(current < current_in) && (in_bytes[current] == in_bytes[current_in])
)
{
current++;
current_in++;
}
if ( (current - position) > max_length )
{
max_length = current - position;
position_for_max_length = position;
}
current_in = this.current_in;
}
else current++;
}
return position_for_max_length;
}
protected string Bytes2String(byte[] bytes, out int charsDecodedCount)
{
string result = "";
Decoder uniDecoder = Encoding.Unicode.GetDecoder();
int charCount = uniDecoder.GetCharCount(bytes, 0, bytes.Length);
char[] chars = new Char[charCount];
charsDecodedCount = uniDecoder.GetChars(bytes, 0, bytes.Length, chars, 0);
foreach( char c in chars)
if (c != (char)0)
result = result + c;
return result;
}
protected byte[] String2Bytes(char[] chars, out int bytesEncodedCount )
{
Encoder uniEncoder = Encoding.Unicode.GetEncoder();
int byteCount = uniEncoder.GetByteCount(chars, 0, chars.Length, true);
Byte[] bytes = new Byte[byteCount];
bytesEncodedCount = uniEncoder.GetBytes(chars, 0, chars.Length, bytes, 0, true);
return bytes;
}
public virtual byte[] Encode( string buffer, out int count_out_bytes)
{
count_in_bytes = count_out_bytes = current_in = 0;
in_bytes = String2Bytes(buffer.ToCharArray(), out count_in_bytes);
out_bytes = new ArrayList();
bits = 0 ; length_bits = 0;
while (current_in < count_in_bytes)
{
int length, position = search_sequence(out length);
if (length > 1)
{
addBit(1);
addBits(position, 12);
if (length > 8)
{
addBit(1);
addBit(1);
addBit(1);
addBits(length-9, 8);
}
else
{
addBits(length-2,3);
}
current_in += length;
}
else
{
addBit(0);
addBits(in_bytes[current_in]);
current_in ++;
}
}
if (length_bits > -1)
{
if (length_bits < 8 )
bits = (byte)(bits << (8 - length_bits));
out_bytes.Add(bits);
this.count_out_bytes++;
}
count_out_bytes = this.count_out_bytes;
object[] arr = out_bytes.ToArray();
byte[] bb = new byte[count_out_bytes];
out_bytes.CopyTo(0, bb, 0, count_out_bytes);
return bb;
}
byte Getbit(byte[] bytes, int bit)
{
int nbyte = 0, remainder = bit;
if ((bit +1) > 8)
{
int rr;
nbyte = Math.DivRem((bit+1), 8, out rr);
if (rr == 0 )
nbyte--;
remainder = (bit+1) - nbyte * 8 -1;
}
byte b = bytes[ nbyte ];
byte nbit = (byte)Math.Pow(2, 7 - remainder);
return (byte)(( (b & nbit) == nbit)? 1:0) ;
}
public virtual string Decode( byte[] buffer, out int charsDecodedCount)
{
out_bytes = new ArrayList(); this.count_out_bytes = 0;
bits = 0; length_bits = 0;
int current_bit =0;
while ((current_bit < buffer.Length*8 ) && ((buffer.Length*8 - current_bit) > 8))
{
byte b = Getbit(buffer, current_bit);
current_bit++;
if (b == 0)
{
for ( int i=0; i<8;i++)
addBit(Getbit(buffer, current_bit + i));
current_bit+=8;
}
else
{
int position = 0;
for ( int i = 0; i< 12; i++, current_bit++)
{
position = (int)(position << 1);
byte bbb = Getbit(buffer, current_bit);
position = (int)(position | bbb);
}
int length = 0;
for ( int i = 0; i< 3; i++, current_bit++)
{
length = (int)(length << 1);
byte bbb = Getbit(buffer, current_bit);
length = (int)(length | bbb);
}
if (length > 6)
{
length = 0;
for ( int i = 0; i< 8; i++, current_bit++)
{
length = (int)(length << 1);
length = (int)(length | (Getbit(buffer, current_bit) & 0x01));
}
length +=9;
}
else
{
length +=2;
}
if (length_bits > 0)
{
out_bytes.Add(bits); bits = 0; length_bits = 0;
this.count_out_bytes++;
}
for ( int i = 0; i < length; i++)
{
out_bytes.Add(out_bytes[position + i]);
this.count_out_bytes++;
}
}
}
if (length_bits > 0)
{
out_bytes.Add(bits); bits = 0; length_bits = 0;
this.count_out_bytes++;
}
count_out_bytes = this.count_out_bytes;
object[] arr = out_bytes.ToArray();
byte[] bb = new byte[count_out_bytes];
out_bytes.CopyTo(0, bb, 0, count_out_bytes);
return Bytes2String(bb, out charsDecodedCount);
}
public virtual string Decode( string buffer, out int charsDecodedCount)
{
byte[] bytebuffer = String2Bytes(buffer.ToCharArray(), out count_in_bytes);
return Decode(bytebuffer, out charsDecodedCount);
}
public CLZSimple()
{
}
} // class CLZSimple
}
Теперь его надо аккуратно вставить в мессенджер, таким образом, что бы передаваемые данные упаковывались, а принимаемые распаковывались, прозрачно для основного алгоритма (не мешая ему или не заставляя его отвлекаться на такие несущественные для него операции, как упаковка/распаковка), для этого, определим в классе CSocketThread два метода: public static void WriteToStream(NetworkStream stream, string message)
{
CLZSimple lzss = new CLZSimple();
try
{
#if LZSS
// упаковываем
int count;
byte[] data = lzss.Encode(message, out count);
#else
Byte[] data = Encoding.Unicode.GetBytes(message);
#endif
// посылаем
stream.Write(data, 0, data.Length);
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch (System.Exception Ex)
{
Console.WriteLine("Exception {0}", Ex.Message);
}
finally
{
lzss = null;
}
}
public static string ReadFromStream(NetworkStream stream)
{
CLZSimple lzss = new CLZSimple();
string request = "";
try
{
byte[] requestByte = new byte[CMaxLengthMessages];
do
{
int bytes = stream.Read(requestByte, 0, requestByte.Length);
request = String.Concat(request, Encoding.Unicode.GetString(requestByte, 0, bytes));
}
while((stream.DataAvailable));
#if LZSS
// распаковываем
int count;
request = lzss.Decode(request, out count);
#endif
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch (System.Exception Ex)
{
Console.WriteLine("Exception {0}", Ex.Message);
}
finally
{
lzss = null;
}
return request;
}
И, соответственно все операции приема/передачи в сетевой поток должны происходить через эти методы.
Примечание
Другой способ, более красивый с точки зрения концепции ООП, состоит в определении нового класса, например LZSSNetworkStream, наследника NetworkStream, в котором необходимо переопределить методы Read и Write, с учетом возможностей упаковки/распаковки данных. А если быть более точным, то, сначала неплохо бы определить некий класс, наследник NetworkStream, «подразумевающего» работу с упаковщиками, и уже от него наследовать и создавать новые классы, например: LZSSNetworkStream, LZWNetworkStream, и т.д.
Асинхронные (не блокирующие) способы работы с Socket
Напомню (в первую очередь себе), что значит синхронный и асинхронный вызов метода. Под синхронным способом, иногда еще говорят блокирующим, обычно понимают такой вызов метода (функции, процедуры) при котором модуль, из которого был произведен вызов, будет ожидать окончания выполнения вызванного метода. Это в принципе и нормально, но что делать в том случае, когда неизвестно когда будет отработан метод, и будет ли отработан вообще, то есть программа окажется заблокированной и не сможет делать многое, например, реагировать на действия пользователя.
Асинхронный способ, позволяет вызвать метод таким образом, что модуль, из которого был произведен вызов, сможет выполнять свой алгоритм и далее. Делается это так называемым механизмом обратного вызова - callback. То есть модуль, вызывая асинхронный метод, передает ему некую точку входа, куда будет передано управление после отработки вызванного метода.
Запутал? Давайте посмотрим на код: ...
public ManualResetEvent allDone;
...
public virtual void AcceptAcyncSocket()
{
while (!SocketTerminate)
{
allDone.Reset();
Console.WriteLine("Waiting for a new connection... ");
socketListener.BeginAccept(new AsyncCallback(acceptCallback), socketListener );
allDone.WaitOne();
}
} // AcceptAcyncTcpClient
public virtual void acceptCallback( IAsyncResult ar)
{
Socket listener = (Socket) ar.AsyncState;
Socket handler = listener.EndAccept(ar);
lock (clients)
{
clients.Enqueue(handler);
}
allDone.Set();
}
И некие пояснения к приведенному коду:
- в метод BeginAccept передается параметр, та самая точка входа, куда будет передано управление при получении запроса на соединение от клиента. Выполняясь метод BeginAccept «говорит» системе, что бы она, при получении запроса на соединение, передала управление в точку входа (acceptCallback).
А запрос на соединение может выглядеть, например, так: TcpClient client = new TcpClient(server_ipAddress, Convert.ToInt32( server_port ));)
- далее, идет вызов метода WaitOne(), который заставляет поток «заснуть», до прихода некоего сигнала. То есть метод AcceptAsyncSocket, дойдя до строки allDone.WaitOne() и выполнив ее, «останавливается».
И дальше работа может продолжиться в acceptCallback, то есть, если система обнаружит запрос на соединение, она вызовет метод acceptCallback, в котором, последняя строка allDone.Set() освобождает поток, то есть выставляет некий сигнал, который позволяет методу AcceptAsyncSocket продолжить свою работу.
Ну вот, вроде всё и пояснил.
И на сладкое, покажу вам модуль, который, я, использовал для опробования работы с сокетами, используя описанную выше технологию: using System;
using System.Text;
using System.Collections;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace AsyncSocket
{
class Class1
{
public const int CMaximumPendingQueue = 1000;
public static Socket socketListener;
public static ManualResetEvent allDone;
public static Queue sockets;
public static Thread threadAnalysSocket;
public static Thread threadListenSocket;
public static void AcceptAcyncTcpClient()
{
while (true)
{
allDone.Reset();
Console.WriteLine("Waiting for a new connection... ");
socketListener.BeginAccept(new AsyncCallback(acceptCallback), socketListener );
allDone.WaitOne();
}
} // AcceptAcyncTcpClient
static void acceptCallback( IAsyncResult ar)
{
Socket listener = (Socket) ar.AsyncState;
Socket handler = listener.EndAccept(ar);
if (handler != null)
{
lock (sockets)
{
sockets.Enqueue(handler);
}
}
allDone.Set();
}
public static void AnalysSocket()
{
Console.WriteLine("Waiting a new Socket ...");
while (true)
{
try
{
if (sockets.Count > 0)
{
Socket client = null;
lock (sockets)
{
client = (Socket)sockets.Dequeue() ;
}
Console.WriteLine("Analysis Socket ...");
if (client != null)
{
string request = "";
byte[] requestByte = new byte[10];
do
{
int bytes = client.Receive(requestByte);
request = String.Concat(request, Encoding.Unicode.GetString(requestByte, 0, bytes));
}
while (client.Available > 0 );
Console.WriteLine(" сообщение : " + request + ", размер сообщения : " + request.Length);
// формируем ответ клиенту
request = String.Concat(" в ответ вам шлем то же самое : '", request + "'");
Byte[] data = Encoding.Unicode.GetBytes(request);
client.Send(data);
}
Console.WriteLine("Waiting a new Socket ...");
}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch (System.Xml.XmlException xmlEx)
{
Console.WriteLine("XmlException {0}", xmlEx.Message);
}
catch (System.Exception ex)
{
Console.WriteLine("SystemException {0}", ex.Message );
}
}
} // AnalysTcpClient
static void Main(string[] args)
{
// очередь обрабатываемых сокетов
sockets = new Queue();
// подготовка к асинхронному использованию
string server_ipAddress = "127.0.0.1";
Int32 port = 41001;
IPAddress localAddr = IPAddress.Parse( server_ipAddress );
Console.WriteLine("Initializing socket for asynchronous work ...");
allDone = new ManualResetEvent(false);
IPEndPoint localEndPoint = new IPEndPoint(localAddr, port);
socketListener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp );
socketListener.Bind(localEndPoint);
socketListener.Listen(CMaximumPendingQueue);
Console.WriteLine("Initializing socket threads ...");
threadAnalysSocket = new Thread(new ThreadStart(AnalysSocket));
threadListenSocket = new Thread(new ThreadStart(AcceptAcyncTcpClient));
// запуск сервера
threadAnalysSocket.Start();
threadListenSocket.Start();
// запуск клиента
int i = 0;
while( true )
{
// устанавливаем соединение с сервером
TcpClient client = new TcpClient(server_ipAddress, Convert.ToInt32( port ));
if (client != null)
{
NetworkStream stream = client.GetStream();
string message = " сообщение номер :" + i.ToString();
i++;
Byte[] data = Encoding.Unicode.GetBytes(message);
// посылаем
stream.Write(data, 0, data.Length);
// получаем ответ
string request = "";
byte[] requestByte = new byte[1000];
do
{
int bytes = stream.Read(requestByte, 0, requestByte.Length);
request = String.Concat(request, Encoding.Unicode.GetString(requestByte, 0, bytes));
}
while((stream.DataAvailable));
Console.WriteLine( request );
Console.ReadLine();
}
}
}
}
}
Примечателен, он, пожалуй, только тем, что, части одного приложения, взаимодействует друг с другом посредством сетевого протокола TCP. В данном примере, передается строка, не несущая ни какой особой информации, и, является она всего лишь неким флажком (что тоже неплохо), показывающим успешность или не успешность взаимодействия между потоками. Но эту тему можно и развить. Впрочем, развивая ее, мы скорей всего дойдем до .NET Remoting, особенно если удастся разнести потоки по разным доменам.
Резюме
Если будут какие-то сложности или вопросы и предложения, по данной работе, не стесняйтесь, пишите...
|