Dubbo源码阅读笔记3

### 扩展点加载(ExtensionLoader)

每一种类型的扩展点都有一个ExtensionLoader实例

    变量说明

    public class ExtensionLoader<T> {// dubbo服务扫描目录private static final String SERVICES_DIRECTORY = "META-INF/services/";// dubbo扩展点配置扫描目录(自定义扩展时使用此目录)private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";// dubbo内部扩展点配置扫描目录private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");// 缓存ExtensionLoaderprivate static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();// 缓存扩展点实例private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();// 前面是常量,以下是变量// ==============================// 当前扩展点的接口类型private final Class<?> type;// 对象工厂private final ExtensionFactory objectFactory;private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();// 该扩展点类型所有配置的实现类类型private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();// 配置中自适应扩展的注解信息private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();// 扩展点实例private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();// 自适应扩展点实例private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();// 自适应扩展点类型private volatile Class<?> cachedAdaptiveClass = null;// 默认扩展点的名private String cachedDefaultName;// 包装类类型private Set<Class<?>> cachedWrapperClasses;private volatile Throwable createAdaptiveInstanceError;private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();// ...}

    初始化

先从全局缓存里面取,如果取不到则新建

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {    if (type == null)        throw new IllegalArgumentException("Extension type == null");    if (!type.isInterface()) {        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");    }    if (!withExtensionAnnotation(type)) {        throw new IllegalArgumentException("Extension type(" + type +                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");    }    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);    if (loader == null) {        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);    }    return loader;}

ExtensionLoader构建方法,保存扩展点接口类型和对象工厂扩展点对象工厂也是从通过ExtensionLoader加载出来的

private ExtensionLoader(Class<?> type) {    this.type = type;    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());}
    获取扩展点实例

先从缓存中取,如果没有则开始创建Holder对象主要是上同步锁的时候用,锁在Holder级别,保证之后get和set方法原子性

public T getExtension(String name) {    if (name == null || name.length() == 0)        throw new IllegalArgumentException("Extension name == null");    if ("true".equals(name)) {        return getDefaultExtension();    }    Holder<Object> holder = cachedInstances.get(name);    if (holder == null) {        cachedInstances.putIfAbsent(name, new Holder<Object>());        holder = cachedInstances.get(name);    }    Object instance = holder.get();    if (instance == null) {        synchronized (holder) {            instance = holder.get();            if (instance == null) {                instance = createExtension(name);                holder.set(instance);            }        }    }    return (T) instance;}

createExtension是在同步块中调用的,所以不需要加synchroneized,是线程安全的

private T createExtension(String name) {    // 取出对应类型    Class<?> clazz = getExtensionClasses().get(name);    if (clazz == null) {        throw findException(name);    }    try {        // 从缓存的实例取出,如果没有则新建        T instance = (T) EXTENSION_INSTANCES.get(clazz);        if (instance == null) {            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());            instance = (T) EXTENSION_INSTANCES.get(clazz);        }                // 给实例注入属性        injectExtension(instance);                // 如果有配置包装类,则实例化包装类并注入属性        Set<Class<?>> wrapperClasses = cachedWrapperClasses;        if (wrapperClasses != null && wrapperClasses.size() > 0) {            for (Class<?> wrapperClass : wrapperClasses) {                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));            }        }        return instance;    } catch (Throwable t) {        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +                type + ")  could not be instantiated: " + t.getMessage(), t);    }}// 获取所有扩展点类型的map,如果缓存中没有就从配置文件中取出private Map<String, Class<?>> getExtensionClasses() {    Map<String, Class<?>> classes = cachedClasses.get();    if (classes == null) {        synchronized (cachedClasses) {            classes = cachedClasses.get();            if (classes == null) {                classes = loadExtensionClasses();                cachedClasses.set(classes);            }        }    }    return classes;}
private T injectExtension(T instance) {    try {        if (objectFactory != null) {            for (Method method : instance.getClass().getMethods()) {                // 只处理set开头,只有一个参数且是public的方法                if (method.getName().startsWith("set")                        && method.getParameterTypes().length == 1                        && Modifier.isPublic(method.getModifiers())) {                    Class<?> pt = method.getParameterTypes()[0];                    try {                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";                        // 从对象工厂中获取属性值,对象工厂中会递归注入值                        Object object = objectFactory.getExtension(pt, property);                        if (object != null) {                            method.invoke(instance, object);                        }                    } catch (Exception e) {                        logger.error("fail to inject via method " + method.getName()                                + " of interface " + type.getName() + ": " + e.getMessage(), e);                    }                }            }        }    } catch (Exception e) {        logger.error(e.getMessage(), e);    }    return instance;}

默认的对象工厂实现是AdaptiveExtensionFactory,其实就是SpringExtensionFactory和SpiExtensionFactory两个一起用。主要看SpiExtensionFactory实现可以看出这里进入了递归,直到相关扩展点全部加载完成

public <T> T getExtension(Class<T> type, String name) {    if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {        ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);        if (loader.getSupportedExtensions().size() > 0) {            return loader.getAdaptiveExtension();        }    }    return null;}

前面的代码是返回普通扩展点,接下来的是返回自适应扩展点,AdaptiveExtension自适应扩展点不同的地方在于,不是直接返回扩展点实现,而是通过字节码技术生成一个代理类,代理类会根据调用时的参数不同,再去选择不同的扩展点实现。也就是调用了获取扩展点的方法getExtension(name)

// 和普通扩展点基本一致public T getAdaptiveExtension() {    Object instance = cachedAdaptiveInstance.get();    if (instance == null) {        if (createAdaptiveInstanceError == null) {            synchronized (cachedAdaptiveInstance) {                instance = cachedAdaptiveInstance.get();                if (instance == null) {                    try {                        instance = createAdaptiveExtension();                        cachedAdaptiveInstance.set(instance);                    } catch (Throwable t) {                        createAdaptiveInstanceError = t;                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);                    }                }            }        } else {            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);        }    }    return (T) instance;}// 这里类型不是从getExtensionClasses中取而是getAdaptiveExtensionClassprivate T createAdaptiveExtension() {    try {        return injectExtension((T) getAdaptiveExtensionClass().newInstance());    } catch (Exception e) {        throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);    }}private Class<?> getAdaptiveExtensionClass() {    getExtensionClasses();    if (cachedAdaptiveClass != null) {        return cachedAdaptiveClass;    }    return cachedAdaptiveClass = createAdaptiveExtensionClass();}// 这里使用字节码技术,生成了代理类private Class<?> createAdaptiveExtensionClass() {    String code = createAdaptiveExtensionClassCode();    ClassLoader classLoader = findClassLoader();    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();    return compiler.compile(code, classLoader);}

createAdaptiveExtensionClassCode代码太长就不贴出来了这是其中一个扩展点生成的源代码,可以看出代码里根据url中的参数选择合适的扩展点实现这些用反射用动态代理也是可以做的,不过效率肯定没字节码好,这个可以学习下。

package com.alibaba.dubbo.rpc;import com.alibaba.dubbo.common.extension.ExtensionLoader;public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {    public void destroy() {        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");    }        public int getDefaultPort() {        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");    }    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {        if (arg0 == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );        if(extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {        if (arg1 == null)            throw new IllegalArgumentException("url == null");        com.alibaba.dubbo.common.URL url = arg1;        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );        if(extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.refer(arg0, arg1);    }}

为了方便理解下面上个小例子

新建一个maven项目,结构如下:

|--pom.xml|--src    |--main        |--java            |--com                |--serviceloader                    |--service                        |--Book.java                        |--Car.java                        |--English.java                        |--Honda.java                        |--Human.java                        |--Man.java                    |--ServiceLoader.java                    |--SPI.java        |--resources            |--config.properties

SPI注解,用来指定实现者

// SPI.java@Documented@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE})public @interface SPI {    String value() default "";}

建3个接口,并加上注解,设置默认实现者

@SPI("english")public interface Book {    String read();}@SPI("honda")public interface Car {    void driver(String name);}// 也可以是woman@SPI("man")public interface Human {    String sayHello();}

以及实现者

public class Man implements Human {    private Car car;    private Book book;    @Override    public String sayHello() {        return "hello man";    }    public Car getCar() {        return car;    }    public void setCar(Car car) {        this.car = car;    }    public Book getBook() {        return book;    }    public void setBook(Book book) {        this.book = book;    }}public class Woman implements Human {    @Override    public String sayHello() {        return "hello man";    }}public class Honda implements Car {    private Book book;    @Override    public void driver(String name) {        System.out.println("i am " + name);    }    public Book getBook() {        return book;    }    public void setBook(Book book) {        this.book = book;    }}public class English implements Book {    @Override    public String read() {        return "hello my name is denis";    }}

配置文件,用来配置实现者的类型

man=com.serviceloader.service.Manwoman=com.serviceloader.service.Womanenglish=com.serviceloader.service.Englishhonda=com.serviceloader.service.Honda

最后是服务加载器

public class ServiceLoader {    private static ConcurrentMap<Class<?>, Object> SERVICE_INSTANCES = new ConcurrentHashMap<>();    private static ConcurrentMap<String, Class<?>> SERVICE_CLASS;    @SuppressWarnings("unchecked")    public static <T> T get(Class<T> clazz) {        if (SERVICE_CLASS == null) {            SERVICE_CLASS = getServiceClass();        }        SPI spi = clazz.getAnnotation(SPI.class);        if (spi == null) {            throw new RuntimeException("不是SPI接口");        }        Class<?> targetClass = SERVICE_CLASS.get(spi.value());  // 这里可以根据其它配置更换实现者        if (targetClass == null) {            throw new RuntimeException("没有配置实现类型");        }        try {            T instance = (T) SERVICE_INSTANCES.get(clazz);            if (instance == null) {                SERVICE_INSTANCES.putIfAbsent(clazz, targetClass.newInstance());                instance = (T) SERVICE_INSTANCES.get(clazz);            }            injectExtension(instance);            return instance;        } catch (InstantiationException | IllegalAccessException e) {            throw new RuntimeException(e.getMessage());        }    }    /**     * 注入属性     *     * @param instance     * @param <T>     */    private static <T> void injectExtension(T instance) {        Method[] methods = instance.getClass().getMethods();        for (Method method : methods) {            if (method.getName().startsWith("set") && method.getName().length() > 3                    && method.getParameterTypes().length == 1                    && Modifier.isPublic(method.getModifiers())) {                try {                    Class<?> pt = method.getParameterTypes()[0];                    Object object = get(pt);  // 递归                    if (object != null) {                        method.invoke(instance, object);                    }                } catch (IllegalAccessException | InvocationTargetException e) {                    e.printStackTrace();                }            }        }    }    /**     * 从配置文件中取出实现者名称与类型对应map     *     * @return     */    private static ConcurrentMap<String,Class<?>> getServiceClass() {        try {            if (SERVICE_CLASS == null) {                synchronized (ServiceLoader.class) {                    if (SERVICE_CLASS == null) {                        SERVICE_CLASS = new ConcurrentHashMap<>();                        InputStream is = ServiceLoader.class.getClassLoader().getResourceAsStream("config.properties");                        Properties p = new Properties();                        p.load(is);                        Set<String> keys = p.stringPropertyNames();                        for (String key : keys) {                            Class<?> clazz = Class.forName(String.valueOf(p.get(key)));                            SERVICE_CLASS.putIfAbsent(key, clazz);                        }                    }                }            }        } catch (Exception e) {            e.printStackTrace();        }        return SERVICE_CLASS;    }    public static void main(String[] args) {        Human human = ServiceLoader.get(Human.class);        System.out.println("class : " + human.getClass().getName());        System.out.println(human.sayHello());        Car car = ServiceLoader.get(Car.class);        System.out.println("class : " + car.getClass().getName());        car.driver("大卡车");        Book book = ServiceLoader.get(Book.class);        System.out.println("class : " + book.getClass().getName());        System.out.println(book.read());    }}

运行后输出:

class : com.serviceloader.service.Manhello manclass : com.serviceloader.service.Hondai am 大卡车class : com.serviceloader.service.Englishhello my name is denis

平平淡淡才是真

Dubbo源码阅读笔记3

相关文章:

你感兴趣的文章:

标签云: