1、优化消息排序逻辑 2、新增加载历史消息 3、修复已知问题 后端: 1、优化消息排序逻辑 2、增加用户信息缓存机制 3、修改日期类型为DateTimeOffset改善时区信息丢失问题 3、修复了已知问题 数据库: 1、新增SequenceId字段用于消息排序 2、新增ClientMsgId字段用于客户端消息回执
139 lines
5.6 KiB
C#
139 lines
5.6 KiB
C#
using AutoMapper;
|
||
using IM_API.Application.Interfaces;
|
||
using IM_API.Domain.Events;
|
||
using IM_API.Dtos;
|
||
using IM_API.Dtos.Message;
|
||
using IM_API.Exceptions;
|
||
using IM_API.Interface.Services;
|
||
using IM_API.Models;
|
||
using IM_API.Tools;
|
||
using IM_API.VOs.Message;
|
||
using MassTransit;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using StackExchange.Redis;
|
||
using System.Text.RegularExpressions;
|
||
using static MassTransit.Monitoring.Performance.BuiltInCounters;
|
||
using static Microsoft.EntityFrameworkCore.DbLoggerCategory;
|
||
|
||
namespace IM_API.Services
|
||
{
|
||
public class MessageService : IMessageSevice
|
||
{
|
||
private readonly ImContext _context;
|
||
private readonly ILogger<MessageService> _logger;
|
||
private readonly IMapper _mapper;
|
||
//废弃,此处已使用rabbitMQ替代
|
||
//private readonly IEventBus _eventBus;
|
||
private readonly IPublishEndpoint _endpoint;
|
||
private readonly ISequenceIdService _sequenceIdService;
|
||
public MessageService(
|
||
ImContext context, ILogger<MessageService> logger, IMapper mapper,
|
||
IPublishEndpoint publishEndpoint, ISequenceIdService sequenceIdService
|
||
)
|
||
{
|
||
_context = context;
|
||
_logger = logger;
|
||
_mapper = mapper;
|
||
//_eventBus = eventBus;
|
||
_endpoint = publishEndpoint;
|
||
_sequenceIdService = sequenceIdService;
|
||
}
|
||
|
||
public async Task<List<MessageBaseVo>> GetMessagesAsync(int userId,MessageQueryDto dto)
|
||
{
|
||
//获取会话信息,用于获取双方聊天的唯一标识streamkey
|
||
Conversation? conversation = await _context.Conversations.FirstOrDefaultAsync(
|
||
x => x.Id == dto.ConversationId && x.UserId == userId
|
||
);
|
||
if (conversation is null) throw new BaseException(CodeDefine.CONVERSATION_NOT_FOUND);
|
||
|
||
var baseQuery = _context.Messages.Where(x => x.StreamKey == conversation.StreamKey);
|
||
if (dto.Direction == 0) // Before: 找比锚点小的,按倒序排
|
||
{
|
||
if (dto.Cursor.HasValue)
|
||
baseQuery = baseQuery.Where(m => m.SequenceId < dto.Cursor.Value);
|
||
|
||
var list = await baseQuery
|
||
.OrderByDescending(m => m.SequenceId) // 最新消息在最前
|
||
.Take(dto.Limit)
|
||
.Select(m => _mapper.Map<MessageBaseVo>(m))
|
||
.ToListAsync();
|
||
|
||
return list.OrderBy(s => s.SequenceId).ToList();
|
||
}
|
||
else // After: 找比锚点大的,按正序排(用于补洞或刷新)
|
||
{
|
||
// 如果 Cursor 为空且是 After,逻辑上说不通,通常直接返回空或报错
|
||
if (!dto.Cursor.HasValue) return new List<MessageBaseVo>();
|
||
|
||
return await baseQuery
|
||
.Where(m => m.SequenceId > dto.Cursor.Value)
|
||
.OrderBy(m => m.SequenceId) // 按时间线正序
|
||
.Take(dto.Limit)
|
||
.Select(m => _mapper.Map<MessageBaseVo>(m))
|
||
.ToListAsync();
|
||
}
|
||
}
|
||
|
||
public Task<int> GetUnreadCountAsync(int userId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<List<MessageBaseDto>> GetUnreadMessagesAsync(int userId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> MarkAsReadAsync(int userId, long messageId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> MarkConversationAsReadAsync(int userId, int? userBId, int? groupId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> RecallMessageAsync(int userId, int messageId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public async Task MakeMessageAsync(Message message)
|
||
{
|
||
_context.Messages.Add(message);
|
||
await _context.SaveChangesAsync();
|
||
}
|
||
#region 发送群消息
|
||
public async Task<MessageBaseVo> SendGroupMessageAsync(int senderId, int groupId, MessageBaseDto dto)
|
||
{
|
||
//判断群存在
|
||
var isExist = await _context.Groups.AnyAsync(x => x.Id == groupId);
|
||
if (!isExist) throw new BaseException(CodeDefine.GROUP_NOT_FOUND);
|
||
//判断是否是群成员
|
||
var isMember = await _context.GroupMembers.AnyAsync(x => x.GroupId == groupId && x.UserId == senderId);
|
||
if (!isMember) throw new BaseException(CodeDefine.NO_GROUP_PERMISSION);
|
||
var message = _mapper.Map<Message>(dto);
|
||
message.StreamKey = StreamKeyBuilder.Group(groupId);
|
||
message.SequenceId = await _sequenceIdService.GetNextSquenceIdAsync(message.StreamKey);
|
||
await _endpoint.Publish(_mapper.Map<MessageCreatedEvent>(message));
|
||
return _mapper.Map<MessageBaseVo>(message);
|
||
|
||
}
|
||
#endregion
|
||
#region 发送私聊消息
|
||
public async Task<MessageBaseVo> SendPrivateMessageAsync(int senderId, int receiverId, MessageBaseDto dto)
|
||
{
|
||
bool isExist = await _context.Friends.AnyAsync(x => x.FriendId == receiverId);
|
||
if (!isExist) throw new BaseException(CodeDefine.FRIEND_RELATION_NOT_FOUND);
|
||
var message = _mapper.Map<Message>(dto);
|
||
message.StreamKey = StreamKeyBuilder.Private(senderId, receiverId);
|
||
message.SequenceId = await _sequenceIdService.GetNextSquenceIdAsync(message.StreamKey);
|
||
await _endpoint.Publish(_mapper.Map<MessageCreatedEvent>(message));
|
||
return _mapper.Map<MessageBaseVo>(message);
|
||
}
|
||
#endregion
|
||
}
|
||
}
|