论坛
解决 Canal Adater 同步 ES8,一不小心成了知名开源项目的贡献者?!
引用于 程序员子悠 在 2024年5月14日, 下午10:03真是一个意外之喜,之前写的 08 实操:基于 Canal Adapter 将 MySQL 库中广告数据同步到 ES8 在线索引中 写了怎么使用
Canal Adapter
进行MySQL
数据同步到ES8
。在这个文章中我提到了这么一个内容,官方自带的
ES8 Adapter
同步类不支持ES8
的TLS
认证,所以导致我们在部署ES8
集群的时候需要关闭这个安全功能。但是作为技术人员就是见不得功能被阉割,所以就拉取了源码,在原有的基础上进行改造支持了
TLS
认证。改完过后本地重新打包实现了功能,本着独乐乐不如众乐乐,就顺手提了一个
PR
,结果万万没想到,最近发现这个PR
被合并到主干了!!!就这样一不小心成了一个几万星的知名开源项目贡献者,大佬还在
PR
下面回复了一个tks
,突然发现自己和大佬也可以靠的这么近。问题描述
在没有修复的时候,启动了
canal
适配器过后,在进行MySQL
数据同步到ES8
的时候,出现下面的错误,这个错误的原因是因为 ES8 默认开启了安全认证,并且自带了签名证书。Canal Adapter
在适配ES8
的时候并没有支持这个功能,因此报错了。
2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target]; java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target]; at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60) at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104) at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83) at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890) at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118) at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538) at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269) at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208) at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97) ... 12 common frames omitted Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257) at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244) at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75) at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692) ... 23 common frames omitted Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431) at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214) at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186) at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270) at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316) at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ... 1 common frames omitted Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979) at sun.security.ssl.Handshaker$1.run(Handshaker.java:919) at sun.security.ssl.Handshaker$1.run(Handshaker.java:916) at java.security.AccessController.doPrivileged(Native Method) at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369) at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288) at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356) ... 9 common frames omitted Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) at sun.security.validator.Validator.validate(Validator.java:260) at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496) ... 17 common frames omitted Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280) at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382) ... 23 common frames omitted 2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed! Error sync and rollback, execute times: 13
解决方案
解决方案有两个:
部署搭建 ES
集群的时候,关闭这个安全证书的功能,对应ES
的配置是在elasticsearch.yml
里面的xpack.security.enabled
为false
,docker
部署的ES
需要进入的容器里面去进行修改,或者在容器启动的时候就配置。修改 canal adapter
的源码,兼容证书;这里主要讲一下方案 2,因为对于方案 1 需要取消
ES8
的安全功能,不推荐。修改源码,兼容 ES8 安全配置
拷贝证书
在使用
docker
安装和部署ES8
的时候,默认已经创建好了一个证书,我们需要将证书从容器中拷贝出来,命令如下
docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .
这里的
es01
是容器名称,根据自己的进行替换即可,拷贝出来的路径可以自行替换,记住在哪就行,后面会用到。修改代码
在
canal adapter
的源码中,找到下面这类,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection
将其中的构造方法改成下面这段
public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException { String caPath = properties.get("security.ca.path"); if (StringUtils.isNotEmpty(caPath)) { connectEsWithCa(hosts, properties, caPath); } else { connectEsWithoutCa(hosts, properties); } } private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) { Path caCertificatePath = Paths.get(caPath); try (InputStream is = Files.newInputStream(caCertificatePath)) { CertificateFactory factory = CertificateFactory.getInstance("X.509"); Certificate trustedCa = factory.generateCertificate(is); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null, null); trustStore.setCertificateEntry("ca", trustedCa); SSLContextBuilder sslContextBuilder = SSLContexts.custom() .loadTrustMaterial(trustStore, null); final SSLContext sslContext = sslContextBuilder.build(); HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); String nameAndPwd = properties.get("security.auth"); if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) { String[] nameAndPwdArr = nameAndPwd.split(":"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1])); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder.setSSLContext(sslContext); }); } restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build(); } catch (Exception e) { throw new RuntimeException(e); } } private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) { HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); String nameAndPwd = properties.get("security.auth"); if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) { String[] nameAndPwdArr = nameAndPwd.split(":"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1])); restClientBuilder.setHttpClientConfigCallback( httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true) .build(); }
简单说明
其中 connectEsWithoutCa
方法为原来的构造方法的实现;connectEsWithCa
方法为兼容了安全认证的方法构造方法实现;这两个方法的使用根据是否配置了 security.ca.path
属性来判断;而 security.ca.path
这个配置是在启动器的outerAdapters
的ES8
的properties
下,与security.auth
同级;代码修改到这里就结束了,下面看下如何使用
重新打包
修改好了代码过后,通过
maven
重新打包,打包出对应的es8
的jar
包即可。将编译打包后的 jar 重新复制到 canal 适配器的 plugin 目录下面,并且修改一下对应的名称跟下载下来的版本一致即可,比如我这边之前下载的 1.1.7 版本。
其中
client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7
是原来下载下来携带的jar
,client-adapter.es8x-1.1.7-jar-with-dependencies.jar
是我重新打包编译后的jar
。修改启动器的配置
前面讲到兼容代码的时候,我们使用了一个叫
security.ca.path
的配置,所以我们需要将前面拷贝的ca
证书路径,配置在这个属性上,即security.ca.path: /opt/canal/http_ca.crt
完整的配置如下所示
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer # kafka.bootstrap.servers: 127.0.0.1:9092 # kafka.enable.auto.commit: false # kafka.auto.commit.interval.ms: 1000 # kafka.auto.offset.reset: latest # kafka.request.timeout.ms: 40000 # kafka.session.timeout.ms: 30000 # kafka.isolation.level: read_committed # kafka.max.poll.records: 1000 # rocketMQ consumer # rocketmq.namespace: # rocketmq.namesrv.addr: 127.0.0.1:9876 # rocketmq.batch.size: 1000 # rocketmq.enable.message.trace: false # rocketmq.customized.trace.topic: # rocketmq.access.channel: # rocketmq.subscribe.filter: # rabbitMQ consumer # rabbitmq.host: # rabbitmq.virtual.host: # rabbitmq.username: # rabbitmq.password: # rabbitmq.resource.ownerId: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true username: root password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: es8 key: es-key hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # transport or rest security.auth: elastic:password security.ca.path: /opt/canal/http_ca.crt cluster.name: docker-cluster - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # druid.stat.enable: false # druid.stat.slowSqlMillis: 1000 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address # - name: phoenix # key: phoenix # properties: # jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver # jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db # jdbc.username: # jdbc.password:
配置好了证书路径过后,就可以正常启动和同步数据了,具体的实操也可以看对应的公众号文章下 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!,这里就不重复演示了。
总结
以前一直想着要参与一下开源项目,没想到这次也算是实现了一个小小的目标,其实这次纯属是一个意外之喜,原本只是在自己学习和研究
canal
的数据同步,然后发现了这个问题,最后就修复了一下,顺手提了一个PR
,没想到还真的被合并了,想想还是很激动的。这个事情告诉我们只要真正的去参与和使用并了解一个开源项目了过后,还是有机会贡献自己的代码的,哪怕只是一个很小的一部分,也算是为开源项目贡献了一份自己的绵薄之力。
另外最近也发现了另一个开源项目的一些小
bug
,回头再提交一下PR
,向着开源的道路继续前行。
真是一个意外之喜,之前写的 08 实操:基于 Canal Adapter 将 MySQL 库中广告数据同步到 ES8 在线索引中 写了怎么使用 Canal Adapter
进行 MySQL
数据同步到 ES8
。
在这个文章中我提到了这么一个内容,官方自带的 ES8 Adapter
同步类不支持 ES8
的 TLS
认证,所以导致我们在部署 ES8
集群的时候需要关闭这个安全功能。
但是作为技术人员就是见不得功能被阉割,所以就拉取了源码,在原有的基础上进行改造支持了 TLS
认证。
改完过后本地重新打包实现了功能,本着独乐乐不如众乐乐,就顺手提了一个 PR
,结果万万没想到,最近发现这个 PR
被合并到主干了!!!
就这样一不小心成了一个几万星的知名开源项目贡献者,大佬还在 PR
下面回复了一个 tks
,突然发现自己和大佬也可以靠的这么近。
问题描述
在没有修复的时候,启动了 canal
适配器过后,在进行 MySQL
数据同步到 ES8
的时候,出现下面的错误,这个错误的原因是因为 ES8 默认开启了安全认证,并且自带了签名证书。Canal Adapter
在适配 ES8
的时候并没有支持这个功能,因此报错了。
2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60)
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104)
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83)
at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118)
at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538)
at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269)
at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208)
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97)
... 12 common frames omitted
Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
... 23 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316)
at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:919)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:916)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288)
at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356)
... 9 common frames omitted
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
at sun.security.validator.Validator.validate(Validator.java:260)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496)
... 17 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
... 23 common frames omitted
2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed! Error sync and rollback, execute times: 13
解决方案
解决方案有两个:
-
部署搭建 ES
集群的时候,关闭这个安全证书的功能,对应ES
的配置是在elasticsearch.yml
里面的xpack.security.enabled
为false
,docker
部署的ES
需要进入的容器里面去进行修改,或者在容器启动的时候就配置。 -
修改 canal adapter
的源码,兼容证书;
这里主要讲一下方案 2,因为对于方案 1 需要取消 ES8
的安全功能,不推荐。
修改源码,兼容 ES8 安全配置
拷贝证书
在使用 docker
安装和部署 ES8
的时候,默认已经创建好了一个证书,我们需要将证书从容器中拷贝出来,命令如下
docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .
这里的 es01
是容器名称,根据自己的进行替换即可,拷贝出来的路径可以自行替换,记住在哪就行,后面会用到。
修改代码
在 canal adapter
的源码中,找到下面这类,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection
将其中的构造方法改成下面这段
public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
String caPath = properties.get("security.ca.path");
if (StringUtils.isNotEmpty(caPath)) {
connectEsWithCa(hosts, properties, caPath);
} else {
connectEsWithoutCa(hosts, properties);
}
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
Path caCertificatePath = Paths.get(caPath);
try (InputStream is = Files.newInputStream(caCertificatePath)) {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa = factory.generateCertificate(is);
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.setSSLContext(sslContext);
});
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
.build();
}
简单说明
-
其中 connectEsWithoutCa
方法为原来的构造方法的实现; -
connectEsWithCa
方法为兼容了安全认证的方法构造方法实现; -
这两个方法的使用根据是否配置了 security.ca.path
属性来判断; -
而 security.ca.path
这个配置是在启动器的outerAdapters
的ES8
的properties
下,与security.auth
同级;
代码修改到这里就结束了,下面看下如何使用
重新打包
修改好了代码过后,通过 maven
重新打包,打包出对应的 es8
的 jar
包即可。
将编译打包后的 jar 重新复制到 canal 适配器的 plugin 目录下面,并且修改一下对应的名称跟下载下来的版本一致即可,比如我这边之前下载的 1.1.7 版本。
其中 client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7
是原来下载下来携带的 jar
,client-adapter.es8x-1.1.7-jar-with-dependencies.jar
是我重新打包编译后的 jar
。
修改启动器的配置
前面讲到兼容代码的时候,我们使用了一个叫 security.ca.path
的配置,所以我们需要将前面拷贝的 ca
证书路径,配置在这个属性上,即 security.ca.path: /opt/canal/http_ca.crt
完整的配置如下所示
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
# kafka.bootstrap.servers: 127.0.0.1:9092
# kafka.enable.auto.commit: false
# kafka.auto.commit.interval.ms: 1000
# kafka.auto.offset.reset: latest
# kafka.request.timeout.ms: 40000
# kafka.session.timeout.ms: 30000
# kafka.isolation.level: read_committed
# kafka.max.poll.records: 1000
# rocketMQ consumer
# rocketmq.namespace:
# rocketmq.namesrv.addr: 127.0.0.1:9876
# rocketmq.batch.size: 1000
# rocketmq.enable.message.trace: false
# rocketmq.customized.trace.topic:
# rocketmq.access.channel:
# rocketmq.subscribe.filter:
# rabbitMQ consumer
# rabbitmq.host:
# rabbitmq.virtual.host:
# rabbitmq.username:
# rabbitmq.password:
# rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es8
key: es-key
hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # transport or rest
security.auth: elastic:password
security.ca.path: /opt/canal/http_ca.crt
cluster.name: docker-cluster
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# druid.stat.enable: false
# druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password:
配置好了证书路径过后,就可以正常启动和同步数据了,具体的实操也可以看对应的公众号文章下 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!,这里就不重复演示了。
总结
以前一直想着要参与一下开源项目,没想到这次也算是实现了一个小小的目标,其实这次纯属是一个意外之喜,原本只是在自己学习和研究 canal
的数据同步,然后发现了这个问题,最后就修复了一下,顺手提了一个 PR
,没想到还真的被合并了,想想还是很激动的。
这个事情告诉我们只要真正的去参与和使用并了解一个开源项目了过后,还是有机会贡献自己的代码的,哪怕只是一个很小的一部分,也算是为开源项目贡献了一份自己的绵薄之力。
另外最近也发现了另一个开源项目的一些小 bug
,回头再提交一下 PR
,向着开源的道路继续前行。