Skip to content

实战踩坑

ThreadPool 使用实践

NamedThreadFactory

java
public class NamedThreadFactory implements ThreadFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(NamedThreadFactory.class);
    // 避免多个NameThreadFactory对象prefix相同
    private static final AtomicInteger ID = new AtomicInteger(0);
    private final AtomicInteger SEQ = new AtomicInteger(0);
    private final String prefix;
    private final boolean daemon;
    private final Thread.UncaughtExceptionHandler handler;

    public NamedThreadFactory(String prefix) {
        this(prefix, true, null);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        this(prefix, daemon, null);
    }

    public NamedThreadFactory(String prefix, boolean daemon, Thread.UncaughtExceptionHandler handler) {
        this.prefix = prefix + "-" + ID.incrementAndGet();
        this.daemon = daemon;
        SecurityManager s = System.getSecurityManager();
        // !!! 非常重要 !!!,请务必设置uncaught exception handler, 否则会使用默认的ThreadGroup中的uncaught exception handler => java.lang.ThreadGroup#uncaughtException默认行为是吞掉异常啥也没有
        // 设置uncaught exception handler.
        this.handler = handler != null ? handler :
                (t, e) -> LOGGER.error("Thread {} catch an uncaught exception.", t, e);
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable, prefix + "-" + SEQ.incrementAndGet());
        thread.setUncaughtExceptionHandler(handler);
        thread.setDaemon(daemon);
        return thread;
    }
}

ThreadPoolExecutor

java
ThreadPoolExecutor EXE = new ThreadPoolExecutor(
        100, // 核心成员100个
        200, // 最多允许200个员工,其中100个外包
        1, // 外包成员没活干的时候,超过一定时间就裁掉
        TimeUnit.MINUTES, // 允许外包人员吃闲饭的时间单位
        // 一般情况下都是核心成员100个干活,允许积压一定的任务,但当积压到1000个的时候,就要聘请外包一起干活
        new LinkedBlockingQueue<>(1000),
        // 给团队起个统一的名称, 成员编号1~n
        new NamedThreadFactory("MyExe"),
        // 单子太多,实在干不完了就推掉
        new ThreadPoolExecutor.AbortPolicy() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                String msg = String.format("线程池耗尽, 线程名称: %s,线程池大小: %d " +
                                "(active: %d, core: %d, max: %d, largest: %d)," +
                                " 任务总数: %d (completed: %d)," +
                                " 执行器状态:(isShutdown:%s, isTerminated:%s, isTerminating:%s).",
                        "BizThread", e.getPoolSize(), e.getActiveCount(),
                        e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                        e.getTaskCount(), e.getCompletedTaskCount(),
                        e.isShutdown(), e.isTerminated(), e.isTerminating());
                RejectedExecutionException rejectedException = new RejectedExecutionException(msg);
                log.error("", rejectedException);
                throw rejectedException;
            }
        }
);

LinkedBlockingQueue 陷阱

注意:如果使用 LinkedBlockingQueue,请配置 Max Threads Size < Queue Size <= Max Threads Size * 5,否则可能会出现并发的任务数量 > Queue Size,但由于 LinkedBlockingQueue 的任务通知机制,无法快速将所有等待线程唤醒,则会出现线程数量足够,但是任务仍然会被 Reject 的情况。

LinkedBlockingQueue 相关解释:

  1. 如果线程池发现 LinkedBlockingQueue 为空的情况下,是所有线程都在 parking 等着 Queue 中有任务。

  2. 此时如果有任务提交到 Queue 中,put 操作会判断当前队列是否为由 empty 状态转变为非 empty 状态,如果是则通知 1 个在等待的线程,将任务取走 1 个;

  3. 被通知到的线程每次取了 1 个任务后,会判断队列是否为空,如果不为空则由被通知的线程再通知另外的等待线程,等待线程的唤醒是链式唤醒的。

  4. 基于上述的表现,则会出现 1 种情况,Queue=300,parking thread=1000,如果此时来了 500 个请求,则会尝试唤醒 300 个线程,但由于线程调度的原因,并不会立刻有 300 个线程开始执行,就会出现线程数明明很多,但是 Queue 满了,无法执行任务的情况。

通俗解释:

相当于很多人在排队领馒头,但是筐子里面只能放 10 个馒头,排队的人有 100 个,平时馒头不够吃的,大家就在那里排队。馒头一下子出锅了 50 个,往筐子里放,放第一个的时候会告诉排队在最前面的人可以拿了,第一个人拿了之后,会告诉第二个人还有更多的馒头可以拿,但是第二个人反应慢啊,没有放馒头的人手快,就出现了放馒头的人放了 10 多个之后,筐子装不下了的情况,也就是 RejectException。

Dubbo SynchronizeQueue

dubbo 默认使用 SynchronizeQueue,这个 Queue 有些特殊,会将等待的线程连接成一个链表,如果有任务则从链表中取出一个线程去执行这个任务。在线程池的使用中,如果没有线程等待执行任务,则直接返回,并出现 rejectException。

!!! 非常重要 !!! 请务必设置 uncaught exception handler,否则会使用默认的 ThreadGroup 中的 uncaught exception handler => java.lang.ThreadGroup#uncaughtException,默认行为是吞掉异常啥也没有。

验证用例

线程数量充裕但是快速放入多个任务到阻塞队列,并且阻塞队列通知过慢,造成队列满了,任务被拒绝的问题复现。

java
public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {

        int threads = 500;
        int producer = 4;

        ThreadPoolExecutor exe = new ThreadPoolExecutor(threads, threads, 1, TimeUnit.MINUTES,
                // 设置queue size < core size.
                new ArrayBlockingQueue<>(100));
        CountDownLatch latch = new CountDownLatch(threads);
        // 保证所有线程都已处于active状态.
        for (int i = 0; i < threads; i++) {
            exe.submit(() -> {
                latch.countDown();
            });
        }
        latch.await();
        // 创建4个线程的线程池, 其目的是快速创建多任务
        ExecutorService poolSubmit = Executors.newFixedThreadPool(producer);
        int circle = 100;
        for (int i = 0; i < producer; i++) {
            poolSubmit.submit(() -> {
                try {
                    for (int j = 0; j < circle; j++) {
                        exe.submit(() -> {
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        });
                    }
                } catch (Throwable cause) {
                    cause.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        exe.shutdownNow();
        poolSubmit.shutdownNow();
    }
}

Jackson 配置实践

在使用 Jackson 的过程中,会经历 Json 字符串 <-> Object 对象的互相转化,下面给出一些建议的 Jackson 工具类配置。

以下配置以 Jackson 2.9.9 版本为例,其他版本可能存在部分 API 的变更,需要适当调整。

引入依赖

groovy
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"

工具类

java
@Slf4j
public class JsonUtils {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    static {
        // 支持非标准json中增加//注释
        OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        // 该属性是用于关闭json key的常量化, 默认jackson会将json的key规范化(调用String#intern)用于减小消耗
        // 常见情况,将Map<String,Object>序列化为json
        // 但如果存在大量非固定key的json需要序列化和反序列化的时候,建议将该属性关闭
        // 如果主动开启该配置,需与CANONICALIZE_FIELD_NAMES, true一起使用.
        OBJECT_MAPPER.getFactory().configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false);
        // 该特性目的是提高jackson的性能,但有可能导致内存泄露,建议关闭.
        // 参考:https://github.com/FasterXML/jackson-core/issues/189
        OBJECT_MAPPER.getFactory().configure(JsonFactory.Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING, false);
        // 关闭未知属性错误异常,默认开启,建议关闭,对未知属性不处理
        OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        // 关闭空类序列化异常,默认开启,建议关闭
        OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
    }

    private JsonUtils() {
    }

    // TODO: encode OR decode code

}

Spring Jackson 配置

请在启动类增加如下定义:

java
@Bean
@Primary
@ConditionalOnMissingBean(ObjectMapper.class)
public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
    ObjectMapper objMapper = builder.createXmlMapper(false).build();
    // 支持非标准json中增加//注释
    objMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
    // 该属性是用于关闭json key的常量化, 默认jackson会将json的key规范化(调用String#intern)用于减小消耗
    // 但如果存在大量非固定key的json需要序列化和反序列化的时候,建议将该属性关闭
    // 如果主动开启该配置,需与CANONICALIZE_FIELD_NAMES, true一起使用.
    objMapper.getFactory().configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false);
    // 该特性目的是提高jackson的性能,但有可能导致内存泄露,建议关闭.
    // 参考:https://github.com/FasterXML/jackson-core/issues/189
    objMapper.getFactory().configure(JsonFactory.Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING, false);
    // 关闭未知属性错误异常,默认开启,建议关闭,对未知属性不处理
    objMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
    // 关闭空类序列化异常,默认开启,建议关闭
    objMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
    return objMapper;
}

String.intern() 说明

注意:String.intern() 方法的作用是,将字符串放到方法区的字符常量池并去重保持单例来减少内存占用。

维度内存优化、性能提升潜在的性能开销、内存管理风险
内存使用减少重复字符串的内存占用,共享唯一实例可能导致字符串常量池膨胀,增加GC负担或引发OOM
性能表现字符串比较更快(可使用 == 代替 equalsintern() 调用本身有开销(池查找、可能插入)
适用场景处理大量重复字符串(如状态码、枚举值)处理大量唯一性高或生命周期短的字符串

为什么配置 INTERN_FIELD_NAMES = false

这样配置的原因?OBJECT_MAPPER.getFactory().configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false);

尽管默认开启,但在以下情况下,将其设置为 false 会是更明智的选择:

首要原因:节省内存,防止潜在 OOM

这是最核心的考量。字符串常量池的大小是有限的(具体取决于 JVM 版本和参数设置),并且通常由 JVM 统一管理,垃圾回收器对其的处理方式也与普通堆内存不同。如果应用程序处理的 JSON 结构千变万化,字段名几乎不重复(例如,字段名是随机生成的 ID、时间戳等),那么启用字段名驻留功能就失去了其"去重"的优势。反而会导致大量唯一的字符串被放入常量池,增加常量池的内存压力,严重时可能成为内存溢出(OutOfMemoryError)的诱因。

处理不可信的 JSON 数据

如果 JSON 数据来源不可控(如对外公开的 API),恶意攻击者可能会构造包含大量不同字段名的巨大 JSON 报文,这是一种潜在的拒绝服务(DoS)攻击向量。关闭此特性可以一定程度上增强应用对此类攻击的韧性。

性能考量(特定场景)

对于字段名重复率极低的场景,intern() 方法本身的调用开销(包括检查池中是否存在该字符串的操作)可能比其带来的收益更大。关闭此特性可以避免这部分不必要的开销。

签名或加解密工具类需使用 ThreadLocal 缓存

JFR 文件分析

生成 Java Flight Recorder (JFR) 文件是进行 Java 应用性能分析的关键第一步。具体采用哪种方法,主要取决于你的具体需求,例如是用于主动性能剖析,还是诊断一个正在运行的应用的问题。

下面这个表格汇总了三种核心方法的对比,可以帮助你快速做出选择。

方法适用场景关键工具/参数主要优势
启动时录制应用启动阶段问题排查,或计划内、固定时长的性能剖析。JVM 参数:-XX:StartFlightRecording设定简单,自动化程度高,适合与启动脚本集成。
运行时动态录制生产环境首选。对正在运行的应用进行即时诊断,无需重启。jcmd 命令灵活且对业务影响最小,可随时开始/停止录制。
通过工具录制图形化操作,适合开发、测试环境,或对命令行不熟悉的用户。Java Mission Control (JMC), Arthas可视化界面,操作直观,易于上手。

UUID.randomUUID 性能问题

UUID.randomUUID 底层共用同一个 SecureRandom 对象,会调用其 engineNextBytes 方法,该方法签名存在对象同步锁 sun.security.provider.SecureRandom#engineNextBytes,存在性能瓶颈。

类似问题

  • MD5/Sha1/Sha256 等签名工具:均通过 MessageDigest.getInstance(algorithm) 执行,该对象的创建底层会公用同一个 java.security.Provider,此操作的性能瓶颈在 java.security.Provider#getService,同样存在锁问题。
  • AES/CBC/PKCS7Padding 等加解密工具:通过 Cipher.getInstance(algorithm) 执行,该对象的创建底层会公用同一个 java.security.Provider,此操作的性能瓶颈在 java.security.Provider#getService,同样存在锁问题。

诸如此类签名及加解密工具,建议操作如 Dubbo 中 MD5Utils,使用 ThreadLocal 缓存对象。

java
public class Md5Utils {

    private static final char[] DIGITS_UPPER =
            {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};

    private static ThreadLocal<MessageDigest> md5Local = ThreadLocal.withInitial(() -> {
        try {
            return MessageDigest.getInstance("MD5");
        } catch (Exception e) {
            throw new RuntimeException("MD5 algorithm not available", e);
        }
    });

    public static String md5(String value) {
        MessageDigest md5 = md5Local.get();
        md5.reset();
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        md5.update(bytes);
        return new String(encodeHex(md5.digest()));
    }

    private static char[] encodeHex(final byte[] data) {
        final int l = data.length;
        final char[] out = new char[l << 1];
        for (int i = 0, j = 0; i < l; i++) {
            out[j++] = DIGITS_UPPER[(0xF0 & data[i]) >>> 4];
            out[j++] = DIGITS_UPPER[0x0F & data[i]];
        }
        return out;
    }
}

ParallelStream 使用误区

ForkJoinPool 原理

Fork/Join 框架的核心是采用分治法的思想,将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

从 ForkJoin 原理图可以看出,parallelStream 在执行的过程中如果需要先将任务拆分(fork)成多个子任务,分属多个 ForkJoinThread 执行。然后再由部分 ForkJoinThread 执行聚合(Join)任务,将子任务结果聚合成最后的结果。

WorkStealing

工作窃取算法能够有效的提高 ForkJoinPool 中任务的执行效率,基于双端队列可以有效的将空闲的线程利用起来。

任何使用 parallel stream 的程序都有可能成为阻塞程序的源头,并且在执行过程中程序中的其他部分将无法访问这些 workers,这意味着任何依赖 parallel streams 的程序在什么别的东西占用着 common ForkJoinPool 时将会变得不可预知并且暗藏危机。

有些业务方存在 List/Map 等对象,希望对其遍历做一些业务逻辑,会使用 parallelStream,但使用过程中需注意 parallelStream 会存在一些使用上的问题,parallelStream 底层使用 ForkJoinPool 线程池调度多个线程执行任务。

问题1:线程安全问题

java
// 线程不安全的错误例子
List<String> numList = ....;
HashMap<String, Object> hashMap = new HashMap<>();
numList.parallelStream().forEach(curr -> {
    hashMap.put(curr, curr);
});

parallelStream 是运行在多线程环境下的,在多线程环境下对非线程安全的 HashMap 操作就有可能出现 ConcurrentModificationException

问题2:CommonPool 造成性能缓慢

java
List<String> numList = ....;
numList.parallelStream().forEach(item -> {
    // Do some I/O or lang time operation,
    // such as dubbo call, db operation.
});

parallelStream 默认使用的是 CommonPool,是公共线程池,其核心数为 core - 1

在非 ForkJoinWorkerThread 提交的任务,会执行在 CurrentThread + CommonPool,如 8 核心 CPU,则会使用 7+1 个线程。

但是,如果同时执行多个 parallelStream 操作,则会共用 CommonPool,容易因为某些耗时较长的任务造成整体任务的执行缓慢,容易出现大量长尾请求。

使用上述方式,用的越多,共用 CommonPool 的情况越多,越容易出现互相竞争导致的卡顿长尾请求。

java
List<String> numList = ....;
// 并行代码放到ForkJoinPool中执行,就不再使用公共ForkJoinPool.commonPool()
ForkJoinPool forkJoinPool = new ForkJoinPool(${size}); // 请填写数量
forkJoinPool.submit(() -> {
    try {
        numList.parallelStream().forEach(item -> {
            // Do some I/O or lang time operation,
            // such as dubbo call, db operation.
        }).get(1000, TimeUnit.MILLISECONDS);  // 主线程等待超时时间:1000 ms.
    } catch(Throwable cause) { // 务必catch异常
        // do something
    }
});

参考资料:JDK8 并行流 parallelStream: https://www.jianshu.com/p/3d4e76467990

CompletableFuture 使用实践

CompletableFuture 为 Java 多线程调度工具,用于 Java 多线程的编排,非常方便。

使用结论

  • parallelStream 会严重影响 CompletableFuture.get() 的性能;
  • 双层 parallelStream 的影响比单层 parallelStream 的影响要大的多;
  • CompletableFuture.get()CompletableFuture.get(time, unit) 存在性能差异,二者的性能差异在不同场景下表现不同;
  • 高 QPS 场景下,不推荐使用 CompletableFuture.join(),尽量用 CompletableFuture.get(long timeout, TimeUnit unit) 替代。

结果消费

  • thenAccept: 如果 supplyAsync 函数执行成功,则调用该函数,并将正确的结果作为参数传递
  • thenRun: 如果 supplyAsync 函数执行成功,则调用该函数
  • whenComplete: 如果 supplyAsync 函数执行成功或失败,则调用该函数,并将正确的结果或执行过程中产生的异常作为入参
java
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> Integer.parseInt("100"));
future.thenAccept(System.out::println); // 如果supplyAsync函数执行成功,则调用该函数,并将正确的结果作为参数传递
future.thenRun(() -> { // 如果supplyAsync函数执行成功,则调用该函数
    System.out.println("future执行完成了");
});
future.whenComplete((rs, cause) -> System.out.println(rs + "-" + cause)); // 如果supplyAsync函数执行成功或,则调用该函数,并将正确的结果或执行过程中产生的异常作为入参。

结果转换

  • thenApply: 结果类型转换
  • handle: 结果类型转换(可处理异常)
  • thenCompose: 可以用于组合链式调用
java
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
// apply 结果类型转换
CompletableFuture<String> negative = future.thenApply(i -> String.valueOf(-i));
negative.thenAccept(System.out::println);
negative.exceptionally(cause -> {
    System.out.println(cause);
    return null;
});

// handler 结果类型转换
CompletableFuture<String> negative1 = future.handle((i, cause) -> String.valueOf(-i));
negative1.thenAccept(System.out::println);
negative1.exceptionally(cause -> {
    System.out.println(cause);
    return null;
});

// thenCompose, 可以用于组合链式调用
CompletableFuture<String> negative2 = future.thenCompose(i ->
        CompletableFuture.supplyAsync(() -> String.valueOf(-i)));
negative2.thenAccept(System.out::println);
negative2.exceptionally(cause -> {
    System.out.println(cause);
    return null;
});

结果组合消费

  • thenAcceptBoth: 等待二者均完成,然后拿着二者结果消费
  • runAfterBoth: 等待二者均完成,然后执行某一特定函数
  • acceptEither: 等待其中之一完成,然后拿着完成的结果消费
  • runAfterEither: 等待其中之一完成,然后执行某特定函数
java
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
// 等待二者均完成,然后拿着二者结果消费
future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
// 等待二者均完成,然后执行某一特定函数
future.runAfterBoth(other, () -> {
    System.out.println("执行完成两个");
});
// 等待其中之一完成,然后拿着完成的结果消费
future.acceptEither(other, System.out::println);
// 等待其中之一完成,然后执行某特定函数
future.runAfterEither(other, () -> {
    System.out.println("执行完成1个");
});

结果组合转换

  • thenCombine: 组合两个结果,使用结果执行转换函数
  • applyToEither: 两个结果之一完成,则使用该结果执行转换函数
java
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
// 组合两个结果, 使用结果执行转换函数.
CompletableFuture<Integer> third = first.thenCombine(second, Integer::sum);
third.whenComplete((rs, cause) -> {
    System.out.println(rs + " - " + cause);
});
// 两个结果之一完成,则使用该结果执行转换函数.
CompletableFuture<Integer> four = first.applyToEither(second, num -> -num);
four.whenComplete((rs, cause) -> {
    System.out.println(rs + "->" + cause);
});

多 Future 组合

  • anyOf: 其中之一完成即触发,可通过 anyDone 获取结果,且可以组合不同结果类型的 Future
  • allOf: 所有均完成之后触发
java
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
// 其中之一完成即触发,可通过anyDone获取结果,且可以组合不同结果类型的Future
CompletableFuture<Object> anyDone = CompletableFuture.anyOf(first, second, third);
Object anyRs = null;
try {
    anyRs = anyDone.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
System.out.println(anyRs);
// 所有均完成之后触发.
CompletableFuture<Void> allDone = CompletableFuture.allOf(first, second, third);
allDone.thenRun(() -> {
    try {
        System.out.println(first.get() + "-" + second.get() + "-" + third.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
});

异常处理

知识点:所有的 CompletableFuture 的操作,返回新的 CompletableFuture 的操作,返回的均是新的 Future 对象。

请保证操作 Future 的过程中至少有一处处理异常,包含有异常的操作函数有 whenCompleteexceptionallyhandle 等。

处理异常时,在某些情况下得到的是原始类型异常,有些情况下得到的 CompletionException 异常:

  • 主动调用 future 对象的 completeExceptionally 方法设置的异常,可以在 future 的各类 Throwable 处理函数中获取得到原始异常;
  • 非调用 completeExceptionally 方法设置的异常,在 future 的各类 Throwable 处理函数中获取得到的异常是 CompletionException
  • future 的转换函数得到的 future1,再调用其各类 Throwable 处理函数中获得的异常,必然是 CompletionException
java
CompletableFuture f1 = new CompletableFuture();
CompletableFuture f2 = f1.whenComplete((rs, cause) -> {
    // IllegalStateException
    System.out.println("f1:" + rs + " - " + cause);
});
assert f1 != f2; // 注意经过whenComplete之后f1就不等于f2了
f2.whenComplete((rs, cause) -> {
    // CompletionException
    System.out.println("f2:" + rs + " - " + cause);
});
// 第一种,通过completeExceptionally设置的异常,能够拿到原始异常
f1.completeExceptionally(new IllegalStateException("exception --- 1"));


CompletableFuture<Object> f3 = CompletableFuture.supplyAsync(() -> {
    throw new IllegalStateException("exception --- 2");
});
f3.whenComplete((rs, cause) -> {
    // CompletionException
    System.out.println("f3:" + rs + " - " + cause);
});
// f1:null - java.lang.IllegalStateException: exception --- 1
// f2:null - java.util.concurrent.CompletionException: java.lang.IllegalStateException: exception --- 1
// f3:null - java.util.concurrent.CompletionException: java.lang.IllegalStateException: exception --- 2

LocalCache 实践

结论先行:如果不需要淘汰算法则选择 ConcurrentHashMap,如果需要淘汰算法和一些丰富的 API,推荐选择 Caffeine。

一、ConcurrentHashMap

最简单的进程内缓存可以通过 JDK 自带的 HashMap 或 ConcurrentHashMap 实现。

  • 适用场景:不需要淘汰的缓存数据。
  • 缺点:无法进行缓存淘汰,内存会无限制的增长。

二、LRUHashMap

可以通过继承 LinkedHashMap 来实现一个简单的 LRUHashMap,即可完成一个简单的 LRU(最近最少使用)算法。

  • 缺点:锁竞争严重,性能比较低;不支持过期时间;不支持自动刷新。

示例:LRUHashMap 的简单实现

java
class LRUCache extends LinkedHashMap {

    private final int max;
    private Object lock;

    public LRUCache(int max) {
        //无需扩容
        super((int) (max * 1.4f), 0.75f, true);
        this.max = max;
        this.lock = new Object();
    }

    /**
     * 重写LinkedHashMap的removeEldestEntry方法即可 在Put的时候判断,如果为true,就会删除最老的
     *
     * @param eldest
     * @return
     */
    @Override
    protected boolean removeEldestEntry(Map.Entry eldest) {
        return size() > max;
    }

    public Object getValue(Object key) {
        synchronized (lock) {
            return get(key);
        }
    }

    public void putValue(Object key, Object value) {
        synchronized (lock) {
            put(key, value);
        }
    }

    public boolean removeValue(Object key) {
        synchronized (lock) {
            return remove(key) != null;
        }
    }

    public boolean removeAll() {
        clear();
        return true;
    }
}

三、Guava Cache

Guava Cache 解决了 LRUHashMap 中的几个缺点。

Guava Cache 提供了基于容量,时间和引用的缓存回收方式。基于容量的方式内部实现采用 LRU 算法,基于引用回收很好的利用了 Java 虚拟机的垃圾回收机制。

其中的缓存构造器 CacheBuilder 采用构建者模式提供了设置好各种参数的缓存对象。缓存核心类 LocalCache 里面的内部类 Segment 与 jdk1.7 及以前的 ConcurrentHashMap 非常相似,分段加锁,减少锁竞争,并且都继承于 ReetrantLock,还有六个队列,以实现丰富的本地缓存方案。Guava Cache 对于过期的 Entry 并没有马上过期(也就是并没有后台线程一直在扫),而是通过进行读写操作的时候进行过期处理,这样做的好处是避免后台线程扫描的时候进行全局加锁。直接通过查询,判断其是否满足刷新条件,进行刷新。

Guava Cache 缓存回收

Guava Cache 提供了三种基本的缓存回收方式。

基于容量回收

  • maximumSize(long):当缓存中的元素数量超过指定值时触发回收。

基于定时回收

  • expireAfterAccess(long, TimeUnit):缓存项在给定时间内没有被读/写访问,则回收。请注意这种缓存的回收顺序和基于大小回收一样。
  • expireAfterWrite(long, TimeUnit):缓存项在给定时间内没有被写访问(创建或覆盖),则回收。如果认为缓存数据总是在固定时候后变得陈旧不可用,这种回收方式是可取的。

如下文所讨论,定时回收周期性地在写操作中执行,偶尔在读操作中执行。

基于引用回收

  • CacheBuilder.weakKeys():使用弱引用存储键。当键没有其它(强或软)引用时,缓存项可以被垃圾回收。
  • CacheBuilder.weakValues():使用弱引用存储值。当值没有其它(强或软)引用时,缓存项可以被垃圾回收。
  • CacheBuilder.softValues():使用软引用存储值。软引用只有在响应内存需要时,才按照全局最近最少使用的顺序回收。

Guava Cache 核心 API

CacheBuilder

缓存构建器。构建缓存的入口,指定缓存配置参数并初始化本地缓存。主要采用 builder 的模式,CacheBuilder 的每一个方法都返回这个 CacheBuilder 知道 build 方法的调用。注意 build 方法有重载,带有参数的为构建一个具有数据加载功能的缓存,不带参数的构建一个没有数据加载功能的缓存。

LocalManualCache

作为 LocalCache 的一个内部类,在构造方法里面会把 LocalCache 类型的变量传入,并且调用方法时都直接或者间接调用 LocalCache 里面的方法。

LocalLoadingCache

可以看到该类继承了 LocalManualCache 并实现接口 LoadingCache。覆盖了 get,getUnchecked 等方法。

LocalCache

Guava Cache 中的核心类,重点了解。

LocalCache 的数据结构与 ConcurrentHashMap 很相似,都由多个 segment 组成,且各 segment 相对独立,互不影响,所以能支持并行操作。每个 segment 由一个 table 和若干队列组成。缓存数据存储在 table 中,其类型为 AtomicReferenceArray。

四、Caffeine

caffeine 是一个使用 JDK8 改进 Guava 缓存的高性能缓存库。推荐 2.8.6 及以上。

Caffeine 实现了 W-TinyLFU(LFU + LRU 算法的变种),其命中率和读写吞吐量大大优于 Guava Cache。

其实现原理较复杂。

五、Ehcache

参考:Ehcache

六、进程内缓存对比

常用进程内缓存技术对比:

比较项ConcurrentHashMapLRUMapEhcacheGuava CacheCaffeine
读写性能很好,分段锁一般,全局加锁好,需要做淘汰操作很好
淘汰算法LRU,一般支持多种淘汰算法,LRU,LFU,FIFOLRU,一般W-TinyLFU,很好
功能丰富程度功能比较简单功能比较单一功能很丰富功能很丰富,支持刷新和虚引用等功能和 Guava Cache 类似
工具大小jdk 自带类,很小基于 LinkedHashMap,较小很大,最新版本 1.4MB是 Guava 工具类中的一个小部分,较小一般,最新版本 644KB
是否持久化
是否支持集群
  • ConcurrentHashMap - 比较适合缓存比较固定不变的元素,且缓存的数量较小的。虽然从上面表格中比起来有点逊色,但是其由于是 JDK 自带的类,在各种框架中依然有大量的使用,比如我们可以用来缓存我们反射的 Method,Field 等等;也可以缓存一些链接,防止其重复建立。在 Caffeine 中也是使用的 ConcurrentHashMap 来存储元素。
  • LRUMap - 如果不想引入第三方包,又想使用淘汰算法淘汰数据,可以使用这个。
  • Ehcache - 由于其 jar 包很大,较重量级。对于需要持久化和集群的一些功能的,可以选择 Ehcache。需要注意的是,虽然 Ehcache 也支持分布式缓存,但是由于其节点间通信方式为 rmi,表现不如 Redis,所以一般不建议用它来作为分布式缓存。
  • Guava Cache - Guava 这个 jar 包在很多 Java 应用程序中都有大量的引入,所以很多时候其实是直接用就好了,并且其本身是轻量级的而且功能较为丰富,在不了解 Caffeine 的情况下可以选择 Guava Cache。
  • Caffeine - 其在命中率,读写性能上都比 Guava Cache 好很多,并且其 API 和 Guava cache 基本一致,甚至会多一点。在真实环境中使用 Caffeine,取得过不错的效果。

总结:如果不需要淘汰算法则选择 ConcurrentHashMap,如果需要淘汰算法和一些丰富的 API,推荐选择 Caffeine。

参考资料

  • caffeine github
  • 深入解密来自未来的缓存-Caffeine
  • Caffeine 缓存
  • Google Guava 官方教程(中文版)
  • Google Guava Cache 全解析
  • 注释驱动的 Spring cache 缓存介绍

ConcurrentHashMap computeIfAbsent 测评

基于 stackoverflow 网友的反馈,computeIfAbsent 在 key 已经存在时仍然会阻塞。JUC 的作者 Doug Lea 也回答了自己的看法,并给出了相关的回答:如果输入集是固定的,提前检查的性能确实更优,吞吐量更大;而当输入集是随机分布时,直接 computeIfAbsent 的性能是更好的。

故为了验证实际的情况,设计实验用例比较两种使用方式:直接 computeIfAbsentget before computeIfAbsent

实验的数据集以命中 key 的难易程度进行设计,为了排除随机数生成对实验的影响,数据集将在运行用例前生成好。

本次实验将从两个角度去进行观察验证,一个是使用 JMH 来分析执行过程的性能,另一个是使用 JFR 来分析程序运行过程中的线程情况。

使用的 JDK 版本为 jdk1.8.0_231。

吞吐量实验(JMH 测试代码)

java
@State(Scope.Benchmark)
public class ComputeIfAbsentBenchmark {
    private static final int SIZE = (2 << 22);
    private static final int RANDOM_STR_LEN = 16;

    @Param({"fixedKey", "randomDistributedKey"})
    private String caseType;

    private Map<String, String> testMap = new ConcurrentHashMap<>();
    private String[] dataSet;

    @State(Scope.Thread)
    public static class ThreadState {
        static final Random random = new Random();
        int index = random.nextInt();
    }

    @Setup
    public void setup() {
        testMap.clear();
        if (caseType.equals("fixedKey")) {
            dataSet = new String[64];
            for (int i = 0; i < 64; i++) {
                dataSet[i] = RandomStringUtils.randomAscii(RANDOM_STR_LEN);
            }
        } else if (caseType.equals("randomDistributedKey")) {
            dataSet = new String[SIZE];
            for (int i = 0; i < SIZE; i++) {
                dataSet[i] = RandomStringUtils.randomAscii(RANDOM_STR_LEN);
            }
        }
    }

    @Benchmark
    @Threads(2)
    public void computeIfAbsent(ThreadState threadState) {
        String key = dataSet[threadState.index++ & dataSet.length - 1];
        testMap.computeIfAbsent(key, s -> new StringBuilder(key).reverse().toString());
    }

    @Benchmark
    @Threads(2)
    public void getBeforeComputeIfAbsent(ThreadState threadState) {
        String key = dataSet[threadState.index++ & dataSet.length - 1];
        String val = testMap.get(key);
        if (val == null) {
            testMap.computeIfAbsent(key, s -> new StringBuilder(key).reverse().toString());
        }
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(ComputeIfAbsentBenchmark.class.getSimpleName())
                .forks(1)
                .warmupIterations(5)
                .measurementIterations(5)
                .result("result.json")
                .resultFormat(ResultFormatType.JSON)
                .build();

        new Runner(opt).run();
    }
}

线程情况(JFR 测试代码)

java
public class FindBlockThreadByJFRTest {

    private static final int THREADS = 16;

    public static void main(String[] args) throws InterruptedException {
        //map是全新map,直接使用computeIfAbsent
        directComputeIfAbsent();

        //map已填充好所有的key,直接使用computeIfAbsent
//        mapInitializedComputeIfAbsent();

        //map是全新map,在computeIfAbsent提前get
//        getBeforeComputeIfAbsent();
    }

    private static void directComputeIfAbsent() throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(THREADS, new NamedThreadFactory("compute-test", true));
        ConcurrentHashMap<String, Integer> testMap = new ConcurrentHashMap<>();
        System.out.println("started");
        for (int i = 0; i < THREADS; i++) {
            exec.execute(() -> {
                int idx = 0;
                while (idx++ < Integer.MAX_VALUE) {
                    String randomKey = RandomStringUtils.randomAlphabetic(3);
                    testMap.computeIfAbsent(randomKey, key -> 2);
                }
            });
        }
        TimeUnit.SECONDS.sleep(1000);
        System.out.println("finished");
    }

    private static void mapInitializedComputeIfAbsent() throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(THREADS, new NamedThreadFactory("compute-test", true));
        ConcurrentHashMap<Integer, Integer> testMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i++) {
            testMap.put(i, i);
        }
        System.out.println("started");
        for (int i = 0; i < THREADS; i++) {
            exec.execute(() -> {
                int idx = 0;
                while (idx++ < Integer.MAX_VALUE) {
                    int intKey = RandomUtils.nextInt(0, 10);
                    testMap.computeIfAbsent(intKey, key -> 2);
                }
            });
        }
        TimeUnit.SECONDS.sleep(1000);
        System.out.println("finished");
    }

    private static void getBeforeComputeIfAbsent() throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(THREADS, new NamedThreadFactory("compute-test", true));
        ConcurrentHashMap<String, Integer> testMap = new ConcurrentHashMap<>();
        System.out.println("started");
        for (int i = 0; i < THREADS; i++) {
            exec.execute(() -> {
                int idx = 0;
                while (idx++ < Integer.MAX_VALUE) {
                    String randomKey = RandomStringUtils.randomAlphabetic(3);
                    Integer value = testMap.get(randomKey);
                    if (value == null) {
                        testMap.computeIfAbsent(randomKey, s -> 2);
                    }
                }
            });
        }
        TimeUnit.SECONDS.sleep(1000);
        System.out.println("finished");
    }

}

实验分三个测试方法:

  1. 直接 computeIfAbsent
  2. map 初始化后 computeIfAbsent
  3. get before computeIfAbsent

computeIfAbsent 前进行 get,不再发现线程阻塞的情况。

数据分析

评测是 throughput,故分数越高性能越好。

吞吐量实验:fixedKey 和 randomDistributedKey 是两类数据集,fixed 的集合范围小,在实验过程中 key 更容易重复命中,random 的集合范围大,key 不容易命中。结合上面的数据可以发现,当输入的 key 在 map 中已经存在的概率越高,提前 get 比直接 computeIfAbsent 的吞吐量要更高。

线程状态实验:使用 JFR 捕捉线程状态时,提前 get 不会出现线程 BLOCKED 的情况。

源码分析

结合 JFR 的阻塞的代码行,进行分析。

线程阻塞在 1674 行,只有当 key 已经在 map 中存在时才会被锁进行 function 计算更新 value 值。

结论

  • key 在 map 中已经存在时,再次调用 computeIfAbsent 会被锁。
  • 若数据集不固定,key 相对分散时,建议可直接使用 computeIfAbsent 来获取 value,因为第一次的添加不会被锁住。
  • 若数据集固定,尤其是相同 key 反复获取时,建议提前 get 来优化性能。

建议用法

computeIfAbsent 封装:

java
public final class MapUtils {
    /**
     * A temporary workaround for Java 8 specific performance issue JDK-8161372 .<br>
     * This class should be removed once we drop Java 8 support.
     *
     * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a>
     */
    public static <K,V> V computeIfAbsent(Map<K,V> map, K key,
                        Function<? super K, ? extends V> mappingFunction) {
        V val = map.get(key);
        return val != null? val
                : map.computeIfAbsent(key, mappingFunction);
    }

    private MapUtils() {
    }
}

参考资料

  1. 探索的起点,stackoverflow 系列问答 https://stackoverflow.com/questions/26481796/concurrenthashmap-computeifabsent
  2. 大师解惑,Doug lee 的回答 http://cs.oswego.edu/pipermail/concurrency-interest/2014-December/013360.html
  3. concurrency interest http://cs.oswego.edu/pipermail/concurrency-interest/2014-December/thread.html
  4. open jdk 保障 issue https://bugs.openjdk.java.net/browse/JDK-8161372
  5. 其他的测试代码 https://github.com/ben-manes/caffeine/blob/7f98672657a6e3515eac00ec86990b877a9d10dc/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ComputeBenchmark.java
  6. 源码 computeIfAbsent 第一次创 node 为何会对临时变量进行锁 http://cs.oswego.edu/pipermail/concurrency-interest/2016-January/014857.html
  7. Doug Lee 对临时变量创锁的解释 https://bugs.openjdk.java.net/browse/JDK-8202416
  8. 大师的彩蛋 https://blog.csdn.net/g6U8W7p06dCO99fQ3/article/details/106846139

Move fast and break things