百木园-与人分享,
就是让自己快乐。

Spring Boot异步请求处理框架

Spring Boot异步请求处理框架

1、前言

​	在Spring Boot项目中,经常会遇到处理时间过长,导致出现HTTP请求超时问题,状态码:502。
​	例如一个文件导入接口需要导入一个Excel文件的学员记录,原来是针对一个班的学员,最多500条记录,1分钟的HTTP超时时长之内基本可以响应。现在将很多班级的学员混在一起,放在一个Excel文件中(这样可以提高操作人员的工作效率),比如5万条学员记录,于是就出现HTTP请求超时问题。
​	​解决方案有:1)Ajax异步请求方式;2)WebSocket方式;3)异步请求处理方式:请求+轮询。
​	方案1,需要调整HTTP超时设置,Spring Boot开启异步处理(使用@EnableAsync和@Async),这种方式,问题是超时时长要设置多大,没有底。
​	方案2,需要前端支持Web2.0,对浏览器有所限制,代码也变得复杂。
​	方案3,使用异步请求处理。所谓异步请求处理,就是将请求异步化,前端发起请求后,后端很快就响应,返回一个任务ID,前端再用这个任务ID去轮询,获取处理进程和结果信息。需要两个接口:任务请求接口和任务信息轮询接口。
​	显然,对于长时间的业务处理,通过轮询,获取处理进程信息,可以获得较好的用户体验。正如大文件下载,用户可以了解下载的进度一样,业务处理同样可以通过输出处理日志信息和进度,使得长时间业务处理过程可视化,而不至于让用户长时间面对一个在空转鼠标符号。
​	本文针对方案3,提出一种通用的处理框架。使用这个通用的异步请求处理框架,可以适应各种不同的需要长时间异步处理的业务需求。

2、异步请求处理框架描述

​	本异步请求处理框架,主要包括任务信息、任务执行类(Runnable)、任务管理器。

2.1、任务信息对象类TaskInfo

​	任务信息对象,用于存储任务信息,其生命周期为:创建任务==>加入任务队列==>加入线程池工作线程队列==>任务执行==>任务执行完成==>任务对象缓存超期销毁。
​	1)任务识别信息:
​	1.1)任务ID:任务ID用于识别任务信息,一个任务ID对应一个任务,是全局唯一的,不区分任务类型。
​	1.2)任务名称:即任务类型的名称,对应于业务处理类型名称,如查询商品单价、查询商品库存等,这样可方便可视化识别任务。一个任务名称可以有多个任务实例。
​	1.3)会话ID(sessionId):用于身份识别,这样只有请求者才能根据返回的任务ID来查询任务信息,其它用户无权访问。想象一下同步请求,谁发起请求,响应给谁。
​	2)任务调用信息:使得任务管理器可以使用反射方法,调用任务处理方法。
​	2.1)任务处理对象:这是业务处理对象,为Object类型,一般为Service实现类对象。
​	2.2)任务处理方法:这是一个Method对象类型,为异步处理的业务处理方法对象。这个方法必须是public的方法。
​	2.3)任务方法参数:这是一个Map<String,Object>类型字典对象,可适应任意参数结构。
​	3)任务处理过程和结果相关信息:可以提供任务处理过程和结果可视化的信息。
​	3.1)任务状态:表示任务目前的处理状态,0-未处理,1-处理中,2-处理结束。
​	3.2)处理日志:这是一个List<String>类型的字符串列表,用于存放处理日志。处理日志格式化:\"time level taskId taskName --- logInfo\",便于前端展示。
​	3.3)处理进度百分比:这是double类型数据,0.0-100.0,业务单元可视需要使用。
​	3.4)处理结果:这是一个Object类型对象,真实数据类型由业务单元约定。在未处理结束前,该值为null,处理结束后,如有返回值,此时赋值。
​	3.5)返回码:业务处理,可能遇到异常,如需设置返回码,此处赋值。
​	3.6)返回消息:与返回码相联系的提示信息。
​	3.7)开始处理时间戳:在任务启动(开始执行时)设置,用于计算业务处理的耗时时长。
​	4)任务缓存到期时间:任务处理完成后,任务信息会缓存一段时间(如60秒),等待前端获取,超期后,任务对象被销毁,意味着再也无法获取任务信息了。后端系统不可能累积存放超期的任务信息,否则可能导致OOM(Out Of Memory)异常。

2.2、任务执行类TaskRunnable

​	任务执行类,实现Runnable接口,是为线程池的工作线程提供处理方法。
​	任务执行类,使用任务信息对象作为参数,并调用任务信息的任务调用信息,使用反射方法,来执行任务处理。

2.3、任务管理器类TaskManService

​	任务管理器,全局对象,使用@Service注解,加入Spring容器。这样,任何需要异步处理的业务都可以访问任务管理器。
​	任务管理器,包含下列属性:
​	1)任务队列:LinkedBlockingQueue<TaskInfo>类型,考虑到OOM问题,容量使用有限值,如1万,即最大缓存1万个任务,相当于二级缓存。
​	2)任务信息字典:Map<Integer,TaskInfo>类型,key为taskId,目的是为了方便根据taskId快速查询任务信息。
​	3)线程池:ThreadPoolExecutor类型,工作线程队列长度为线程池的最大线程数,相当于一级缓存。可以设置核心线程数,最大线程数,工作线程队列长度等参数,如设置核心线程数为5,最大线程数为100,工作线程队列长度为100。线程工厂ThreadFactory使用Executors.defaultThreadFactory()。
​	4)任务ID计数器:AtomicInteger类型,用于分配唯一的任务ID。
​	5)监视线程:用于任务调度,以及检查缓存到期时间超期的已结束任务信息。
​	6)监视线程的执行类对象:Runnable对象,提供监视线程的执行方法。
​	7)上次检查时间戳:用于检查缓存到期时间,每秒1次检查。
​	任务管理器,包含下列接口方法:
​	1)添加任务:addTask,获取sessionId,检查任务处理对象、方法及参数是否为null,然后分配任务ID,创建任务对象,加入任务队列。如果参数为void,也需要构造一个空的Map<String,Object>字典对象。如果任务队列未满,就将任务加入任务队列中,并返回包含任务ID的字典,否则抛出“任务队列已满”的异常信息。
​	2)获取任务信息:getTaskInfo,参数为request和任务ID,如果sessionId与请求时相同,且任务对象能在任务信息字典中找到,就返回任务信息对象,否则抛出相关异常。
​	任务管理器的核心方法:
​	1)初始化:使用@PostConstruct注解,启动监视线程,并预启动线程池的一个核心线程。
​	2)监视线程的执行类run方法:实现每秒一次的超期已处理结束的任务信息的检查,以及任务调度。任务调度方法:
​	2.1)如果任务队列非空,且线程池未满,则取出一个任务信息对象,并创建一个任务执行类对象,加入到线程池的工作线程队列(execute方法加入)。
​	2.2)如果任务队列非空,且线程池已满,则等待100毫秒。
​	2.3)如果任务队列为空,则等待100毫秒。

3、异步请求处理框架代码

3.1、任务信息对象类TaskInfo

​	任务信息对象类TaskInfo,代码如下:
package com.abc.example.asyncproc;

import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import lombok.Data;

/**
 * @className	: TaskInfo
 * @description	: 任务信息
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/17	1.0.0		sheng.zheng		初版
 *
 */
@Data
public class TaskInfo {
	
	// ////////////////////////////////////////////////
	// 任务识别信息
	
	// 任务ID
	private Integer taskId = 0;
	
	// sessionId,用于识别请求者
	private String sessionId = \"\";

	// 任务名称,即业务处理的名称,如查询商品最低价,导入学员名册
	private String taskName = \"\";
	
	// ////////////////////////////////////////////////
	// 任务执行相关的
	
	// 请求参数,使用字典进行封装,以便适应任意数据结构
	private Map<String, Object> params;
	
	// 处理对象,一般是service对象
	private Object procObject;
	
	// 处理方法
	private Method method;
	
	// ////////////////////////////////////////////////
	// 任务处理产生的数据,中间数据,结果
	
	// 处理状态,0-未处理,1-处理中,2-处理结束
	private int procStatus = 0;
	
	// 处理结果,数据类型由业务单元约定
	private Object result;

	// 处理日志,包括中间结果,格式化显示:Time level taskId taskName logInfo
	private List<String> logList = new ArrayList<String>();	
	
	// 处理进度百分比
	private double progress = 0;	
	
	// 到期时间,UTC,任务完成后才设置,超时后销毁
	private long expiredTime = 0;	

	// 返回码,保留,0表示操作成功
	private int resultCode = 0;
	
	// 响应消息,保留
	private String message = \"\";
	
	// 开始处理时间,便于统计任务处理时长
	private long startTime = 0;	
	
	// ////////////////////////////////////////////////
	// 日志相关的方法
	private DateTimeFormatter df = DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss.SSS\");
		
	// 添加处理日志
	public void addLogInfo(String level,String logInfo) {
		// 格式化显示:Time level taskId taskName logInfo
		LocalDateTime current = LocalDateTime.now();
		String strCurrent = current.format(df);
		String log = String.format(\"%s %s %d %s --- %s\",
				strCurrent,level,taskId,taskName,logInfo);
		logList.add(log);
	}
	
	
	// ////////////////////////////////////////////////
	// 不同状态的参数设置接口
	
	// 设置任务初始化,未开始
	public void init(Integer taskId,String taskName,String sessionId,
			Object procObject,Method method,Map<String, Object> params) {
		this.procStatus = 0;
		this.taskId = taskId;
		this.taskName = taskName;
		this.sessionId = sessionId;
		this.procObject = procObject;
		this.method = method;
		this.params = params;
	}
	
	// 启动任务
	public void start() {
		this.procStatus = 1;
		addLogInfo(TaskConstants.LEVEL_INFO,\"开始处理任务...\");
		// 记录任务开始处理的时间
		startTime = System.currentTimeMillis();
	}
	
	// 结束任务
	public void finish(Object result) {
		this.result = result;
		this.procStatus = 2;
		// 设置结果缓存的到期时间
		long current = System.currentTimeMillis();
		this.expiredTime = current + TaskConstants.PROC_EXPIRE_TIME;
		long duration = 0;
		double second = 0.0;
		duration = current - startTime;
		second = duration / 1000.0;
		addLogInfo(TaskConstants.LEVEL_INFO,\"任务处理结束,耗时(s):\"+second);
	}
	
	// 处理异常
	public void error(int resultCode,String message) {
		this.resultCode = resultCode;
		this.message = message;
		this.procStatus = 2;
	}
}

​	说明:任务信息对象类TaskInfo提供了几个常用的处理方法,如addLogInfo、init、start、finish、error,便于简化属性值设置。

3.2、任务执行类TaskRunnable

​	任务执行类TaskRunnable,代码如下:
package com.abc.example.asyncproc;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.abc.example.common.utils.LogUtil;
import com.abc.example.exception.BaseException;
import com.abc.example.exception.ExceptionCodes;

/**
 * @className	: TaskRunnable
 * @description	: 可被线程执行的任务执行类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/17	1.0.0		sheng.zheng		初版
 *
 */
public class TaskRunnable implements Runnable {
	// 任务信息
	private TaskInfo taskInfo;
	
	public TaskRunnable(TaskInfo taskInfo) {
		this.taskInfo = taskInfo;
	}
	
	// 获取任务ID
	public Integer getTaskId() {
		if (taskInfo != null) {
			return taskInfo.getTaskId();
		}
		return 0;
	}
		
	@Override
	public void run() {
		Object procObject = taskInfo.getProcObject();
		Method method = taskInfo.getMethod();
				
		try {
			// 使用反射方法,调用方法来处理任务
			method.invoke(procObject, taskInfo);
		}catch(BaseException e) {
			// 优先处理业务处理异常
			taskInfo.error(e.getCode(),e.getMessage());
			LogUtil.error(e);
		}catch(InvocationTargetException e) {
			taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
			LogUtil.error(e);
		}catch(IllegalAccessException e) {
			taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
			LogUtil.error(e);			
		}catch(IllegalArgumentException e) {
			taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
			LogUtil.error(e);			
		}catch(Exception e) {
			// 最后处理未知异常
			taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
			LogUtil.error(e);			
		}
	}
}

3.3、任务常量类TaskConstants

​	任务常量类TaskConstants,提供异步请求处理框架模块的相关常量设置,代码如下:
package com.abc.example.asyncproc;

/**
 * @className	: TaskConstants
 * @description	: 任务处理相关常量
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/18	1.0.0		sheng.zheng		初版
 *
 */
public class TaskConstants {
    // 任务缓存过期时间,单元毫秒,即任务处理完成后,设置此时长,超期销毁
    public static final int PROC_EXPIRE_TIME = 60000;  
    
    // 线程池核心线程数
    public static final int CORE_POOL_SIZE = 5;

    // 线程池最大线程数
    public static final int MAX_POOL_SIZE  = 100;
    
    // 线程池KeepAlive参数,单位秒
    public static final long KEEP_ALIVE_SECONDS = 10;    
    
    // 任务队列最大数目
    public static final int MAX_TASK_NUMS  = 10000;    
    
    // 日志信息告警等级
    public static final String LEVEL_INFO = \"INFO\";    
    public static final String LEVEL_ERROR = \"ERROR\";     

}

3.4、任务管理器类TaskManService

​	任务管理器类TaskManService,代码如下:
package com.abc.example.asyncproc;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import org.springframework.stereotype.Service;
import com.abc.example.common.utils.LogUtil;
import com.abc.example.exception.BaseException;
import com.abc.example.exception.ExceptionCodes;

/**
 * @className	: TaskManService
 * @description	: 任务管理器
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/18	1.0.0		sheng.zheng		初版
 *
 */
@Service
public class TaskManService {
	// 任务队列,考虑OOM(Out Of Memory)问题,限定任务队列长度,相当于二级缓存
	private BlockingQueue<TaskInfo> taskQueue = 
			new LinkedBlockingQueue<TaskInfo>(TaskConstants.MAX_TASK_NUMS);
	
	// 任务信息字典,key为taskId,目的是为了方便根据taskId查询任务信息
	private Map<Integer,TaskInfo> taskMap = new HashMap<Integer,TaskInfo>();
	
	// 线程池,工作线程队列长度为线程池的最大线程数,相当于一级缓存
	private ThreadPoolExecutor executor = new ThreadPoolExecutor(
			TaskConstants.CORE_POOL_SIZE, 
			TaskConstants.MAX_POOL_SIZE, 
			TaskConstants.KEEP_ALIVE_SECONDS,
			TimeUnit.SECONDS, 
			new LinkedBlockingQueue<>(TaskConstants.MAX_POOL_SIZE), 
			Executors.defaultThreadFactory());
		
	// 任务ID计数器,累加
	private AtomicInteger taskIdCounter = new AtomicInteger();
	
	// 用于缓存上次检查时间
	private long lastTime = 0;	
	
	// 监视线程,用于任务调度,以及检查已结束任务的缓存到期时间
	private Thread monitor;
	@PostConstruct
	public void init(){
		// 启动线程实例
		monitor = new Thread(checkRunnable);
		monitor.start();
		
		// 启动一个核心线程
		executor.prestartCoreThread();
	}			
	
	// 检查已结束任务的缓存到期时间,超期的销毁
	private Runnable checkRunnable = new Runnable() {
		@Override
        public void run() {
			while (true) {
				long current = System.currentTimeMillis();
				if(current - lastTime >= 1000) {
					// 离上次检查时间超过1秒
					checkAndremove();
					// 更新lastTime
					lastTime = current;
				}
				synchronized(this) {
					try {
						// 检查任务队列
						if(taskQueue.isEmpty()) {
							// 如果任务队列为空,则等待100ms
							Thread.sleep(100);
						}else {
							// 如果任务队列不为空
							// 检查线程池队列
							if (executor.getQueue().size() < TaskConstants.MAX_POOL_SIZE) {
								// 如果线程池队列未满
								// 从任务队列中获取一个任务
								TaskInfo taskInfo = taskQueue.take();
								// 创建Runnable对象
								TaskRunnable tr = new TaskRunnable(taskInfo);
								// 调用线程池执行任务
								executor.execute(tr);
							}else {
								// 如果线程池队列已满,则等待100ms
								Thread.sleep(100);
							}
						}											
					}catch (InterruptedException e) {
						LogUtil.error(e);
					}
				}
			}
		}
	};
	
	/**
	 * 
	 * @methodName	: checkAndremove
	 * @description	: 检查并移除过期对象
	 * @history	:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/15	1.0.0		sheng.zheng		初版
	 *
	 */
	private void checkAndremove() {
		synchronized(taskMap) {
			if (taskMap.size() == 0) {
				// 如果无对象
				return;
			}
			long current = System.currentTimeMillis();
			Iterator<Map.Entry<Integer,TaskInfo>> iter = taskMap.entrySet().iterator();
			while(iter.hasNext()) {
				Map.Entry<Integer,TaskInfo> entry = iter.next();
				TaskInfo taskInfo = entry.getValue();
				long expiredTime = taskInfo.getExpiredTime();
				if ((expiredTime != 0) && ((current - expiredTime) > TaskConstants.PROC_EXPIRE_TIME)) {
					// 如果过期,移除
					iter.remove();
				}
			}					
		}
	}	
	
	/**
	 * 
	 * @methodName		: addTask
	 * @description         : 添加任务
	 * @param request	: request对象
	 * @param taskName	: 任务名称
	 * @param procObject    : 处理对象
	 * @param method	: 处理方法
	 * @param params	: 方法参数,透明传递到处理方法中
	 * @return		: 处理ID,唯一标识该请求的处理
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/19	1.0.0		sheng.zheng		初版
	 *
	 */
	public Integer addTask(HttpServletRequest request,
			String taskName,Object procObject,Method method,
			Map<String, Object> params) {
		// 获取sessionId
		String sessionId = null;
		if (request.getSession() != null) {
			sessionId = request.getSession().getId();
		}else {
			// 无效的session
			throw new BaseException(ExceptionCodes.SESSION_IS_NULL);			
		}
		
		// 空指针保护
		if (procObject == null) {
			throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,\"procObject对象为null\");				
		}
		if (method == null) {
			throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,\"method对象为null\");				
		}
		if (params == null) {
			throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,\"params对象为null\");				
		}		

		// 获取可用的任务ID
		Integer taskId = taskIdCounter.incrementAndGet();
		
		// 生成任务处理信息对象		
		TaskInfo item = new TaskInfo();
		// 初始化任务信息
		item.init(taskId,taskName,sessionId,procObject,method,params);
		
		// 加入处理队列
		try {
			synchronized(taskQueue) {
				taskQueue.add(item);				
			}
		}catch(IllegalStateException e) {
			// 队列已满
			throw new BaseException(ExceptionCodes.ADD_OBJECT_FAILED,\"任务队列已满\");
		}
		
		// 加入字典
		synchronized(taskMap) {
			taskMap.put(taskId, item);
		}
				
		return taskId;
	}
	
	/**
	 * 
	 * @methodName		: getTaskInfo
	 * @description	        : 获取任务信息
	 * @param request	: request对象
	 * @param taskId	: 任务ID	
	 * @return		: TaskInfo对象
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/19	1.0.0		sheng.zheng		初版
	 *
	 */
	public TaskInfo getTaskInfo(HttpServletRequest request,Integer taskId) {
		TaskInfo item = null;
		synchronized(taskMap) {
			if (taskMap.containsKey(taskId)) {
				item = taskMap.get(taskId);
				String sessionId = request.getSession().getId();
				if (!sessionId.equals(item.getSessionId())) {
					throw new BaseException(ExceptionCodes.TASKID_NOT_RIGHTS);				
				}
			}else {
				throw new BaseException(ExceptionCodes.TASKID_NOT_EXIST);
			}			
		}
				
		return item;
	}	
	
}

3.5、异常处理类BaseException

​	异常处理类BaseException,代码如下:
package com.abc.example.exception;

import lombok.Data;

/**
 * @className	: BaseException
 * @description	: 异常信息基类
 * @summary	: 可以处理系统异常和自定义异常
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
@Data
public class BaseException extends RuntimeException{
	private static final long serialVersionUID = 4359709211352401087L;
	
	// 异常码
    private int  code ;
    
    // 异常信息ID
    private String messageId;
    
    // 异常信息
    private String message;
    
  	// =============== 以下为各种构造函数,重载 ===================================
    
    public BaseException(String message) {
        this.message = message;
    }

    public BaseException(String message, Throwable e) {
        this.message = message;
    }

    public BaseException(int code, String message) {
        this.message = message;
        this.code = code;
    }

    public BaseException(ExceptionCodes e) {
        this.code = e.getCode();
        this.messageId = e.getMessageId();
        this.message = e.getMessage();
    }
    
    public BaseException(ExceptionCodes e,String message) {
        this.code = e.getCode();
        this.messageId = e.getMessageId();
        this.message = e.getMessage() + \":\" + message;
    }    

    public BaseException(int code, String message, Throwable e) {
        this.message = message;
        this.code = code;   
        
    }
}

3.6、异常信息枚举类ExceptionCodes

​	异常信息枚举类ExceptionCodes,代码如下:
package com.abc.example.exception;

/**
 * @className	: ExceptionCodes
 * @description	: 异常信息枚举类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
public enum ExceptionCodes {
    // 0-99,reserved for common exception
    SUCCESS(0, \"message.SUCCESS\", \"操作成功\"),
    FAILED(1, \"message.FAILED\", \"操作失败\"),
    ERROR(99, \"message.ERROR\", \"操作异常\"),
    ARGUMENTS_ERROR(2, \"message.ARGUMENTS_ERROR\",\"参数错误\"),
    TASKID_NOT_EXIST(16, \"message.TASKID_NOT_EXIST\",\"任务ID不存在,可能已过期销毁\"),
    TASKID_NOT_RIGHTS(17, \"message.TASKID_NOT_RIGHTS\",\"无权访问此任务ID\"),
    SESSION_IS_NULL(18, \"message.SESSION_IS_NULL\",\"session为空,请重新登录\"),

    ARGUMENTS_IS_EMPTY(22, \"message.ARGUMENTS_IS_EMPTY\",\"参数值不能为空\"),
    ADD_OBJECT_FAILED(30, \"message.ADD_OBJECT_FAILED\", \"新增对象失败\"),

    ;	// 定义结束
	
	// 返回码
    private int code;
    public int getCode() {
    	return this.code;
    }
    
    // 返回消息ID
    private String messageId;
    public String getMessageId() {
    	return this.messageId;
    }

    // 返回消息
    private String message;
    public String getMessage() {
    	return this.message;
    }
    
    ExceptionCodes(int code, String messageId, String message) {
        this.code = code;
        this.messageId = messageId;
        this.message = message;
    }
}

3.7、通用异常处理类UniveralExceptionHandler

​	通用异常处理类UniveralExceptionHandler,这是一个异常信息捕获的拦截器,代码如下:
package com.abc.example.exception;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @className	: UniveralExceptionHandler
 * @description	: 通用异常处理类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
@ControllerAdvice
public class UniveralExceptionHandler {
	Logger logger = LoggerFactory.getLogger(getClass());
	
	/**
	 * 
	 * @methodName	: handleException
	 * @description	: 拦截非业务异常
	 * @param e	: Exception类型的异常
	 * @return	: JSON格式的异常信息
	 * @history     :
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2021/01/01	1.0.0		sheng.zheng		初版
	 *
	 */
    @ResponseBody
    @ExceptionHandler(Exception.class)
    public Map<String,Object> handleException(Exception e) {
    	//将异常信息写入日志
        logger.error(e.getMessage(), e);
        //输出通用错误代码和信息
        Map<String,Object> map = new HashMap<>();
        map.put(\"code\", ExceptionCodes.ERROR.getCode());
        map.put(\"message\", ExceptionCodes.ERROR.getMessage());
        return map;
    }

    /**
     * 
     * @methodName	: handleBaseException
     * @description	: 拦截业务异常
     * @param e		: BaseException类型的异常
     * @return		: JSON格式的异常信息
     * @history		:
     * ------------------------------------------------------------------------------
     * date			version		modifier		remarks                   
     * ------------------------------------------------------------------------------
     * 2021/01/01	1.0.0		sheng.zheng		初版
     *
     */
    @ResponseBody
    @ExceptionHandler(BaseException.class)
    public Map<String,Object> handleBaseException(BaseException e) {
    	//将异常信息写入日志
        logger.error(\"业务异常:code:{},messageId:{},message:{}\", e.getCode(), e.getMessageId(), e.getMessage());
        //输出错误代码和信息
        Map<String,Object> map = new HashMap<>();
        map.put(\"code\", e.getCode());
        map.put(\"message\" ,e.getMessage());
        return map;
    }
    
}

3.8、日志工具类LogUtil

​	日志工具类LogUtil,相关方法代码如下:
package com.abc.example.common.utils;


import lombok.extern.slf4j.Slf4j;

/**
 * @className	: LogUtil
 * @description	: 日志工具类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
@Slf4j
public class LogUtil {
	/**
	 * 
	 * @methodName	: error
	 * @description	: 输出异常信息
	 * @param e	: Exception对象
	 * @history	:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2021/01/01	1.0.0		sheng.zheng		初版
	 *
	 */
	public static void error(Exception e) {
		e.printStackTrace();
		String ex = getString(e);
		log.error(ex);
	}	

	/**
	 * 
	 * @methodName	: getString
	 * @description	: 获取Exception的getStackTrace信息
	 * @param ex	: Exception对象
	 * @return	: 错误调用栈信息
	 * @history	:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2021/01/01	1.0.0		sheng.zheng		初版
	 *
	 */
    public static String getString(Exception ex) {

        StringBuilder stack = new StringBuilder();
        StackTraceElement[] sts = ex.getStackTrace();
        for (StackTraceElement st : sts) {
            stack.append(st.toString()).append(\"\\r\\n\");
        }
        return stack.toString();
    } 
}

4、异步请求处理测试例子

​	下面使用一个测试例子,来说明如何使用此框架。

4.1、异步任务的业务处理类

​	假设有一个测试任务服务类TestTaskService,简单起见,不用接口类了,直接就是可实例化的类。这个类有一个需要异步处理的方法,方法名为testTask。
​	testTask方法只接受TaskInfo类型的参数,但实际参数params为Map字典(相当于JSON对象),包含repeat和delay,这两个参数是testTask方法所需要的。处理结果result此处为字符串类型,这个类型在实际处理时可以是任意类型,只需要与前端有约定即可。
​	为方便控制器调用,TestTaskService提供两个接口方法:addAsyncTask和getTaskInfo。
​	addAsyncTask方法,有2个参数,request和请求参数params,请求参数params是控制器@RequestBody的请求参数,或者重新封装的适应testTask处理的参数。对于业务处理类TestTaskService来说,testTask方法需要什么形式和类型的参数,属于内部约定,只要两者匹配即可。本例子比较简单,直接透传HTTP请求参数,作为任务处理的方法参数。addAsyncTask方法,执行输入参数校验(不要等执行任务时,再去校验参数),然后调用任务管理器addTask方法,加入一个任务,并获取任务ID,返回前端。
​	getTaskInfo方法,有2个参数,request和请求参数params,请求参数params包含任务ID参数,调用任务管理器的getTaskInfo方法。获取TaskInfo对象,然后屏蔽一些不需要展示的信息,返回前端。getTaskInfo方法用于前端轮询,查询任务执行过程和结果。

​	测试任务服务类TestTaskService,代码如下:
package com.abc.example.asyncproc;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.abc.example.common.utils.LogUtil;
import com.abc.example.common.utils.Utility;
import com.abc.example.exception.BaseException;
import com.abc.example.exception.ExceptionCodes;

/**
 * @className	: TestTaskService
 * @description	: 测试任务服务类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/19	1.0.0		sheng.zheng		初版
 *
 */
@Service
public class TestTaskService {
	// 任务管理器
	@Autowired
	private TaskManService taskManService;
	
	/**
	 * 
	 * @methodName		: addAsyncTask
	 * @description	        : 新增一个异步任务
	 * @summary		: 新增测试任务类型的异步任务,
	 * 	如果处理队列未满,可立即获取任务ID:
	 * 		根据此任务ID,可以通过调用getTaskInfo,获取任务的处理进度信息;
	 * 		如果任务处理完毕,任务信息缓存60秒,过期后无法再获取;
	 * 	如果处理队列已满,返回任务队列已满的失败提示。
	 * @param request	: request对象
	 * @param params	: 请求参数,形式如下:
	 * 	{
	 * 		\"repeat\"	: 10,	// 重复次数,默认为10,可选
	 * 		\"delay\"		: 1000,	// 延时毫秒数,默认为1000,可选
	 * 	}
	 * @return			: JSON对象,形式如下:
	 * 	{
	 * 		\"taskId\"	: 1,	// 任务ID
	 * 	}
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/19	1.0.0		sheng.zheng		初版
	 *
	 */
	public Map<String, Object> addAsyncTask(HttpServletRequest request,
			Map<String, Object> params){
		// 参数校验
		Integer repeat = (Integer)params.get(\"repeat\");
		if (repeat == null) {
			repeat = 10;
		}
		Integer delay = (Integer)params.get(\"delay\");
		if (delay == null) {
			delay = 1000;
		}
		if (repeat <= 0) {
			// 参数错误
			throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,\"repeat\");
		}
		if (delay <= 10) {
			// 参数错误
			throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,\"delay\");
		}
		
		// 任务名称
		String taskName = \"测试任务\";
		// 任务处理对象
		Object procObject = this;
		// 任务执行方法
		Method method = Utility.getMethodByName(this,\"testTask\");
		// 调用任务管理器,添加任务
		Integer taskId = taskManService.addTask(request, taskName, procObject, method, params);

		// 返回值处理
		Map<String, Object> map = new HashMap<String, Object>();
		map.put(\"taskId\", taskId);
		return map;
	}
	
	/**
	 * 
	 * @methodName		: getTaskInfo
	 * @description	        : 根据任务ID,获取任务信息
	 * @summary		: 如果任务ID对应的任务,属于当前用户,则可以有权获取信息,否则拒绝。
	 * 	如果任务状态为未处理或处理中,可以获取任务信息。
	 * 	如果任务状态为处理结束,且在缓存到期时间之前,也可以获取任务信息。否则,无法获取任务信息。
	 * @param request	: request对象
	 * @param params	: 请求参数,形式如下:
	 * 	{
	 * 		\"taskId\"	: 1,	// 任务ID,必选
	 * 	}
	 * @return			: JSON对象,形式如下:
	 * 	{
	 * 		\"taskId\"	: 1,	// 任务ID
	 * 		\"procStatus\": 1,	// 处理状态,0-未处理,1-处理中,2-处理结束
	 * 		\"progress\"	: 0.0,	// 处理进度百分比
	 * 		\"logList\"	: [],	// 处理日志,字符串列表,格式化显示:Time level taskId taskName logInfo
	 * 		\"result\"	: \"\",	// 处理结果,字符串类型
	 * 		\"resultCode\": 0,	// 返回码,0表示操作成功
	 * 		\"message\"	: \"\",	// 响应消息
	 * 	}
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/19	1.0.0		sheng.zheng		初版
	 *
	 */
	public Map<String, Object> getTaskInfo(HttpServletRequest request,
			Map<String, Object> params){
		// 从请求参数中获取taskId
		Integer taskId = (Integer)params.get(\"taskId\");
		if(taskId == null) {
			throw new BaseException(ExceptionCodes.ARGUMENTS_IS_EMPTY,\"taskId\");
		}
		// 调用任务管理器的方法,获取任务信息对象
		TaskInfo taskInfo = taskManService.getTaskInfo(request, taskId);
		
		// 返回值处理
		// 从任务信息对象,筛选一些属性返回
		Map<String, Object> map = new HashMap<String, Object>();
		// 任务ID
		map.put(\"taskId\", taskId);
		// 任务状态
		map.put(\"procStatus\", taskInfo.getProcStatus());
		// 处理结果,如任务未结束,则为null
		map.put(\"result\", taskInfo.getResult());
		// 处理日志
		map.put(\"logList\", taskInfo.getLogList());
		// 处理进度
		map.put(\"progress\", taskInfo.getProgress());
		// 可能的返回码和消息
		map.put(\"resultCode\", taskInfo.getResultCode());
		map.put(\"message\", taskInfo.getMessage());
		
		return map;		
	}

	/**
	 * 
	 * @methodName		: testTask
	 * @description	        : 测试任务,异步执行
	 * @param taskInfo	: 任务信息
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2022/08/19	1.0.0		sheng.zheng		初版
	 *
	 */
	public void testTask(TaskInfo taskInfo) {
		// 开始处理任务
		taskInfo.start();
		
		// 获取参数
		Map<String, Object> params = (Map<String, Object>)taskInfo.getParams();
		Integer repeat = (Integer)params.get(\"repeat\");
		if (repeat == null) {
			repeat = 10;
		}
		Integer delay = (Integer)params.get(\"delay\");
		if (delay == null) {
			delay = 1000;
		}
		
		String result = \"\";
		
		// 重复n次
		for(int i = 0; i < repeat; i++) {
			taskInfo.addLogInfo(TaskConstants.LEVEL_INFO, \"处理步骤\" + (i+1));
			// 显示处理进度
			taskInfo.setProgress((i+1)*1.0/repeat*100);
			// 延迟delay毫秒
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				LogUtil.error(e);
			}					
		}
		result = \"OK\";
		
		// 处理完毕
		taskInfo.finish(result);
	}
}

4.2、相关工具类

​	4.1中涉及到工具类Utility的getMethodByName方法,代码如下:

package com.abc.example.common.utils;

import java.lang.reflect.Method;

/**
 * @className	: Utility
 * @description	: 工具类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
public class Utility {
	/**
	 * 
	 * @methodName		: getMethodByName
	 * @description	        : 根据方法名称获取方法对象
	 * @param object	: 方法所在的类对象
	 * @param methodName    : 方法名
	 * @return		: Method类型对象
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2021/01/01	1.0.0		sheng.zheng		初版
	 *
	 */
	public static Method getMethodByName(Object object,String methodName) {
		Class<?> class1 = object.getClass();
		Method retItem = null;
		Method[] methods = class1.getMethods();
		for (int i = 0; i < methods.length; i++) {
			Method item = methods[i];
			if (item.getName().equals(methodName)) {
				retItem = item;
				break;
			}
		}
		return retItem;
	}    
}    

4.3、异步测试任务控制器类AsycTestController

​	异步测试任务控制器类AsycTestController,提供HTTP访问接口,代码如下:
package com.abc.example.controller;

import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.abc.example.asyncproc.TestTaskService;
import com.abc.example.vo.common.BaseResponse;

/**
 * @className	: AsycTestController
 * @description	: 异步任务测试控制器类
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/08/19	1.0.0		sheng.zheng		初版
 *
 */
@RequestMapping(\"/asycTest\")
@RestController
public class AsycTestController extends BaseController{
	@Autowired	
	private TestTaskService tts;
	
	@RequestMapping(\"/addTask\")
	public BaseResponse<Map<String,Object>> addTask(HttpServletRequest request,
			 @RequestBody Map<String, Object> params) {
		Map<String,Object> map = tts.addAsyncTask(request,params);
		return successResponse(map);
	}	
	
	@RequestMapping(\"/getTaskInfo\")
	public BaseResponse<Map<String,Object>> getTaskInfo(HttpServletRequest request,
			 @RequestBody Map<String, Object> params) {
		Map<String,Object> map = tts.getTaskInfo(request, params);
		return successResponse(map);
	}	
}

4.4、基本响应消息体对象类BaseResponse

​	基本响应消息体对象类BaseResponse,提供标准形式的HTTP响应格式,代码如下:
package com.abc.example.vo.common;

import lombok.Data;

/**
 * @className	: BaseResponse
 * @description	: 基本响应消息体对象
 * @summary	:
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
@Data
public class BaseResponse<T> {
    // 响应码
    private int code;

    // 响应消息
    private String message;
        
    // 响应实体信息
    private T data;

    // 简单起见,屏蔽下列信息
    // 分页信息
    // private Page page;

    // 附加通知信息
    // private Additional additional;
}

4.5、控制器基类BaseController

​	控制器基类BaseController,支持事务处理,并提供操作成功的方法,代码如下:
package com.abc.example.controller;

import java.util.List;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RestController;
import com.abc.example.exception.ExceptionCodes;
import com.abc.example.vo.common.BaseResponse;
import com.abc.example.vo.common.Page;
import com.github.pagehelper.PageInfo;

/**
 * @className	: BaseController
 * @description	: 控制器基类
 * @summary	: 支持事务处理,并提供操作成功的方法
 * @history	:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2021/01/01	1.0.0		sheng.zheng		初版
 *
 */
@RestController
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class BaseController {
	/**
	 * 
	 * @methodName		: successResponse
	 * @description	        : 操作成功,返回信息不含数据体
	 * @param <T>		: 模板类型
	 * @return		: 操作成功的返回码和消息,不含数据体
	 * @history		:
	 * ------------------------------------------------------------------------------
	 * date			version		modifier		remarks                   
	 * ------------------------------------------------------------------------------
	 * 2021/01/01	1.0.0		sheng.zheng		初版
	 *
	 */
    protected <T> BaseResponse<T> successResponse() {
        BaseResponse<T> response = new BaseResponse<T>();
        response.setCode(ExceptionCodes.SUCCESS.getCode());
        response.setMessage(ExceptionCodes.SUCCESS.getMessage());
        return response;
    }

    /**
     * 
     * @methodName		: successResponse
     * @description	        : 操作成功,返回信息含数据体
     * @param <T>		: 模板类型
     * @param data		: 模板类型的数据
     * @return			: 操作成功的返回码和消息,并包含数据体
     * @history		        :
     * ------------------------------------------------------------------------------
     * date			version		modifier		remarks                   
     * ------------------------------------------------------------------------------
     * 2021/01/01	1.0.0		sheng.zheng		初版
     *
     */
    protected <T> BaseResponse<T> successResponse(T data) {
        BaseResponse<T> response = new BaseResponse<T>();
        response.setCode(ExceptionCodes.SUCCESS.getCode());
        response.setMessage(ExceptionCodes.SUCCESS.getMessage());
        response.setData(data);
        return response;
    }
}

5、测试

​	使用postman来测试。

5.1、添加测试任务

url:/asycTest/addTask
method: POST
request body:
{
    \"repeat\" : 10,	// 步骤数目
    \"delay\" : 2000	// 等待时长,毫秒
}
response:
{
    \"code\": 0,
    \"message\": \"操作成功\",
    \"data\": {
        \"taskId\": 1
    }
}

5.2、获取任务信息,任务处理过程中

url:/asycTest/getTaskInfo
method: POST
request body:
{
    \"taskId\": 1
}
response:
{
    \"code\": 0,
    \"message\": \"操作成功\",
    \"data\": {
        \"result\": null,
        \"procStatus\": 1,
        \"logList\": [
            \"2022-08-19 16:26:29.919 INFO 1 测试任务 --- 开始处理任务...\",
            \"2022-08-19 16:26:29.921 INFO 1 测试任务 --- 处理步骤1\",
            \"2022-08-19 16:26:31.923 INFO 1 测试任务 --- 处理步骤2\",
            \"2022-08-19 16:26:33.928 INFO 1 测试任务 --- 处理步骤3\",
            \"2022-08-19 16:26:35.933 INFO 1 测试任务 --- 处理步骤4\",
            \"2022-08-19 16:26:37.937 INFO 1 测试任务 --- 处理步骤5\",
            \"2022-08-19 16:26:39.942 INFO 1 测试任务 --- 处理步骤6\",
            \"2022-08-19 16:26:41.946 INFO 1 测试任务 --- 处理步骤7\",
            \"2022-08-19 16:26:43.951 INFO 1 测试任务 --- 处理步骤8\",
        ],
        \"resultCode\": 0,
        \"progress\": 80.0,
        \"message\": \"\",
        \"taskId\": 1
    }
}

5.3、获取任务信息,处理结束

url:/asycTest/getTaskInfo
method: POST
request body:
{
    \"taskId\": 1
}
response:
{
    \"code\": 0,
    \"message\": \"操作成功\",
    \"data\": {
        \"result\": \"OK\",
        \"procStatus\": 2,
        \"logList\": [
            \"2022-08-19 16:26:29.919 INFO 1 测试任务 --- 开始处理任务...\",
            \"2022-08-19 16:26:29.921 INFO 1 测试任务 --- 处理步骤1\",
            \"2022-08-19 16:26:31.923 INFO 1 测试任务 --- 处理步骤2\",
            \"2022-08-19 16:26:33.928 INFO 1 测试任务 --- 处理步骤3\",
            \"2022-08-19 16:26:35.933 INFO 1 测试任务 --- 处理步骤4\",
            \"2022-08-19 16:26:37.937 INFO 1 测试任务 --- 处理步骤5\",
            \"2022-08-19 16:26:39.942 INFO 1 测试任务 --- 处理步骤6\",
            \"2022-08-19 16:26:41.946 INFO 1 测试任务 --- 处理步骤7\",
            \"2022-08-19 16:26:43.951 INFO 1 测试任务 --- 处理步骤8\",
            \"2022-08-19 16:26:45.956 INFO 1 测试任务 --- 处理步骤9\",
            \"2022-08-19 16:26:47.960 INFO 1 测试任务 --- 处理步骤10\",
            \"2022-08-19 16:26:49.965 INFO 1 测试任务 --- 任务处理结束,耗时(s):20.044\"
        ],
        \"resultCode\": 0,
        \"progress\": 100.0,
        \"message\": \"\",
        \"taskId\": 1
    }
}

5.4、获取任务信息,处理结束后任务信息缓存时间过期

url:/asycTest/getTaskInfo
method: POST
request body:
{
    \"taskId\": 1
}
response:
{
    \"code\": 16,
    \"message\": \"任务ID不存在,可能已过期销毁\"
}

6、异步处理的注意事项

​	编写异步处理方法(如测试例子中的testTask方法)时,有几点注意事项:
​	1)request参数失效,如果要传递request参数,将之封装到params参数中,会有很多问题,如Session为null等,可以参考此文:https://www.shuzhiduo.com/A/gVdnaPp85W/。
​	2)事务处理:线程方法的@Transactional会失效,如需要使用事务处理,可参考此文:https://blog.csdn.net/u013844437/article/details/112983780。

作者:阿拉伯1999
出处:http://www.cnblogs.com/alabo1999/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利.
养成良好习惯,好文章随手顶一下。


来源:https://www.cnblogs.com/alabo1999/p/16607827.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » Spring Boot异步请求处理框架

相关推荐

  • 暂无文章