Spring Cloud Feign内部实现代码细节

1. 概述

Feign用于服务间调用,它的内部实现是一个包含Ribbon(负载均衡)的**JDK-HttpURLConnection(Http)**调用。虽然调用形式是类似于RPC,但是实际调用是Http,这也是为什么Feign被称为伪RPC调用的原因。内部调用过程如下:

2. 代码细节

1) BaseLoadBalancer.java配置初始化

重点功能: 1. 初始化负载均衡策略 2. 初始化取服务注册列表调度策略

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {    ...    // 每隔30s Ping一次    int pingIntervalTime = Integer.parseInt(""            + clientConfig.getProperty(                    CommonClientConfigKey.NFLoadBalancerPingInterval,                    Integer.parseInt("30")));    // 每次最多Ping 2s    int maxTotalPingTime = Integer.parseInt(""            + clientConfig.getProperty(                    CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,                    Integer.parseInt("2")));    setPingInterval(pingIntervalTime);    setMaxTotalPingTime(maxTotalPingTime);    // cross associate with each other    // i.e. Rule,Ping meet your container LB    // LB, these are your Ping and Rule guys ...    // 设置负载均衡规则    setRule(rule);    // 初始化取服务注册列表调度策略    setPing(ping);    setLoadBalancerStats(stats);    rule.setLoadBalancer(this);    ...}

2) 负载均衡策略初始化

重点功能: 1. 默认使用轮询策略

BaseLoadBalancer.java

public void setRule(IRule rule) {    if (rule != null) {        this.rule = rule;    } else {        /* default rule */        // 默认使用轮询策略        this.rule = new RoundRobinRule();    }    if (this.rule.getLoadBalancer() != this) {        this.rule.setLoadBalancer(this);    }}

RoundRobinRule.java

private AtomicInteger nextServerCyclicCounter;public Server choose(ILoadBalancer lb, Object key) {    if (lb == null) {        log.warn("no load balancer");        return null;    }    Server server = null;    int count = 0;    while (server == null && count++ < 10) {        List<Server> reachableServers = lb.getReachableServers();        List<Server> allServers = lb.getAllServers();        int upCount = reachableServers.size();        int serverCount = allServers.size();        if ((upCount == 0) || (serverCount == 0)) {            log.warn("No up servers available from load balancer: " + lb);            return null;        }        // 轮询重点算法        int nextServerIndex = incrementAndGetModulo(serverCount);        server = allServers.get(nextServerIndex);        if (server == null) {            /* Transient. */            Thread.yield();            continue;        }        if (server.isAlive() && (server.isReadyToServe())) {            return (server);        }        // Next.        server = null;    }    if (count >= 10) {        log.warn("No available alive servers after 10 tries from load balancer: "                + lb);    }    return server;}private int incrementAndGetModulo(int modulo) {    for (;;) {        int current = nextServerCyclicCounter.get();        int next = (current + 1) % modulo;        if (nextServerCyclicCounter.compareAndSet(current, next))            return next;    }}

3) 初始化取服务注册列表调度策略

重点功能: 1. 设置轮询间隔为30s 一次

注意: 这里没有做实际的Ping,只是获取缓存的注册列表的alive服务,原因是为了提高性能

BaseLoadBalancer.java

public void setPing(IPing ping) {    if (ping != null) {        if (!ping.equals(this.ping)) {            this.ping = ping;            setupPingTask(); // since ping data changed        }    } else {        this.ping = null;        // cancel the timer task        lbTimer.cancel();    }}void setupPingTask() {    if (canSkipPing()) {        return;    }    if (lbTimer != null) {        lbTimer.cancel();    }    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,            true);    // 这里虽然默认设置是10s一次,但是在初始化的时候,设置了30s一次    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);    forceQuickPing();}class Pinger {    private final IPingStrategy pingerStrategy;    public Pinger(IPingStrategy pingerStrategy) {        this.pingerStrategy = pingerStrategy;    }    public void runPinger() throws Exception {        if (!pingInProgress.compareAndSet(false, true)) {             return; // Ping in progress - nothing to do        }                // we are "in" - we get to Ping        Server[] allServers = null;        boolean[] results = null;        Lock allLock = null;        Lock upLock = null;        try {            /*             * The readLock should be free unless an addServer operation is             * going on...             */            allLock = allServerLock.readLock();            allLock.lock();            allServers = allServerList.toArray(new Server[allServerList.size()]);            allLock.unlock();            int numCandidates = allServers.length;            results = pingerStrategy.pingServers(ping, allServers);            final List<Server> newUpList = new ArrayList<Server>();            final List<Server> changedServers = new ArrayList<Server>();            for (int i = 0; i < numCandidates; i++) {                boolean isAlive = results[i];                Server svr = allServers[i];                boolean oldIsAlive = svr.isAlive();                svr.setAlive(isAlive);                if (oldIsAlive != isAlive) {                    changedServers.add(svr);                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}",                         name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));                }                if (isAlive) {                    newUpList.add(svr);                }            }            upLock = upServerLock.writeLock();            upLock.lock();            upServerList = newUpList;            upLock.unlock();            notifyServerStatusChangeListener(changedServers);        } finally {            pingInProgress.set(false);        }    }}private static class SerialPingStrategy implements IPingStrategy {    @Override    public boolean[] pingServers(IPing ping, Server[] servers) {        int numCandidates = servers.length;        boolean[] results = new boolean[numCandidates];        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);        for (int i = 0; i < numCandidates; i++) {            results[i] = false; /* Default answer is DEAD. */            try {                // NOTE: IFF we were doing a real ping                // assuming we had a large set of servers (say 15)                // the logic below will run them serially                // hence taking 15 times the amount of time it takes                // to ping each server                // A better method would be to put this in an executor                // pool                // But, at the time of this writing, we dont REALLY                // use a Real Ping (its mostly in memory eureka call)                // hence we can afford to simplify this design and run                // this                // serially                if (ping != null) {                    results[i] = ping.isAlive(servers[i]);                }            } catch (Exception e) {                logger.error("Exception while pinging Server: '{}'", servers[i], e);            }        }        return results;    }}

4) 最后拼接完整URL使用JDK-HttpURLConnection进行调用

SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {    Request request = this.targetRequest(template);    if (this.logLevel != Level.NONE) {        this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);    }    long start = System.nanoTime();    Response response;    try {        response = this.client.execute(request, options);        response = response.toBuilder().request(request).requestTemplate(template).build();    } catch (IOException var13) {        if (this.logLevel != Level.NONE) {            this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));        }        throw FeignException.errorExecuting(request, var13);    }    ...}

LoadBalancerFeignClient.java

@Overridepublic Response execute(Request request, Request.Options options) throws IOException {    try {        URI asUri = URI.create(request.url());        String clientName = asUri.getHost();        URI uriWithoutHost = cleanUrl(request.url(), clientName);        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(                this.delegate, request, uriWithoutHost);        IClientConfig requestConfig = getClientConfig(options, clientName);        return lbClient(clientName)                .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();    }    catch (ClientException e) {        IOException io = findIOException(e);        if (io != null) {            throw io;        }        throw new RuntimeException(e);    }}

AbstractLoadBalancerAwareClient.java

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);    try {        return command.submit(            new ServerOperation<T>() {                @Override                public Observable<T> call(Server server) {                    // 获取真实访问URL                    URI finalUri = reconstructURIWithServer(server, request.getUri());                    S requestForServer = (S) request.replaceUri(finalUri);                    try {                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));                    }                     catch (Exception e) {                        return Observable.error(e);                    }                }            })            .toBlocking()            .single();    } catch (Exception e) {        Throwable t = e.getCause();        if (t instanceof ClientException) {            throw (ClientException) t;        } else {            throw new ClientException(e);        }    }    }

FeignLoadBalancer.java

@Overridepublic RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)        throws IOException {    Request.Options options;    if (configOverride != null) {        RibbonProperties override = RibbonProperties.from(configOverride);        options = new Request.Options(override.connectTimeout(this.connectTimeout),                override.readTimeout(this.readTimeout));    }    else {        options = new Request.Options(this.connectTimeout, this.readTimeout);    }    Response response = request.client().execute(request.toRequest(), options);    return new RibbonResponse(request.getUri(), response);}

feign.Client.java

@Overridepublic Response execute(Request request, Options options) throws IOException {  HttpURLConnection connection = convertAndSend(request, options);  return convertResponse(connection, request);}Response convertResponse(HttpURLConnection connection, Request request) throws IOException {  // 使用HttpURLConnection 真实进行Http调用  int status = connection.getResponseCode();  String reason = connection.getResponseMessage();  if (status < 0) {    throw new IOException(format("Invalid status(%s) executing %s %s", status,        connection.getRequestMethod(), connection.getURL()));  }  Map<String, Collection<String>> headers = new LinkedHashMap<>();  for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {    // response message    if (field.getKey() != null) {      headers.put(field.getKey(), field.getValue());    }  }  Integer length = connection.getContentLength();  if (length == -1) {    length = null;  }  InputStream stream;  if (status >= 400) {    stream = connection.getErrorStream();  } else {    stream = connection.getInputStream();  }  return Response.builder()      .status(status)      .reason(reason)      .headers(headers)      .request(request)      .body(stream, length)      .build();}

拓展干货阅读:一线大厂面试题、高并发等主流技术资料

以上就是Spring Cloud Feign内部实现代码细节的详细内容,更多关于Spring Cloud Feign的资料请关注其它相关文章!

有事者,事竟成;破釜沉舟,百二秦关终归楚;苦心人,

Spring Cloud Feign内部实现代码细节

相关文章:

你感兴趣的文章:

标签云: