【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的

首先你需要了解JAVA KEYSTORE

该SSL用于Avro Sink到Avro Source之间的数据传输 该场景主要用于分布式Flume之间的数据传输,从分散的各个flume agent到中心汇集节点的flume agent

下面来看下如何实现的?

Avro Sink SSL

在这个传输过程中,sink其实就相当于socket的client端了 flume源码中有个类NettyAvroRpcClient,该类中还有个内部类SSLCompressionChannelFactory 其中定义了如下属性:

private final boolean enableCompression; private final int compressionLevel; private final boolean enableSsl; private final boolean trustAllCerts; private final String truststore; private final String truststorePassword; private final String truststoreType; private final List excludeProtocols;

1、要使用SSL进行数据传输,首先要将ssl开关打开,true 2、truststore指定生成的keystore文件 3、truststorepassword指定密码(这里注意生成的keypass和storepass一定相同,否则报错)

KeyStore keystore = null;if (truststore != null) {if (truststorePassword == null) {throw new NullPointerException(“truststore password is null”);}InputStream truststoreStream = new FileInputStream(truststore);keystore = KeyStore.getInstance(truststoreType);keystore.load(truststoreStream, truststorePassword.toCharArray());}TrustManagerFactory tmf = TrustManagerFactory.getInstance(“SunX509”);// null keystore is OK, with SunX509 it defaults to system CA Certs// see http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManagertmf.init(keystore);managers = tmf.getTrustManagers();

该段代码就去加载了keystore文件 TrustManagerFactory是JDK原生的一个信任管理器工厂,每个新人管理器管理特定类型的由安全套接字使用的信任材料。信任材料是基于keystore或提供者特定源。 init方法通过证书授权源和相关的信任材料初始化此工厂 最后为此信任材料返回一个信任管理器

SSLContext sslContext = SSLContext.getInstance(“TLS”);sslContext.init(null, managers, null);SSLEngine sslEngine = sslContext.createSSLEngine();sslEngine.setUseClientMode(true);List<String> enabledProtocols = new ArrayList<String>();for (String protocol : sslEngine.getEnabledProtocols()) {if (!excludeProtocols.contains(protocol)) {enabledProtocols.add(protocol);}}sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));logger.info(“SSLEngine protocols enabled: ” +Arrays.asList(sslEngine.getEnabledProtocols()));pipeline.addFirst(“ssl”, new SslHandler(sslEngine));

1、返回指定协议的SSLContext对象 TLS安全传输层协议 2、初始化此上下文,初始化参数只有信任管理器 3、初始化SSLEngine,并指定引擎在握手时使用客户端模式

最终这个安全的Socket就建立起来了

Avro Source SSL

source我们可以认为是socket的server端,,打开连接后,等待客户端的连接

private static final String PORT_KEY = “port”; private static final String BIND_KEY = “bind”; private static final String COMPRESSION_TYPE = “compression-type”; private static final String SSL_KEY = “ssl”; private static final String IP_FILTER_KEY = “ipFilter”; private static final String IP_FILTER_RULES_KEY = “ipFilterRules”; private static final String KEYSTORE_KEY = “keystore”; private static final String KEYSTORE_PASSWORD_KEY = “keystore-password”; private static final String KEYSTORE_TYPE_KEY = “keystore-type”; private static final String EXCLUDE_PROTOCOLS = “exclude-protocols”;

以上Avro Source的一些配置属性

try {KeyStore ks = KeyStore.getInstance(keystoreType);ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());} catch (Exception ex) {throw new FlumeException(“Avro source configured with invalid keystore: ” + keystore, ex);}

从上面代码可以看出,source端在configure方法执行的时候就会load该keystore

if (enableSsl) {SSLEngine sslEngine = createServerSSLContext().createSSLEngine();sslEngine.setUseClientMode(false);List<String> enabledProtocols = new ArrayList<String>();for (String protocol : sslEngine.getEnabledProtocols()) {if (!excludeProtocols.contains(protocol)) {enabledProtocols.add(protocol);}}sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));logger.info(“SSLEngine protocols enabled: ” +Arrays.asList(sslEngine.getEnabledProtocols()));pipeline.addFirst(“ssl”, new SslHandler(sslEngine));}

注意这里的SSLEngine就配置了引擎在握手时使用的服务器模式 最终返回对象ChannelPipeline

以上所有内容可能理解起来比较费劲,大家不妨先来看看这篇文章 Channel与Pipeline这里写链接内容

SSL在flume中的使用

首先准备一个keystore文件 Sink配置

但我想说,我做了一个善良的平凡女子,并且一直在爱,

【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的

相关文章:

你感兴趣的文章:

标签云: