LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

【WEB开发】使用WebSocket实现多端通信

admin
2025年5月13日 13:42 本文热度 5

 1、网络通信概念

  1.1、核心定义:

    网络通信是两个或多个计算设备通过传输介质和通信协议进行数据交换的过程。

    本质上是数字信号的传输与解析

  1.2、基本要素:

    通信节点:发送端和接收设备、

    传输介质:有线(光纤/电话线)或无线(wifi/4G/5G)

    通信协议:TCP/IP、HTTP、Websocket等协议

    数据格式:JSON、XML等结构化数据

  1.3、常用的通信方案:

  1.3.1、HTTP通信:

    基本特征:

      请求-响应模型:客户端发起请求,服务端返回响应。

      无状态协议:每个请求相互独立

      基于TCP:默认端口80(http)或者443(https)

    版本演进

      HTTP/1.0:每个请求新建TCP连接    

      HTTP/1.1引入持久连接(Keep-Alive)  支持管道化(pipelining)

      HTTP/2二进制分帧、多路复用、头部压缩    

    优点:简单易用、防火墙友好、丰富的工具链支持

    缺点:单向通信模式、头部开销大、实时性较差

  1.3.2、Socket通信:

    定义:Socket是操作系统提供的底层API,用于实现不同主机之间进程通信,基于传输层协议(TCP、UDP)

    特点:直接操作网络层,支持多种协议(TCP可靠传输、UDP无连接传输),适用于自定义协议的开发

  1.3.3、WebSocket通信

    定义:WebSocket是HTML5引入的应用层协议,通过HTTP升级机制建立持久连接,提供全双工通信能力。

    特点:基于HTTP/HTTPS,以消息为单位传输数据,内置心跳机制(ping/pong),适用于浏览器与服务器的实时交互。

  1.3.4、Socket与WebSocket核心差异

维度

Socket通信

WebSocket通信

协议层

传输层(TCP/UDP)

应用层(基于HTTP升级)

连接建立

直接通过IP+端口建立连接

需HTTP握手(Upgrade: websocket头)

数据格式

字节流(需处理分包/粘包)

消息帧(自动分帧,以消息为单位)

通信模式

全双工(TCP)或单工(UDP)

全双工(基于TCP)

跨域支持

需手动处理(如CORS)

默认支持跨域(同源策略下)

适用环境

任意客户端/服务端(如C++、Python)

主要浏览器环境,但也可用于服务端间通信


2、在Web项目中集成WebSocket

  2.1、创建WebSocket服务端项目

   

  2.2、引用nuget包,此处引用的是存储相关的包,WebSocket已经集成到dotnet webapi项目中。

  <ItemGroup>    <PackageReference Include="CacheManager.SystemRuntimeCaching" Version="2.0.0" />    <PackageReference Include="StackExchange.Redis" Version="2.8.31" />    <PackageReference Include="Swashbuckle.AspNetCore" Version="6.6.2" />  </ItemGroup>

2.3、修改appsettings.json配置文件

{  "Logging": {    "LogLevel": {      "Default": "Information",      "Microsoft.AspNetCore": "Warning"    }  },  "AllowedHosts": "*",  "SocketApi": {    "Server": "http://localhost:5072",    "SocketUrl": "/ws", //Socket监听地址,区分于WebAPI    "OpenNoticeRecord": "true", //开启消息记录    "HistoryNoticeCacheDay": "1", //离线消息缓存时间/天    "GroupAddress": "100001", //群组    "AiringAddress": "public", // 广播地址-所有成员可接收    "OrientAddress": "orient" //定向传播-需要双方携带接收人的账号  },   //当你消息服务是固定账号模式,则可以配置在配置文件,否则可以通过数据库来管理  "SocketAccount": [    {      "Account": "luoni",      "AccountName": "罗尼"    },    {      "Account": "kate",      "AccountName": "卡特"    }  ],  //使用群聊的格式,包括群Id,群名称以及群成员账号。  "GroupSocketAccount": [    {      "GroupID": "100001",      "GroupName": "群聊1",      "GroupUsers": "luoni,kate"    }  ],  "CacheType": "Redis", //Redis/Memory //消息存储方式,缓存离线消息。  "RedisConfig": {    "Connection": "localhost:6379",    "Db": "2"  }}

2.3.1、创建群配置模型GroupSocketAccount.cs

namespace WS.SocketServer.Models{    /// <summary>    /// 群模型    /// </summary>    public class GroupSocketAccount    {        /// <summary>        /// 群Id        /// </summary>        public string GroupID { getset; }        /// <summary>        /// 群名称        /// </summary>        public string GroupName { getset; }        /// <summary>        /// 群成员        /// </summary>        public string GroupUsers { getset; }      }}

2.4、创建消息处理相关操作类

2.4.1、创建消息处理中间件MySocketMiddleware.cs

using System.Net.WebSockets;using System.Text;namespace WS.SocketServer;/// <summary>/// WebSocket服务端处理中间件/// </summary>public class MySocketMiddleware{    private readonly RequestDelegate _next;    private readonly ISocketServiceHandler _handler;    /// <summary>    ///    /// </summary>    /// <param name="next"></param>    /// <param name="handler"></param>    public MySocketMiddleware(RequestDelegate next, ISocketServiceHandler handler)    {        _next = next;        _handler = handler;    }    /// <summary>    /// 根据状态处理    /// </summary>    /// <param name="context"></param>    /// <returns></returns>    public async Task InvokeAsync(HttpContext context)    {        if (context.WebSockets.IsWebSocketRequest)        {            var socket = await context.WebSockets.AcceptWebSocketAsync();            //创建连接对象(此对象存储在内存中)            await _handler.OnConnectedAsync(socket);                      try            {                //消息缓冲区1M(分段接收)                    var buffer = new byte[1024 * 1024 * 1];                while (socket.State == WebSocketState.Open)                {                    var result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer),  CancellationToken.None);                    switch (result.MessageType)                    {                        case WebSocketMessageType.Text: //接收文本消息                            var message = Encoding.UTF8.GetString(buffer, 0, result.Count);                            await _handler.ReceiveTextAsync(socket, message);                            break;                        case WebSocketMessageType.Binary: //接收文件流                            await _handler.ReceiveFileAsync(socket, result, buffer);                            break;                        case WebSocketMessageType.Close: //关闭连接                            _ = _handler.OnDisconnectedAsync(socket);                            break;                        default:                            throw new ArgumentOutOfRangeException();                    }                }            }            catch (Exception e)            {                _ = _handler.OnDisconnectedAsync(socket);                var msg = $"处理Socket状态发生异常,Exception:{e}";                //_logger.Error($"处理Socket状态发生异常,Exception:{e}");                Console.WriteLine(msg);            }        }        else        {            await _next(context);        }    }}


2.4.2、创建Socket对象管理接口ISocketServiceHandler

using System.Net.WebSockets;namespace WS.SocketServer;/// <summary>/// WebSocket对象管理接口/// </summary>public abstract class ISocketServiceHandler{    /// <summary>    /// 连接Socket    /// </summary>    /// <param name="socket"></param>    /// <returns></returns>    public abstract Task OnConnectedAsync(WebSocket socket);    /// <summary>    /// 断开连接的Socket    /// </summary>    /// <param name="socket"></param>    public abstract Task OnDisconnectedAsync(WebSocket socket);    /// <summary>    /// 接收文本消息    /// </summary>    /// <param name="socket"></param>    /// <param name="message"></param>    /// <returns></returns>    public abstract Task ReceiveTextAsync(WebSocket socket, string message);    /// <summary>    /// 处理文件流    /// </summary>    /// <param name="socket"></param>    /// <param name="result"></param>    /// <param name="buffer"></param>    /// <returns></returns>    public abstract Task ReceiveFileAsync(WebSocket socket, WebSocketReceiveResult result,  byte[] buffer);    /// <summary>    /// 单发消息    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public abstract Task SendMessageAsync(SingleMessageRequest request);    /// <summary>    /// 群发消息    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public abstract Task SendGroupMessageAsync(GroupMessageRequest request);    /// <summary>    /// 发送全体    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public abstract Task SendMassMessageAsync(MassMessageRequest request);}


2.4.3、Socket连接对象实现类SocketServiceHandler

using Logger.Service.Interface;using System.Net.Mail;using System.Net.WebSockets;using System.Security.Principal;using System.Text;using System.Text.Encodings.Web;using System.Text.Json;using System.Text.Unicode;using WS.SocketServer.Models;using WS.SocketServer.Services.Dtos;namespace WS.SocketServer;/// <summary>/// Socket连接对象处理类/// </summary>public class SocketServiceHandler : ISocketServiceHandler{    private readonly ISocketDataManager _sockets;    private readonly ILoggerContext _logger;    private readonly INoticeCacheManager _noticeCacheManager;    private readonly IConfiguration _configuration;    /// <summary>    ///    /// </summary>    public SocketServiceHandler(ISocketDataManager sockets, ILoggerContext logger,        INoticeCacheManager noticeCacheManager,IConfiguration configuration)    {        _sockets = sockets;        _logger = logger;        _noticeCacheManager = noticeCacheManager;        _configuration = configuration;    }    /// <summary>    /// 连接Socket    /// </summary>    /// <param name="socket"></param>    /// <returns></returns>    public override async Task OnConnectedAsync(WebSocket socket)    {        try        {            var socketId = Guid.NewGuid().ToString("N");            await _sockets.AddSocket(socketId, socket);        }        catch (Exception e)        {            var logmsg = $"连接Socket异常:{e.Message}";            Console.WriteLine(logmsg);            _logger.Error(logmsg);            throw;        }    }    /// <summary>    /// 断开连接的Socket    /// </summary>    /// <param name="socket"></param>    public override async Task OnDisconnectedAsync(WebSocket socket)    {        try        {            var socketId = _sockets.GetSocketId(socket);            if (socketId != null)            {                await SendCloConnectedNoticeAsync(socket,"");                await _sockets.RemoveSocketAsync(socketId);            }        }        catch (Exception e)        {            var logmsg = $"连接关闭,清除内存中的Socket对象发生异常:{e.Message}";            Console.WriteLine(logmsg);        }    }    /// <summary>    /// 单发消息    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public override async Task SendMessageAsync(SingleMessageRequest request)    {        try        {            var connection = _sockets.GetAllConnections().Where(it => it.Key == request.SocketId)                .Select(it => new                {                    SocketId = it.Key,                    Socket = it.Value                }).FirstOrDefault();            if (connection != null)            {                if (request.MessageType != MessageTypeEnum.Files)                {                    await SendMessageAsync(connection.Socket, request.Message.ToString());                }                else                {                    await SendFileMessageAsync(connection.Socket, (Stream)request.Message);                }            }        }        catch (Exception e)        {            Console.WriteLine(e);        }    }    /// <summary>    /// 指定连接发送    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public override async Task SendGroupMessageAsync(GroupMessageRequest request)    {        try        {            var list = _sockets.GetAllConnections().Select(it => new            {                SocketId = it.Key,                Socket = it.Value            }).ToList();            if (request.SocketIds != null && request.SocketIds?.Length > 0)            {                list = list.Where(it => request.SocketIds.Contains(it.SocketId)).ToList();            }            if (list?.Count == 0)            {                throw new Exception("连接不存在");            }            if (request.MessageType != MessageTypeEnum.Files)            {                //群发文本                foreach (var connection in list)                {                    await SendMessageAsync(connection.Socket, request.Message.ToString());                }            }            else            {                //群发附件                foreach (var connection in list)                {                    await SendFileMessageAsync(connection.Socket, (Stream)request.Message);                }            }        }        catch (Exception e)        {            Console.WriteLine(e);        }    }    /// <summary>    /// 发送全部    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public override async Task SendMassMessageAsync(MassMessageRequest request)    {        try        {            //过滤发送者、发送者不需要自己发送的消息            var list = _sockets.GetAllConnections()                .Where(w => w.Key != request.FromSocketId)                .Select(it => new                {                    SocketId = it.Key,                    Socket = it.Value                }).ToList();            if (list?.Count == 0)            {                throw new Exception("连接不存在");            }            if (request.Message == null)            {                throw new Exception("消息内容为空");            }            if (request.MessageType != MessageTypeEnum.Files)            {                //群发文本                foreach (var connection in list)                {                    await SendMessageAsync(connection.Socket, request.Message.ToString());                }            }            else            {                //群发附件                foreach (var connection in list)                {                    await SendFileMessageAsync(connection.Socket, (Stream)request.Message);                }            }        }        catch (Exception e)        {            Console.WriteLine(e);        }    }    /// <summary>    /// 发送附件数据    /// </summary>    /// <param name="socket"></param>    /// <param name="stream"></param>    /// <returns></returns>    private async Task SendFileMessageAsync(WebSocket socket, Stream stream)    {        if (socket.State != WebSocketState.Open)        {            throw new Exception("连接已关闭");        }        ;        if (stream == null)        {            throw new Exception("附件数据不存在");        }        try        {            byte[] content = new byte[stream.Length];            await stream.ReadAsync(content, 0, content.Length);            stream.Seek(0SeekOrigin.Begin);            await socket.SendAsync(new ArraySegment<byte>(content), WebSocketMessageType.BinarytrueCancellationToken.None);        }        catch (Exception e)        {            Console.WriteLine(e);            throw;        }    }    /// <summary>    /// 发送文本消息    /// </summary>    /// <param name="socket"></param>    /// <param name="message"></param>    /// <returns></returns>    private async Task SendMessageAsync(WebSocket socket, string message)    {        try        {            if (socket.State != WebSocketState.Open)            {                throw new Exception("连接已关闭");            }            ;            byte[] content = Encoding.UTF8.GetBytes(message);            await socket.SendAsync(new ArraySegment<byte>(content), WebSocketMessageType.TexttrueCancellationToken.None);        }        catch (Exception e)        {            Console.WriteLine(e);        }    }    /// <summary>    /// 处理文本消息    /// </summary>    /// <param name="socket"></param>    /// <param name="message"></param>    /// <returns></returns>    public override async Task ReceiveTextAsync(WebSocket socket, string message)    {        //群发、单发、广播三种消息格式        var socketId = _sockets.GetSocketId(socket);        var macAddress = IPHelper.GetMacAddress();        if (!string.IsNullOrEmpty(socketId))        {            //验证是否登录指令            var messageDto = new WSMessageDto();            try            {                messageDto = JsonSerializer.Deserialize<WSMessageDto>(message);                //如果没有建立连接、则根据消息内容建立连接                if (messageDto?.Command == (int)MessageBodyCommandType.VerifyToken)                {                    await GetValidateSuccessResponseAsync(socket, socketId, messageDto);                    //建立连接成功后、推送离线消息                    HandleSocketHistoryNoticesAsync($"{MD5Helper.GenerateMd5ByRequest(messageDto.FromAccount)}_{macAddress}", socket);                }                if (messageDto?.Command == (int)MessageBodyCommandType.Directional)                {                    //单聊、需要检测接收人、推送给指定的Socket                    await HandleNoticeToOnlineSocketAsync(socket, messageDto);                }                if (messageDto?.Command == (int)MessageBodyCommandType.GroupChat)                {                    //群聊、需要检测接收人、推送给指定的Socket                    await HandleNoticeToGroupSocketAsync(socket, messageDto);                }                if (messageDto?.Command == (int)MessageBodyCommandType.Broadcast)                {                    //广播消息、推送给所有在线Socket                    await SendMassMessageAsync(new MassMessageRequest                    {                        Message = message,                        FromSocketId = socketId,                        MessageType = MessageTypeEnum.Text                    });                }            }            catch (Exception e)            {                Console.WriteLine($"解析消息格式异常:{e.Message},{e}");                Console.WriteLine($"消息体格式:{message}");                await GetValidateExceptionResponse(socket, socketId);            }        }    }    /// <summary>    /// 接收附件消息    /// </summary>    /// <param name="socket"></param>    /// <param name="result"></param>    /// <param name="buffer"></param>    /// <returns></returns>    public override async Task ReceiveFileAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)    {        try        {            var socketId = _sockets.GetSocketId(socket);            var macAddress = IPHelper.GetMacAddress();            if (!string.IsNullOrEmpty(socketId) && buffer.Length > 0)            {                if (socketId.Contains($"_{macAddress}"))                {                    //以文件发送、则需要分两次发送、先推送对应关系。                    var stream = new MemoryStream(buffer);                    await SendMassMessageAsync(new MassMessageRequest                    {                        Message = stream,                        FromSocketId = socketId,                        MessageType = MessageTypeEnum.Files                    });                }            }        }        catch (Exception e)        {            Console.WriteLine(e);        }    }    /// <summary>    /// 转发文本消息    /// </summary>    /// <returns></returns>    private async Task HandleNoticeToOnlineSocketAsync(WebSocket socket, WSMessageDto message)    {        /* 消息体格式:         * 从消息体中定位到发送方、接收方、操作指令,消息体中必须包含以下字段         * 第一种格式:认证消息         *{            "Command": 1001,            "CommandName": "登录认证",            "FromAccount": "luoni",            "FromAccountName": "罗尼·库尔曼",            "Content": ""          }          第二种格式:普通消息          {            "Command": 2001,            "CommandName": "普通消息",            "FromAccount": "qiao",            "FromAccountName": "乔·卡特",            "ToAccount": "luoni",            "ToAccountName": "罗尼·库尔曼",            "Content":"Hello 罗尼"          }         */        var macAddress = IPHelper.GetMacAddress();        //序列化成消息模型        string dstAccount = message.ToAccount;        if (string.IsNullOrWhiteSpace(dstAccount))        {            throw new Exception("接收方为空...");        }        else        {            var recipientId = $"{MD5Helper.GenerateMd5ByRequest(dstAccount)}_{macAddress}";            var recipientWebSocket = _sockets.GetSocketById(recipientId);            if (recipientWebSocket == null)            {                var logMsg = $"接收人{dstAccount}不在线或未建立令牌";                Console.WriteLine(logMsg);                if (!string.IsNullOrWhiteSpace(dstAccount))                {                    var cacheKey = $"{MD5Helper.GenerateMd5ByRequest(dstAccount)}_{macAddress}";                    //缓存消息                    _noticeCacheManager.AddNotice(cacheKey, message);                    //推送连接不在线(发送给当前连接Socket)                    await SendCloConnectedNoticeAsync(socket, dstAccount);                }            }            else            {                //接收对象在线并且已经建立令牌,立即推送                await SendMessageAsync(recipientWebSocket, message.Content);                //推送成功、记录消息到发送记录列表,确保必须建立令牌后才可获取                //历史消息堆积会比较大,使用日期分割                var newKey = $"{DateTime.Now.ToString("yyyy-MM-dd")}:{dstAccount}";                //增加消息记录                _noticeCacheManager.AddNoticeRecord(newKey, message);                _logger.Debug("记录消息到Redis");            }        }    }    /// <summary>    /// 处理群聊消息    /// </summary>    /// <param name="socket"></param>    /// <param name="message"></param>    /// <returns></returns>    private async Task HandleNoticeToGroupSocketAsync(WebSocket socket, WSMessageDto message)    {        /**          第三种格式:群发消息          {            "Command": 2002,            "CommandName": "群发消息",            "FromAccount": "luoni",            "FromAccountName": "罗尼·库尔曼",            "ToAccount": "100011101",            "ToAccountName": "测试群"",            "Content":"Hello 各位群友"          }         */        //群发消息针对已经加入群中的用户,实际群管理的操作应该在webapi中处理。        List<GroupSocketAccount> groupUsers = _configuration.GetSection("GroupSocketAccount").Get<List<GroupSocketAccount>>();        var macAddress = IPHelper.GetMacAddress();        var toUsers = groupUsers?.Where(it => it.GroupID == message.ToAccount).FirstOrDefault()?.GroupUsers?.Split(",");        if (toUsers?.Length == 0)        {            throw new Exception("接收方为空...");        }        else        {            foreach (var dstAccount in toUsers)            {                var recipientId = $"{MD5Helper.GenerateMd5ByRequest(dstAccount)}_{macAddress}";                var recipientWebSocket = _sockets.GetSocketById(recipientId);                if (recipientWebSocket == null)                {                    var logMsg = $"接收群组{dstAccount}不在线或没有相应的成员";                    Console.WriteLine(logMsg);                    if (!string.IsNullOrWhiteSpace(dstAccount))                    {                        var cacheKey = $"{MD5Helper.GenerateMd5ByRequest(dstAccount)}_{macAddress}";                        //缓存消息                        _noticeCacheManager.AddNotice(cacheKey, message);                        //推送连接不在线(发送给当前连接Socket)                        await SendCloConnectedNoticeAsync(socket, dstAccount);                    }                }                else                {                    //接收对象在线并且已经建立令牌,立即推送                    await SendMessageAsync(recipientWebSocket, message.Content);                    //推送成功、记录消息到发送记录列表,确保必须建立令牌后才可获取                    //历史消息堆积会比较大,使用日期分割                    var newKey = $"{DateTime.Now.ToString("yyyy-MM-dd")}:{dstAccount}";                    //增加消息记录                    _noticeCacheManager.AddNoticeRecord(newKey, message);                    _logger.Debug("记录消息到Redis");                }            }        }    }    /// <summary>    /// 建立连接成功返回处理    /// </summary>    /// <param name="socket"></param>    /// <param name="socketId"></param>    /// <returns></returns>    private async Task GetValidateSuccessResponseAsync(WebSocket socket, string socketId, WSMessageDto message)    {        /**认证类的消息格式         {            "Command": 1001,            "CommandName": "登录认证",            "FromAccount": "luoni",            "FromAccountName": "罗尼·库尔曼",            "Content": ""        }*/        //如果认证账号需要与配置文件或者数据库匹配,在此处理相关业务逻辑。        var response = new UserTokenValidateResponse        {            Status = 200,            Message = "连接成功",            Token = socketId        };        await SendMessageAsync(socket, JsonSerializer.Serialize(response, new JsonSerializerOptions        {            Encoder = JavaScriptEncoder.Create(UnicodeRanges.All)        }));        //绑定当前Socket对象与FromAccount关联        var macAddress = IPHelper.GetMacAddress();        var recipientId = $"{MD5Helper.GenerateMd5ByRequest(message.FromAccount)}_{macAddress}";        await _sockets.EditSocketKey(socketId, recipientId);        var logMsg = $"建立连接成功,连接信息:SocketId:{socketId}";        Console.WriteLine(logMsg);    }    /// <summary>    /// 建立连接失败返回处理    /// </summary>    /// <param name="socket"></param>    /// <param name="socketId"></param>    /// <returns></returns>    private async Task GetValidateExceptionResponse(WebSocket socket, string socketId)    {        var response = new UserTokenValidateResponse        {            Status = 401,            Message = "消息格式异常,无法发送,请参考文档"        };        await SendMessageAsync(socket, JsonSerializer.Serialize(response, new JsonSerializerOptions        {            Encoder = JavaScriptEncoder.Create(UnicodeRanges.All)        }));        //建立连接失败后、删除缓存中的Socket连接对象、服务端主动断开        await _sockets.RemoveSocketAsync(socketId);        var logMsg = $"建立连接失败,已清除Socket连接:SocketId:{socketId}";        Console.WriteLine(logMsg);    }    /// <summary>    /// 推送离线消息    /// </summary>    /// <param name="cacheNoticeKey"></param>    /// <param name="socket"></param>    /// <returns></returns>    private void HandleSocketHistoryNoticesAsync(string cacheNoticeKey, WebSocket socket)    {        try        {            var historyNotices = _noticeCacheManager.GetUserAllHistoryNotices(cacheNoticeKey);            if (historyNotices != null)            {                foreach (var notice in historyNotices)                {                    _ = SendMessageAsync(socket, notice.ToString());                }                var logMsg = $"建立连接成功,接收历史消息{historyNotices.Count}条";                Console.WriteLine(logMsg);                //推送成功后、清除离线消息                Console.WriteLine(cacheNoticeKey);                _noticeCacheManager.RemoveNotice(cacheNoticeKey);            }        }        catch (Exception e)        {            Console.WriteLine(e);            _logger.Error("推送离线消息异常:", e);        }    }    /// <summary>    /// 通知接收用户不在线    /// </summary>    /// <returns></returns>    private async Task SendCloConnectedNoticeAsync(WebSocket socket, string toAccount)    {        //通知所有在线用户、xxx用户下线        try        {            //读取缓存中的用户名xxx,组装下线通知格式:            var socketId = _sockets.GetSocketId(socket);            var noticeTemplate = $"用户{toAccount}已下线";            //通知所有当前连接用户            await SendMassMessageAsync(new MassMessageRequest            {                FromSocketId = socketId,                Message = noticeTemplate,                MessageType = MessageTypeEnum.Text            });        }        catch (Exception e)        {            Console.WriteLine(e);            //_logger.Error("SendCloConnectedNoticeAsync", e);        }    }}

2.4.4、创建消息Command类型MessageBodyCommandType.cs:

using System;using System.Collections.Generic;using System.ComponentModel;using System.Linq;using System.Text;using System.Threading.Tasks;namespace WS.SocketServer;/// <summary>/// 消息体的Command类型/// </summary>public enum MessageBodyCommandType{    /// <summary>    /// 认证Token    /// </summary>    [Description("认证Token")]    VerifyToken = 1001,    /// <summary>    /// 定向单聊    /// </summary>    [Description("定向单聊")]    Directional = 2001,    /// <summary>    /// 群聊    /// </summary>    [Description("群聊")]    GroupChat = 2002,    /// <summary>    /// 广播    /// </summary>    [Description("广播")]    Broadcast = 2003}

2.4.5、创建一个Socket对象管理接口ISocketDataManager,用于保存和管理Socket连接对象。

using System;using System.Collections.Concurrent;using System.Net.WebSockets;namespace WS.SocketServer;/// <summary>/// Socket对象管理/// </summary>public interface ISocketDataManager{    /// <summary>    /// 获取所有连接    /// </summary>    /// <returns></returns>    ConcurrentDictionary<string, WebSocket> GetAllConnections();    /// <summary>    /// 根据Id获取指定Socket连接    /// </summary>    /// <param name="id"></param>    /// <returns></returns>    WebSocket GetSocketById(string id);    /// <summary>    /// 获取Socket连接ID    /// </summary>    /// <param name="socket"></param>    /// <returns></returns>    string GetSocketId(WebSocket socket);            /// <summary>    /// 连接账号获取SocketId    /// </summary>    /// <param name="account"></param>    /// <returns></returns>    string GetSocketId(string account);    /// <summary>    /// 总连接数    /// </summary>    /// <returns></returns>    int GetCount();    /// <summary>    /// 删除指定ID的Socket,并关闭该连接    /// </summary>    /// <param name="id"></param>    /// <returns></returns>    Task RemoveSocketAsync(string id);    /// <summary>    /// 添加新的Socket    /// </summary>    /// <param name="socketId"></param>    /// <param name="socket"></param>    Task<boolAddSocket(string socketId, WebSocket socket);    /// <summary>    /// 变更Socket令牌    /// </summary>    /// <param name="oldSocketKey"></param>    /// <param name="newSocketKey"></param>    /// <returns></returns>    Task<boolEditSocketKey(string oldSocketKey, string newSocketKey);}

2.4.6、创建Socket连接对象管理的实现类SocketDataManager.cs

using System.Collections.Concurrent;using System.Net.WebSockets;namespace WS.SocketServer;/// <summary>/// Socket连接对象管理/// </summary>public class SocketDataManager : ISocketDataManager{    //WebSocket对象当前是存储在服务器内存中的    private readonly ConcurrentDictionary<string, WebSocketConnections = new();     public SocketDataManager()    {    }    /// <summary>    /// 获取所有Socket连接    /// </summary>    /// <returns></returns>    public ConcurrentDictionary<string, WebSocketGetAllConnections()    {        return Connections;    }    /// <summary>    /// 根据Id获取指定Socket连接    /// </summary>    /// <param name="id"></param>    /// <returns></returns>    public WebSocket GetSocketById(string id)    {        var list = Connections;        return Connections.FirstOrDefault(x => x.Key == id).Value;    }    /// <summary>    /// 获取Socket连接ID    /// </summary>    /// <param name="socket"></param>    /// <returns></returns>    public string GetSocketId(WebSocket socket)    {        return Connections.FirstOrDefault(x => x.Value == socket).Key;    }    /// <summary>    /// 连接账号获取SocketId    /// </summary>    /// <param name="account"></param>    /// <returns></returns>    public string GetSocketId(string account)    {        var cacheKey = $"{MD5Helper.GenerateMd5ByRequest(account)}_verify";        return cacheKey;    }    /// <summary>    /// 删除指定ID的Socket,并关闭该连接    /// </summary>    /// <param name="id"></param>    /// <returns></returns>    public async Task RemoveSocketAsync(string id)    {        Connections.TryRemove(id, out var socket);        if (socket != null && socket?.State == WebSocketState.Open)        {            Console.WriteLine("socket连接关闭");            await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure"socket连接关闭"CancellationToken.None);        }    }    /// <summary>    /// 添加新的Socket    /// </summary>    /// <param name="socketId"></param>    /// <param name="socket"></param>    public async Task<bool> AddSocket(string socketId, WebSocket socket)    {        var result = Connections.TryAdd(socketId, socket);        return await Task.FromResult(result);    }    /// <summary>    /// 变更Socket令牌    /// </summary>    /// <param name="oldSocketKey"></param>    /// <param name="newSocketKey"></param>    /// <returns></returns>    public async Task<bool> EditSocketKey(string oldSocketKey, string newSocketKey)    {        var result = false;        if (Connections.Any(it => it.Key == oldSocketKey))        {            //清除旧的连接            Connections.TryRemove(oldSocketKey, out var socket1);            result = Connections.TryAdd(newSocketKey, socket1);        }        if (Connections.Any(it => it.Key == newSocketKey))        {            //重新确定、清除已建立令牌的旧的连接            Console.WriteLine($"oldSocketKey:{oldSocketKey}, newSocketKey:{newSocketKey}");            var removeResult = Connections.TryRemove(oldSocketKey, out var socket2);            Connections.TryAdd(newSocketKey, socket2);        }        result = Connections.Any(it => it.Key == newSocketKey);        return await Task.FromResult(result);    }    /// <summary>    /// 总连接数    /// </summary>    /// <returns></returns>    public int GetCount()    {        return Connections.Count;    }}

2.4.7、创建消息类型枚举MessageTypeEnum

using System.ComponentModel;namespace WS.SocketServer{    /// <summary>    /// 消息类型    /// </summary>    public enum MessageTypeEnum    {        [Description("文本")]        Text = 1,        [Description("文件流")]        Files = 2    }}

2.4.8、创建消息手法的参数类MessageInfo.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace WS.SocketServer;/// <summary>/// 消息内容/// </summary>public class MessageInfo{    /// <summary>    /// 消息Id    /// </summary>    public string NoticeId { getset; }    /// <summary>    /// 消息类型    /// </summary>    public MessageTypeEnum MessageType { getset; }    /// <summary>    /// 消息内容    /// </summary>    public object Message { getset; }}

3、创建缓存模块

  3.1、缓存消息存储在Redis或者使用内存存储,历史消息使用Redis存储 

 3.2、创建CacheManager目录,并创建缓存接口ICacheContext.cs

namespace WS.SocketServer;/// <summary>/// 缓存接口/// </summary>public interface ICacheContext{    /// <summary>    /// Exists    /// </summary>    /// <param name="key"></param>    /// <returns></returns>    bool Exists(string key);    /// <summary>    /// 读取缓存    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="key"></param>    /// <returns></returns>    GetCache<T>(string key);    /// <summary>    /// 设置缓存    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="key"></param>    /// <param name="value"></param>    /// <returns></returns>    bool SetCache<T>(string key, T value);    /// <summary>    /// 设置缓存    /// </summary>    /// <param name="key"></param>    /// <param name="value"></param>    /// <param name="expire"></param>    bool SetCache<T>(string key, T value, DateTime expire);    /// <summary>    /// 删除缓存    /// </summary>    /// <param name="key"></param>    /// <returns></returns>    bool Remove(string key);    /// <summary>    /// 重命名Key    /// </summary>    /// <param name="key"></param>    /// <param name="newKey"></param>    /// <returns></returns>    bool RenameKey(string key, string newKey);}


3.3、创建缓存接口的内存实现

using System.Runtime.Caching;namespace WS.SocketServer;/// <summary>////// </summary>public class MemoryCacheService : ICacheContext{    /// <summary>    /// 是否存在此缓存    /// </summary>    /// <param name="key"></param>    /// <returns></returns>    public bool Exists(string key)    {        return MemoryCache.Default.Contains(key);    }    /// <summary>    /// 取得缓存数据    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="key"></param>    /// <returns></returns>    public T GetCache<T>(string key)    {        return (T)MemoryCache.Default[key];    }    /// <summary>    /// 设置缓存    /// </summary>    /// <param name="key"></param>    /// <param name="value"></param>    public bool SetCache<T>(string key, T value)    {        try        {            MemoryCache.Default.Set(key, valuenew CacheItemPolicy            {                AbsoluteExpiration = DateTime.Now.AddYears(1)            });            return true;        }        catch (Exception e)        {            return false;        }    }    /// <summary>    /// 设置缓存    /// </summary>    /// <param name="key"></param>    /// <param name="value"></param>    /// <param name="expire"></param>    public bool SetCache<T>(string key, T value, DateTime expire)    {        try        {            MemoryCache.Default.Set(key, valuenew CacheItemPolicy            {                AbsoluteExpiration = expire            });            return true;        }        catch (Exception e)        {            return false;        }    }    /// <summary>    /// 移除缓存    /// </summary>    /// <param name="key"></param>    public bool Remove(string key)    {        try        {            MemoryCache.Default.Remove(key);            return true;        }        catch (Exception e)        {            return false;        }    }    /// <summary>    /// 重命名Key    /// </summary>    /// <param name="key"></param>    /// <param name="newKey"></param>    /// <returns></returns>    public bool RenameKey(string key, string newKey)    {        try        {            MemoryCache.Default[key] = newKey;            return true;        }        catch (Exception e)        {            Console.WriteLine(e);            return false;        }    }}

3.4、创建缓存接口的Redis实现

using StackExchange.Redis;using System.Text.Json;namespace WS.SocketServer;public class RedisCacheService : ICacheContext{    private static ConnectionMultiplexer Conn { getset; }    private static IDatabase RedisDb { getset; }    /// <summary>    /// 初始化连接    /// </summary>    /// <param name="conn"></param>    /// <param name="db">默认DB</param>    public static void Init(string conn, int db = 0)    {        Conn = ConnectionMultiplexer.Connect(conn);        RedisDb = Conn.GetDatabase(db);    }    /// <summary>    /// RedisCacheContext    /// </summary>    public RedisCacheService()    {    }    /// <summary>    /// 是否存在此缓存    /// </summary>    /// <param name="key"></param>    /// <returns></returns>    public bool Exists(string key)    {        return RedisDb.KeyExists(key);    }    /// <summary>    /// 获取缓存对象    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="key"></param>    /// <returns></returns>    public T GetCache<T>(string key)    {        RedisValue value = RedisDb.StringGet(key);        if (!value.HasValue)            return default;        if (typeof(T) == typeof(string))        {            return (T)Convert.ChangeType(valuetypeof(T));        }        else        {            return JsonSerializer.Deserialize<T>(value);        }    }    /// <summary>    /// 设置缓存    /// </summary>    /// <param name="key"></param>    /// <param name="value"></param>    public bool SetCache<T>(string key, T value)    {        if (typeof(T) == typeof(string))        {            return RedisDb.StringSet(key, value?.ToString());        }        else        {            return RedisDb.StringSet(key, JsonSerializer.Serialize(value));        }    }    /// <summary>    /// 设置缓存    /// </summary>    /// <param name="key"></param>    /// <param name="value"></param>    /// <param name="expire"></param>    public bool SetCache<T>(string key, T value, DateTime expire)    {        if (typeof(T) == typeof(string))        {            return RedisDb.StringSet(key, value?.ToString(), expire - DateTime.Now);        }        else        {            return RedisDb.StringSet(key, JsonSerializer.Serialize(value), expire -  DateTime.Now);        }    }    /// <summary>    /// 删除缓存    /// </summary>    /// <param name="key"></param>    /// <returns></returns>    public bool Remove(string key)    {        return RedisDb.KeyDelete(key);    }    /// <summary>    /// 重命名Key    /// </summary>    /// <param name="key"></param>    /// <param name="newKey"></param>    /// <returns></returns>    public bool RenameKey(string key, string newKey)    {        return RedisDb.KeyRename(key, newKey);    }}

3.5、创建消息的缓存接口INoticeCacheManager.cs和实现类,用于管理离线消息

# INoticeCacheManager.csnamespace WS.SocketServer;/// <summary>/// 消息缓存接口/// </summary>public interface INoticeCacheManager{    /// <summary>    /// 获取用户所有离线消息    /// </summary>    /// <param name="toUser">接收人</param>    /// <returns></returns>    List<objectGetUserAllHistoryNotices(string toUser);    /// <summary>    /// 缓存离线消息    /// </summary>    /// <param name="toUser">接收人</param>    /// <param name="message"></param>    void AddNotice(string toUser, object message);    /// <summary>    /// 缓存消息发送记录    /// </summary>    /// <param name="toUser">接收人</param>    /// <param name="message"></param>    void AddNoticeRecord(string toUser, object message);    /// <summary>    /// 删除缓存中的消息    /// </summary>    /// <param name="toUser">接收人</param>    void RemoveNotice(string toUser);}#NoticeCacheHandler.csnamespace WS.SocketServer;/// <summary>/// 缓存消息处理类/// </summary>public class NoticeCacheHandler : INoticeCacheManager{    private readonly ICacheContext _cacheContext;    /// <summary>    ///    /// </summary>    /// <param name="cacheContext"></param>    public NoticeCacheHandler(ICacheContext cacheContext)    {        _cacheContext = cacheContext;    }    /// <summary>    /// 获取所有离线消息    /// </summary>    /// <param name="toUser">消息接收方</param>    /// <returns></returns>    public List<objectGetUserAllHistoryNotices(string toUser)    {        //消息缓存的格式: 接收者作为Key可快速搜索离线消息        /*         根据当前验证账号获取离线消息         //存储的离线消息针对接收方,当接收方上线后,Socket服务自动推送         以接收对象为Key,接收者上线后自动拉取         */        try        {            var noticeAll = _cacheContext.GetCache<List<object>>(toUser);            return noticeAll;        }        catch (Exception e)        {            Console.WriteLine(e);            throw;        }    }    /// <summary>    /// 缓存离线消息    /// </summary>    /// <param name="toUser">接收者</param>    /// <param name="message">消息内容</param>    public void AddNotice(string toUser, object message)    {        var noticeList = GetUserAllHistoryNotices(toUser);        if (noticeList != null)        {            noticeList.Add(message);        }        else        {            noticeList = new List<object>            {                message            };        }        //覆盖缓存数据        _cacheContext.SetCache(toUser, noticeList);    }    /// <summary>    /// 缓存消息历史    /// </summary>    /// <param name="cacheKey">接收人</param>    /// <param name="message">消息内容</param>    public void AddNoticeRecord(string cacheKey, object message)    {        try        {            AddNotice(cacheKey, message);        }        catch (Exception e)        {            Console.WriteLine(e);            throw;        }    }    /// <summary>    /// 删除缓存消息    /// </summary>    /// <param name="toUser"></param>    public void RemoveNotice(string toUser)    {        try        {            _cacheContext.Remove(toUser);        }        catch (Exception e)        {            Console.WriteLine(e);            throw;        }    }}

4.创建基础工具类

  4.1、创建IP地址获取工具类IPHelper.cs,防止在不同IP登录后消息无法正常收发

using Microsoft.AspNetCore.Http;using System;using System.Collections.Generic;using System.Linq;using System.Net.NetworkInformation;using System.Text;using System.Threading.Tasks;namespace WS.SocketServer;public class IPHelper{    /// <summary>    /// 获得IP地址    /// </summary>    /// <returns>字符串数组</returns>    public static string GetIp()    {        HttpContextAccessor _context = new HttpContextAccessor();        if (_context.HttpContext == null)            return string.Empty;        var ip = _context.HttpContext.Request.Headers["X-Forwarded-For"].ToString();        if (string.IsNullOrEmpty(ip))        {            ip = _context.HttpContext.Connection.RemoteIpAddress.ToString();        }        if (ip == "::1")        {            ip = "127.0.0.1";        }        return ip;    }        /// <summary>    /// 获取Mac地址    /// </summary>    /// <returns></returns>    public static string GetMacAddress()    {        NetworkInterface[] networks = NetworkInterface.GetAllNetworkInterfaces();        if(networks.Length == 0)        {            return string.Empty;        }        var macAddress = string.Empty;        var effectiveNetworks = networks.Where(it=>it.NetworkInterfaceType ==  NetworkInterfaceType.Loopback && it.OperationalStatus == OperationalStatus.Up).ToList();        foreach (NetworkInterface adapter in effectiveNetworks)        {            PhysicalAddress address1 = adapter.GetPhysicalAddress();            macAddress = address1?.ToString();            if (string.IsNullOrWhiteSpace(macAddress))            {                IPInterfaceProperties properties = adapter.GetIPProperties();                var unicastAddress = properties.UnicastAddresses;                if(unicastAddress.Any(it=>it.Address.AddressFamily ==  System.Net.Sockets.AddressFamily.InterNetwork))                {                    var  address2 = adapter.GetPhysicalAddress().ToString();                    macAddress = address2?.ToString();                }            }        }        return macAddress;    }}

4.2、创建MD5加密类MD5Helper.cs,用于格式化SocketID

using System;using System.Collections.Generic;using System.Linq;using System.Security.Cryptography;using System.Text;using System.Threading.Tasks;namespace WS.SocketServer;/// <summary>/// SocketId生成/// </summary>public class MD5Helper{    /// <summary>    /// 创建SocketId    /// </summary>    /// <returns></returns>    public static string GenerateNGuid()    {        return Guid.NewGuid().ToString("N");    }    /// <summary>    /// 根据请求参数生成标准MD5    /// </summary>    /// <param name="request"></param>    /// <returns></returns>    public static string GenerateMd5ByRequest(string request)    {        using (var md5 = MD5.Create())        {            byte[] data = System.Text.Encoding.Default.GetBytes(request);            byte[] result = md5.ComputeHash(data);            string response = "";            for (int i = 0; i < result.Length; i++)                response += result[i].ToString("x").PadLeft(2'0');            return response;        }    }}

4.3、修改Program.cs,配置连接以及依赖注入

using Logger.Service;using Logger.Service.Interface;using WS.SocketServer;var builder = WebApplication.CreateBuilder(args);// Add services to the container.builder.Services.AddControllers();// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbucklebuilder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();IConfiguration config = new ConfigurationBuilder().AddJsonFile("appsettings.json",  true).Build();var cacheType = config.GetSection("CacheType").Value;if (cacheType != "Redis"){    builder.Services.AddSingleton<ICacheContextMemoryCacheService>();}else{    var redisConn = config.GetSection("RedisConfig:Connection").Value;    var redisDb = config.GetSection("RedisConfig:Db").Value;    builder.Services.AddSingleton<ICacheContextRedisCacheService>();    RedisCacheService.Init(redisConn, Convert.ToInt32(redisDb));}builder.Services.AddSingleton<IHttpContextAccessorHttpContextAccessor>();builder.Services.AddTransient<INoticeCacheManagerNoticeCacheHandler>();builder.Services.AddTransient<ISocketDataManagerSocketDataManager>();builder.Services.AddTransient<ILoggerContextNLogService>();builder.Services.AddTransient<ISocketServiceHandlerSocketServiceHandler>();//使用配置的Socket地址var url = config.GetSection("SocketApi:Server").Value;builder.WebHost.UseUrls(url);builder.WebHost.UseKestrel(options =>{    options.Limits.MaxRequestBodySize = int.MaxValue;});var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){    app.UseSwagger();    app.UseSwaggerUI();}//启用webSocketapp.UseWebSockets(new WebSocketOptions{    KeepAliveInterval = TimeSpan.FromSeconds(15), //心跳机制、15秒检测一次,默认2分钟    ReceiveBufferSize = 1024 * 1024 * 10 //消息长度限制});var serviceProvider = app.Services;var socketService = serviceProvider.GetService<ISocketServiceHandler>();if (socketService != null){    //配置Socket监听地址    var monitorUrl = config.GetSection("SocketApi:SocketUrl").Value;    app.Map(monitorUrl, it => it.UseMiddleware<MySocketMiddleware>(socketService));}app.UseAuthorization();app.MapControllers();app.Run();

5、测试WebSocket

  5.1、用两台电脑分布打开https://wstool.js.org/

    输入ws://localhost:5072/ws测试地址,点击开启连接,提示连接成功后可以开始测试

5.2、登录测试数据

# 根据执行命令来判定具体的操作步骤,比如Command=1001标识单个用户链接{    "Command": 1001,    "CommandName""登录认证",    "FromAccount""luoni",    "FromAccountName""罗尼·库尔曼",    "Content"""}{    "Command": 1001,    "CommandName""登录认证",    "FromAccount""qiao",    "FromAccountName""乔·卡特",    "Content":""}

5.2.1、分别在两台机器上发送登录认证消息,等待正确返回


  5.2.2、在另一个浏览器上执行另一个登录认证

5.3、发送文本消息

# 根据执行命令来判定具体的操作步骤,比如Command=2001标识单个用户链接{    "Command": 2001,    "CommandName""普通消息",    "FromAccount""qiao",    "FromAccountName""乔·卡特",    "ToAccount""luoni",    "ToAccountName""罗尼·库尔曼",    "Content":"Hello 罗尼"}{    "Command": 2001,    "CommandName""普通消息",    "FromAccount""luoni",    "FromAccountName""罗尼·库尔曼",    "ToAccount""kate",    "ToAccountName""乔·卡特",    "Content":"Hello 乔"}

5.3.1、从乔·卡特这个登录账号中发出消息

5.3.2、在另一个浏览器中接收到消息,则表示WebSocket通信已完成

5.4、发送群消息测试

#群发消息、根据Command来识别{    "Command": 2002,    "CommandName""群发消息",    "FromAccount""luoni",    "FromAccountName""罗尼·库尔曼",    "ToAccount""100001",    "ToAccountName""测试群",    "Content":"Hello 各位群友"}

 5.4.1、发送群聊消息,所有链接在线的群成员都可以收到,并且自己也会收到一份,也可以不将发送者配置到群里,具体可根据业务修改逻辑

    WebSocket本身使用起来并不复杂,需要考虑到各种业务场景、以及性能损耗,比如针对一些超长文本的发送,数据传输和反序列化会消耗大量性能,可以使用文件流来传输,上面的代码中已经集成了文件流的传输,并且采用了分段式读取1M/频次。

  当前案例只是提供了WebSocket服务端这块的功能,前端可以使用JavaScript或者其他一些集成库来测试。

  本章节代码量过多,后期将会采用片段代码+在线源码的风格来编写此类文档,方便阅读和理解。

  源码地址:

https://gitee.com/inc-zz/netCoreDemos/tree/master/12-WebSocket/src/WebSocketApi


阅读原文:原文链接


该文章在 2025/5/14 10:08:47 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved