分享免费的编程资源和教程

网站首页 > 技术教程 正文

OGG同步到Kafka

goqiw 2025-01-12 13:54:20 技术教程 2 ℃ 0 评论

目的:测试使用OGG将数据单向同步到Kafka上。

简要说明:Kafka使用单节点单Broker部署;单独部署简单ZooKeeper;需要使用到JAVA1.8;OGG需要2个版本,一个for oracle版本,一个Big Data。

1 环境说明

服务器ip

作用

192.168.141.100(源端)

Oracle 11.2.0.4,OGG 12.2.0.1.1

192.168.141.107(目标端)

Zookeeper-3.6.2, Kafka_2.13-2.7.0,

OGG(Big Data)-12.3.2.1.1


软件

安装包

oracle

11.2.0.4

zookeeper

apache-zookeeper-3.6.2-bin.tar.gz

kafka

kafka_2.13-2.7.0.tgz

ogg for bigdata

OGG_BigData_Linux_x64_12.3.2.1.1.zip

ogg for oracle

fbo_ggs_Linux_x64_shiphome.zip

jdk

jdk-8u181-linux-x64.tar.gz

CentOS

CentOS-6.9-x86_64-bin-DVD1

2 Java 环境安装

[root@test01 ~]# tar -zxvf jdk-8u181-linux-x64.tar.gz -C /usr/local/

[root@test01 ~]# cd /usr/local/jdk1.8.0_181/bin/

[root@test01 bin]# ./java -version

java version "1.8.0_181"

Java(TM) SE Runtime Environment (build 1.8.0_181-b13)

Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

添加环境变量:(在文件最后添加)

vi /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_181

export PATH=$JAVA_HOME/bin:$PATH

3 ZOOKEEPER安装

3.1 ZooKeeper 简单安装

[root@test01 ~] tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -c /usr/local

[root@test01 ~] mv /usr/local/apache-zookeeper-3.6.2-bin /usr/local/apache-zookeeper-3.6.2

3.2 编辑参数文件zoo.cfg

[root@test01 ~] cd /usr/local/apache-zookeeper-3.6.2

进入Zookeeper的config目录下

拷贝zoo_sample.cfg文件重命名为zoo.cfg,然后修改dataDir属性,其他参数保持不变

[root@test01 conf]# cp zoo_sample.cfg zoo.cfg

[root@test01 conf]# vi zoo.cfg

# 数据的存放目录

dataDir=/usr/local/apache-zookeeper-3.6.2/zkdata

# 端口,默认就是2181

clientPort=2181

3.3 环境变量配置

[root@test01 ~] vi /etc/profile --在末尾添加

export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2

export JAVA_HOME=/usr/local/jdk1.8.0_181

export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

3.4 启动和停止zookeeper

[root@test01 bin]# cd /usr/local/apache-zookeeper-3.6.2/bin

[root@test01 bin]# ./zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

$ [root@test01 bin]# zkServer.sh stop

3.5 JPS检查进程

[root@test01 bin]# jps

1971 QuorumPeerMain

5645 Jps

4 Kafka安装

4.1 Kafka 单节点部署

[root@test01 ~]# tar -zxvf kafka_2.13-2.7.0.tgz -C /usr/local/

4.2 配置Kafka参数

进入kafka的config目录下,有一个server.properties,添加如下配置

[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/config

[root@test01 config]# vi server.properties

# broker的全局唯一编号,不能重复

broker.id=0

# 监听

#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://192.168.141.107:9092

advertised.listeners=PLAINTEXT://192.168.141.107:9092

# 日志目录

log.dirs=/usr/local/kafka_2.13-2.7.0/kafka-logs

# 配置zookeeper的连接(如果不是本机,需要改为ip或主机名)

#zookeeper.connect=localhost:2181

zookeeper.connect=192.168.141.107:2181

4.3 添加环境变量

[root@test01 config]#vi /etc/profile

export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2

export JAVA_HOME=/usr/local/jdk1.8.0_181

export KAFKA_HOME=/usr/local/kafka_2.13-2.7.0

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:

export PATH=$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

4.4 启动和停止Kafka

启动:

[root@test01 bin]# cd /usr/local/kafka_2.13-2.7.0/bin

[root@test01 bin]#./ kafka-server-start.sh $KAFKA_HOME/config/server.properties &

停止:

[root@test01 bin]# ./kafka-server-stop.sh

4.5 JPS进程查看

[root@test01 kafka_2.13-2.7.0]# jps

1971 QuorumPeerMain

2702 Jps

2287 Kafka

4.6 Kafka测试

[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/bin

创建topic

[root@test01 bin]#./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Created topic test.

查看所有的topic信息

[root@test01 bin]# ./kafka-topics.sh --list --zookeeper localhost:2181

test

查看到返回值’test’ 说明创建成功!

5. OGG源端安装

5.1解压安装包

[oracle@oracle01 ~]$unzip fbo_ggs_Linux_x64_shiphome.zip

5.2安装:(图形化安装略)

[oracle@oracle01 ~]$ cd fbo_ggs_Linux_x64_shiphome/Disk1

[oracle@oracle01 Disk1]$./runInstaller

.....此处略........

使用图形化安装界面安装完成后,目录直接已经生成好了!!


检查安装效果:

[oracle@oracle01 ogg12.2]$ ldd ggsci

linux-vdso.so.1 => (0x00007ffdb9efa000)

librt.so.1 => /lib64/librt.so.1 (0x0000003df0600000)

libdl.so.2 => /lib64/libdl.so.2 (0x0000003defa00000)

libgglog.so => /u01/ogg12.2/./libgglog.so (0x00007f02e8ccc000)

libggrepo.so => /u01/ogg12.2/./libggrepo.so (0x00007f02e8a5a000)

libdb-6.1.so => /u01/ogg12.2/./libdb-6.1.so (0x00007f02e8675000)

libggperf.so => /u01/ogg12.2/./libggperf.so (0x00007f02e8445000)

libggparam.so => /u01/ogg12.2/./libggparam.so (0x00007f02e733b000)

libicui18n.so.48 => /u01/ogg12.2/./libicui18n.so.48 (0x00007f02e6f4b000)

libicuuc.so.48 => /u01/ogg12.2/./libicuuc.so.48 (0x00007f02e6bca000)

libicudata.so.48 => /u01/ogg12.2/./libicudata.so.48 (0x00007f02e5405000)

libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003df0200000)

libxerces-c.so.28 => /u01/ogg12.2/./libxerces-c.so.28 (0x00007f02e4e3e000)

libantlr3c.so => /u01/ogg12.2/./libantlr3c.so (0x00007f02e4c25000)

libnnz11.so => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libnnz11.so (0x00007f02e4857000)

libclntsh.so.11.1 => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libclntsh.so.11.1 (0x00007f02e1ded000)

libggnnzitp.so => /u01/ogg12.2/./libggnnzitp.so (0x00007f02e1696000)

libm.so.6 => /lib64/libm.so.6 (0x0000003df0a00000)

libc.so.6 => /lib64/libc.so.6 (0x0000003defe00000)

/lib64/ld-linux-x86-64.so.2 (0x0000003def600000)

libstdc++.so.6 => /usr/lib64/libstdc++.so.6 (0x0000003df3200000)

libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x0000003df2e00000)

libnsl.so.1 => /lib64/libnsl.so.1 (0x0000003df2a00000)

libaio.so.1 => /lib64/libaio.so.1 (0x00007f02e1492000)


初始化目录:(图形化安装会生成目录)

GGSCI (oracle01) 2> create SUBDIRS

Creating subdirectories under current directory /u01/ogg12.2


Parameter files /u01/ogg12.2/dirprm: already exists

Report files /u01/ogg12.2/dirrpt: already exists

Checkpoint files /u01/ogg12.2/dirchk: already exists

Process status files /u01/ogg12.2/dirpcs: already exists

SQL script files /u01/ogg12.2/dirsql: already exists

Database definitions files /u01/ogg12.2/dirdef: already exists

Extract data files /u01/ogg12.2/dirdat: already exists

Temporary files /u01/ogg12.2/dirtmp: already exists

Credential store files /u01/ogg12.2/dircrd: already exists

Masterkey wallet files /u01/ogg12.2/dirwlt: already exists

Dump files /u01/ogg12.2/dirdmp: already exists

5.3 检查数据库配置和更改参数

5.3.1 源端数据需要处于归档模式,且开启supplemental log和force_logging

SQL> archive log list;

Database log mode Archive Mode

Automatic archival Enabled

Archive destination USE_DB_RECOVERY_FILE_DEST

Oldest online log sequence 8

Next log sequence to archive 10

Current log sequence 10

SQL> select name,supplemental_log_data_min,force_logging from v$database;

NAME SUPPLEMENTAL_LOG_DATA_MI FORCE_LOG

--------------------------- ------------------------ ---------

TDB01 YES YES


如果需要更改,使用下面语句:

alter database archivelog;

Alter database force logging;

alter database add supplemental log data;

ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION = TRUE SCOPE=BOTH;

5.3.2 创建OGG用户和单独表空间

create tablespace ts_ogg datafile '/u01/app/oracle/oradata/tdb01/ts_ogg01.dbf' size 200M AUTOEXTEND on extent management local segment space management auto;

create user ggusr identified by ggusr default tablespace ts_ogg;

为了方便直接给予DBA权限:

grant resource,connect,dba to ggusr;

5.3.3 配置测试用户

alter user scott identified by scott account unlock;

grant select_catalog_role to scott;

在scott用户下创建新的表,做测试表:

create table test_ogg(id int ,name varchar(20),primary key(id));

5.4 配置MGR

GGSCI (oracle01) 3> edit param mgr

PORT 7809

DYNAMICPORTLIST 7810-7909

--AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

注释:

PORT即mgr的默认监听端口;

DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;

AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;

PURGEOLDEXTRACTS即TRAIL文件的定期清理

5.5 添加复制附加日志信息

使用ogg用户ggusr登录

##GGSCI (oracle01) 5> dblogin userid ggusr, password ggusr

Successfully logged into database.


GGSCI (oracle01 as ggusr@tdb01) 6> info trandata scott.test_ogg

Logging of supplemental redo log data is disabled for table SCOTT.TEST_OGG.


GGSCI (oracle01 as ggusr@tdb01) 7> add trandata scott.test_ogg

Logging of supplemental redo data enabled for table SCOTT.TEST_OGG.

TRANDATA for scheduling columns has been added on table 'SCOTT.TEST_OGG'.

TRANDATA for instantiation CSN has been added on table 'SCOTT.TEST_OGG'.

5.6 配置EXTRACT抽取进程

GGSCI (oracle01) 18> edit param extkafka

extract extkafka

setenv (ORACLE_SID=tdb01)

setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)

Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")

userid ggusr,password ggusr

exttrail ./dirdat/ka

table scott.test_ogg;


添加进程:

--add extract extkafka,tranlog,begin now

绑定trail文件:

--add exttrail ./dirdat/ka,extract extkafka

5.7 配置PUMP投递进程(EXTRACT进程的另一种用法)

GGSCI (oracle01) 19> edit param pukafka

extract pukafka

setenv (ORACLE_SID=tdb01)

setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)

passthru

dynamicresolution

userid ggusr,password ggusr

rmthost 192.168.141.107,mgrport 7809

rmttrail ./dirdat/pa

table scott.test_ogg;


注释:

第一行指定extract进程名称;

passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;

dynamicresolution动态解析;

userid ogg,password ogg即OGG连接Oracle数据库的帐号密码

rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;

rmttrail即目标端trail文件存储位置以及名称


将本地文件和目标端文件绑定到PUMP进程中:

add extract pukafka,exttrailsource ./dirdat/ka

add rmttrail ./dirdat/pa,extract pukafka

5.8 配置define文件用来定义表之间的关系映射

oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,需要定义表之间的关系映射


GGSCI (oracle01) 1> edit param test_ogg

defsfile /u01/ogg12.2/dirdef/scott.test_ogg

userid ggusr,password ggusr

table scott.test_ogg;


#GGSCI (oracle01) 3> view param test_ogg

defsfile /u01/ogg12.2/dirdef/scott.test_ogg

userid ggusr,password ggusr

table scott.test_ogg;


在OGG主目录下执行(oracle用户):

./defgen paramfile dirprm/test_ogg.prm

---执行后会在/u01/ogg12.2/dirdef目录下生成相关文件scott.test_ogg


[oracle@oracle01 ogg12.2]$ ./defgen paramfile dirprm/test_ogg.prm

***********************************************************************

Oracle GoldenGate Table Definition Generator for Oracle

Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401

Linux, x64, 64bit (optimized), Oracle 11g on Dec 11 2015 21:37:21

Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.

Starting at 2021-03-10 11:02:19

***********************************************************************

Operating System Version:

Linux

Version #1 SMP Tue May 10 17:27:01 UTC 2016, Release 2.6.32-642.el6.x86_64

Node: oracle01

Machine: x86_64

soft limit hard limit

Address Space Size : unlimited unlimited

Heap Size : unlimited unlimited

File Size : unlimited unlimited

CPU Time : unlimited unlimited

Process id: 2602

**********************************************************************

** Running with the following parameters **

***********************************************************************

defsfile /u01/ogg12.2/dirdef/scott.test_ogg

userid ggusr,password ***

table scott.test_ogg;

Retrieving definition for SCOTT.TEST_OGG.

Definitions generated for 1 table in /u01/ogg12.2/dirdef/scott.test_ogg.


将生成的文件/u01/ogg12.2/dirdef/scott.test_ogg 拷贝到目标端ogg目录下的dirdef目录中!!!!!

scp /u01/ogg12.2/dirdef/scott.test_ogg root@192.168.141.107:/u01/ogg12.3/dirdef

6.O GG目标端(kafka)安装

6.1 确认zookeeper服务,kafka服务开启

[root@test01 dirdef]# jps

3760 Jps

1971 QuorumPeerMain

2287 Kafka

6.2 配置MGR

GGSCI (test01) 3> edit param mgr

PORT 7809

DYNAMICPORTLIST 7810-7909

--AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 ---暂时不要

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

6.3 配置checkpoint

checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。

GGSCI (test01) 14>edit param ./GLOBALS

CHECKPOINTTABLE ggusr.checkpoint

6.4 配置replicate进程

GGSCI (test01) 6> EDIT PARAMS rekafka

replicat rekafka

sourcedefs /u01/ogg12.3/dirdef/scott.test_ogg

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

map scott.test_ogg,target scott.test_ogg;


注释:

REPLICATE rekafka定义rep进程名称;

sourcedefs 即是从源服务器上复制过来的表映射文件;

TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;

REPORTCOUNT即复制任务的报告生成频率;

GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;

MAP即源端与目标端的映射关系

6.5 配置文件kafka.props

注意:配置时需要将注释删除不然会报错!!!

root@test01 ~]# cd /u01/ogg12.3/dirprm 编辑kafka.props

[root@test01 dirprm]# vi kafka.props

gg.handlerlist=kafkahandler //handler类型

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置

gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建

gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等

gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次

##gg.classpath=dirprm/:/u01/ogg12.3/:/u01/ogg12.3/lib/*

一定要有kafka安装的库文件,不然会一直报错

gg.classpath=dirprm/:/u01/ogg12.3/*:/u01/ogg12.3/lib/*:/u01/ogg12.3/ggjava/resources/lib/*:/usr/local/kafka_2.13-2.7.0/libs/*

6.6 编辑文件custom_kafka_producer.properties

注意:配置时需要将注释删除

[root@test01 dirprm]#vi custom_kafka_producer.properties

bootstrap.servers=192.168.141.107:9092 //kafkabroker的地址

acks=1

compression.type=gzip //压缩类型

reconnect.backoff.ms=1000 //重连延时

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=102400

linger.ms=10000

6.7 添加trail文件到replicate进程

GGSCI (test01) 14>add replicat rekafka exttrail ./dirdat/pa,checkpointtable ggusr.checkpoint

7. 测试

7.1 启动相关进程并检查

源端启动mgr ,extkafka, pukafka

Start mgr

Start extkafka

Start pukafka

GGSCI (oracle01) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt


MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:06

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:00


目标端启动mgr rekafka

Start mgr

Start rekafka

GGSCI (test01) 1> info all


Program Status Group Lag at Chkpt Time Since Chkpt


MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:07

7.2 数据插入,更改,和删除测试

在源端数据上,使用scott用户对表test_ogg做insert,update,delete操作

insert into test_ogg values(1,'test');

commit;

update test_ogg set name='zhangsan' where id=1;

commit;

delete from test_ogg where id=1;

Commit;


目标端查看topics是否创建:

./kafka-topics.sh --list --zookeeper 192.168.141.107:2181


[root@test01 bin]# ./kafka-topics.sh --list --zookeeper 192.168.141.107:2181

test_ogg ---文件中定义的名字, 出现说明同步正常


通过消费者查看数据是否同步:

启动kafka消费者,会在前台挂起一个实时进程,然后在源端对表进程操作,会实时显示在消费者端!!!

[root@test01 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.141.107:9092 --topic test_ogg --from-beginning

[root@test01 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.141.107:9092 --topic test_ogg --from-beginning

{"table":"SCOTT.TEST_OGG","op_type":"I","op_ts":"2021-03-10 12:27:13.417910","current_ts":"2021-03-10T12:27:20.018000","pos":"00000000000000001817","after":{"ID":1,"NAME":"test"}}


{"table":"SCOTT.TEST_OGG","op_type":"U","op_ts":"2021-03-10 12:32:36.417849","current_ts":"2021-03-10T12:32:43.324000","pos":"00000000000000001945","before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"zhangsan"}}


{"table":"SCOTT.TEST_OGG","op_type":"D","op_ts":"2021-03-10 15:19:28.414300","current_ts":"2021-03-10T15:19:35.464000","pos":"00000000000000002098","before":{"ID":1,"NAME":"zhangsan"}}


正常同步到Kafka,格式为json,

其中op_type代表操作类型,可配置或者使用默认的配置:

gg.handler.kafkahandler.format.insertOpKey = I

gg.handler.kafkahandler.format.updateOpKey = U

gg.handler.kafkahandler.format.deleteOpKey = D

before代表操作之前的数据,after代表操作后的数据!!

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表