websocket实现文件切片上传
故事
前端时间由于项目需要用到视频上传以及转码,于是自己开始造轮子,之后把那不是很⚪的轮子开源了,有人提了isuss说,上传时候切片过多,会导致卡住,总是在最后几个切片没办法提交成功。于是我f12看了一下,最终没办法解决,增加了socket进行切片上传。
HTTP切片上传出现的问题
然后查了半天资料说什么因为前端是异步、浏览器什么限制,涉及到协议什么的,然后就不准备折腾了,直接改用socket传输不香么。
撸起袖子加油干
前端代码
function socketUpload(type,file,dom){
let name = file.name, //文件名
size = file.size; //总大小
//socket数据针过大会导致发送断开
let shardSize = 5 * 1024 * 1024, //以1MB为一个分片
shardCount = Math.ceil(size / shardSize); //总片数
let GUID = guid();
let ws = new WebSocket('ws://localhost:8080/file/' + GUID);
let map = {code:null,type:type,name: null, chunks: null};
ws.onopen = () => {
console.log('建立文件上传通道 ...');
map.chunks = shardCount;
map.name = name;
map.code = 0;
//创建服务器存储目录
ws.send(JSON.stringify(map));
}
ws.onmessage = (evt) => {
console.log('Received Message: ' + evt.data);
let parse = JSON.parse(evt.data);
if (parse.code == 101) {
console.log('通道已建立 ...');
for (let i = 0; i < shardCount; ++i) {
//计算每一片的起始与结束位置
let start = i * shardSize,
end = Math.min(size, start + shardSize);
let fileBlob = file.slice(start, end);
ws.send(fileBlob);
}
}
if (parse.code == 200) {
document.getElementById(dom).innerHTML = "当前上传进度为:" + parse.msg + "%";
} else if (parse.code == 202){
document.getElementById(dom).innerHTML = "文件正在解析";
} else if (parse.code == 201){
document.getElementById(dom).innerHTML = "解析成功,地址为:"+parse.data;
if (type ==='video'){
changeVideo(parse.data);
}
}else {
document.getElementById(dom).innerHTML = parse.data;
}
return null;
}
ws.onclose = function (e) {
console.log('websocket 断开: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
console.log(e)
}
}
后端代码
package com.wslhome.demo.api;
import com.alibaba.fastjson.JSONObject;
import com.wslhome.demo.config.FilePath;
import com.wslhome.demo.obj.FileDetail;
import com.wslhome.demo.obj.Result;
import com.wslhome.demo.service.FileService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 大文件切片上传改用websocket
*/
@ServerEndpoint(value = "/file/{guid}")
@Controller
@Slf4j
public class FileSocket {
//统计
private static AtomicInteger count = new AtomicInteger(0);
//链接信息
private static Map<String, FileDetail> fileInfo = new ConcurrentHashMap<>();
//切片计数
private static Map<String,AtomicInteger> chunk = new ConcurrentHashMap<>();
private static FilePath filePath;
private static FileService fileService;
@Autowired
private void setFilePath(FilePath filePath){
this.filePath = filePath;
}
@Autowired
private void setFileService(FileService fileService){
this.fileService = fileService;
}
String projectUrl = System.getProperty("user.dir").replaceAll("\\\\", "/");
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
count.incrementAndGet();
log.info("{} 开始建立大文件上传通道。",session.getId());
log.info("当前链接数:{}",count.get());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session,@PathParam("guid") String guid) {
count.decrementAndGet();
fileInfo.remove(guid);
chunk.remove(guid);
log.info("{} 文件上传通道已关闭",session.getId());
}
/**
* 收到客户端消息后调用的方法
*/
@OnMessage
public void onMessage(String message, @PathParam("guid") String guid, Session session){
if(StringUtils.isBlank(message)){
return;
}
FileDetail fileDetail = JSONObject.parseObject(message, FileDetail.class);
fileInfo.put(guid,fileDetail);
// 临时目录用来存放所有分片文件
String tempFileDir = projectUrl + filePath.getBigPath() + guid;
File parentFileDir = new File(tempFileDir);
if (!parentFileDir.exists()) {
parentFileDir.mkdirs();
}
chunk.put(guid,new AtomicInteger(0));
sendMessage(session,Result.fileBuild());
}
@OnMessage
public void onMessage(byte[] message, @PathParam("guid") String guid, Session session){
int number = chunk.get(guid).incrementAndGet();
log.info("当前切片序号:{}",number);
String tempFileDir = projectUrl + filePath.getBigPath() + guid;
File tempPartFile = new File(new File(tempFileDir), guid + "_" + (number-1) + ".part");
try {
FileUtils.copyInputStreamToFile(new ByteArrayInputStream(message), tempPartFile);
FileDetail fileDetail = fileInfo.get(guid);
if (String.valueOf(number).equals(fileDetail.getChunks().toString())){
//合并文件
sendMessage(session,Result.fileOver());
String path;
if ("video".equals(fileDetail.getType())){
path = fileService.uploadVideoMerge(guid, fileDetail.getName());
}else{
path = fileService.uploadMerge(guid, fileDetail.getName());
}
sendMessage(session,Result.fileSuccess(path));
onClose(session,guid);
}else{
sendMessage(session,Result.success(String.valueOf((int)((float) number/fileInfo.get(guid).getChunks()*100))));
}
}catch (Exception e){
log.error("文件切片存储失败!当前序号:{},异常原因:{}",number,e.getMessage());
sendMessage(session,Result.error("文件上传失败"));
onError(session,guid,e);
}
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session,@PathParam("guid") String guid, Throwable error) {
fileInfo.remove(guid);
chunk.remove(guid);
log.error("socket异常:{}",error.getMessage());
}
/**
* 发送消息
* @param session:
* @param message:
*/
public static void sendMessage(Session session, Result message) {
try {
session.getBasicRemote().sendText(JSONObject.toJSONString(message));
} catch (IOException e) {
log.error("发送消息出错:{}", e.getMessage());
e.printStackTrace();
}
}
}
完整源码
完整源码请查看
码云:https://gitee.com/sirwsl/uploadWheel/
码云:https://gitee.com/sirwsl/uploadWheel/
码云:https://gitee.com/sirwsl/uploadWheel/