Prerequisite:
source side, turn off auto add and turn off all ddl replication (see step 2) and enable supplemental logging for pk/uk
Only add column, drop column and truncate are supported for open target DB.
Assuming Kafka is installed and running and install SharePlex 8.6.6, 9.0 or 9.1.
Starting from 9.1, SharePlex supports SQL Server source to Kafka.
SharePlex supports clustered Kafka brokers, allows posting to multiple partitions, control over partition assignment, multiple topics, and controls over topic naming. In SharePlex 9.0, it also supports JSON format in addition to the xml format.
Steps:
1. create topic for SharePlex in kafka without partition or with multiple partitions
For example, go to kafka /xx/xx/kafka_2.10-0.10.0.0/bin
To create a topic with one partition:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic splex
Created topic "splex".
To describe or show the new topic created:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic splex
Publish a 'hello' message to the new topic splex
./kafka-console-producer.sh --broker-list localhost:9092 --topic splex
hello
consume this message to make sure the topic is working
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic splex
hello
2. create config file in SharePlex
For example, to replicate demo_src table to Kafka
sp_ctrl (host1:2109)> view config test
datasource:o.orcl
splex.demo_src !kafka host1
Note: for SQL Server source, the config file looks like below. Replace DB_NAME below with your SQL Server database name.
datasource:r.DB_NAME
splex.demo_src !kafka host1
sp_ctrl (host1:2109)> activate config test
sp_ctrl (host1:2109)> stop capture
sp_ctrl (host1:2109)> set param SP_OCT_AUTOADD_ENABLE 0
sp_ctrl (host1:2109)> set param SP_OCT_REPLICATE_ALL_DDL 0
sp_ctrl (host1:2109)> set param SP_OCT_USE_SUPP_KEYS 1
sp_ctrl (host1:2109)> start capture
3. set up target config for Kafka for kafka broker and port#
sp_ctrl (host1:2109)> stop post
sp_ctrl (host1:2109) > target x.kafka set kafka broker=<IP address of kafka broker>:<port#>
If you have clustered broker, install the latest SharePlex release.
sp_ctrl (host1:2109)> target x.kafka set kafka broker=<IP address of cluster kafka broker1>:<port#>,<ip address of cluster kafka broker2>:<port#>,...........
If you are user an older version of Kafka such as 0.8.1, you need to set below parameters for working with clustered kafka brokers
sp_ctrl (host1:2109)> target x.kafka set kafka broker.version.fallback=0.8.1
If you are on Kafka 0.9.x or 0.10.x, then you don't need to set the above fallback parameter.
For example, I have a local kafka broker running on the same sever where SharePlex is running and port is 9092
sp_ctrl (host1:2109)> target x.kafka set kafka broker=localhost:9092
sp_ctrl (host1:2109)> start post
sp_ctrl (host1:2109)> target x.kafka show
Queue: (default)
parameters defining data formatting:
date = yyyy-MM-ddTHH:mm:ss
decimal = .
enotation = 14
record = xml
timestamp = yyyy-MM-ddTHH:mm:ss.ffffffffff
parameters for kafka target:
broker = localhost:9092
threshold_size = 10000
topic = shareplex
To change the topic for SharePlex target config
sp_ctrl (host1:2109)> stop post
sp_ctrl (host1:2109)> target x.kafka set kafka topic=splex
sp_ctrl (host1:2109)> target x.kafka show
Queue: (default)
parameters defining data formatting:
date = yyyy-MM-ddTHH:mm:ss
decimal = .
enotation = 14
record = xml
timestamp = yyyy-MM-ddTHH:mm:ss.ffffffffff
parameters for kafka target:
broker = localhost:9092
threshold_size = 10000
topic = splex
sp_ctrl (host1:2109)> start post
4. insert data on source
sqlplus splex/splex
insert into demo_src values ('test','test'','test');
commit;
5. Check data on kafka
For example,
kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic splex
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="9263778" />
<tbl name="SPLEX.DEMO_SRC" utcOffset="-7:00">
<cmd ops="schema">
<schema>
<col name="NAME" xmlType="string" key="true" nullable="true" length="30" />
<col name="ADDRESS" xmlType="string" key="true" nullable="true" length="60" />
<col name="PHONE#" xmlType="string" key="true" nullable="true" length="12" />
</schema>
</cmd>
</tbl>
</opentarget>
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="9263778" msgIdx="1" />
<tbl name="SPLEX09.DEMO_SRC">
<cmd ops="ins">
<row id="AAASvyAAEAAAFIrAAA">
<col name="NAME">test</col>
<col name="ADDRESS">test</col>
<col name="PHONE#">test</col>
</row>
</cmd>
</tbl>
</opentarget>
Support for multiple topics
- A single xpst can now post to multiple topics
- Topic is controlled via the “topic” target config settting
- In the topic setting ‘%o’ will be replaced by the table owner name; ‘%t’ will be replaced by the table name.
- So for example ‘%o_%t’ will create names like ‘SCOTT_TIGER’
- Customer may configure their Kafka server to auto-create topics
Note: You need to modify the Kafka config/server.properties to add “auto.create.topics.enable=true” and restart Kafka broker to enable auto create topics in Kafka.
Example below will auto create topic like owner_tablename:
sp_ctrl> target x.kafka queue p3 set kafka topic=%o_%t
stop and restart post for this change to take effect
Support for multiple partitions
- Partition is controlled via the “partition” target config settings
- Options are:
- A fixed number – Data will be sent to a single partition.
- The keyword ‘rotate’ – Each operation will be sent to the next partition.
- The keyword ‘rotate trans’ – Each transaction will be sent to the next partition.
- Partition is controlled via the “partition” target config settings
Example:
Sp_ctrl> target x.kafka queue p1 set kafka partition=rotate trans
Sp_ctrl> target x.kafka queue p2 set kafka partition=rotate
stop and restart post for this change to take effect
Improved cluster support
- New ‘request.required.acks’ parameter setting. Default is -1 which means all in-sync replicas. This helps avoid data loss.
- Xpst behavior is improved when broker or cluster fails. Xpst will retry and then exit and re-spawn until broker is available.
JSON format support in 9.0
Example:
sp_ctrl > target x.kafka queue p1 set format record=json
stop and restart post for this change to take effect.
Meta data support
You can add metadata column such as time, userid, op, scn, rowid, trans, seq, host, queue, source, size, idx, posttime to the xml for JSON output. Use idx for the unique sequential seq# for messages within a transaction, do not use changeid which is meant for CDC.
posttime can be used with time to calculate the replication latency.
Example:
sp_ctrl> target x.kafka queue p1 set metadata time, userid, op, scn, rowid, trans, seq, queue, size, idx, posttime
Stop and restart post after the change
sp_ctrl > target x.kafka queue p1 show
Queue: p1
metadata to be updated on the target:
time: Time operation applied on source
userid: User ID that performed operation
op: Type of operation (INSERT, UPDATE, DELETE, ...)
scn: Source SCN when operation was applied
rowid: ROWID of row that changed
trans: Transaction ID for the operation
seq: Operation order within transaction
queue: Queue name
size: Number of operations in the transaction
idx: Operation index in XML headers
posttime: Time operation was posted
parameters defining data formatting:
record = json
default:
decimal = .
enotation = 14
parameters for kafka target:
partition = rotate trans
topic = test8
default:
api.version.request = false
broker = 10.1.xx.xxx:9095,10.1.xx.yyy:9092
broker.version.fallback = 0.9.0
compression.codec = none
request.required.acks = -1
restart_timeout = 30
threshold_size = 10000
topic_params = request.timeout.ms=60000
parameters for JSON formatting:
default:
before = yes
commit = yes
ddl = yes
eol = yes
indent = 0
meta = yes
schema = yes
Json parameters explanation:
json before:
Used to control if the before image is included on updates.
json commit
Used to control if comment records are sent.
son ddl:
Controls if ALTER TABLE commands are sent. Alter commands supported include add column, drop column, set unused column.
json indent:
Controls level of indentation. If non-zero, line breaks are also added.
json meta
Controls if the “meta” section is included in the Json. Default is yes
json schema
When a new configuration is activated the schema information for the objects in replication may be sent. Also if a table has a DDL change, the new schema will be sent. Default is yes
Example with metadata in JSON format:
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"ins","scn":"142772824"," rowid":"AAAXR1AAEAAAAjjAAA","trans":"8.16.135158","seq":1,"queue":"p1","size":5, "table":"JESSICA.TEST","idx":"1/5","posttime":"2017-06-17T12:40:01"},"data
":{"ID":"1","NAME":"T "}}
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"ins","scn":"142772824"," rowid":"AAAXR1AAEAAAAjjAAB","trans":"8.16.135158","seq":2,"queue":"p1","size":5, "table":"JESSICA.TEST","idx":"2/5","posttime":"2017-06-17T12:40:01"},"data
":{"ID":"2","NAME":"T "}}
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"ins","scn":"142772824"," rowid":"AAAXR1AAEAAAAjjAAC","trans":"8.16.135158","seq":3,"queue":"p1","size":5, "table":"JESSICA.TEST","idx":"3/5","posttime":"2017-06-17T12:40:01"},"data
":{"ID":"3","NAME":"T "}}
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"ins","scn":"142772824"," rowid":"AAAXR1AAEAAAAjjAAD","trans":"8.16.135158","seq":4,"queue":"p1","size":5, "table":"JESSICA.TEST","idx":"4/5","posttime":"2017-06-17T12:40:01"},"data
":{"ID":"4","NAME":"T "}}
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"ins","scn":"142772824"," rowid":"AAAXR1AAEAAAAjjAAE","trans":"8.16.135158","seq":5,"queue":"p1","size":5, "table":"JESSICA.TEST","idx":"5/5","posttime":"2017-06-17T12:40:01"},"data
":{"ID":"5","NAME":"T "}}
{"meta":{"time":"2017-06-17T09:39:59","userid":89,"op":"commit","scn":"142772824 ","rowid":"AAAAAAAAAAAAAAAAAAA","trans":"8.16.135158","seq":0,"queue":"p1","size ":0,"table":"","idx":"0/0","posttime":"2017-06-17T12:40:01"}}