Chat now with support
Chat with Support

SharePlex 8.6.6 - Installation Guide

About this Guide Conventions used in this guide Introduction to SharePlex SharePlex pre-installation checklist Set up SharePlex in an Oracle cluster Set up SharePlex on Amazon Cloud Download SharePlex UNIX Installation and Setup Windows Installation and Setup Assign SharePlex users to security groups Basic SharePlex demonstrations Advanced SharePlex Demonstrations Solve Installation Problems Uninstall SharePlex SharePlex Utilities Appendix A: Advanced installer options Appendix B: Install SharePlex as root Appendix C: SharePlex installed items

Configure Replication to a Kafka target

Overview

The SharePlex Post process can connect and write to a Kafka broker. The data is written as XML records that include the data definitions, the operation type, and the changed column values. This data is written as a sequential series of operations as they occurred on the source, which can then be posted in sequential order to a target database or consumed by an external process or program.

Guidelines for posting to Kafka

  • A SharePlex Post process acts as a Kafka producer. A SharePlex Post process writes to a single topic with the default partition of 0 in a single broker. To support multiple topics or multiple brokers, configure multiple Post processes using named Post queues. For more information, see Configure named post queues.
  • The SharePlex Post process does not create topics. You must create the topic through Kafka.
  • For performance reasons, Post batches messages together before sending them to the Kafka broker. Tuning parameters are available to control the batch size. These can be used to balance performance with latency.

Note: For the platforms, datatypes and operations that are supported when using SharePlex to replicate to Kafka, see the SharePlex Release Notes.

Configure SharePlex on the source

When replicating data to Kafka, configure the source database and SharePlex on the source system as follows.

Enable supplemental logging

On the source system, enable PK/UK supplemental logging in the Oracle source database. SharePlex must have the Oracle key information to build an appropriate key on the target.

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;

Set SP_OCT_USE_SUPP_KEYS parameter

On the source system, set the SP_OCT_USE_SUPP_KEYS parameter to a value of 1. This parameter directs SharePlex to use the columns set by Oracle's supplemental logging as the key columns when a row is updated or deleted. When both supplemental logging and this parameter are set, it ensures that SharePlex can always build a key and that the SharePlex key will match the Oracle key.

See the SharePlex Reference Guide for more information about this parameter.

Configure replication

On the source, create a SharePlex configuration file that specifies capture and routing information. The structure that is required in a configuration file varies, depending on your replication strategy, but this shows you the required syntax for routing data to Kafka.

Datasource:o.SID
src_owner.table !kafka[:tgt_owner.table] host

where:

  • SID is the Oracle SID of the source Oracle database.
  • src_owner.table is the owner and name of the source table.
  • !kafka is a required keyword indicating SharePlex is posting to Kafka.
  • :tgt_owner.table is optional and specifies the owner and name of the target table. Use if either component is different from that of the source table. Allow no spaces between !kafka and :tgt_owner.table. Type case-sensitive names in the correct case and enclose them within double quotes, as in "MySchema"."MyTable"
  • host is the name of the target system.

Note:  For more information, see Create configuration files.

Source configuration example

Datasource:o.ora112

MY_SCHEMA.MY_TABLE !kafka:"MySchema2"."MyTable2" sysprod

Configure SharePlex on the target

These instructions configure the SharePlex Post process to connect to Kafka. You must have a running Kafka broker.

To configure post to Kafka

  1. Create a Kafka topic.
  2. Start sp_cop. (Do not activate the configuration yet.)
  3. Run sp_ctrl.
  4. Issue the target command to configure posting to a Kafka broker and topic. The following are example commands.

    sp_ctrl> target x.kafkaset kafka broker=localhost:9092

    sp_ctrl> target x.kafka set kafka topic=shareplex

    See View and change Kafka settings for command explanations and options.

View and change Kafka settings

To view current property settings for output to Kafka, use the following command:

target x.kafka show

To change a property setting, use the following command.

target x.kafka [queue queuename] set kafka property=value

where:

  • queue queuename is the name of a Post queue. Use this option if there are multiple Post processes.
  • property and value are shown in the following table.

Table 12: Kafka target properties

Property Input Value Default
broker

Required. The host and port number of the Kafka broker, or a comma delimited list of multiple brokers. This list is the bootstrap into the Kafka cluster. So long as Post can connect to one of these brokers, it will discover any other brokers in the cluster.

localhost:9092
compression.code Optional. Controls whether data is compressed in Kafka. Options are none, gzip or snappy. None
topic

Required. The name of the target Kafka topic.

This string may contain the special sequences %o or %t. The %o sequence is replaced by the owner name of the table that is being replicated. The %t sequence is replaced by the table name of the table that is being replicated. This feature may be used in conjunction with a Kafka server setting of auto.create.topics.enabled set to 'true'. Also view your server settings for default.replication.factor and num.partitions because these are used as defaults when topics are auto created.

shareplex
client_id

Optional. A user-defined string that Post will send in each request to help trace calls.

None
partition

Optional. One of the following:

  • A fixed partition number: Directs Post to post messages only to the specified partition number. For example, setting it to 0 directs Post to post only to partition 0. This option is suitable for use in testing or if the target has multiple channels of data posting to the same Kafka topic.
  • The keyword rotate: Directs Post to apply messages to all of the partitions of a topic in a round-robin fashion. The partition changes with each new message. For example if a topic has three partitions, the messages are posted to partitions 0,1,2,0,1,2, and so on in that order.
  • The keyword rotate trans: This is similar to the rotate option, except that the partition is incremented with each transaction rather than with each message. For example, if a topic has three partitions, the messages are posted to partition 0 until the commit, then to partition 1 until the commit, and so on in that order. This option is suitable if you are replicating multiple tables to a single topic. It allows you to distribute data across several partitions, while still preserving all of the operations of a transaction together in a single partition. This enables a consumer that reads from a single partition to receive a stream of complete transactions.
None
request.required.acks Optional. This is a Kafka client parameter. By default it is set to a value of -1, which means all. Consult the Kafka documentation about this subject, because all really means all in-sync replicas. This parameter can be used in conjunction with the min.insync.replicas server parameter to tune behavior between availability and data consistency. Important: Is is possible for data to be lost between a Kafka producer (SharePlex in this case) and a Kafka cluster, depending on these settings. None
threshold_size

Optional. The approximate network packet size*, in kilobytes, that Post sends to the Kafka broker.

Notes:

  • Maximum packet size is 128000 KB.
  • Packet size is approximate.
10000KB

* To avoid latency, if Post detects no more incoming messages, it sends the packet to Kafka immediately without waiting for the threshold to be satisfied.

Set recovery options

If the Kafka process aborts suddenly, or if the machine that it is running on aborts, row changes may be written twice to the Kafka topic. The consumer must manage this by detecting and discarding duplicates.

Every record of every row-change operation in a transaction has the same transaction ID and is also marked with a sequence ID. These attributes are id and msgIdx, respectively, under the txn element in the XML output (see About the XML).

The transaction ID is the SCN at the time the transaction was committed, and the sequence ID is the index of the row change in the transaction. These two values are guaranteed to be the same if they are re-written to the Kafka topic in a recovery situation.

If desired, you can configure Post to include additional metadata with every row-change record by using the following command:

target x.kafka [queue queuename] set metadata property[, property]

Table 13: Optional metadata properties

Property Description
time The time the operation was applied on the source.
userid The ID of the database user that performed the operation.
trans The ID of the transaction that included the operation.
size The number of operations in the transaction.

Example

target x.kafka set metadata time, userid, trans, size

To reset the metadata

target x.kafka [queue queuename] reset metadata

To view the metadata

target x.kafka [queue queuename] show metadata

About the XML

The XML format is separated into operation and schema "types" for easier consumption. They are actually the same when viewed from an XSD perspective and are not distinct types. The template XML represents all possible attributes and elements. The individual XML represents the bare minimum output for each supported operation.

After startup, the first time that Post writes a change record for any given table, it first writes a schema record for that table. Each schema record contains the table name and details of interest for each columns. A schema record is written only once for each table during a Post run, unless there is a change to that schema, and then a new schema record is written. If Post stops and starts, schema records are written again, once for each table as Post receives a change record for it.

Schema record template

<?xml version="1.0" encoding="UTF-8" ?>  
<?opentarget version="1.0" ?>  
<opentarget>  
    <txn  
        id="xs:integer"  
        oracleTxnId="xs:string"   
        commitTime="xs:dateTimeStamp" />  
    <tbl  
        name="xs:string"  
        utcOffset="xs:integer"
        <cmd ops="schema">  
            <schema>  
                <col  
                    name="xs:string"  
                    xmlType="xs:string"  
                    key="xs:boolean"  
                    nullable="xs:boolean"  
                    length="xs:integer"  
                  />  
            </schema>  
        </cmd>  
    </tbl>  
</opentarget>

Table 14: Explanation of schema template (* = optional)

Element Attribute Description

txn

 

Transaction metadata

 

id

ID of current transaction

 

oracleTxnId *

Oracle transaction ID

 

commitTime*

Transaction commit timestamp

tbl

 

Table metadata

 

name

Fully qualified name of the table

 

utcOffset

UTC offset in the log

cmd

 

Operation metadata (In the case of a schema, there are no operations.)

 

ops

Type of record generated for this table. For a schema, the value is schema.

schema   Column metadata
col   Metadata for a column (One of these elements appears for every record in the table.)

 

name

Name of the column

 

xmlType

XML data type

 

key

Key flag (true, false)

 

nullable

Nullable flag

 

length

Length of the column

Operation record template

<?xml version="1.0" encoding="UTF-8" ?>
<?opentarget version="1.1" ?>
<opentarget>
    <txn
        id="xs:integer" 
        msgIdx="xs:integer" 
        msgTot="xs:integer" 
        oracleTxnId="xs:string"
        commitTime="xs:dateTimeStamp"
        userId="xs:string" />
    <tbl 
        name="xs:string"
        <cmd ops="xs:string">
            <row id="xs:string">
                <col name="xs:string"></col>
                <lkup>
                    <col name="xs:string"></col>
                </lkup>
            </row>
        </cmd>
    </tbl>
</opentarget>

Table 15: Explanation of operation template (* = optional)

Element Attribute Description
txn   Transaction metadata for the operation
  id ID of current transaction
  msgIdx Index of current record in the transaction
  msgTot* Total number of messages in transaction
  oracleTxnId * Oracle transaction ID, taken from the System Change Number (SCN)
  commitTime* Transaction commit timestamp
  userId * User ID that performed the operation
tble   Table metadata
  name Fully qualified table name
cmd  

Operation metadata

  ops Operation type (insert, update, delete, truncate)
row   Metadata of the row that changed in the operation
  id Oracle ROWID
col   Change data for a column (One of these elements appears for every changed column in the operation.)
  name Column name with the after value for that column
lkup   Before image for use in update and delete operations
col   Before image of column (One of these elements appears for every changed column in the operation.)
  name Column name with the before value or the key value (depending on the operation) for that column

Note: The id and msgIdx attributes together uniquely identify an operation.

Supported datatypes

See the SharePlex Release Notes for a chart that shows how Oracle datatypes are converted to XML.

Sample XML records

Source table

This is the table for which the sample operations are generated.

SQL> desc products

Name

Null?

Type

PRODUCT_ID

NOT NULL

NUMBER

DESCRIPTION

 

VARCHAR2(600)

PRICE

 

NUMBER

Source DML operations
insert into products values (230117, ‘Hamsberry vintage tee, cherry’, 4099);
commit;
update products set price=3599 where product_id=230117 and price=4099;
commit;
delete products where product_id=230117;
commit;
truncate table products;
Schema record
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="2218316945" commitTime="2014-10-10T13:18:43" userId="85" oracleTxnId="3.10.1339425" />
<tbl name="MFG.PRODUCTS" utcOffset="-5:00">
<cmd ops="schema">
<schema>
<col name="PRODUCT_ID" xmlType="decimal" key="true" nullable="false" length="22" />
<col name="DESCRIPTION" xmlType="string" key="false" nullable="true" length="600" />
<col name="PRICE" xmlType="decimal" key="false" nullable="true" length="22" />
</schema>
</cmd>
</tbl>
</opentarget>
Insert record
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="2218316945" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:18:43" userId="85" oracleTxnId="3.10.1339425" />
<tbl name="MFG.PRODUCTS">
<cmd ops="ins">
<row id="AAAmDbAAEAAApRrAAA">
<col name="PRODUCT_ID">230117</col>
<col name="DESCRIPTION">Hamsberry vintage tee, cherry</col>
<col name="PRICE">4099</col>
</row>
</cmd>
</tbl>
</opentarget>
Update record
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="2218318728" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:19:12" userId="85" oracleTxnId="1.17.970754" />
<tbl name="MFG.PRODUCTS">
<cmd ops="upd">
<row id="AAAmDbAAEAAApRrAAA">
<col name="PRICE">3599</col>
<lkup>
<col name="PRODUCT_ID">230117</col>
<col name="PRICE">4099</col>
</lkup>
</row>
</cmd>
</tbl>
</opentarget>
Delete record
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="2218319446" msgIdx="1" msgTot="1" commitTime="2014-10-10T13:19:25" userId="85" oracleTxnId="5.23.1391276" />
<tbl name="MFG.PRODUCTS">
<cmd ops="del">
<row id="AAAmDbAAEAAApRrAAA">
<lkup>
<col name="PRODUCT_ID">230117</col>
</lkup>
</row>
</cmd>
</tbl>
</opentarget>
Truncate record
<?xml version="1.0" encoding="UTF-8"?>
<?opentarget version="1.1"?>
<opentarget>
<txn id="2218319938" commitTime="1988-01-01T00:00:00" userId="85" oracleTxnId="11.4.939801" />
<tbl name="MFG.PRODUCTS">
<cmd ops="trunc" />
</tbl>
</opentarget>
Related Documents