243 lines
10 KiB
C#
243 lines
10 KiB
C#
using AutoMapper;
|
|
using IM_API.Configs.Options;
|
|
using IM_API.Domain.Events;
|
|
using IM_API.Dtos;
|
|
using IM_API.Exceptions;
|
|
using IM_API.Interface.Services;
|
|
using IM_API.Models.Upload;
|
|
using IM_API.Tools;
|
|
using IM_API.VOs;
|
|
using MassTransit;
|
|
using Microsoft.EntityFrameworkCore.Storage;
|
|
using StackExchange.Redis;
|
|
using System.Security.AccessControl;
|
|
using System.Security.Claims;
|
|
using System.Threading.Tasks;
|
|
using IDatabase = StackExchange.Redis.IDatabase;
|
|
|
|
namespace IM_API.Services
|
|
{
|
|
public class LocalStorageService : IStorageService
|
|
{
|
|
private readonly IMapper _mapper;
|
|
private readonly IHttpContextAccessor _httpContext;
|
|
private FileUploadOptions _options;
|
|
private readonly IUploadTaskService _uploadTaskService;
|
|
private readonly IDatabase _redis;
|
|
private readonly IHostEnvironment _env;
|
|
private readonly ILogger<LocalStorageService> _logger;
|
|
private readonly IPublishEndpoint _endpoint;
|
|
public LocalStorageService(IMapper mapper, IHttpContextAccessor httpContextAccessor,
|
|
IConfiguration configuration, IUploadTaskService uploadTaskService,
|
|
IConnectionMultiplexer connectionMultiplexer, IHostEnvironment hostEnvironment
|
|
, ILogger<LocalStorageService> logger, IPublishEndpoint publishEndpoint)
|
|
{
|
|
_mapper = mapper;
|
|
_httpContext = httpContextAccessor;
|
|
_options = configuration.GetSection("FileUploadOptions").Get<FileUploadOptions>()!;
|
|
_uploadTaskService = uploadTaskService;
|
|
_redis = connectionMultiplexer.GetDatabase();
|
|
_env = hostEnvironment;
|
|
_logger = logger;
|
|
_endpoint = publishEndpoint;
|
|
}
|
|
|
|
public UploadMode Mode => UploadMode.Proxy;
|
|
public string ProviderName => "Local";
|
|
|
|
public async Task<Guid> CompleteAsync(Guid taskId, List<UploadPartDto> parts)
|
|
{
|
|
var task = await _uploadTaskService.GetTaskAsync(taskId);
|
|
|
|
if(task is null)
|
|
throw new BaseException(CodeDefine.CHUNKE_NOT_FOUND);
|
|
|
|
var partsToCheck = Enumerable.Range(1, task.TotalChunks)
|
|
.Select(i => (RedisValue)i).ToArray();
|
|
|
|
var results = await _redis.SetContainsAsync(RedisKeys.GetUploadPartKey(taskId), partsToCheck);
|
|
// 3. 快速判断是否全部存在
|
|
bool isAllUploaded = results.All(exists => exists);
|
|
if (!isAllUploaded) throw new BaseException(CodeDefine.CHUNKE_NOT_FOUND);
|
|
|
|
await _endpoint.Publish(new UploadMergeEvent
|
|
{
|
|
AggregateId = taskId.ToString(),
|
|
OccurredAt = DateTime.UtcNow,
|
|
EventId = Guid.NewGuid(),
|
|
OperatorId = 0,
|
|
Parts = parts,
|
|
TaskId = taskId,
|
|
ChunckCount = task.TotalChunks,
|
|
ObjectName = task.ObjectName
|
|
|
|
});
|
|
return taskId;
|
|
}
|
|
|
|
public async Task MergeAsync(Guid taskId, string objectName, int totalChunks, List<UploadPartDto> parts)
|
|
{
|
|
var baseDir = Path.Combine(_env.ContentRootPath, "uploads");
|
|
var tempPath = Path.Combine(baseDir, "temp", taskId.ToString()); // 项目根目录下 uploads // 最终文件存储路径(这里可以用你之前 ObjectNameGenerator 生成的名字)
|
|
var finalPath = Path.Combine(baseDir, "files", objectName);
|
|
var finalDir = Path.GetDirectoryName(finalPath);
|
|
Directory.CreateDirectory(finalDir);
|
|
try
|
|
{
|
|
using (var finalStream = new FileStream(finalPath, FileMode.Create))
|
|
{
|
|
for (var i = 1; i <= totalChunks; i++)
|
|
{
|
|
var progress = (i * 100.0 / totalChunks);
|
|
if (i % 5 == 0 || i == totalChunks)
|
|
{
|
|
await _redis.HashSetAsync(RedisKeys.MergeStatus(taskId), new HashEntry[]
|
|
{
|
|
new("status", "processing"),
|
|
new("progress", progress.ToString("F2"))
|
|
});
|
|
}
|
|
var chunkPath = Path.Combine(tempPath, $"{i}.part.tmp");
|
|
if (!File.Exists(chunkPath))
|
|
throw new BaseException(CodeDefine.CHUNKE_NOT_FOUND);
|
|
using (var chunkStream = new FileStream(chunkPath, FileMode.Open))
|
|
{
|
|
await chunkStream.CopyToAsync(finalStream);
|
|
}
|
|
}
|
|
Directory.Delete(tempPath, true);
|
|
await _redis.KeyDeleteAsync(RedisKeys.GetUploadPartKey(taskId));
|
|
await _uploadTaskService.UpdateStatusAsync(taskId, UploadStatus.Completed);
|
|
await _redis.HashSetAsync(RedisKeys.MergeStatus(taskId), new HashEntry[]
|
|
{
|
|
new("status", "Completed"),
|
|
new("progress", "100"),
|
|
new("url", objectName)
|
|
});
|
|
}
|
|
|
|
}
|
|
catch (Exception e) when (e is not BaseException)
|
|
{
|
|
_logger.LogError(e, e.Message);
|
|
throw new BaseException(CodeDefine.CHUNKE_COMBINE_FAIL);
|
|
|
|
}
|
|
}
|
|
|
|
public async Task<UploadPartInstructionVo> CreatePartInstructionAsync(Guid taskId, int partNumer)
|
|
{
|
|
if (await _redis.SetContainsAsync(RedisKeys.GetUploadPartKey(taskId), partNumer)){
|
|
return new UploadPartInstructionVo
|
|
{
|
|
PartNumber = partNumer,
|
|
Skip = true,
|
|
Headers = new Dictionary<string, string>()
|
|
};
|
|
}
|
|
var request = _httpContext.HttpContext!.Request;
|
|
|
|
var scheme = request.Scheme; // http 或 https
|
|
var host = request.Host.Value; // localhost:5000 或域名
|
|
|
|
var baseUrl = $"{scheme}://{host}/api/upload/local/{taskId}/parts/{partNumer}";
|
|
var headers = new Dictionary<string, string>();
|
|
headers.Add("Content-Type", "multipart/form-data");
|
|
return new UploadPartInstructionVo
|
|
{
|
|
Method = "POST",
|
|
PartNumber = partNumer,
|
|
Skip = false,
|
|
Url = baseUrl,
|
|
Headers = headers
|
|
};
|
|
|
|
|
|
}
|
|
|
|
public async Task<UploadTask> UploadSmallFileAsync(Stream stream, string fileName, string fileType, long size, string hash)
|
|
{
|
|
var request = _httpContext.HttpContext?.Request;
|
|
var baseUrl = $"{request.Scheme}://{request.Host}";
|
|
var taskOld = await _uploadTaskService.GetTaskAsync(hash);
|
|
if (taskOld is not null) {
|
|
taskOld.Url = UrlTools.GetFullUrl(taskOld.ObjectName, ProviderName, baseUrl);
|
|
return taskOld;
|
|
}
|
|
|
|
|
|
var userId = _httpContext.HttpContext?.User.FindFirstValue(ClaimTypes.NameIdentifier);
|
|
var objectname = ObjectNameGenerator.Generate(new ObjectNameContext
|
|
{
|
|
ContentType = fileType,
|
|
FileName = fileName,
|
|
UserId = int.Parse(userId)
|
|
});
|
|
var path = GetDownloadUrl(objectname);
|
|
// 4. 将 Stream 写入本地文件
|
|
using (var fileStream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None))
|
|
{
|
|
await stream.CopyToAsync(fileStream);
|
|
}
|
|
var task = new UploadTask
|
|
{
|
|
CreatedAt = DateTime.UtcNow,
|
|
ChunkSize = (int)size,
|
|
ContentType = fileType,
|
|
FileHash = hash,
|
|
FileName = fileName,
|
|
FileSize = size,
|
|
Id = Guid.NewGuid(),
|
|
ObjectName = objectname,
|
|
Url = UrlTools.GetFullUrl(objectname, ProviderName, baseUrl),
|
|
ProviderUploadId = Guid.NewGuid().ToString(),
|
|
Status = UploadStatus.Completed,
|
|
StorageProvider = ProviderName,
|
|
TotalChunks = 1
|
|
};
|
|
await _uploadTaskService.AddAsync(task);
|
|
return task;
|
|
|
|
}
|
|
|
|
public string GetDownloadUrl(string objectname)
|
|
{
|
|
var baseDir = Path.Combine(_env.ContentRootPath, "uploads"); // 最终文件存储路径(这里可以用你之前 ObjectNameGenerator 生成的名字)
|
|
var finalPath = Path.Combine(baseDir, "files", objectname);
|
|
var finalDir = Path.GetDirectoryName(finalPath);
|
|
Directory.CreateDirectory(finalDir);
|
|
return finalPath;
|
|
}
|
|
|
|
public async Task<CreateUploadTaskVo> InitTaskAsync(CreateUploadTaskDto dto)
|
|
{
|
|
var userId = _httpContext.HttpContext.User.FindFirstValue(ClaimTypes.NameIdentifier);
|
|
UploadTask task = _mapper.Map<UploadTask>(dto);
|
|
var taskOld = await _uploadTaskService.GetTaskAsync(dto.FileHash);
|
|
if(taskOld != null)
|
|
{
|
|
var t = _mapper.Map<CreateUploadTaskVo>(taskOld);
|
|
t.Skip = false;
|
|
if (taskOld.Status == UploadStatus.Completed)
|
|
{
|
|
t.Skip = true;
|
|
}
|
|
return t;
|
|
task = taskOld;
|
|
}
|
|
task.ObjectName = ObjectNameGenerator.Generate(new ObjectNameContext
|
|
{
|
|
ContentType = task.ContentType,
|
|
FileName = task.FileName,
|
|
UserId = int.Parse(userId)
|
|
});
|
|
task.StorageProvider = ProviderName;
|
|
task.ProviderUploadId = Guid.NewGuid().ToString();
|
|
task.ChunkSize = _options.ChunkSize;
|
|
task.TotalChunks = (int)Math.Ceiling((double)task.FileSize / _options.ChunkSize);
|
|
await _uploadTaskService.AddAsync(task);
|
|
return _mapper.Map<CreateUploadTaskVo>(task);
|
|
}
|
|
}
|
|
}
|