using AutoMapper; using IM_API.Application.Interfaces; using IM_API.Domain.Events; using IM_API.Dtos; using IM_API.Exceptions; using IM_API.Interface.Services; using IM_API.Models; using IM_API.Tools; using MassTransit; using Microsoft.EntityFrameworkCore; namespace IM_API.Services { public class MessageService : IMessageSevice { private readonly ImContext _context; private readonly ILogger _logger; private readonly IMapper _mapper; //废弃,此处已使用rabbitMQ替代 //private readonly IEventBus _eventBus; private readonly IPublishEndpoint _endpoint; public MessageService( ImContext context, ILogger logger, IMapper mapper, IEventBus eventBus, IPublishEndpoint publishEndpoint ) { _context = context; _logger = logger; _mapper = mapper; //_eventBus = eventBus; _endpoint = publishEndpoint; } public async Task> GetMessagesAsync(int userId, int conversationId, int? msgId, int? pageSize, bool desc) { //获取会话信息,用于获取双方聊天的唯一标识streamkey Conversation? conversation = await _context.Conversations.FirstOrDefaultAsync( x => x.Id == conversationId && x.UserId == userId ); if (conversation is null) throw new BaseException(CodeDefine.CONVERSATION_NOT_FOUND); var query = _context.Messages.AsQueryable(); if(msgId != null) { query = query.Where( x => x.StreamKey == conversation.StreamKey && x.Id < msgId.Value ) .OrderByDescending(x => x.Id); } else { query = query.Where( x => x.StreamKey == conversation.StreamKey && x.Id > conversation.LastReadMessageId ); } if(pageSize != null) { query = query.Take(pageSize.Value); } var msgList = await query .ToListAsync(); msgList = msgList .OrderBy(x => x.Created) .ThenBy(t => t.Id) .ToList(); return _mapper.Map>(msgList); } public Task GetUnreadCountAsync(int userId) { throw new NotImplementedException(); } public Task> GetUnreadMessagesAsync(int userId) { throw new NotImplementedException(); } public Task MarkAsReadAsync(int userId, long messageId) { throw new NotImplementedException(); } public Task MarkConversationAsReadAsync(int userId, int? userBId, int? groupId) { throw new NotImplementedException(); } public Task RecallMessageAsync(int userId, int messageId) { throw new NotImplementedException(); } #region 发送群消息 public async Task SendGroupMessageAsync(int senderId, int groupId, MessageBaseDto dto) { //判断群存在 var isExist = await _context.Groups.AnyAsync(x => x.Id == groupId); if (!isExist) throw new BaseException(CodeDefine.GROUP_NOT_FOUND); //判断是否是群成员 var isMember = await _context.GroupMembers.AnyAsync(x => x.GroupId == groupId && x.UserId == senderId); if (!isMember) throw new BaseException(CodeDefine.NO_GROUP_PERMISSION); var message = _mapper.Map(dto); message.Sender = senderId; message.StreamKey = StreamKeyBuilder.Group( groupId); _context.Messages.Add(message); await _context.SaveChangesAsync(); await _endpoint.Publish(_mapper.Map(message)); return _mapper.Map(message); } #endregion #region 发送私聊消息 public async Task SendPrivateMessageAsync(int senderId, int receiverId, MessageBaseDto dto) { bool isExist = await _context.Friends.AnyAsync(x => x.FriendId == receiverId); if (!isExist) throw new BaseException(CodeDefine.FRIEND_RELATION_NOT_FOUND); var message = _mapper.Map(dto); message.Sender = senderId; //生成消息流唯一标识符 message.StreamKey = StreamKeyBuilder.Private(dto.SenderId, dto.ReceiverId); _context.Messages.Add(message); await _context.SaveChangesAsync(); await _endpoint.Publish(_mapper.Map(message)); return _mapper.Map(message); } #endregion } }