Особенности:
репликация между базами данных Oracle и Не-Oracle через Heterogenous Services поддерживается только для шлюзов "Transparent Gateway", к которым не относится Gateway for ODBC. Проблема возникает при попытке применить захваченные процессом Streams Capture данные к таблице Postgres через Gateway из-за того, что Gateway for ODBC не поддерживает двухфазный commit.
Чтобы обойти проблему можно извлекать захваченные изменения из очереди в отдельном job-е и применять к таблице Postgres программно. Однако, чтобы можно было извлекать изменения непосредственно из очереди, процедура захвата должна быть настроена в синхронном режиме (SYNC_CAPTURE).
Порядок настройки:
1. Настроить доступ к PostgreSQL через ODBC
1.1. установить Postgres (64-битная версия)
1.2. Установить UnixODBC (тестировал версию unixODBC-2.3.0) для сборки необходимо установить параметры окружения:
PATH=/usr/bin:/usr/sfw/bin:/usr/ccs/bin:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/bin:/usr/ccs/bin:/usr/sbin:/usr/openwin/bin:/opt/oracle10/product/11.1.0/Db_1/bin
LD_LIBRARY_PATH=/usr/sfw/lib/64:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/lib
CFLAGS=-m64
запускать autogonfig:
>CFLAGS=-m64 ./configure
для сборки использовать gcc 3.4.3. из стандартной поставки Solaris 10 (версии, доступные для скачивания черех freesolaris, не содержат 64-битных библиотек)
1.3. Установить Postgres ODBC (тестировал версию psqlodbc-08.04.0200) переменные окружения - см. выше.
зарегистрировать Postgres ODBC в odbc.ini:
/usr/local/etc/odbcinst.ini: [PostgreSQL] Description = PostgreSQL driver Driver = /usr/local/lib/psqlodbcw.so FileUsage = 1
/usr/local/etc/odbc.ini [pgtest] Description = Test to Postgres Driver = PostgreSQL Trace = Yes TraceFile = sql.log Database = pgtest Servername = localhost UserName = myuser Password = myuser Port = 5432 Protocol = 6.4 ReadOnly = No RowVersioning = No ShowSystemTables = No ShowOidColumn = No FakeOidIndex = No QuotedId = No ConnSettings =
протестировать доступность Postgres через ODBC с помощью isql:
isql pgtest
если тест не проходит, более подробно сообщение об ошибке можно посмотреть с помощью нативного клиента postgres: /usr/local/pgsql/bin/psql -h <host name> -d <database name> -U <user>
в случае проблем с кодировкой можно поменять кодировку на стороне клиента с помощью команды
alter user <user> SET client_encoding to LATIN1; эта кодировка должна быть такой же, как указана при настройке Heterogenous Services в переменной HS_LANGUAGE
нужно иметь в виду, что есть bug:
DG4ODBC Using MySQL ODBC Driver Corrupts Multibyte Characters [ID 1209943.1] из-за которого невожможно работать через DG4ODBC со сторонними базами в unicode-кодировке.
2. Настроить доступ к PostgreSQL из Oracle через Gateway for ODBC
2.1. Удостовериться, что в RDBMS Oracle установлены параметры:
global_names=true job_queue_processes > 2 compatible > 10.2.0
2.2. Сконфигурировать Heterogenous Services:
2.2.1. сконфигурировать listener в $ORACLE_HOME/network/admin/listener.ora:
LHS = (ADDRESS_LIST = ( ADDRESS= (PROTOCOL=tcp) (HOST = blader) (PORT = 1526) ) )
SID_LIST_LHS = (SID_LIST = (SID_DESC = (GLOBAL_DBNAME = PGTEST.MYDOMAIN.RU) (SID_NAME = pgtest) (ORACLE_HOME = /opt/oracle10/product/11.1.0/Db_1) (PROGRAM = dg4odbc) (ENVS="LD_LIBRARY_PATH=/usr/sfw/lib/64:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/lib") ) )
2.2.2. сконфигурировать HS в $ORACLE_HOME/hs/admin/initpgtest.ora:
HS_FDS_CONNECT_INFO = pgtest HS_FDS_TRACE_LEVEL = DEBUG HS_FDS_SHAREABLE_NAME = /usr/local/lib/psqlodbcw.so HS_LANGUAGE=american_america.we8iso8859p1 HS_FDS_SUPPORT_STATISTICS=FALSE HS_DB_NAME=PGTEST HS_DB_DOMAIN=MYDOMAIN.RU set ODBCINI=/usr/local/etc/odbc.ini
2.3. создать DB LINK в БД Oracle
2.3.1. добавить запись в tnsnames.ora
pgtest = (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = blader) (PORT = 1526) ) (CONNECT_DATA = (SID = pgtest) ) (HS = OK) )
2.3.2. создать DB_LINK
CREATE PUBLIC DATABASE LINK dblink CONNECT TO "user" IDENTIFIED BY "password" USING 'tns_name_entry';
2.4. запустить listener: lsnrctl start LHS
2.5. протестировать доступность Postgres из Oracle
2.6. понизить уровень логирования в $ORACLE_HOME/hs/admin/initpgtest.ora:
HS_FDS_TRACE_LEVEL = 0
3. Настроить репликацию с использованием Streams
3.1. Если объекты репликации создаются в отдельной схеме, нужно дать соотв. права:
begin DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('SCHEMA_OWNER'); end;
3.2. Создать очередь anydata
begin DBMS_STREAMS_ADM.set_up_queue( 'MY_QUEUE_TABLE', 'tablespace NCC storage (initial 124M)', 'STREAMS_QUEUE', 'SCHEMA_OWNER'); end;
нужно увеличить количество попыток извлечения из очереди - у нас может быть недоступен канал и нельзя допустить, чтобы сообщения протухали
begin dbms_aqadm.alter_queue( queue_name => 'STREAMS_QUEUE', max_retries => 1000000, retry_delay => null, retention_time => null, auto_commit => false); end;
если требуется пересоздать таблицу, то вот команда: BEGIN DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'MY_QUEUE_TABLE'); END;
3.3. Сконфигурировать применение изменений (на живой таблице нужно делать перед конфигурированием захвата, а то будут ошибки при insert'ах)
BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'STREAMS_QUEUE', apply_name => 'TEST_APPLY', apply_captured => false); END;
если требуется удалить apply, то вот команда: BEGIN DBMS_APPLY_ADM.DROP_APPLY( apply_name => 'TEST_APPLY'); END;
3.4. Сконфигурировать захват изменений на таблице
begin dbms_streams_adm.add_table_rules( table_name => 'myschema.my_replicated_tab', streams_type => 'SYNC_CAPTURE', streams_name => 'TEST_CAPTURE', queue_name => 'STREAMS_QUEUE'); end; если требуется удалить правило, нужно сначала посмотреть его имя в DBA_STREAMS_RULES, а потом удалить: begin DBMS_STREAMS_ADM.REMOVE_RULE('SOME_RULE_NAME', 'CAPTURE', 'TEST_CAPTURE'); end;
3.5. Скомпилировать пакет для применения изменений:
-- пакет pkg_apply_pg применяет изменения в автономной транзакции -- Автономная транзакция нужна, чтобы исключить ошибку "ORA-02047 can not join distributed transaction in progress" create or replace package pkg_apply_pg is
procedure do_insert(p_val varchar2); procedure do_update(p_key varchar2, p_val varchar2); procedure do_delete(p_key varchar2);
end pkg_apply_pg;
create or replace package body pkg_apply_pg is
procedure do_insert(p_val varchar2) is pragma autonomous_transaction; begin insert into "public"."mytab"@pgtest.mydomain.ru values (p_val); commit; end; procedure do_update(p_key varchar2, p_val varchar2) is pragma autonomous_transaction; begin update "public"."mytab"@pgtest.mydomain.ru set "a" = p_val where "a" = p_key; commit; end; procedure do_delete(p_key varchar2) is pragma autonomous_transaction; begin delete from "public"."mytab"@pgtest.mydomain.ru where "a" = p_key; commit; end; end pkg_apply_pg;
3.6. Скомпилировать процедуру для извлечения изменений из очереди и поместить ее в периодически запускаемый JOB:
declare deq_data ANYDATA; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; proceed BOOLEAN := true; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; object_owner VARCHAR2(30); object_name VARCHAR2(40); dmlcommand VARCHAR2(10); old_val VARCHAR2(100); new_val VARCHAR2(100); x number; BEGIN deqopt.consumer_name := 'TEST_APPLY'; deqopt.wait := 1; WHILE (proceed) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'STREAMS_QUEUE', dequeue_options => deqopt, message_properties => mprop, payload => deq_data, msgid => msgid); deqopt.navigation := DBMS_AQ.NEXT; rc := deq_data.GETOBJECT(lcr); object_owner := lcr.GET_OBJECT_OWNER(); object_name := lcr.GET_OBJECT_NAME(); dmlcommand := lcr.GET_COMMAND_TYPE(); if (dmlcommand = 'INSERT') then x := lcr.GET_VALUE('NEW', 'A').getvarchar2(new_val); pkg_apply_pg.do_insert(new_val); elsif (dmlcommand = 'UPDATE') then x := lcr.GET_VALUE('OLD', 'A').getvarchar2(old_val); x := lcr.GET_VALUE('NEW', 'A').getvarchar2(new_val); pkg_apply_pg.do_update(old_val, new_val); elsif (dmlcommand = 'DELETE') then x := lcr.GET_VALUE('OLD', 'A').getvarchar2(old_val); pkg_apply_pg.do_delete(old_val); end if; commit; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN proceed := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END;
3.7. Запустить синхронный захват изменений
!! а это похоже не обязательно !!
BEGIN DBMS_CAPTURE_ADM.START_CAPTURE('TEST_CAPTURE'); END;
Проверить, что все сконфигурировано:
select * from DBA_SYNC_CAPTURE_TABLES select * from DBA_STREAMS_RULES
3.8. Обеспечить периодическое удаление сообщений в состоянии PROCESSED:
declare po dbms_aqadm.aq$_purge_options_t; begin po.block := FALSE; dbms_aqadm.purge_queue_table( queue_table => 'STREAMS_QUEUE_TABLE', purge_condition => 'msg_state = ''PROCESSED''', purge_options => po); end;
Особенности реализации:
1. Чтобы не съедалось много ресурсов на попытки достучаться до сервера PostgreSQL в случае, когда сервер не доступен, нужно предусмотреть механизм приостановки применения изменений из очереди на определенный период времени с последующим автоматическим возобновлением (retry timeout).
2. Операции по применению изменений полезно не коммитить по отдельности, а объединять в пачки.
Ссылки:
Oracle 11g Streams Implementer's Guide
|