diff --git a/backend/IM_API/Application/EventHandlers/ConversationEventHandler.cs b/backend/IM_API/Application/EventHandlers/ConversationEventHandler.cs index 057e5e5..721f9bb 100644 --- a/backend/IM_API/Application/EventHandlers/ConversationEventHandler.cs +++ b/backend/IM_API/Application/EventHandlers/ConversationEventHandler.cs @@ -30,58 +30,26 @@ namespace IM_API.Application.EventHandlers _context = imContext; _mapper = mapper; } - - /* - * 此方法有并发问题,当双方同时第一次发送消息时, - * 会出现同时创建的情况,其中一方会报错, - * 导致也未走到更新逻辑,会话丢失 - */ public async Task Handle(MessageCreatedEvent @event) { - //此处仅处理私聊会话创建 - if (@event.ChatType == ChatType.GROUP) + if(@event.ChatType == ChatType.PRIVATE) { - return; - } - var conversation = await _context.Conversations.FirstOrDefaultAsync( - x => x.UserId == @event.MsgSenderId && x.TargetId == @event.MsgRecipientId - ); - //如果首次发消息则创建双方会话 - if (conversation is null) - { - Conversation senderCon = _mapper.Map(@event); - Conversation ReceptCon = _mapper.Map(@event); - ReceptCon.UserId = @event.MsgRecipientId; - ReceptCon.TargetId = @event.MsgSenderId; - ReceptCon.UnreadCount += 1; - ReceptCon.LastReadMessageId = null; - _context.Conversations.AddRange(senderCon,ReceptCon); - await _context.SaveChangesAsync(); - } - else - { - Conversation senderCon = conversation; - Conversation? ReceptCon = await _context.Conversations.FirstOrDefaultAsync( - x => x.UserId == @event.MsgRecipientId && x.TargetId == @event.MsgSenderId); - if (ReceptCon is null) + Conversation? userAConversation = await _context.Conversations.FirstOrDefaultAsync( + x => x.UserId == @event.MsgSenderId && x.TargetId == @event.MsgRecipientId + ); + Conversation? userBConversation = await _context.Conversations.FirstOrDefaultAsync( + x => x.UserId == @event.MsgRecipientId && x.TargetId == @event.MsgSenderId + ); + if(userAConversation is null || userBConversation is null) { - _logger.LogError("ConversationEventHandlerError:接收者会话对象缺失!Event:{Event}", JsonConvert.SerializeObject(@event)); - throw new BaseException(CodeDefine.SYSTEM_ERROR); + _logger.LogError("消息事件更新会话信息失败:{@event}",@event); } - - //更新发送者conversation - senderCon.UnreadCount = 0; - senderCon.LastReadMessageId = @event.MessageId; - senderCon.LastMessage = @event.MessageContent; - senderCon.LastMessageTime = DateTime.Now; - //更新接收者conversation - ReceptCon.UnreadCount += 1; - ReceptCon.LastMessage = @event.MessageContent; - senderCon.LastMessageTime = DateTime.Now; - _context.Conversations.UpdateRange(senderCon, ReceptCon); + userAConversation.LastMessage = @event.MessageContent; + userAConversation.LastReadMessageId = @event.MessageId; + userBConversation.LastMessage = @event.MessageContent; + userBConversation.UnreadCount += 1; + _context.UpdateRange(userAConversation,userBConversation); await _context.SaveChangesAsync(); - - } } } diff --git a/backend/IM_API/Application/EventHandlers/SignalREventHandler.cs b/backend/IM_API/Application/EventHandlers/SignalREventHandler.cs index 970336f..e2f4f16 100644 --- a/backend/IM_API/Application/EventHandlers/SignalREventHandler.cs +++ b/backend/IM_API/Application/EventHandlers/SignalREventHandler.cs @@ -1,6 +1,9 @@ -using IM_API.Application.Interfaces; +using AutoMapper; +using IM_API.Application.Interfaces; using IM_API.Domain.Events; +using IM_API.Dtos; using IM_API.Hubs; +using IM_API.Models; using IM_API.Tools; using Microsoft.AspNetCore.SignalR; @@ -9,15 +12,20 @@ namespace IM_API.Application.EventHandlers public class SignalREventHandler : IEventHandler { private readonly IHubContext _hub; - public SignalREventHandler(IHubContext hub) + private readonly IMapper _mapper; + public SignalREventHandler(IHubContext hub, IMapper mapper) { _hub = hub; + _mapper = mapper; } public async Task Handle(MessageCreatedEvent @event) { - var streamKey = @event.StreamKey; - await _hub.Clients.Group(streamKey).SendAsync(SignalRMethodDefine.ReceiveMessage, @event); + if(@event.ChatType == Models.ChatType.PRIVATE) + { + MessageBaseDto messageBaseDto = _mapper.Map(_mapper.Map(@event)); + await _hub.Clients.Users(@event.MsgRecipientId.ToString()).SendAsync("ReceiveMessage", messageBaseDto); + } } } } diff --git a/backend/IM_API/Hubs/ChatHub.cs b/backend/IM_API/Hubs/ChatHub.cs index 759f1e6..4851c4a 100644 --- a/backend/IM_API/Hubs/ChatHub.cs +++ b/backend/IM_API/Hubs/ChatHub.cs @@ -1,4 +1,7 @@ -using IM_API.Dtos; +using AutoMapper; +using IM_API.Application.Interfaces; +using IM_API.Domain.Events; +using IM_API.Dtos; using IM_API.Interface.Services; using IM_API.Models; using IM_API.Tools; @@ -11,10 +14,14 @@ namespace IM_API.Hubs { private IMessageSevice _messageService; private readonly IConversationService _conversationService; - public ChatHub(IMessageSevice messageService, IConversationService conversationService) + private readonly IEventBus _eventBus; + private readonly IMapper _mapper; + public ChatHub(IMessageSevice messageService, IConversationService conversationService, IEventBus eventBus, IMapper mapper) { _messageService = messageService; _conversationService = conversationService; + _eventBus = eventBus; + _mapper = mapper; } public async override Task OnConnectedAsync() @@ -51,8 +58,6 @@ namespace IM_API.Hubs { msgInfo = await _messageService.SendGroupMessageAsync(int.Parse(userIdStr), dto.ReceiverId, dto); } - - await Clients.Users(dto.ReceiverId.ToString()).SendAsync("ReceiveMessage", msgInfo); return; } } diff --git a/backend/IM_API/Services/MessageService.cs b/backend/IM_API/Services/MessageService.cs index d6e631b..0966a1a 100644 --- a/backend/IM_API/Services/MessageService.cs +++ b/backend/IM_API/Services/MessageService.cs @@ -1,4 +1,6 @@ using AutoMapper; +using IM_API.Application.Interfaces; +using IM_API.Domain.Events; using IM_API.Dtos; using IM_API.Exceptions; using IM_API.Interface.Services; @@ -13,11 +15,13 @@ namespace IM_API.Services private readonly ImContext _context; private readonly ILogger _logger; private readonly IMapper _mapper; - public MessageService(ImContext context, ILogger logger, IMapper mapper) + private readonly IEventBus _eventBus; + public MessageService(ImContext context, ILogger logger, IMapper mapper, IEventBus eventBus) { _context = context; _logger = logger; _mapper = mapper; + _eventBus = eventBus; } public async Task> GetMessagesAsync(int userId, int conversationId, int? msgId, int? pageSize, bool desc) @@ -92,6 +96,7 @@ namespace IM_API.Services message.StreamKey = StreamKeyBuilder.Group(groupId); _context.Messages.Add(message); await _context.SaveChangesAsync(); + await _eventBus.PublishAsync(_mapper.Map(message)); return _mapper.Map(message); } @@ -107,6 +112,7 @@ namespace IM_API.Services message.StreamKey = StreamKeyBuilder.Private(dto.SenderId, dto.ReceiverId); _context.Messages.Add(message); await _context.SaveChangesAsync(); + await _eventBus.PublishAsync(_mapper.Map(message)); return _mapper.Map(message); } #endregion diff --git a/frontend/web/src/stores/chat.js b/frontend/web/src/stores/chat.js index 1a3547c..f323538 100644 --- a/frontend/web/src/stores/chat.js +++ b/frontend/web/src/stores/chat.js @@ -24,7 +24,6 @@ export const useChatStore = defineStore('chat', { this.messages = []; //先从浏览器缓存加载一部分消息列表 const localHistory = await messagesDb.getPageMessages(sessionId, new Date().toISOString(), this.pageSize); - console.log(localHistory) if (localHistory.length > 0) { this.messages = localHistory; } else { diff --git a/frontend/web/src/stores/conversation.js b/frontend/web/src/stores/conversation.js new file mode 100644 index 0000000..0b7d8ab --- /dev/null +++ b/frontend/web/src/stores/conversation.js @@ -0,0 +1,63 @@ +import { defineStore } from "pinia"; +import { messageService } from "@/services/message"; +import { conversationDb } from "@/utils/db/conversationDB"; +import { useMessage } from "@/components/messages/useAlert"; + +const message = useMessage(); + +export const useConversationStore = defineStore('conversation', { + state: () => ({ + conversations: [] + }), + actions: { + async addConversation(conversation) { + await conversationDb.save(conversation); + this.conversations.unshift(conversation) + }, + /** + * 加载当前会话消息列表 + */ + async loadUserConversations() { + if (this.conversations.length == 0) { + try { + const covnersationsCache = await conversationDb.getAll(); + if (covnersationsCache && covnersationsCache.length > 0) { + covnersationsCache.sort((a, b) => { + return new Date(a.dateTime) - new Date(b.dateTime); + }) + } + } catch (e) { + message.error('读取本地会话缓存失败...'); + console.log('读取本地会话缓存失败:', e); + } + } + await this.fetchConversationsFromServier() + }, + /** + * 从服务器加载新消息 + * @param {*} sessionId + * @returns + */ + async fetchConversationsFromServier() { + const newConversations = (await messageService.getConversations()).data; + if (newConversations.length > 0) { + // 1. 将当前的本地数据转为 Map,方便通过 ID 快速查找 (O(1) 复杂度) + const localMap = new Map(this.conversations.map(item => [item.id, item])); + newConversations.forEach(item => { + const existingItem = localMap.get(item.id); + if (existingItem) { + // --- 局部更新 --- + // 使用 Object.assign 将新数据合并到旧对象上,保持响应式引用 + Object.assign(existingItem, item); + } else { + // --- 插入新会话 --- + this.conversations.unshift(item); + } + // 同步到本地数据库 + conversationDb.save(item); + }); + + } + } + } +}) \ No newline at end of file diff --git a/frontend/web/src/utils/db/baseDb.js b/frontend/web/src/utils/db/baseDb.js new file mode 100644 index 0000000..9768851 --- /dev/null +++ b/frontend/web/src/utils/db/baseDb.js @@ -0,0 +1,19 @@ +import { openDB } from "idb"; + +const DBNAME = 'IM_DB'; +const STORE_NAME = 'messages'; +const CONVERSARION_STORE_NAME = 'conversations'; + +export const dbPromise = openDB(DBNAME, 2, { + upgrade(db) { + if (!db.objectStoreNames.contains(STORE_NAME)) { + const store = db.createObjectStore(STORE_NAME, { keyPath: 'msgId' }); + store.createIndex('by-sessionId', 'sessionId'); + store.createIndex('by-time', 'timeStamp'); + } + if (!db.objectStoreNames.contains(CONVERSARION_STORE_NAME)) { + const store = db.createObjectStore(CONVERSARION_STORE_NAME, { keyPath: 'id' }); + store.createIndex('by-id', 'id'); + } + } +}) \ No newline at end of file diff --git a/frontend/web/src/utils/db/conversationDB.js b/frontend/web/src/utils/db/conversationDB.js new file mode 100644 index 0000000..0dc6995 --- /dev/null +++ b/frontend/web/src/utils/db/conversationDB.js @@ -0,0 +1,18 @@ +import { dbPromise } from "./baseDb"; + +const STORE_NAME = 'conversations'; + +export const conversationDb = { + async save(conversation) { + (await dbPromise).put(STORE_NAME, conversation); + }, + async getById(id) { + return (await dbPromise).getFromIndex(STORE_NAME, 'by-id', id); + }, + async getAll() { + return (await dbPromise).getAll(STORE_NAME); + }, + async clearAll() { + (await dbPromise).clear(STORE_NAME); + } +} \ No newline at end of file diff --git a/frontend/web/src/utils/db/messageDB.js b/frontend/web/src/utils/db/messageDB.js index 49bb17a..d4eb59f 100644 --- a/frontend/web/src/utils/db/messageDB.js +++ b/frontend/web/src/utils/db/messageDB.js @@ -1,18 +1,7 @@ -import { openDB } from "idb"; +import { dbPromise } from "./baseDb"; -const DBNAME = 'IM_DB'; const STORE_NAME = 'messages'; -export const dbPromise = openDB(DBNAME, 1, { - upgrade(db) { - if (!db.objectStoreNames.contains(STORE_NAME)) { - const store = db.createObjectStore(STORE_NAME, { keyPath: 'msgId' }); - store.createIndex('by-sessionId', 'sessionId'); - store.createIndex('by-time', 'timeStamp'); - } - } -}) - export const messagesDb = { async save(msg) { return (await dbPromise).put(STORE_NAME, msg); diff --git a/frontend/web/src/views/messages/MessageContent.vue b/frontend/web/src/views/messages/MessageContent.vue index d6be9a3..1415ac3 100644 --- a/frontend/web/src/views/messages/MessageContent.vue +++ b/frontend/web/src/views/messages/MessageContent.vue @@ -50,6 +50,7 @@ import { formatDate } from '@/utils/formatDate'; import { useChatStore } from '@/stores/chat'; import { generateSessionId } from '@/utils/sessionIdTools'; import { useSignalRStore } from '@/stores/signalr'; +import { useConversationStore } from '@/stores/conversation'; const props = defineProps({ id:{ @@ -60,6 +61,7 @@ const props = defineProps({ const chatStore = useChatStore(); const signalRStore = useSignalRStore(); +const conversationStore = useConversationStore(); const input = ref(''); // 输入框内容 const historyRef = ref(null); // 绑定 DOM 用于滚动 @@ -118,20 +120,20 @@ function toggleEmoji() { } async function loadConversation(conversationId) { + /* const res = await messageService.getConversationById(conversationId); conversationInfo.value = res.data; - console.log(res) -} - -async function loadMessages(conversationId, msgId = null, pageSize = null) { - const res = await messageService.getMessages(conversationId); - messages.value = res.data; + */ + if(conversationStore.conversations.length == 0){ + await conversationStore.loadUserConversations(); + } + conversationInfo.value = conversationStore.conversations.find(x => x.id == Number(conversationId)); } // 初始化时滚动到底部 onMounted(async () => { await loadConversation(props.id); - const sessionid = generateSessionId(conversationInfo.userId, conversationInfo.targetId) + const sessionid = generateSessionId(conversationInfo.value.userId, conversationInfo.value.targetId) await chatStore.swtichSession(sessionid,props.id); scrollToBottom(); }); diff --git a/frontend/web/src/views/messages/MessageList.vue b/frontend/web/src/views/messages/MessageList.vue index 4f182f0..c98ab23 100644 --- a/frontend/web/src/views/messages/MessageList.vue +++ b/frontend/web/src/views/messages/MessageList.vue @@ -13,7 +13,7 @@ class="list-item" :class="{active: activeId === s.id}" @click="selectSession(s)">
- {{ s.unreadCount ?? 0 }} + {{ s.unreadCount ?? 0 }}
@@ -36,26 +36,19 @@ import { ref, computed, nextTick, onMounted } from 'vue' import { messageService } from '@/services/message' import defaultAvatar from '@/assets/default_avatar.png' import { formatDate } from '@/utils/formatDate' +import { useConversationStore } from '@/stores/conversation' + +const conversationStore = useConversationStore(); const searchQuery = ref('') -const input = ref('') -const historyRef = ref(null) const activeId = ref(1) const conversations = ref([]); -const filteredSessions = computed(() => conversations.value.filter(s => s.targetName.includes(searchQuery.value))) - -const currentSession = computed(() => sessions.value.find(s => s.id === activeId.value)) +const filteredSessions = computed(() => conversationStore.conversations.filter(s => s.targetName.includes(searchQuery.value))) function selectSession(s) { activeId.value = s.id router.push(`/messages/chat/${s.id}`) - scrollToBottom() -} - -const scrollToBottom = async () => { - await nextTick() - if (historyRef.value) historyRef.value.scrollTop = historyRef.value.scrollHeight } async function loadConversation() { @@ -65,7 +58,7 @@ async function loadConversation() { } onMounted(async () => { - await loadConversation(); + await conversationStore.loadUserConversations(); })