解决 Canal Adater 同步 ES8 证书问题,一不小心成了知名开源项目的贡献者?!

真是一个意外之喜,之前写的 08 实操:基于 Canal Adapter 将 MySQL 库中广告数据同步到 ES8 在线索引中 写了怎么使用 Canal Adapter 进行 MySQL 数据同步到 ES8

在这个文章中我提到了这么一个内容,官方自带的 ES8 Adapter 同步类不支持 ES8TLS 认证,所以导致我们在部署 ES8 集群的时候需要关闭这个安全功能。

但是作为技术人员就是见不得功能被阉割,所以就拉取了源码,在原有的基础上进行改造支持了 TLS 认证。

20240514

改完过后本地重新打包实现了功能,本着独乐乐不如众乐乐,就顺手提了一个 PR,结果万万没想到,最近发现这个 PR 被合并到主干了!!!

20240514-1

就这样一不小心成了一个几万星的知名开源项目贡献者,大佬还在 PR 下面回复了一个 tks,突然发现自己和大佬也可以靠的这么近。

20240514-2

问题描述

在没有修复的时候,启动了 canal 适配器过后,在进行 MySQL 数据同步到 ES8 的时候,出现下面的错误,这个错误的原因是因为 ES8 默认开启了安全认证,并且自带了签名证书。Canal Adapter 在适配 ES8 的时候并没有支持这个功能,因此报错了。

2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83)
 at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
 at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
 at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
 at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154)
 at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118)
 at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97)
 ... 12 common frames omitted
Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
 at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
 ... 23 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
 at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
 at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
 at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
 at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537)
 at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
 at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
 ... 1 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
 at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
 at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
 at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:919)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:916)
 at java.security.AccessController.doPrivileged(Native Method)
 at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356)
 ... 9 common frames omitted
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
 at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
 at sun.security.validator.Validator.validate(Validator.java:260)
 at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496)
 ... 17 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
 at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
 at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
 ... 23 common frames omitted
2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync and rollback, execute times: 13

解决方案

解决方案有两个:

  1. 部署搭建 ES 集群的时候,关闭这个安全证书的功能,对应 ES 的配置是在 elasticsearch.yml 里面的 xpack.security.enabled falsedocker 部署的 ES 需要进入的容器里面去进行修改,或者在容器启动的时候就配置。
  2. 修改 canal adapter 的源码,兼容证书;

这里主要讲一下方案 2,因为对于方案 1 需要取消 ES8 的安全功能,不推荐。

修改源码,兼容 ES8 安全配置

拷贝证书

在使用 docker 安装和部署 ES8 的时候,默认已经创建好了一个证书,我们需要将证书从容器中拷贝出来,命令如下

docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .

这里的 es01 是容器名称,根据自己的进行替换即可,拷贝出来的路径可以自行替换,记住在哪就行,后面会用到。

修改代码

canal adapter 的源码中,找到下面这类,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection

20240514-3

将其中的构造方法改成下面这段

public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
    String caPath = properties.get("security.ca.path");
    if (StringUtils.isNotEmpty(caPath)) {
        connectEsWithCa(hosts, properties, caPath);
    } else {
        connectEsWithoutCa(hosts, properties);
    }
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
    Path caCertificatePath = Paths.get(caPath);
    try (InputStream is = Files.newInputStream(caCertificatePath)) {
        CertificateFactory factory = CertificateFactory.getInstance("X.509");
        Certificate trustedCa = factory.generateCertificate(is);
        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(nullnull);
        trustStore.setCertificateEntry("ca", trustedCa);
        SSLContextBuilder sslContextBuilder = SSLContexts.custom()
        .loadTrustMaterial(trustStore, null);
        final SSLContext sslContext = sslContextBuilder.build();

        HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        String nameAndPwd = properties.get("security.auth");
        if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
            String[] nameAndPwdArr = nameAndPwd.split(":");
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                                               new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                return httpClientBuilder.setSSLContext(sslContext);
                });
            }
            restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
        } catch (Exception e) {
            throw new RuntimeException(e);
    }
}

private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
    HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
    RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
    String nameAndPwd = properties.get("security.auth");
    if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
        String[] nameAndPwdArr = nameAndPwd.split(":");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
        restClientBuilder.setHttpClientConfigCallback(
                httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    }
    restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
            .build();
}

简单说明

  1. 其中 connectEsWithoutCa 方法为原来的构造方法的实现;
  2. connectEsWithCa 方法为兼容了安全认证的方法构造方法实现;
  3. 这两个方法的使用根据是否配置了 security.ca.path 属性来判断;
  4. security.ca.path 这个配置是在启动器的 outerAdaptersES8properties 下,与 security.auth 同级;

代码修改到这里就结束了,下面看下如何使用

重新打包

修改好了代码过后,通过 maven 重新打包,打包出对应的 es8jar 包即可。

20240514-4

将编译打包后的 jar 重新复制到 canal 适配器的 plugin 目录下面,并且修改一下对应的名称跟下载下来的版本一致即可,比如我这边之前下载的 1.1.7 版本。

20240514-5
img

其中 client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7 是原来下载下来携带的 jarclient-adapter.es8x-1.1.7-jar-with-dependencies.jar 是我重新打包编译后的 jar

修改启动器的配置

前面讲到兼容代码的时候,我们使用了一个叫 security.ca.path 的配置,所以我们需要将前面拷贝的 ca 证书路径,配置在这个属性上,即 security.ca.path: /opt/canal/http_ca.crt

完整的配置如下所示

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
        - name: es8
          key: es-key
          hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
          properties:
            mode: rest # transport or rest
            security.auth: elastic:password
            security.ca.path: /opt/canal/http_ca.crt
            cluster.name: docker-cluster
        - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase

#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

配置好了证书路径过后,就可以正常启动和同步数据了,具体的实操也可以看对应的公众号文章下 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!,这里就不重复演示了。

总结

以前一直想着要参与一下开源项目,没想到这次也算是实现了一个小小的目标,其实这次纯属是一个意外之喜,原本只是在自己学习和研究 canal 的数据同步,然后发现了这个问题,最后就修复了一下,顺手提了一个 PR,没想到还真的被合并了,想想还是很激动的。

这个事情告诉我们只要真正的去参与和使用并了解一个开源项目了过后,还是有机会贡献自己的代码的,哪怕只是一个很小的一部分,也算是为开源项目贡献了一份自己的绵薄之力。

另外最近也发现了另一个开源项目的一些小 bug,回头再提交一下 PR,向着开源的道路继续前行。

评论