The SharePlex Post process can connect and write to a Kafka broker. The data is written as XML records that include the data definitions, the operation type, and the changed column values. This data is written as a sequential series of operations as they occurred on the source, which can then be posted in sequential order to a target database or consumed by an external process or program.
Note: For the platforms, datatypes and operations that are supported when using SharePlex to replicate to Kafka, see the SharePlex Release Notes.
When replicating data to Kafka, configure the source database and SharePlex on the source system as follows.
On the source system, enable PK/UK supplemental logging in the Oracle source database. SharePlex must have the Oracle key information to build an appropriate key on the target.
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;
On the source system, set the SP_OCT_USE_SUPP_KEYS parameter to a value of 1. This parameter directs SharePlex to use the columns set by Oracle's supplemental logging as the key columns when a row is updated or deleted. When both supplemental logging and this parameter are set, it ensures that SharePlex can always build a key and that the SharePlex key will match the Oracle key.
See the SharePlex Reference Guide for more information about this parameter.
On the source, create a SharePlex configuration file that specifies capture and routing information. The structure that is required in a configuration file varies, depending on your replication strategy, but this shows you the required syntax for routing data to Kafka.
Datasource:o.SID | ||
src_owner.table | !kafka[:tgt_owner.table] | host |
where:
host is the name of the target system.
Note: For more information, see Create configuration files.
Datasource:o.ora112
MY_SCHEMA.MY_TABLE !kafka:"MySchema2"."MyTable2" sysprod
These instructions configure the SharePlex Post process to connect to Kafka. You must have a running Kafka broker.
To configure post to Kafka
Issue the target command to configure posting to a Kafka broker and topic. The following are example commands.
sp_ctrl> target x.kafkaset kafka broker=localhost:9092
sp_ctrl> target x.kafka set kafka topic=shareplex
See View and change Kafka settings for command explanations and options.
To view current property settings for output to Kafka, use the following command:
target x.kafka show
To change a property setting, use the following command.
target x.kafka [queue queuename] set kafka property=value
where:
Table 12: Kafka target properties
Property | Input Value | Default |
---|---|---|
broker |
Required. The host and port number of the Kafka broker, or a comma delimited list of multiple brokers. This list is the bootstrap into the Kafka cluster. So long as Post can connect to one of these brokers, it will discover any other brokers in the cluster. |
localhost:9092 |
compression.code | Optional. Controls whether data is compressed in Kafka. Options are none, gzip or snappy. | None |
topic |
Required. The name of the target Kafka topic. This string may contain the special sequences %o or %t. The %o sequence is replaced by the owner name of the table that is being replicated. The %t sequence is replaced by the table name of the table that is being replicated. This feature may be used in conjunction with a Kafka server setting of auto.create.topics.enabled set to 'true'. Also view your server settings for default.replication.factor and num.partitions because these are used as defaults when topics are auto created. |
shareplex |
client_id |
Optional. A user-defined string that Post will send in each request to help trace calls. |
None |
partition |
Optional. One of the following:
|
None |
request.required.acks | Optional. This is a Kafka client parameter. By default it is set to a value of -1, which means all. Consult the Kafka documentation about this subject, because all really means all in-sync replicas. This parameter can be used in conjunction with the min.insync.replicas server parameter to tune behavior between availability and data consistency. Important: Is is possible for data to be lost between a Kafka producer (SharePlex in this case) and a Kafka cluster, depending on these settings. | None |
threshold_size |
Optional. The approximate network packet size*, in kilobytes, that Post sends to the Kafka broker. Notes:
|
10000KB |
* To avoid latency, if Post detects no more incoming messages, it sends the packet to Kafka immediately without waiting for the threshold to be satisfied.
If the Kafka process aborts suddenly, or if the machine that it is running on aborts, row changes may be written twice to the Kafka topic. The consumer must manage this by detecting and discarding duplicates.
Every record of every row-change operation in a transaction has the same transaction ID and is also marked with a sequence ID. These attributes are id and msgIdx, respectively, under the txn element in the XML output (see About the XML).
The transaction ID is the SCN at the time the transaction was committed, and the sequence ID is the index of the row change in the transaction. These two values are guaranteed to be the same if they are re-written to the Kafka topic in a recovery situation.
If desired, you can configure Post to include additional metadata with every row-change record by using the following command:
target x.kafka [queue queuename] set metadata property[, property]
Table 13: Optional metadata properties
Property | Description |
---|---|
time | The time the operation was applied on the source. |
userid | The ID of the database user that performed the operation. |
trans | The ID of the transaction that included the operation. |
size | The number of operations in the transaction. |
target x.kafka set metadata time, userid, trans, size
To reset the metadata
target x.kafka [queue queuename] reset metadata
To view the metadata
target x.kafka [queue queuename] show metadata
The XML format is separated into operation and schema "types" for easier consumption. They are actually the same when viewed from an XSD perspective and are not distinct types. The template XML represents all possible attributes and elements. The individual XML represents the bare minimum output for each supported operation.
After startup, the first time that Post writes a change record for any given table, it first writes a schema record for that table. Each schema record contains the table name and details of interest for each columns. A schema record is written only once for each table during a Post run, unless there is a change to that schema, and then a new schema record is written. If Post stops and starts, schema records are written again, once for each table as Post receives a change record for it.
<?xml version="1.0" encoding="UTF-8" ?> <?opentarget version="1.0" ?> <opentarget> <txn id="xs:integer" oracleTxnId="xs:string" commitTime="xs:dateTimeStamp" /> <tbl name="xs:string" utcOffset="xs:integer" <cmd ops="schema"> <schema> <col name="xs:string" xmlType="xs:string" key="xs:boolean" nullable="xs:boolean" length="xs:integer" /> </schema> </cmd> </tbl> </opentarget>
Table 14: Explanation of schema template (* = optional)
Element | Attribute | Description |
---|---|---|
txn |
|
Transaction metadata |
|
id |
ID of current transaction |
|
oracleTxnId * |
Oracle transaction ID |
|
commitTime* |
Transaction commit timestamp |
tbl |
|
Table metadata |
|
name |
Fully qualified name of the table |
|
utcOffset |
UTC offset in the log |
cmd |
|
Operation metadata (In the case of a schema, there are no operations.) |
|
ops |
Type of record generated for this table. For a schema, the value is schema. |
schema | Column metadata | |
col | Metadata for a column (One of these elements appears for every record in the table.) | |
|
name |
Name of the column |
|
xmlType |
XML data type |
|
key |
Key flag (true, false) |
|
nullable |
Nullable flag |
|
length |
Length of the column |
<?xml version="1.0" encoding="UTF-8" ?> <?opentarget version="1.1" ?> <opentarget> <txn id="xs:integer" msgIdx="xs:integer" msgTot="xs:integer" oracleTxnId="xs:string" commitTime="xs:dateTimeStamp" userId="xs:string" />
<tbl name="xs:string" <cmd ops="xs:string"> <row id="xs:string"> <col name="xs:string"></col> <lkup> <col name="xs:string"></col> </lkup> </row> </cmd> </tbl> </opentarget>
Table 15: Explanation of operation template (* = optional)
Element | Attribute | Description |
---|---|---|
txn | Transaction metadata for the operation | |
id | ID of current transaction | |
msgIdx | Index of current record in the transaction | |
msgTot* | Total number of messages in transaction | |
oracleTxnId * | Oracle transaction ID, taken from the System Change Number (SCN) | |
commitTime* | Transaction commit timestamp | |
userId * | User ID that performed the operation | |
tble | Table metadata | |
name | Fully qualified table name | |
cmd |
Operation metadata | |
ops | Operation type (insert, update, delete, truncate) | |
row | Metadata of the row that changed in the operation | |
id | Oracle ROWID | |
col | Change data for a column (One of these elements appears for every changed column in the operation.) | |
name | Column name with the after value for that column | |
lkup | Before image for use in update and delete operations | |
col | Before image of column (One of these elements appears for every changed column in the operation.) | |
name | Column name with the before value or the key value (depending on the operation) for that column |
Note: The id and msgIdx attributes together uniquely identify an operation.
See the SharePlex Release Notes for a chart that shows how Oracle datatypes are converted to XML.
This is the table for which the sample operations are generated.
SQL> desc products
Name |
Null? |
Type |
PRODUCT_ID |
NOT NULL |
NUMBER |
DESCRIPTION |
|
VARCHAR2(600) |
PRICE |
|
NUMBER |
insert into products values (230117, ‘Hamsberry vintage tee, cherry’, 4099); commit; update products set price=3599 where product_id=230117 and price=4099; commit; delete products where product_id=230117; commit; truncate table products;
<?xml version="1.0" encoding="UTF-8"?> <?opentarget version="1.1"?> <opentarget> <txn id="2218316945" commitTime="2014-10-10T13:18:43" userId="85" oracleTxnId="3.10.1339425" /> <tbl name="MFG.PRODUCTS" utcOffset="-5:00"> <cmd ops="schema"> <schema> <col name="PRODUCT_ID" xmlType="decimal" key="true" nullable="false" length="22" /> <col name="DESCRIPTION" xmlType="string" key="false" nullable="true" length="600" /> <col name="PRICE" xmlType="decimal" key="false" nullable="true" length="22" /> </schema> </cmd> </tbl> </opentarget>
<?xml version="1.0" encoding="UTF-8"?> <?opentarget version="1.1"?> <opentarget> <txn id="2218316945" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:18:43" userId="85" oracleTxnId="3.10.1339425" /> <tbl name="MFG.PRODUCTS"> <cmd ops="ins"> <row id="AAAmDbAAEAAApRrAAA"> <col name="PRODUCT_ID">230117</col> <col name="DESCRIPTION">Hamsberry vintage tee, cherry</col> <col name="PRICE">4099</col> </row> </cmd> </tbl> </opentarget>
<?xml version="1.0" encoding="UTF-8"?> <?opentarget version="1.1"?> <opentarget> <txn id="2218318728" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:19:12" userId="85" oracleTxnId="1.17.970754" /> <tbl name="MFG.PRODUCTS"> <cmd ops="upd"> <row id="AAAmDbAAEAAApRrAAA"> <col name="PRICE">3599</col> <lkup> <col name="PRODUCT_ID">230117</col> <col name="PRICE">4099</col> </lkup> </row> </cmd> </tbl> </opentarget>
<?xml version="1.0" encoding="UTF-8"?> <?opentarget version="1.1"?> <opentarget> <txn id="2218319446" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:19:25" userId="85" oracleTxnId="5.23.1391276" /> <tbl name="MFG.PRODUCTS"> <cmd ops="del"> <row id="AAAmDbAAEAAApRrAAA"> <lkup> <col name="PRODUCT_ID">230117</col> </lkup> </row> </cmd> </tbl> </opentarget>
<?xml version="1.0" encoding="UTF-8"?> <?opentarget version="1.1"?> <opentarget> <txn id="2218319938" commitTime="1988-01-01T00:00:00" userId="85" oracleTxnId="11.4.939801" /> <tbl name="MFG.PRODUCTS"> <cmd ops="trunc" /> </tbl> </opentarget>
© 2022 Quest Software Inc. ALL RIGHTS RESERVED. Feedback Terms of Use Privacy