From e1c55b04c73e4ca31fa65c435dd831c8263f619b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A9=BF=E4=BA=91?= Date: Sat, 21 Dec 2024 17:31:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DOracle=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connector/oracle/logminer/LogMiner.java | 167 ++++++++++-------- 1 file changed, 93 insertions(+), 74 deletions(-) diff --git a/dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java b/dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java index 351bb22c..f4e60327 100644 --- a/dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java +++ b/dbsyncer-connector/dbsyncer-connector-oracle/src/main/java/org/dbsyncer/connector/oracle/logminer/LogMiner.java @@ -3,7 +3,6 @@ */ package org.dbsyncer.connector.oracle.logminer; -import org.apache.commons.lang3.time.StopWatch; import org.dbsyncer.sdk.util.DatabaseUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,6 @@ public class LogMiner { private String driverClassName; private String miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG"; private volatile boolean connected = false; - private final StopWatch stopWatch = StopWatch.create(); private Connection connection; private List currentRedoLogSequences; private TransactionalBuffer transactionalBuffer = new TransactionalBuffer(); @@ -41,6 +39,7 @@ public class LogMiner { // 初始位点 private long startScn = 0; private EventListener listener; + private Worker worker; public LogMiner(String username, String password, String url, String schema, String driverClassName) { this.username = username; @@ -52,12 +51,13 @@ public class LogMiner { public void close() throws SQLException { lock.lock(); - if (!connected) { - logger.error("LogMiner is already stop"); - lock.unlock(); - return; + if (null != worker && !worker.isInterrupted()) { + worker.interrupt(); + worker = null; + } + if (connection != null) { + connection.close(); } - this.connection.close(); connected = false; lock.unlock(); } @@ -69,93 +69,54 @@ public class LogMiner { lock.unlock(); return; } - this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password); connected = true; lock.unlock(); - //get current scn 判断是否第一次没有存储 + connect(); + } + + private void connect() throws SQLException { + this.connection = DatabaseUtil.getConnection(driverClassName, url, username, password); + // 判断是否第一次读取 if (startScn == 0) { startScn = getCurrentScn(connection); } - logger.info("scn start '{}'", startScn); logger.info("start LogMiner..."); LogMinerHelper.setSessionParameter(connection); - // 1.记录当前redoLog,用于下文判断redoLog 是否切换 currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection); - // 2.构建数据字典 && add redo / archived log initializeLogMiner(); + worker = new Worker(); + worker.setName(new StringBuilder("log-miner-parser-").append(url).append("_").append(worker.hashCode()).toString()); + worker.setDaemon(false); + worker.start(); + } - String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username); - logger.debug(minerViewQuery); - - try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY, - ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) { - // while - while (connected) { - // 3.确定 endScn - BigInteger endScn = determineEndScn(); - - // 4.是否发生redoLog切换 - if (redoLogSwitchOccurred()) { - // 如果切换则重启logMiner会话 - logger.debug("restart LogMiner Session"); - restartLogMiner(); - currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection); + private void recover() { + logger.error("Connection interrupted, attempting to reconnect"); + while (connected) { + try { + if (null != worker && !worker.isInterrupted()) { + worker.interrupt(); + worker = null; } - - // 5.start logMiner - LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy); - - // 6.查询 logMiner view, 处理结果集 - minerViewStatement.setFetchSize(2000); - minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD); - minerViewStatement.setString(1, String.valueOf(startScn)); - minerViewStatement.setString(2, endScn.toString()); - - stopWatch.start(); - - try (ResultSet rs = minerViewStatement.executeQuery()) { - logger.trace("Query V$LOGMNR_CONTENTS spend time {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)); - stopWatch.reset(); - try{ - logMinerViewProcessor(rs); - }catch (SQLException e){ - if (e.getMessage().contains("ORA-00310")){ - logger.error("ORA-00310 try continue"); - restartLogMiner(); - currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection); - continue; - } - throw e; - } + if (connection != null) { + connection.close(); } - - // 7.确定新的SCN - startScn = Long.parseLong(endScn.toString()); - sleepFiveSeconds(); - } - - } catch (Exception e) { - if (e instanceof SQLRecoverableException) { - logger.error("Connection timed out, attempting to reconnect in 5 seconds"); - reConnect(); - return; + connect(); + logger.info("Reconnect successfully"); + break; + } catch (Exception e) { + logger.error(url, e); + sleepSeconds(5); } - logger.error(e.getMessage(), e); } } - private void reConnect() throws SQLException { - connected = false; - sleepFiveSeconds(); - start(); - } - - private void sleepFiveSeconds() { + private void sleepSeconds(int seconds) { try { - TimeUnit.SECONDS.sleep(5); + TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } @@ -341,4 +302,62 @@ public class LogMiner { return connected; } + final class Worker extends Thread { + + @Override + public void run() { + String minerViewQuery = LogMinerHelper.logMinerViewQuery(schema, username); + try (PreparedStatement minerViewStatement = connection.prepareStatement(minerViewQuery, ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) { + while (!isInterrupted() && connected) { + // 1.确定 endScn + BigInteger endScn = determineEndScn(); + + // 2.是否发生redoLog切换 + if (redoLogSwitchOccurred()) { + // 如果切换则重启logMiner会话 + restartLogMiner(); + currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection); + } + + // 3.start logMiner + LogMinerHelper.startLogMiner(connection, BigInteger.valueOf(startScn), endScn, miningStrategy); + + // 4.查询 logMiner view, 处理结果集 + minerViewStatement.setFetchSize(2000); + minerViewStatement.setFetchDirection(ResultSet.FETCH_FORWARD); + minerViewStatement.setString(1, String.valueOf(startScn)); + minerViewStatement.setString(2, endScn.toString()); + try (ResultSet rs = minerViewStatement.executeQuery()) { + try { + logMinerViewProcessor(rs); + } catch (SQLException e) { + if (e.getMessage().contains("ORA-00310")) { + logger.error("ORA-00310 try continue"); + restartLogMiner(); + currentRedoLogSequences = LogMinerHelper.getCurrentRedoLogSequences(connection); + continue; + } + throw e; + } + } + + // 5.确定新的SCN + startScn = Long.parseLong(endScn.toString()); + sleepSeconds(3); + } + } catch (Exception e) { + if (e instanceof SQLRecoverableException) { + recover(); + return; + } + logger.error(e.getMessage(), e); + try { + close(); + } catch (SQLException ex) { + logger.error(ex.getMessage(), ex); + } + } + } + } } \ No newline at end of file -- Gitee