Skip to content

mysql数据库中间件

lnkForKing edited this page Jan 12, 2020 · 1 revision

MysqlSchedulerImpl.java

import com.gzkit.crm.mcs.mapper.TimedTaskMapper;
import com.gzkit.crm.mcs.utils.CacheUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import pres.lnk.springframework.AbstractScheduler;

import java.util.concurrent.TimeUnit;

/**
 *
 * @Author lnk
 * @Date 2018/2/28
 */
public class MysqlSchedulerImpl extends AbstractScheduler {
    private static final String MAX_LEVEL = "maxLevel";

    @Autowired
    private TimedTaskMapper timedTaskMapper;

    @Override
    public boolean check(String id) {
        int count = timedTaskMapper.check(id);
        return count == 0;
    }

    @Override
    public boolean lock(String id, long timeoutMillis) {
        long timeout = timeoutMillis / 1000;
        int result = timedTaskMapper.hasId(id);
        if(result > 0){
            try {
                result = timedTaskMapper.lock(id, timeout);
            } finally { }
        }else{
            try {
                result = timedTaskMapper.insert(id, timeout, null);
            } finally { }
        }
        return result == 1;
    }

    @Override
    public void relock(String id, long timeoutMillis) {
        long timeout = timeoutMillis / 1000;
        timedTaskMapper.update(id, timeout, null);
    }

    @Override
    public long currentTimeMillis() {
        return timedTaskMapper.time();
    }

    @Override
    public void keepAlive() {
        int timeout = getHeartTime() + 5;

        //获取当前最高level
        String value = timedTaskMapper.getMaxLevel(MAX_LEVEL);
        Integer result = null;
        if(NumberUtils.isDigits(value)){
            result = Integer.parseInt(value);
        }

        if(result == null){
            //如果数据库没有保存level,则保存当前服务器的级别到数据库
            try {
                timedTaskMapper.insert(MAX_LEVEL, timeout, getLevel() + "");
            } catch (Exception e){
                timedTaskMapper.updateLevel(MAX_LEVEL, getLevel() + "", timeout);
            }
        }else{
            //数据库已有level
            if(result == getLevel()){
                //如果数据库level与当前服务器相同,则尝试刷新当前级别服务器的时间
                try {
                    timedTaskMapper.updateLevelTime(MAX_LEVEL, getLevel() + "", timeout);
                } finally { }
            }else if(result > getLevel()){
                //如果当前服务器级别比数据库的高,则刷新数据库级别
                try {
                    timedTaskMapper.updateLevel(MAX_LEVEL, getLevel() + "", timeout);
                } finally { }
            }

        }
    }

    @Override
    public int getMaxAliveLevel() {
        String value = timedTaskMapper.getMaxLevel(MAX_LEVEL);
        if(NumberUtils.isDigits(value)){
            return Integer.parseInt(value);
        }
        return getLevel();
    }
}

TimedTaskMapper.java

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

/**
 * @Author lnk
 * @Date 2018/3/6
 */
@Mapper
public interface TimedTaskMapper {

    /**
     * 判断是否被锁
     * @param id
     * @return
     */
    @Select("select count(*) from t_timed_task " +
            "where id = #{id, jdbcType=VARCHAR} and timeout >= UNIX_TIMESTAMP(now())")
    int check(String id);

    /**
     * 判断是否存在锁
     * @param id
     * @return
     */
    @Select("select count(*) from t_timed_task where id = #{id, jdbcType=VARCHAR}")
    int hasId(String id);

    /**
     * 修改锁的过期时间,用返回修改数据数判断是否修改成功
     * @param id
     * @param timeout
     * @return
     */
    @Update("update t_timed_task set timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} " +
            "where id = #{id, jdbcType=VARCHAR} and timeout < UNIX_TIMESTAMP(now())")
    int lock(@Param("id") String id, @Param("timeout") long timeout);

    /**
     * 插入锁数据,用返回插入数据数判断是否加锁成功
     * @param id
     * @param timeout
     * @param value
     * @return
     */
    @Insert("insert into t_timed_task values ( " +
            "#{id, jdbcType=VARCHAR}," +
            "UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT}," +
            "#{value, jdbcType=VARCHAR} )")
    int insert(@Param("id") String id, @Param("timeout") long timeout, @Param("value") String value);

    /**
     * 修改锁的过期时间
     * @param id
     * @param timeout
     * @param value
     * @return
     */
    @Update("update t_timed_task set " +
            "timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT}," +
            "value = #{value, jdbcType=VARCHAR}" +
            "where id = #{id, jdbcType=VARCHAR}")
    int update(@Param("id") String id, @Param("timeout") long timeout, @Param("value") String value);

    /**
     * 获取数据库的系统时间
     * @return
     */
    @Select("select UNIX_TIMESTAMP(now())")
    long time();

    /**
     * 更新服务器级别
     * @param id
     * @param value
     * @param timeout
     * @return
     */
    @Update("update t_timed_task set value = #{value, jdbcType=VARCHAR}," +
            "timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} " +
            "where id = #{id, jdbcType=VARCHAR} " +
            "and (value > #{value, jdbcType=VARCHAR} or timeout < UNIX_TIMESTAMP(now()))")
    int updateLevel(@Param("id") String id, @Param("value") String value, @Param("timeout") long timeout);

    /**
     * 更新服务器级别的有效时间
     * @param id
     * @param value
     * @param timeout
     * @return
     */
    @Update("update t_timed_task set timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} " +
            "where id = #{id, jdbcType=VARCHAR} and value = #{value, jdbcType=VARCHAR}")
    int updateLevelTime(@Param("id") String id, @Param("value") String value, @Param("timeout") long timeout);

    /**
     * 获取当前最高级别服务器的级别
     * @param id
     * @return
     */
    @Select("select value from t_timed_task where id = #{id, jdbcType=VARCHAR} and timeout > UNIX_TIMESTAMP(now())")
    String getMaxLevel(String id);
}

表结构

t_timed_task

字段 类型 说明
id varchar(255) PRIMARY NOT NULL 任务id
timeout bigint NOT NULL 锁的失效时间
value varchar(255) DEFAULT NULL 锁对应的值
DROP TABLE IF EXISTS `t_timed_task`;

CREATE TABLE `t_timed_task` (
  `id` varchar(255) NOT NULL,
  `timeout` bigint(20) NOT NULL,
  `value` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Clone this wiki locally