admin 1 年之前
父节点
当前提交
9dd9ba6ac0

+ 102 - 0
canalSpringBoot/pom.xml

@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>canalSpringBoot</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.2.2.RELEASE</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>2.0.7</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>cn.gjing</groupId>
+            <artifactId>tools-common</artifactId>
+            <version>1.0.4</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.gjing</groupId>
+            <artifactId>tools-starter-swagger</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-dbutils</groupId>
+            <artifactId>commons-dbutils</artifactId>
+            <version>1.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.example</groupId>
+            <artifactId>canalSpringBoot</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>RELEASE</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 15 - 0
canalSpringBoot/src/main/java/org/example/Main.java

@@ -0,0 +1,15 @@
+package org.example;
+
+import cn.gjing.core.EnableSwagger;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cache.annotation.EnableCaching;
+
+@SpringBootApplication
+@EnableSwagger
+@EnableCaching
+public class Main {
+    public static void main(String[] args) {
+        SpringApplication.run(Main.class, args);
+    }
+}

+ 15 - 0
canalSpringBoot/src/main/resources/application.properties

@@ -0,0 +1,15 @@
+# ????
+server.port=8080
+# ???
+spring.application.name=canal-client
+# mysql?????
+spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
+spring.datasource.url=jdbc:mysql://localhost:3306/canal?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
+spring.datasource.username=root
+spring.datasource.password=5608095liyukun
+
+# ??????
+canal.client.instances.example.host=127.0.0.1
+canal.client.instances.example.port=11111
+
+canal.instance.filter.regex=canal.canal_test

+ 404 - 0
canalSpringBoot/src/test/java/CanalTest.java

@@ -0,0 +1,404 @@
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
+import org.example.Main;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.util.StringUtils;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+@SpringBootTest(classes = Main.class)
+public class CanalTest {
+
+    CanalConnector canalConnector = CanalConnectors.newSingleConnector(
+            new InetSocketAddress("127.0.0.1", 11111),
+            "example",
+            "",
+            ""
+    );
+
+    /**
+     * 一个简单的canal 的连接测试程序
+     */
+    @Test
+    public void connectionTest() {
+        //1. 创建连接  填充对应的地址信息 ,要监控的实例和相应的用户名和密码
+        //2. 进行连接
+        canalConnector.connect();
+        log.info(">>>连接成功:{}", canalConnector);
+    }
+
+    /**
+     * 获取数据信息. 可以发现,未获取到数据 .  这个应该是实时的.
+     */
+    @Test
+    public void getDataTest() {
+        // 进行连接
+        canalConnector.connect();
+
+        //3. 注册,看使用哪个数据库表
+        canalConnector.subscribe("canal.canal_test");
+
+        //4. 获取 1条数据
+        Message message = canalConnector.get(1);
+        log.info("获取的数据:id:{},数据:{}", message.getId(), message);
+        if (message.getId() == -1) {
+            log.info(">>>未获取到数据");
+            return;
+        }
+        //5. 获取相应的数据集合
+        List<CanalEntry.Entry> entries = message.getEntries();
+        for (CanalEntry.Entry entry : entries) {
+            log.info(">>>获取数据 {}", entry);
+            //获取表名
+            CanalEntry.Header header = entry.getHeader();
+            log.info(">>>获取表名:{}", header.getTableName());
+            CanalEntry.EntryType entryType = entry.getEntryType();
+            log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
+
+            //获取数据
+            ByteString storeValue = entry.getStoreValue();
+            log.info(">>>输出存储的值:{}", storeValue);
+        }
+    }
+
+    /**
+     * 获取数据信息. 获取现在的数据.  再次执行时,就没有这个数据了.
+     */
+    @Test
+    public void getNowDataTest() {
+        // 进行连接
+        canalConnector.connect();
+
+        //3. 注册,看使用哪个数据库表
+        canalConnector.subscribe("canal.canal_test");
+        for (;;) {
+            //4. 获取 1条数据
+            Message message = canalConnector.get(1);
+            log.info("获取的数据:id:{},数据:{}", message.getId(), message);
+            if (message.getId() == -1) {
+                log.info(">>>未获取到数据");
+                try {
+                    TimeUnit.MILLISECONDS.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                continue;
+            }
+            //5. 获取相应的数据集合
+            List<CanalEntry.Entry> entries = message.getEntries();
+            for (CanalEntry.Entry entry : entries) {
+                log.info(">>>获取数据 {}", entry);
+                //获取表名
+                CanalEntry.Header header = entry.getHeader();
+                log.info(">>>获取表名:{}", header.getTableName());
+                CanalEntry.EntryType entryType = entry.getEntryType();
+                log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
+
+                //获取数据
+                ByteString storeValue = entry.getStoreValue();
+                log.info(">>>输出存储的值:{}", storeValue);
+            }
+        }
+    }
+
+    /**
+     * 将 storeValue 进行解析,解析成我们能看懂的语句.
+     * 对数据库 cud 进行处理操作观看一下.
+     * 发现,点是不好的,也有多余的记录信息.
+     *
+     * @throws Exception 异常
+     */
+    @Test
+    public void convertDataTest() throws Exception {
+        //2. 进行连接
+        canalConnector.connect();
+        canalConnector.subscribe("canal.canal_test");
+
+        for (;;) {
+            //获取信息
+            Message message = canalConnector.get(1);
+            if (message.getId() == -1L) {
+                // log.info("未获取到数据");
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                continue;
+            }
+            List<CanalEntry.Entry> entryList = message.getEntries();
+            //对获取到的数据进行处理
+            log.info(">>获取到{}条数据", entryList.size());
+            for (CanalEntry.Entry entry : entryList) {
+                CanalEntry.Header header = entry.getHeader();
+                log.info(">>>获取表名:{}", header.getTableName());
+                //获取类型.
+                CanalEntry.EntryType entryType = entry.getEntryType();
+                log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name());
+
+                //获取存入日志的值
+                ByteString storeValue = entry.getStoreValue();
+                //将这个值进行解析
+                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
+                String sql = rowChange.getSql();
+                log.info(">>>获取对应的sql:{}", sql);
+                // 这个sql 可能是 批量的sql语句
+                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
+                for (CanalEntry.RowData rowData : rowDatasList) {
+                    log.info(">>>获取信息:{}", rowData);
+                    //对数据进行处理
+                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
+                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
+
+                    beforeColumnsList.forEach(
+                            n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(),
+                                    n.getValue(), n.getUpdated())
+                    );
+                    afterColumnsList.forEach(
+                            n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
+                    );
+                }
+            }
+        }
+
+    }
+
+    /**
+     * 类型转换数据
+     *
+     * @throws Exception 异常
+     */
+    @Test
+    public void dataTypeTest() throws Exception {
+        canalConnector.connect();
+        canalConnector.subscribe("canal.canal_test");
+
+        for(;;){
+            Message message = canalConnector.get(1);
+            if (message.getId() == -1) {
+                TimeUnit.SECONDS.sleep(1);
+                continue;
+            }
+
+            List<CanalEntry.Entry> entries = message.getEntries();
+            for (CanalEntry.Entry entry : entries) {
+                CanalEntry.EntryType entryType = entry.getEntryType();
+                //只要 RowData 数据类型的
+                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
+                    continue;
+                }
+                String tableName = entry.getHeader().getTableName();
+                log.info(">>>对表 {} 进行操作", tableName);
+                ByteString storeValue = entry.getStoreValue();
+                CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
+                //行改变
+                CanalEntry.EventType eventType = rowChange.getEventType();
+                switch (eventType) {
+                    case INSERT: {
+                        insertHandler(rowChange);
+                        break;
+                    }
+                    case UPDATE: {
+                        updateHandler(rowChange);
+                        break;
+                    }
+                    case DELETE: {
+                        deleteHandler(rowChange);
+                        break;
+                    }
+                    default: {
+                        break;
+                    }
+                }
+            }
+        }
+
+    }
+
+    private void deleteHandler(RowChange rowChange) {
+        log.info(">>>>执行删除的方法");
+        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
+        for (CanalEntry.RowData rowData : rowDatasList) {
+
+            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
+            for (CanalEntry.Column column : beforeColumnsList) {
+
+                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
+            }
+        }
+    }
+
+    private void updateHandler(RowChange rowChange) {
+        log.info(">>>执行更新的方法");
+        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
+        for (CanalEntry.RowData rowData : rowDatasList) {
+
+            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
+            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
+            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
+                    Collectors.toMap(
+                            CanalEntry.Column::getName,
+                            CanalEntry.Column::getValue
+                    )
+            );
+            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
+                    Collectors.toMap(
+                            CanalEntry.Column::getName,
+                            CanalEntry.Column::getValue
+                    )
+            );
+            beforeValueMap.forEach((column, beforeValue) -> {
+                String afterValue = afterValueMap.get(column);
+                Boolean update = beforeValue.equals(afterValue);
+                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
+                        !update);
+            });
+        }
+
+    }
+
+    /**
+     * 插入数据. 只有后的数据.
+     *
+     * @param rowChange 行改变
+     */
+    private void insertHandler(RowChange rowChange) {
+        log.info(">>>执行添加 的方法");
+        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
+        for (CanalEntry.RowData rowData : rowDatasList) {
+            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
+
+            for (CanalEntry.Column column : afterColumnsList) {
+                if (!StringUtils.hasText(column.getValue())) {
+                    continue;
+                }
+                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
+            }
+
+        }
+    }
+
+    /**
+     * 一次性获取多条数据。
+     * sql 执行多条。
+     */
+//    @Test
+//    public void dataMoreTest() throws Exception {
+//        canalConnector.connect();
+//        // 订阅哪个对象
+//        canalConnector.subscribe("canal.canal_test");
+//
+//        for (; ; ) {
+//
+//            // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
+//            Message message = canalConnector.get(3);
+//            if (message.getId() == -1) {
+//                // 未获取到数据
+//                continue;
+//            }
+//            List<CanalEntry.Entry> entries = message.getEntries();
+//            for (CanalEntry.Entry entry : entries) {
+//                CanalEntry.EntryType entryType = entry.getEntryType();
+//                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
+//                    continue;
+//                }
+//                String tableName = entry.getHeader().getTableName();
+//                log.info(">>>>对表{} 执行操作", tableName);
+//
+//                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+//                //对类型进行处理
+//                CanalEntry.EventType eventType = rowChange.getEventType();
+//                switch (eventType) {
+//                    case INSERT: {
+//                        insertHandler(rowChange);
+//                        break;
+//                    }
+//                    case UPDATE: {
+//                        updateHandler(rowChange);
+//                        break;
+//                    }
+//                    case DELETE: {
+//                        deleteHandler(rowChange);
+//                        break;
+//                    }
+//                    default: {
+//                        break;
+//                    }
+//                }
+//
+//            }
+//
+//        }
+//    }
+
+    /**
+     * 一次性获取多条数据。
+     * sql 执行多条。
+     */
+    @Test
+    public void dataMoreTest() throws Exception {
+        canalConnector.connect();
+        // 订阅哪个对象
+        canalConnector.subscribe("canal.canal_test");
+
+        for (; ; ) {
+
+            Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
+            if (message.getId() == -1) {
+                // 未获取到数据
+                TimeUnit.MILLISECONDS.sleep(500);
+                continue;
+            }
+            log.info(">>>>获取对应的 id: {}",message.getId());
+            List<CanalEntry.Entry> entries = message.getEntries();
+            for (CanalEntry.Entry entry : entries) {
+                CanalEntry.EntryType entryType = entry.getEntryType();
+                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
+                    continue;
+                }
+                String tableName = entry.getHeader().getTableName();
+                log.info(">>>>对表{} 执行操作", tableName);
+
+                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                //对类型进行处理
+                CanalEntry.EventType eventType = rowChange.getEventType();
+                switch (eventType) {
+                    case INSERT: {
+                        insertHandler(rowChange);
+                        break;
+                    }
+                    case UPDATE: {
+                        updateHandler(rowChange);
+                        break;
+                    }
+                    case DELETE: {
+                        deleteHandler(rowChange);
+                        break;
+                    }
+                    default: {
+                        break;
+                    }
+                }
+
+            }
+            //进行回滚
+            // canalConnector.rollback();
+
+            //确认ack 配置
+            canalConnector.ack(message.getId());
+        }
+    }
+
+}