212 lines
8.6 KiB
C#
212 lines
8.6 KiB
C#
using AutoMapper;
|
||
using IM_API.Application.Interfaces;
|
||
using IM_API.Domain.Events;
|
||
using IM_API.Dtos;
|
||
using IM_API.Dtos.Message;
|
||
using IM_API.Exceptions;
|
||
using IM_API.Interface.Services;
|
||
using IM_API.Models;
|
||
using IM_API.Tools;
|
||
using IM_API.VOs.Message;
|
||
using MassTransit;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using Newtonsoft.Json;
|
||
using StackExchange.Redis;
|
||
using System.Text.Json;
|
||
using System.Text.RegularExpressions;
|
||
using System.Threading.Tasks;
|
||
using static MassTransit.Monitoring.Performance.BuiltInCounters;
|
||
using static Microsoft.EntityFrameworkCore.DbLoggerCategory;
|
||
|
||
namespace IM_API.Services
|
||
{
|
||
public class MessageService : IMessageSevice
|
||
{
|
||
private readonly ImContext _context;
|
||
private readonly ILogger<MessageService> _logger;
|
||
private readonly IMapper _mapper;
|
||
//废弃,此处已使用rabbitMQ替代
|
||
//private readonly IEventBus _eventBus;
|
||
private readonly IPublishEndpoint _endpoint;
|
||
private readonly ISequenceIdService _sequenceIdService;
|
||
private readonly IUserService _userService;
|
||
private readonly IUploadTaskService _uploadService;
|
||
private readonly IHttpContextAccessor _httpContextAccessor;
|
||
public MessageService(
|
||
ImContext context, ILogger<MessageService> logger, IMapper mapper,
|
||
IPublishEndpoint publishEndpoint, ISequenceIdService sequenceIdService,
|
||
IUserService userService, IUploadTaskService uploadTaskService,
|
||
IHttpContextAccessor httpContextAccessor
|
||
)
|
||
{
|
||
_context = context;
|
||
_logger = logger;
|
||
_mapper = mapper;
|
||
//_eventBus = eventBus;
|
||
_endpoint = publishEndpoint;
|
||
_sequenceIdService = sequenceIdService;
|
||
_userService = userService;
|
||
_uploadService = uploadTaskService;
|
||
_httpContextAccessor = httpContextAccessor;
|
||
}
|
||
|
||
public async Task<List<MessageBaseVo>> GetMessagesAsync(int userId,MessageQueryDto dto)
|
||
{
|
||
//获取会话信息,用于获取双方聊天的唯一标识streamkey
|
||
Conversation? conversation = await _context.Conversations.FirstOrDefaultAsync(
|
||
x => x.Id == dto.ConversationId && x.UserId == userId
|
||
);
|
||
if (conversation is null) throw new BaseException(CodeDefine.CONVERSATION_NOT_FOUND);
|
||
|
||
var baseQuery = _context.Messages.Where(x => x.StreamKey == conversation.StreamKey);
|
||
List<MessageBaseVo> messages = new List<MessageBaseVo>();
|
||
if (dto.Direction == 0) // Before: 找比锚点小的,按倒序排
|
||
{
|
||
if (dto.Cursor.HasValue)
|
||
baseQuery = baseQuery.Where(m => m.SequenceId < dto.Cursor.Value);
|
||
|
||
var list = await baseQuery
|
||
.OrderByDescending(m => m.SequenceId) // 最新消息在最前
|
||
.Take(dto.Limit)
|
||
.Select(m => _mapper.Map<MessageBaseVo>(m))
|
||
.ToListAsync();
|
||
|
||
messages = list.OrderBy(s => s.SequenceId).ToList();
|
||
}
|
||
else // After: 找比锚点大的,按正序排(用于补洞或刷新)
|
||
{
|
||
// 如果 Cursor 为空且是 After,逻辑上说不通,通常直接返回空或报错
|
||
if (!dto.Cursor.HasValue) return new List<MessageBaseVo>();
|
||
|
||
messages = await baseQuery
|
||
.Where(m => m.SequenceId > dto.Cursor.Value)
|
||
.OrderBy(m => m.SequenceId) // 按时间线正序
|
||
.Take(dto.Limit)
|
||
.Select(m => _mapper.Map<MessageBaseVo>(m))
|
||
.ToListAsync();
|
||
}
|
||
//取发送者信息,用于前端展示
|
||
if(messages.Count > 0)
|
||
{
|
||
var ids = messages
|
||
.Select(s => s.SenderId)
|
||
.Distinct()
|
||
.ToList();
|
||
var userinfoList = await _userService.GetUserInfoListAsync(ids);
|
||
// 转为字典,提高查询效率
|
||
var userDict = userinfoList.ToDictionary(x => x.Id, x => x);
|
||
|
||
foreach (var item in messages)
|
||
{
|
||
if(item.Type != MessageMsgType.Text)
|
||
{
|
||
var request = _httpContextAccessor.HttpContext?.Request;
|
||
var baseUrl = $"{request.Scheme}://{request.Host}";
|
||
item.Content = UrlTools.ProcessMessageUrl(item.Content, baseUrl);
|
||
}
|
||
if(userDict.TryGetValue(item.SenderId, out var user))
|
||
{
|
||
item.SenderName = user.NickName;
|
||
item.SenderAvatar = user.Avatar ?? "";
|
||
}
|
||
}
|
||
}
|
||
return messages;
|
||
}
|
||
|
||
public Task<int> GetUnreadCountAsync(int userId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<List<MessageBaseDto>> GetUnreadMessagesAsync(int userId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> MarkAsReadAsync(int userId, long messageId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> MarkConversationAsReadAsync(int userId, int? userBId, int? groupId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public Task<bool> RecallMessageAsync(int userId, int messageId)
|
||
{
|
||
throw new NotImplementedException();
|
||
}
|
||
|
||
public async Task MakeMessageAsync(Message message)
|
||
{
|
||
_context.Messages.Add(message);
|
||
await _context.SaveChangesAsync();
|
||
}
|
||
#region 发送群消息
|
||
public async Task<MessageBaseVo> SendGroupMessageAsync(int senderId, int groupId, MessageBaseDto dto)
|
||
{
|
||
//判断群存在
|
||
var isExist = await _context.Groups.AnyAsync(x => x.Id == groupId);
|
||
if (!isExist) throw new BaseException(CodeDefine.GROUP_NOT_FOUND);
|
||
//判断是否是群成员
|
||
var isMember = await _context.GroupMembers.AnyAsync(x => x.GroupId == groupId && x.UserId == senderId);
|
||
if (!isMember) throw new BaseException(CodeDefine.NO_GROUP_PERMISSION);
|
||
var message = _mapper.Map<Message>(dto);
|
||
message.StreamKey = StreamKeyBuilder.Group(groupId);
|
||
message.SequenceId = await _sequenceIdService.GetNextSquenceIdAsync(message.StreamKey);
|
||
var publishData = _mapper.Map<MessageCreatedEvent>(message);
|
||
var request = _httpContextAccessor.HttpContext?.Request;
|
||
publishData.BaseUrl = $"{request.Scheme}://{request.Host}";
|
||
await _endpoint.Publish(publishData);
|
||
return _mapper.Map<MessageBaseVo>(message);
|
||
|
||
}
|
||
#endregion
|
||
#region 发送私聊消息
|
||
public async Task<MessageBaseVo> SendPrivateMessageAsync(int senderId, int receiverId, MessageBaseDto dto)
|
||
{
|
||
bool isExist = await _context.Friends.AnyAsync(x => x.FriendId == receiverId);
|
||
if (!isExist) throw new BaseException(CodeDefine.FRIEND_RELATION_NOT_FOUND);
|
||
var message = _mapper.Map<Message>(dto);
|
||
message.StreamKey = StreamKeyBuilder.Private(senderId, receiverId);
|
||
message.SequenceId = await _sequenceIdService.GetNextSquenceIdAsync(message.StreamKey);
|
||
var publishData = _mapper.Map<MessageCreatedEvent>(message);
|
||
var request = _httpContextAccessor.HttpContext?.Request;
|
||
publishData.BaseUrl = $"{request.Scheme}://{request.Host}";
|
||
await _endpoint.Publish(publishData);
|
||
return _mapper.Map<MessageBaseVo>(message);
|
||
}
|
||
#endregion
|
||
|
||
public async Task<MessageBaseDto> HandleFileMessageContentAsync(MessageBaseDto dto)
|
||
{
|
||
if(dto.Type == MessageMsgType.Text)
|
||
{
|
||
return dto;
|
||
}
|
||
|
||
var dic = JsonConvert.DeserializeObject<Dictionary<string, object>>(dto.Content);
|
||
|
||
if (dic == null || !dic.TryGetValue("fileId", out var fileIdObj))
|
||
throw new BaseException(CodeDefine.PARAMETER_ERROR);
|
||
|
||
var fileInfo = await _uploadService.GetTaskAsync(new Guid(fileIdObj.ToString()));
|
||
|
||
if (fileInfo is null)
|
||
throw new BaseException(CodeDefine.FILE_NOT_FOUND);
|
||
|
||
|
||
|
||
dic["url"] = fileInfo.ObjectName;
|
||
dic["provider"] = fileInfo.StorageProvider;
|
||
dic["size"] = fileInfo.FileSize;
|
||
|
||
dto.Content = JsonConvert.SerializeObject(dic);
|
||
|
||
return dto;
|
||
}
|
||
}
|
||
}
|