2フェーズコミットインターフェースを使用したトランザクション
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
ScalarDB は、2フェーズコミットインターフェースを使用したトランザクションの実行をサポートしています。2フェーズコミットインターフェースを使用すると、マイクロサービスアー キテクチャのように、複数のプロセスまたはアプリケーションにまたがるトランザクションを実行できます。
このページでは、2フェーズコミットインターフェースを使用したトランザクションが ScalarDB でどのように機能するか、および ScalarDB でそれらを設定して実行する方法について説明します。
ScalarDB での2フェーズコミットインターフェースを使用したトランザクションの動作
ScalarDB は通常、1フェーズコミットインターフェースを使用して単一のトランザクションマネージャーインスタンスでトランザクションを実行します。1フェーズコミットインターフェースを使用したトランザクションでは、トランザクションを開始し、CRUD 操作を実行し、同じトランザクションマネージャーインスタンスでトランザクションをコミットします。
ScalarDB では、複数のトランザクションマネージャーインスタンスにまたがる2フェーズコミットインターフェースを使用してトランザクションを実行できます。トランザクションマネージャーインスタンスは、同じプロセスまたはアプリケーションに存在しても、異なるプロセスまたはアプリケーションに存在してもかまいません。たとえば、複数のマイクロサービスにトランザクションマネージャーインスタンスがある場合は、複数のマイクロサービスにまたがるトランザクションを実行できます。
2フェーズコミットインターフェースを使用したトランザクションでは、Coordinator と参加者という2つのロールが1つのトランザクションを共同で実行します。
Coordinator プロセスと参加者プロセスはすべて、異なるトランザクションマネージャーインスタンスを持っています。Coordinator プロセスが最初にトランザクションを開始または開始し、参加者プロセスがトランザクションに参加します。 CRUD 操作を実行した後、Coordinator プロセスと参加プロセスは2フェーズインターフェイスを使用してトランザクションをコミットします。
2フェーズコミットインターフェースを使用してトランザクションを実行する方法
2フェーズコミットインターフェイスを使用してトランザクションを実行するには、トランザクションマネージャーインスタンスを取得する必要があります。その後、Coordinator プロセスはトランザクションを開始または起動でき、参加者はトランザクションを処理できます。
TwoPhaseCommitTransactionManager
インスタンスを取得する
2フェーズコミットインターフェースを使用してトランザクションを実行するには、まず TwoPhaseCommitTransactionManager
インスタンスを取得する必要があります。
TwoPhaseCommitTransactionManager
インスタンスを取得するには、次のように TransactionFactory
を使用できます。
TransactionFactory factory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
TwoPhaseCommitTransactionManager transactionManager = factory.getTwoPhaseCommitTransactionManager();
トランザクションを開始する (Coordinator 用)
トランザクションを開始するプロセスまたはアプリケーションが Coordinator として機能するには、次の begin
メソッドを使用する必要があります。
// Begin a transaction.
TwoPhaseCommitTransaction tx = transactionManager.begin();
または、トランザクションを開始するプロセスまたはアプリケーションが Coordinator として機能するようにするには、次の start
メソッドを使用する必要があります。
// Start a transaction.
TwoPhaseCommitTransaction tx = transactionManager.start();
あるいは、次のようにトランザクション ID を指定して、トランザクションに begin
メソッドを使用することもできます。
// Begin a transaction by specifying a transaction ID.
TwoPhaseCommitTransaction tx = transactionManager.begin("<TRANSACTION_ID>");
または、次のようにトランザクション ID を指定して、トランザクションに start
メソッドを使用することもできます。
// Start a transaction by specifying a transaction ID.
TwoPhaseCommitTransaction tx = transactionManager.start("<TRANSACTION_ID>");
トランザクションに参加する (参加者向け)
参加者は 、次のように、Coordinator が開始または開始したトランザクションに関連付けられたトランザクション ID を指定して、トランザクションに参加できます。
TwoPhaseCommitTransaction tx = transactionManager.join("<TRANSACTION_ID>");
getId()
を使用してトランザクション ID を取得するには、次のように指定します。
tx.getId();
トランザクションの CRUD 操作
TwoPhaseCommitTransacton
の CRUD 操作は、DistributedTransaction
の操作と同じです。詳細については、CRUD 操作 を参照してください。
以下は、2フェーズコミットインターフェイスを使用したトランザクションの CRUD 操作のサンプルコードです。
TwoPhaseCommitTransaction tx = ...
// Retrieve the current balances by ID.
Get fromGet =
Get.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(new Key(ID, fromId))
.build();
Get toGet =
Get.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(new Key(ID, toId))
.build();
Optional<Result> fromResult = tx.get(fromGet);
Optional<Result> toResult = tx.get(toGet);
// Calculate the balances (assuming that both accounts exist).
int newFromBalance = fromResult.get().getInt(BALANCE) - amount;
int newToBalance = toResult.get().getInt(BALANCE) + amount;
// Update the balances.
Put fromPut =
Put.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(new Key(ID, fromId))
.intValue(BALANCE, newFromBalance)
.build();
Put toPut =
Put.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(new Key(ID, toId))
.intValue(BALANCE, newToBalance)
.build();
tx.put(fromPut);
tx.put(toPut);
トランザクションを準備、コミット、またはロールバックする
CRUD 操作が完了したら、トランザクションをコミットする必要があります。標準の2フェーズコミットプロトコルと同様に、準備とコミットの2つのフェーズがあります。
すべての Coordinator プロセスと参加者プロセスでは、次のようにトランザクションを準備してからコミットする必要があります。
TwoPhaseCommitTransaction tx = ...
try {
// Execute CRUD operations in the Coordinator and participant processes.
...
// Prepare phase: Prepare the transaction in all the Coordinator and participant processes.
tx.prepare();
...
// Commit phase: Commit the transaction in all the Coordinator and participant processes.
tx.commit();
...
} catch (TransactionException e) {
// If an error happens, you will need to roll back the transaction in all the Coordinator and participant processes.
tx.rollback();
...
}
prepare()
の場合、Coordinator または参加者プロセスのいずれかがトランザクションの準備に失敗した場合、すべての Coordinator および参加者プロセスで rollback()
(または abort()
) を呼び出す必要があります。
commit()
の場合、Coordinator または参加者プロセスのいずれかがトランザクションを正常にコミットした場合、トランザクションはコミットされたと見なすことができます。トランザクションがコミットされたら、他の Coordinator および参加者プロセスで発生したエラーは無視できます。すべての Coordinator および参加者プロセスがトランザクションのコミットに失敗した場合、すべての Coordinator および参加者プロセスで rollback()
(または abort()
) を呼び出す必要があります。
パフォーマンスを向上させるには、Coordinator および参加者プロセスでそれぞれ prepare()
、commit()
、および rollback()
を並行して呼び出すことができます。
トランザクションを検証する
同時実行制御プロトコルに応じて、以下に示すように、prepare()
の後、commit()
の前に、すべての Coordinator および参加者プロセスで validate()
を呼び出す必要があります。
// Prepare phase 1: Prepare the transaction in all the Coordinator and participant processes.
tx.prepare();
...
// Prepare phase 2: Validate the transaction in all the Coordinator and participant processes.
tx.validate();
...
// Commit phase: Commit the transaction in all the Coordinator and participant processes.
tx.commit();
...
prepare()
と同様に、Coordinator または参加者プロセスのいずれかがトランザクションの検証に失敗した場合は、すべての Coordinator および参加者プロセスで rollback()
(または abort()
) を呼び出す必要があります。さらに、パフォーマンスを向上させるために、Coordinator および参加者プロセスで validate()
を並行して呼び出すこともできます。
scalar.db.consensus_commit.serializable_strategy
の値として EXTRA_READ
を設定し、scalar.db.consensus_commit.isolation_level
の値として SERIALIZABLE
を設定して Consensus Commit トランザクションマネージャーを使用する場合は、validate()
を呼び出す必要があります。ただし、Consensus Commit を使用していない場合は、validate()
を指定しても効果はありません 。
複数のトランザクションマネージャーインスタンスを使用してトランザクションを実行する
上記の API を使用すると、次のように複数のトランザクションマネージャーインスタンスを使用してトランザクションを実行できます。
TransactionFactory factory1 =
TransactionFactory.create("<PATH_TO_CONFIGURATION_FILE_FOR_TRANSACTION_MANAGER_1>");
TwoPhaseCommitTransactionManager transactionManager1 =
factory1.getTwoPhaseCommitTransactionManager();
TransactionFactory factory2 =
TransactionFactory.create("<PATH_TO_CONFIGURATION_FILE_FOR_TRANSACTION_MANAGER_2>");
TwoPhaseCommitTransactionManager transactionManager2 =
factory2.getTwoPhaseCommitTransactionManager();
TwoPhaseCommitTransaction transaction1 = null;
TwoPhaseCommitTransaction transaction2 = null;
try {
// Begin a transaction.
transaction1 = transactionManager1.begin();
// Join the transaction begun by `transactionManager1` by getting the transaction ID.
transaction2 = transactionManager2.join(transaction1.getId());
// Execute CRUD operations in the transaction.
Optional<Result> result = transaction1.get(...);
List<Result> results = transaction2.scan(...);
transaction1.put(...);
transaction2.delete(...);
// Prepare the transaction.
transaction1.prepare();
transaction2.prepare();
// Validate the transaction.
transaction1.validate();
transaction2.validate();
// Commit the transaction. If any of the transactions successfully commit,
// you can regard the transaction as committed.
AtomicReference<TransactionException> exception = new AtomicReference<>();
boolean anyMatch =
Stream.of(transaction1, transaction2)
.anyMatch(
t -> {
try {
t.commit();
return true;
} catch (TransactionException e) {
exception.set(e);
return false;
}
});
// If all the transactions fail to commit, throw the exception and roll back the transaction.
if (!anyMatch) {
throw exception.get();
}
} catch (TransactionException e) {
// Roll back the transaction.
if (transaction1 != null) {
try {
transaction1.rollback();
} catch (RollbackException e1) {
// Handle the exception.
}
}
if (transaction2 != null) {
try {
transaction2.rollback();
} catch (RollbackException e1) {
// Handle the exception.
}
}
}