Tuesday, September 23, 2008

Steps to build Oracle Table level Streams?

Oracle Streams were introduced in oracle 9.2. Oracle Streams are a generic mechanism to replicate data between one database to another database. The processing of streams is divided into three main processes(Capture, Staging and Apply).

This article provides step by step instruction to build the three way streams. This article is tested in oracle 9.2.0.8. We have three database DB1, DB2, DB3. The steam is setup on dept table for scott schema on all three instance. Any changes(DML/DDL) made on any of the database will be replicate to other two database.

Instance Setup

The below init parmeters needs to be set on intance level for all three database(DB1,DB2,DB3).

ALTER SYSTEM SET JOB_QUEUE_PROCESSES = 20;
ALTER SYSTEM SET AQ_TM_PROCESSES = 2;
ALTER SYSTEM SET GLOBAL_NAMES = TRUE;
ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE = SPFILE;
ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE = SPFILE;
SHUTDOWN IMMEDIATE;
STARTUP;

Stream admin Setup on DB1

CONNECT SYS/password@db1 AS SYSDBA;


CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB1/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/

BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/

create user strmadmin identified by strmadmin
/

alter user strmadmin default tablespace LOGMNRTS
/

grant connect,resource,aq_administrator_role,dba to strmadmin
/

GRANT SELECT ANY DICTIONARY TO STRMADMIN;

GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;


BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

Stream admin Setup on DB2

CONNECT SYS/PASSWORD@DB2 AS SYSDBA;

CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB2/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/


BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/

create user strmadmin identified by strmadmin
/

alter user strmadmin default tablespace LOGMNRTS
/

grant connect,resource,aq_administrator_role,dba to strmadmin
/

GRANT SELECT ANY DICTIONARY TO STRMADMIN;

GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;


BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

Stream admin Setup on DB3

CONNECT SYS/password@db3 AS SYSDBA;

CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB3/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/

BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/

create user strmadmin identified by strmadmin
default tablespace LOGMNRTS
/

grant connect,resource,aq_administrator_role,dba to strmadmin
/

GRANT SELECT ANY DICTIONARY TO STRMADMIN;

GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;


BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

Define and start the Apply Process in DB1
connect strmadmin/strmadmin@db1;

CREATE DATABASE LINK DB2
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db2)
)
)'
/

CREATE DATABASE LINK DB3
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db3)
)
)'
/


begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/

begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB2',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB3',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/


BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB2',
apply_user => 'SCOTT');

dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB3',
apply_user => 'SCOTT');

END;
/

BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB2',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB3',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

END;
/


declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB2';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB2' );
end if;

select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB3';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB3' );
end if;

end;
/


Define and start the Apply Process in DB2

CONNECT strmadmin/strmadmin@DB2;

CREATE DATABASE LINK DB1
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db1)
)
)'
/


CREATE DATABASE LINK DB3
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db3)
)
)'
/


begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/

begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB1',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB3',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/


BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB1',
apply_user => 'SCOTT');

dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB3',
apply_user => 'SCOTT');

END;
/

BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB1',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB3',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

END;
/


declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB1';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB1' );
end if;

select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB3';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB3' );
end if;

end;
/


Define and start the Apply Process in DB3

CONNECT strmadmin/strmadmin@DB3;

CREATE DATABASE LINK DB1
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db1)
)
)'
/


CREATE DATABASE LINK DB2
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db2)
)
)'
/


begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/

begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB1',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB2',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/



BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB1',
apply_user => 'SCOTT');

dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB2',
apply_user => 'SCOTT');

END;
/

BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB1',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB2',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );

END;
/



declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB1';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB1' );
end if;

select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB2';

if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB2' );
end if;

end;
/

Setup Propagation Rule for DB1


CONNECT SYS/PASSWORD@DB1 AS SYSDBA
ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;

ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;

CONNECT STRMADMIN/STRMADMIN@DB1

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB2',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB2',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB3',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB3',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/

Setup Propagation Rule for DB2


CONNECT SYS/PASSWORD@DB2 AS SYSDBA

ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;

ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;


CONNECT STRMADMIN/STRMADMIN@DB2

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB1',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB1',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB3',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB3',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/

Setup Propagation Rule for DB3

CONNECT SYS/PASSWORD@DB3 AS SYSDBA

ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;


CONNECT STRMADMIN/STRMADMIN@DB3

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB1',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB1',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB2',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB2',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
Setup Instantiation SCN for DB1

CONNECT STRMADMIN/STRMADMIN@DB1

set echo on
set feedback on
set serverout on

declare
iscn number;
begin
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/

CONNECT STRMADMIN/STRMADMIN@DB2

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB1',
instantiation_scn => iscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB3

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB1',
instantiation_scn => iscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB1

EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Setup Instantiation SCN for DB2

CONNECT STRMADMIN/STRMADMIN@DB2

set echo on
set feedback on
set serverout on

declare
iscn number;
begin
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/

CONNECT STRMADMIN/STRMADMIN@DB1

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB2',
instantiation_scn => vscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB3

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB2',
instantiation_scn => vscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB2

EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Setup Instantiation SCN for DB3

CONNECT STRMADMIN/STRMADMIN@DB3

set echo on
set feedback on
set serverout on

declare
iscn number;
begin
iscn :=
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/

CONNECT STRMADMIN/STRMADMIN@DB1

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB3',
instantiation_scn => vscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB2

BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB3',
instantiation_scn => vscn);
END;
/

CONNECT STRMADMIN/STRMADMIN@DB3

EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Stream Testing

Scenario 1

Here is the records in dept table for DB1 instance
scott@DB1.US.ORACLE.COM> select * from dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

scott@DB1.US.ORACLE.COM>

Now insert one record in DB1 instance and check this record is replicated in DB2, DB3 instance.

scott@DB1.US.ORACLE.COM> insert into
2 dept values(50,'IT','HOUSTON');

1 row created.

scott@DB1.US.ORACLE.COM> COMMIT;

Commit complete.

scott@DB1.US.ORACLE.COM>

scott@DB2.US.ORACLE.COM> SELECT * FROM DEPT;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 IT HOUSTON

scott@DB2.US.ORACLE.COM>

scott@DB3.US.ORACLE.COM> SELECT * FROM DEPT;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 IT HOUSTON

scott@DB3.US.ORACLE.COM
>

Now the changes in DB1 is replicated to DB2, DB3.

Scenario 2

Let us update the record in DB2 and see the changes are replicating to DB1, DB3.

scott@DB2.US.ORACLE.COM> UPDATE dept
2 set dname='Testing'
3 where deptno=50;

1 row updated.

scott@DB2.US.ORACLE.COM> commit;

Commit complete.

scott@DB2.US.ORACLE.COM>

scott@DB1.US.ORACLE.COM> SELECT * FROM DEPT;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 Testing HOUSTON

scott@DB1.US.ORACLE.COM>

scott@DB3.US.ORACLE.COM> SELECT * FROM DEPT;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 Testing HOUSTON

scott@DB3.US.ORACLE.COM>

Now the changes in DB2 is replicated to DB1, DB3.

Scenario 3

Now delete one record in DB3 instance and check this record is replicated in DB1, DB2 instance.

scott@DB3.US.ORACLE.COM> delete dept
2 where deptno=50;

1 row deleted.

scott@DB3.US.ORACLE.COM> commit;

Commit complete.

scott@DB3.US.ORACLE.COM>
scott@DB1.US.ORACLE.COM> select * from dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

scott@DB1.US.ORACLE.COM>
scott@DB2.US.ORACLE.COM> select * from dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

scott@DB2.US.ORACLE.COM>

No comments:

Post a Comment