博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Cloud Hystrix源码分析
阅读量:7168 次
发布时间:2019-06-29

本文共 5308 字,大约阅读时间需要 17 分钟。

Spring Cloud Hystrix源码分析

Spring Cloud Hystrix源码解读

@EnableCircuitBreaker
职责:

  • 激活Circuit Breaker
    初始化顺序
  • @EnableCircuitBreaker
  • EnableCircuitBreakerImportSelector
  • HystrixCircuitBreakerConfiguration

HystrixCircuitBreakerConfiguration

初始化组件

  • HystrixCommandAspect
  • HystrixShutdownHook
  • HystrixStreamEndpoint:Servlet
  • HystrixMetricsPollerConfiguration

Netflix Hystrix源码解读

HystrixCommandAspect

依赖组件

  • MetaholderFactory
  • HystrixCommandFactory:生成HystriInvokable
  • HystrixInvokable

    CommandCollapserGenericObservableCommandGenericCommand

Future实现服务熔断

package com.segumentfault.springcloudlesson9.future;import java.util.Random;import java.util.concurrent.*;/** * 通过 {@link Future} 实现 服务熔断 * */public class FutureCircuitBreakerDemo {    public static void main(String[] args) throws InterruptedException, ExecutionException {        // 初始化线程池        ExecutorService executorService = Executors.newSingleThreadExecutor();        RandomCommand command = new RandomCommand();        Future
future = executorService.submit(command::run); String result = null; // 100 毫秒超时时间 try { result = future.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // fallback 方法调用 result = command.fallback(); } System.out.println(result); executorService.shutdown(); } /** * 随机对象 */ private static final Random random = new Random(); /** * 随机事件执行命令 */ public static class RandomCommand implements Command
{ @Override public String run() throws InterruptedException { long executeTime = random.nextInt(200); // 通过休眠来模拟执行时间 System.out.println("Execute Time : " + executeTime + " ms"); Thread.sleep(executeTime); return "Hello,World"; } @Override public String fallback() { return "Fallback"; } } public interface Command
{ /** * 正常执行,并且返回结果 * * @return */ T run() throws Exception; /** * 错误时,返回容错结果 * * @return */ T fallback(); }}

RxJava基础

单数据:Single

Single.just("Hello,World") // 仅能发布单个数据        .subscribeOn(Schedulers.io()) // 在 I/O 线程执行        .subscribe(RxJavaDemo::println) // 订阅并且消费数据;

多数据:Observable

List
values = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);Observable.from(values) //发布多个数据 .subscribeOn(Schedulers.computation()) // 在 I/O 线程执行 .subscribe(RxJavaDemo::println) // 订阅并且消费数据;// 等待线程执行完毕Thread.sleep(100);

使用标准Reactive模式

List
values = Arrays.asList(1, 2, 3);Observable.from(values) //发布多个数据 .subscribeOn(Schedulers.newThread()) // 在 newThread 线程执行 .subscribe(value -> { if (value > 2) throw new IllegalStateException("数据不应许大于 2"); //消费数据 println("消费数据:" + value); }, e -> { // 当异常情况,中断执行 println("发生异常 , " + e.getMessage()); }, () -> { // 当整体流程完成时 println("流程执行完成"); });// 等待线程执行完毕Thread.sleep(100);

Java 9 Flow API

只做了解

package concurrent.java9;import java.util.concurrent.Flow;import java.util.concurrent.SubmissionPublisher;/** * {@link SubmissionPublisher} * * @author mercyblitz **/public class SubmissionPublisherDemo {    public static void main(String[] args) throws InterruptedException {        try (SubmissionPublisher
publisher = new SubmissionPublisher<>()) { //Publisher(100) => A -> B -> C => Done publisher.subscribe(new IntegerSubscriber("A")); publisher.subscribe(new IntegerSubscriber("B")); publisher.subscribe(new IntegerSubscriber("C")); // 提交数据到各个订阅器 publisher.submit(100); } Thread.currentThread().join(1000L); } private static class IntegerSubscriber implements Flow.Subscriber
{ private final String name; private Flow.Subscription subscription; private IntegerSubscriber(String name) { this.name = name; } @Override public void onSubscribe(Flow.Subscription subscription) { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "subscribes subscription[%s]\n", Thread.currentThread().getName(), name, subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "receives item[%d]\n", Thread.currentThread().getName(), name, item); subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "is completed!\n", Thread.currentThread().getName(), name); } }}

转载地址:http://vaqwm.baihongyu.com/

你可能感兴趣的文章