百木园-与人分享,
就是让自己快乐。

FastDFS并发问题的排查经历

附件用的fastdf上传和下载的, 本地开发时就没考虑过多文件上传就会有并发的问题,比如多个只上传成功了一个或者上传了但是文档内容缺失了,变成0字节。

呵。。都是一次难忘的经历。

经过本地模拟大批量的上传下载, 发现fastdf是在启动时就初始化了tracker和stroge, 每次调用过他的接口后都会关闭连接, 这样就导致上传的不完整或者不成功。也是后面找的博客看到的,非常感谢这篇文章。https://blog.csdn.net/AFSGEFEGH/article/details/109034532?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&utm_relevant_index=6

记得方法上面加上synchronized

点击查看代码

	@RequestMapping(value = \"/batchDownloadForThesisCheck2\", method = RequestMethod.POST)
	public synchronized void batchDownloadForThesisCheck2(@RequestBody List<FileInfoDto> fileInfoList)  {
		if(CollectionUtils.isEmpty(fileInfoList)){
			throw new EducationException(\"下载附件失败\");
		}
		List<FileInfoDto> fileInfoDtos = differentFileName(fileInfoList);
//		List<FileInfoDto> fileInfoDtos = new ArrayList<>();

//		for(int fi=0;fi<300;fi++){
//			FileInfoDto it = new FileInfoDto();
//			it.setFileName(\"20210115_小鱼儿\"+fi+\"_jjlw.doc\");
//			fileInfoDtos.add(it);
//		}
		String zipName = request.getParameter(\"zipName\");
		if(StringUtils.isEmpty(zipName)) zipName = \"批量下载\";
		ZipOutputStream zipOS = null ;
		InputStream is = null;

		OutputStream os = null;
		// 计算百分值
		int index =1;
		int totalSize =CollectionUtils.isNotEmpty(fileInfoDtos) ? fileInfoDtos.size():1;


		try {
			response.setContentType(\"application/octet-stream; charset=UTF-8\");
			response.setHeader(\"Access-Control-Expose-Headers\", \"fileName\");
			response.setHeader(\"fileName\", URLEncoder.encode(zipName, \"UTF-8\"));
			os = response.getOutputStream();
			zipOS = new ZipOutputStream(os);
			for (FileInfoDto info : fileInfoDtos) {
				// 机检论文换名字,学号_姓名_jjlw命名
				String itemFileName= info.getFileName();
//				itemFileName = \"S20020804005_陈明鑫_jjlw .docx\";
				int secondShowIndex = Common.findNumber(itemFileName,\"_\",2);
				int firstShowIndex = Common.findNumber(itemFileName,\"_\",1);
				if(\"1\".equals(info.getPaperToName())){
					// 文件格式1:学校代码_学号_LW.doc 2:学号_姓名_jjlw
					itemFileName = \"10356_\"+itemFileName.substring(0,firstShowIndex)+\"_LW\"+itemFileName.substring(itemFileName.lastIndexOf(\".\"),itemFileName.length());
				}else{
					itemFileName = itemFileName.substring(0,secondShowIndex)+\"_jjlw\"+itemFileName.substring(itemFileName.lastIndexOf(\".\"),itemFileName.length());
				}
				logger.error(\"已下载学生:{} \"  ,itemFileName);
				zipOS.putNextEntry(new ZipEntry(itemFileName));
				try{
					is = fastDFS.downloadFile(info.getFileId());
//					is = fastDFS.downloadFile(\"group1/M00/00/C0/wKgjdWQYEJ-AbefsAdOyZXrKanw028.doc\");
					int len = 0;
					byte[] buffer = new byte[1024*8];
					while ((len = is.read(buffer)) != -1) {
						zipOS.write(buffer, 0, len);
					}
//					is.close();

					// 计算进度,向下取整
					double nowProcess = Math.floor((index*100)/totalSize);
					logger.error(\"已下载”{}\",index);
//					createProcessDownFile(nowProcess,\"已下载\"+nowProcess+\"%\",info.getTimeId(),info.getUserId());
					index ++;
				}catch(Exception ignored){

				}
				zipOS.flush();
//				zipOS.closeEntry();
			}
		} catch (IOException e) {
//			createProcessDownFile(100d,\"批量下载发生错误\",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
			logger.error(\"批量下载发生错误: \" + e.getMessage(), e);
		} finally {
			try {
				if (zipOS != null) {
					zipOS.closeEntry();
					zipOS.close();
				}
				if (os != null) os.close();
//				createProcessDownFile(100d,\"已下载100%\",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn(\"关闭机检下载 :\"  );
			} catch (IOException e) {
//				createProcessDownFile(100d,\"批量下载发生错误,关闭文件流失败\",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn(\"关闭文件流失败, cause by :\" + e.getMessage());

			}
//			finally {
//				createProcessDownFile(100d,\"已下载100%\",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
//				logger.warn(\"关闭机检下载 :\"  );
//			}

		}
	}

下面是封装的上传FastDFS

点击查看代码
package fastdfs.config;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import javax.annotation.PostConstruct;

import org.apache.commons.lang.StringUtils;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.ProtoCommon;
import org.csource.fastdfs.StorageClient1;
import org.csource.fastdfs.StorageServer;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerGroup;
import org.csource.fastdfs.TrackerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import com.xx.commons.exception.EducationException;

import lombok.extern.slf4j.Slf4j;



/**
 * FastDfs文件系统工具类
 * 连接Fast
 * 上传图片 
 * 返回上传之后的路径 用此路径就能访问此图片
 *    group1/M00/00/01/wKjIgFWOYc6APpjAAAD-qk29i78248.jpg
 *
 */
@Slf4j
public class FastDFS {

    @Autowired
    private  FastDFSProperty fastDFSProperty;

    /**
     * 跟踪器
     */
    private TrackerServer trackerServer;
    
    /**
     * 存储器
     */
    private StorageServer storageServer;
    
    /**
     * 默认编码
     */
    private static final String DEFAULT_ENCODING = \"UTF-8\";

    @PostConstruct
    public void init(){
        try {
            ClientGlobal.setG_charset(DEFAULT_ENCODING);
            ClientGlobal.setG_connect_timeout(fastDFSProperty.getConnect_timeout());
            ClientGlobal.setG_network_timeout(fastDFSProperty.getNetwork_timeout());
            ClientGlobal.setG_secret_key(fastDFSProperty.getSecret_key());
            ClientGlobal.setG_tracker_http_port(fastDFSProperty.getTracker_http_port());

            String tracker_server = fastDFSProperty.getTracker_server();
            InetSocketAddress isadd = new InetSocketAddress(
                    tracker_server.substring(0, tracker_server.indexOf(\':\')), 
                    Integer.parseInt(tracker_server.substring(tracker_server.indexOf(\':\') + 1, tracker_server.length())));
            InetSocketAddress[] tracker_servers  = {isadd};
            ClientGlobal.setG_tracker_group(new TrackerGroup(tracker_servers));

            TrackerClient trackerClient = new TrackerClient(ClientGlobal.g_tracker_group);
            trackerServer = trackerClient.getConnection();
            if (trackerServer == null) {
                throw new EducationException(\"getConnection return null\");
            }
            storageServer = trackerClient.getStoreStorage(trackerServer);
            if (storageServer == null) {
                throw new EducationException(\"getStoreStorage return null\");
            }
            ProtoCommon.activeTest(storageServer.getSocket());
        } catch (Exception e) {
            throw new EducationException(\"初始化 fastdfs 配置失败\", e);
        }
    }

    /**
     * 
     * @param file
     *            文件
     * @param fileName
     *            文件名
     * @return 返回Null则为失败
     */
    public String uploadFile(File file, String fileName) {
        InputStream fis = null;
        try {
            NameValuePair[] meta_list = null; 
            fis = Files.newInputStream(file.toPath());
            byte[] file_buff = new byte[1024];
            int len = fis.available();
            file_buff = new byte[len];
            while (fis.read(file_buff) > 0) {
                break;
            }
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            String fileid = storageClient1.upload_file1(file_buff, getFileExt(fileName), meta_list);
            return fileid;
        } catch (Exception ex) {
            throw new EducationException(\"上传文件错误\",ex);
        } finally{
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    log.error(\"Close {} InputStream failed\", fis);
                }
            }
        }
    }

    /**
     * 上传文件
     * @param bytes
     * @param name
     * @param size
     * @return
     */
    public String uploadFile(byte[] bytes, String name, Long size) {
        try {
            //扩展名
            String ext = name.substring(name.lastIndexOf(\'.\')+1);
            NameValuePair[] meta_list = new NameValuePair[3];
            meta_list[0] = new NameValuePair(\"filename\",name);
            meta_list[1] = new NameValuePair(\"fileext\",ext);
            meta_list[2] = new NameValuePair(\"filesize\",String.valueOf(size));
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.upload_file1(bytes, ext, meta_list);
        } catch (Exception ex) {
            throw new EducationException(\"上传文件错误\",ex);
        }
    }

    /**
     * 根据组名和远程文件名来删除一个文件
     * 
     * @param groupName
     *            例如 \"group1\" 如果不指定该值,默认为group1
     * @param fileName
     *            例如\"M00/00/00/wKgxgk5HbLvfP86RAAAAChd9X1Y736.jpg\"
     * @return 0为成功,非0为失败,具体为错误代码
     */
    public int deleteFile(String groupName, String fileName) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file(StringUtils.isBlank(groupName)? \"group1\" : groupName, fileName);
        } catch (Exception ex) {
            throw new EducationException(\"return null\",ex);
        }
    }

    /**
     * 根据fileId来删除一个文件(我们现在用的就是这样的方式,上传文件时直接将fileId保存在了数据库中)
     * 
     * @param fileId
     *            file_id源码中的解释file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @return 0为成功,非0为失败,具体为错误代码
     */
    public int deleteFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException(\"删除文件错误\",ex);
        }
    }

    /**
     * 修改一个已经存在的文件
     * 
     * @param oldFileId
     *            原来旧文件的fileId, file_id源码中的解释file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @param file
     *            新文件
     * @param filePath
     *            新文件路径
     * @return 返回空则为失败
     */
    public String modifyFile(String oldFileId, File file, String filePath) {
        String fileid = null;
        try {
            // 先上传
            fileid = uploadFile(file, filePath);
            if (fileid == null) {
                return null;
            }
            // 再删除
            int delResult = deleteFile(oldFileId);
            if (delResult != 0) {
                return null;
            }
        } catch (Exception ex) {
            throw new EducationException(\"修改一个已经存在的文件错误\",ex);
        }
        return fileid;
    }

    /**
     * 文件下载
     * 
     * @param fileId
     * @return 返回一个流
     */
    public InputStream downloadFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            byte[] bytes = storageClient1.download_file1(fileId);
            return new ByteArrayInputStream(bytes);
        } catch (Exception ex) {
            throw new EducationException(\"文件下载错误\",ex);
        }
    }

    /**
     * 
     * @param fileId
     * @return 返回一个字节数组
     */
    public byte[] downloadFileToByte(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.download_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException(\"文件下载错误\",ex);
        }
    }

    /**  
     * 批量文件下载,在map中给出文件名:fileName, 文件对应路径:filePath;   
     * 方法返回  由所有文件生成的压缩包ZIP   
     * @param fileList 在map中给出文件名:fileName, 文件对应路径:filePath;  
     * @return 返回一个流  
     */  
    public InputStream downloadFile(List<Map<String, String>> fileList){  
        if(CollectionUtils.isEmpty(fileList)){  
            throw new EducationException(\"文件下载错误,fileList为空!\");  
        }  
        Date date = new Date();  
        long timeStr = date.getTime();  
        
        StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
        
        try(ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(Paths.get(\".\", timeStr+\".zip\")));) {  
              
            ZipEntry entry;  
            int count, bufferLen = 1024;    
            byte data[] = new byte[bufferLen];  
            for(Map<String, String> map : fileList){  
                entry = new ZipEntry(map.get(\"fileName\"));  
                  
                zos.putNextEntry(entry);  
                byte[] bytes = storageClient1.download_file1(map.get(\"filePath\"));  
                try(BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(bytes));){  
                    while ((count = bis.read(data, 0, bufferLen)) != -1) {    
                        zos.write(data, 0, count);    
                    }    
                    zos.closeEntry();  
                }  
            }  
            return Files.newInputStream(Paths.get(\".\", timeStr+\".zip\"));  
        } catch (Exception ex) {  
            throw new EducationException(\"文件下载错误\", ex);  
        }  
    } 

    /**
     * 获取文件后缀名(不带点).
     * 
     * @return 如:\"jpg\" or \"\".
     */
    private  String getFileExt(String fileName) {
        if (StringUtils.isBlank(fileName) || !fileName.contains(\".\")) {
            return \"\";
        } else {
            return fileName.substring(fileName.lastIndexOf(\'.\') + 1); // 不带最后的点
        }
    }
}

fastdf源码中storageServer每次用完都会关闭

image


来源:https://www.cnblogs.com/heavenTang/p/17266440.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » FastDFS并发问题的排查经历

相关推荐

  • 暂无文章