メインコンテンツまでスキップ
バージョン: 3.14

マイクロサービストランザクションを使用した Spring Data JDBC for ScalarDB のサンプルアプリケーション

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

このチュートリアルでは、Spring Data JDBC for ScalarDB を使用してマイクロサービストランザクション用のサンプル Spring Boot アプリケーションを作成する方法について説明します。

これらの機能の詳細については、2 フェーズコミットトランザクション および Spring Data JDBC for ScalarDB ガイドを参照してください。

このサンプルアプリケーションの前提条件

注記

このサンプルアプリケーションは、Eclipse Temurin の OpenJDK でテストされています。ただし、ScalarDB 自体は、さまざまなベンダーの JDK ディストリビューションでテストされています。互換性のある JDK ディストリビューションを含む ScalarDB の要件の詳細については、要件を参照してください。

警告

ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。

サンプルアプリケーション

概要

このチュートリアルでは、ScalarDB の 2 フェーズコミットインターフェースを使用したトランザクションを通じて、アイテムを注文し、信用枠で支払うことができるサンプル電子商取引アプリケーションを作成するプロセスについて説明します。

このサンプルアプリケーションには、Database-per-service pattern に基づく Customer ServiceOrder Service という 2 つのマイクロサービスがあります。

Customer Service は、クレジット限度額やクレジット合計などのクレジットカード情報を含む顧客情報を管理します。Order Service は、注文の確定や注文履歴の取得などの注文操作を担当します。

各サービスには gRPC エンドポイントがあります。クライアントはエンドポイントを呼び出し、サービスも互いにエンドポイントを呼び出します。Customer Service と Order Service は、それぞれ ScalarDB を介して MySQL と Cassandra を使用します。

概要

各サービスは、専用の ScalarDB Cluster を介してデータベースにアクセスします。

注記

両方の ScalarDB Cluster は、Consensus Commit プロトコルに使用される小さなコーディネーターデータベースにアクセスします。このサンプルアプリケーションでは、セットアップと説明を簡単にするために、コーディネーターデータベースは Order Service の同じ Cassandra インスタンスに共存していますが、もちろん、コーディネーターデータベースは別のデータベースとして管理できます。

また、サンプルアプリケーションでは ScalarDB の使用方法の説明に重点を置いているため、アプリケーション固有のエラー処理、認証処理などは省略されています。

ScalarDB での例外処理の詳細については、例外の処理方法を参照してください。

サービスエンドポイント

サービスで定義されているエンドポイントは次のとおりです。

  • Customer Service

    • getCustomerInfo
    • payment
    • prepare
    • validate
    • commit
    • rollback
    • repayment
  • Order Service

    • placeOrder
    • getOrder
    • getOrders

このサンプルアプリケーションで実行できること

サンプルアプリケーションは、次の種類のトランザクションをサポートしています。

  • Customer Service の getCustomerInfo エンドポイントを介して顧客情報を取得します。
  • Order Service の placeOrder エンドポイントと、Customer Service の paymentpreparevalidatecommitrollback エンドポイントを介して、信用枠を使用して注文を行います。
    • 注文のコストが顧客の信用限度額を下回っているかどうかを確認します。
    • チェックに合格した場合、注文履歴を記録し、顧客が支払った金額を更新します。
  • Order Service の getOrder エンドポイントと、Customer Service の getCustomerInfopreparevalidatecommitrollback エンドポイントを介して、注文 ID で注文情報を取得します。
  • Order Service の getOrders エンドポイントと、Customer Service の getCustomerInfopreparevalidatecommitrollback エンドポイントを介して、顧客 ID で注文情報を取得します。
  • Customer Service の repayment エンドポイントを介して支払いを行います。
    • 顧客が支払った金額を減らします。
注記

getCustomerInfo エンドポイントは、コーディネーターからトランザクション ID を受信するときに、参加者サービスエンドポイントとして機能します。

ScalarDB Cluster の設定

Customer Service 用の ScalarDB Cluster の設定は次のとおりです。

scalar.db.storage=multi-storage
scalar.db.multi_storage.storages=cassandra,mysql
scalar.db.multi_storage.storages.cassandra.storage=cassandra
scalar.db.multi_storage.storages.cassandra.contact_points=cassandra-1
scalar.db.multi_storage.storages.cassandra.username=cassandra
scalar.db.multi_storage.storages.cassandra.password=cassandra
scalar.db.multi_storage.storages.mysql.storage=jdbc
scalar.db.multi_storage.storages.mysql.contact_points=jdbc:mysql://mysql-1:3306/
scalar.db.multi_storage.storages.mysql.username=root
scalar.db.multi_storage.storages.mysql.password=mysql
scalar.db.multi_storage.namespace_mapping=customer_service:mysql,coordinator:cassandra
scalar.db.multi_storage.default_storage=mysql
scalar.db.consensus_commit.isolation_level=SERIALIZABLE

scalar.db.cluster.node.standalone_mode.enabled=true
scalar.db.sql.enabled=true

# License key configurations
scalar.db.cluster.node.licensing.license_key=
scalar.db.cluster.node.licensing.license_check_cert_pem=
  • scalar.db.sql.connection_mode: この設定は、ScalarDB への接続方法を決定します。
  • scalar.db.storage: ScalarDB でマルチストレージトランザクションを使用するには、multi-storage を指定する必要があります。
  • scalar.db.multi_storage.storages: ここでストレージ名を定義する必要があります。
  • scalar.db.multi_storage.storages.cassandra.*: これらの設定は、scalar.db.multi_storage.storages で定義されているストレージ名の 1 つである cassandra ストレージ用です。cassandra ストレージのすべての scalar.db.* プロパティをここで設定できます。
  • scalar.db.multi_storage.storages.mysql.*: これらの設定は、scalar.db.multi_storage.storages で定義されているストレージ名の 1 つである mysql ストレージ用です。ここで、mysql ストレージのすべての scalar.db.* プロパティを設定できます。
  • scalar.db.multi_storage.namespace_mapping: この設定は、名前空間をストレージにマップします。このサンプルアプリケーションでは、customer_service 名前空間テーブルの操作は mysql ストレージにマップされ、order_service 名前空間テーブルの操作は cassandra ストレージにマップされます。また、Consensus Commit トランザクションで使用される coordinator 名前空間にマップされるストレージを定義することもできます。
  • scalar.db.multi_storage.default_storage: この設定は、マッピングされていない名前空間テーブルでの操作に使用されるデフォルトのストレージを設定します。
  • scalar.db.sql.default_transaction_mode: ScalarDB で 2 フェーズコミットトランザクションモードを使用するには、two_phase_commit_transaction を指定する必要があります。
  • scalar.db.consensus_commit.isolation_level: この設定は、ConsensusCommit に使用される分離レベルを決定します。

詳細については、マルチストレージトランザクションを参照してください。

Order Service 用の ScalarDB Cluster の設定は次のとおりです。

scalar.db.storage=cassandra
scalar.db.contact_points=cassandra-1
scalar.db.username=cassandra
scalar.db.password=cassandra
scalar.db.consensus_commit.isolation_level=SERIALIZABLE

scalar.db.cluster.node.standalone_mode.enabled=true
scalar.db.sql.enabled=true

# License key configurations
scalar.db.cluster.node.licensing.license_key=
scalar.db.cluster.node.licensing.license_check_cert_pem=
  • scalar.db.storage: このサービスは Cassandra のみを基盤データベースとして使用するため、cassandra が指定されています。
  • scalar.db.contact_points: この設定では、Cassandra に接続するための連絡先 (ホストなど) を指定します。
  • scalar.db.username: この設定では、Cassandra に接続するためのユーザー名を指定します。
  • scalar.db.password: この設定では、Cassandra に接続するためのパスワードを指定します。
  • scalar.db.sql.default_namespace_name: この設定では、デフォルトの名前空間が order_service に設定されるため、アプリケーションで名前空間を指定する必要がなくなります。
  • scalar.db.sql.default_transaction_mode: ScalarDB で 2 フェーズコミットトランザクションモードを使用するには、two_phase_commit_transaction を指定する必要があります。
  • scalar.db.consensus_commit.isolation_level: この設定は、ConsensusCommit に使用される分離レベルを決定します。

このサンプルアプリケーションでは、ScalarDB Cluster はスタンドアロンモード (scalar.db.cluster.node.standalone_mode.enabled=true) で実行されています。

また、設定ファイルで ScalarDB Cluster のライセンスキー (試用ライセンスまたは商用ライセンス) を設定する必要があります。詳細については、製品ライセンスキーの設定方法を参照してください。

セットアップ

ScalarDB サンプルリポジトリのクローンを作成する

ターミナルを開き、次のコマンドを実行して ScalarDB サンプルリポジトリのクローンを作成します。

git clone https://github.com/scalar-labs/scalardb-samples

次に、次のコマンドを実行して、このサンプルがあるディレクトリに移動します。

cd scalardb-samples/spring-data-microservice-transaction-sample

ライセンスキーを設定する

設定ファイル scalardb-cluster-node-for-customer-service.properties および scalardb-cluster-node-for-order-service.properties で、ScalarDB Clusters のライセンスキー (試用ライセンスまたは商用ライセンス) を設定します。詳細については、製品ライセンスキーの設定方法 を参照してください。

Cassandra、MySQL、および ScalarDB Cluster を起動する

Cassandra、MySQL、および ScalarDB Cluster を起動するには、次の docker-compose コマンドを実行する必要があります。

docker-compose up -d cassandra mysql scalardb-cluster-node-for-customer-service scalardb-cluster-node-for-order-service
注記

コンテナが完全に起動するまで 1 分以上待つ必要があります。

スキーマをロード

サンプルアプリケーションのデータベーススキーマ (データを整理する方法) は、Customer Service の場合は schema-for-customer-service.sql、Order Service の場合は schema-for-order-service.sql で既に定義されています。

スキーマを適用するには、ScalarDB リリースページに移動し、使用する ScalarDB のバージョンに一致する SQL CLI ツールをダウンロードします。

MySQL

schema-for-customer-service.sql のスキーマを MySQL にロードするには、<VERSION> をダウンロードした ScalarDB Schema Loader のバージョンに置き換えて、次のコマンドを実行します。

java -jar scalardb-cluster-sql-cli-<VERSION>-all.jar --config scalardb-cluster-node-for-customer-service-client.properties --file schema-for-customer-service.sql

Cassandra

schema-for-order-service.sql のスキーマを Cassandra にロードするには、<VERSION> をダウンロードした ScalarDB Schema Loader のバージョンに置き換えて、次のコマンドを実行します。

java -jar scalardb-cluster-sql-cli-<VERSION>-all.jar --config scalardb-cluster-node-for-order-service-client.properties --file schema-for-order-service.sql

スキーマの詳細

schema-for-customer-service.sql に示されているように、Customer Service のすべてのテーブルは customer_service 名前空間に作成されます。

  • customer_service.customers: 顧客の情報を管理するテーブル
    • credit_limit: 貸し手が各顧客に信用枠の使用を許可する最大金額
    • credit_total: 各顧客が信用枠を使用してすでに使用した金額

schema-for-order-service.sql に示されているように、Order Service のすべてのテーブルは order_service 名前空間に作成されます。

  • order_service.orders: 注文情報を管理するテーブル
  • order_service.statements: 注文明細情報を管理するテーブル
  • order_service.items: 注文する商品の情報を管理するテーブル

スキーマのエンティティリレーションシップダイアグラムは次のとおりです。

ERD

マイクロサービスを開始する

まず、次のコマンドを使用してサンプルアプリケーションの Docker イメージをビルドする必要があります。

./gradlew docker

次に、次の docker-compose コマンドを使用してマイクロサービスを起動できます。

docker-compose up -d customer-service order-service

初期データ

マイクロサービスが起動すると、初期データが自動的にロードされます。

初期データがロードされたら、次のレコードがテーブルに保存されます:

  • customer_service.customers テーブルの場合:
customer_idnamecredit_limitcredit_total
1Yamada Taro100000
2Yamada Hanako100000
3Suzuki Ichiro100000
  • order_service.items テーブルの場合:
item_idnameprice
1Apple1000
2Orange2000
3Grape2500
4Mango5000
5Melon3000

サンプルアプリケーションを実行する

まず、ID が 1 である顧客に関する情報を取得します。

./gradlew :client:run --args="GetCustomerInfo 1"
...
{"id": 1,"name": "Yamada Taro","credit_limit": 10000}
...

この時点では、credit_total は表示されません。つまり、credit_total の現在の値は 0 です。

次に、顧客 ID 1 を使用して、リンゴ 3 個とオレンジ 2 個を注文します。

注文の形式は <Item ID>:<Count>,<Item ID>:<Count>,... であることに注意してください。

./gradlew :client:run --args="PlaceOrder 1 1:3,2:2"
...
{"order_id": "415a453b-cfee-4c48-b8f6-d103d3e10bdb"}
...

このコマンドを実行すると、注文 ID が表示されます。

注文 ID を使用して注文の詳細を確認してみましょう。

./gradlew :client:run --args="GetOrder 415a453b-cfee-4c48-b8f6-d103d3e10bdb"
...
{"order": {"order_id": "415a453b-cfee-4c48-b8f6-d103d3e10bdb","timestamp": 1686555272435,"customer_id": 1,"customer_name": "Yamada Taro","statement": [{"item_id": 1,"item_name": "Apple","price": 1000,"count": $
,"total": 3000},{"item_id": 2,"item_name": "Orange","price": 2000,"count": 2,"total": 4000}],"total": 7000}}
...

次に、別の注文を出して、顧客 ID 1 の注文履歴を取得しましょう。

./gradlew :client:run --args="PlaceOrder 1 5:1"
...
{"order_id": "069be075-98f7-428c-b2e0-6820693fc41b"}
...
./gradlew :client:run --args="GetOrders 1"
...
{"order": [{"order_id": "069be075-98f7-428c-b2e0-6820693fc41b","timestamp": 1686555279366,"customer_id": 1,"customer_name": "Yamada Taro","statement": [{"item_id": 5,"item_name": "Melon","price": 3000,"count": 1,"total": 3000}],"total": 3000},{"order_id": "415a453b-cfee-4c48-b8f6-d103d3e10bdb","timestamp": 1686555272435,"customer_id": 1,"customer_name": "Yamada Taro","statement": [{"item_id": 1,"item_name": "Apple","price": 1000,"count": 3,"total": 3000},{"item_id": 2,"item_name": "Orange","price": 2000,"count": 2,"total": 4000}],"total": 7000}]}
...

この注文履歴は、タイムスタンプの降順で表示されます。

顧客の現在の credit_total10000 です。顧客は、情報を取得したときに表示された credit_limit に達したため、これ以上注文することはできません。

./gradlew :client:run --args="GetCustomerInfo 1"
...
{"id": 1,"name": "Yamada Taro","credit_limit": 10000,"credit_total": 10000}
...
./gradlew :client:run --args="PlaceOrder 1 3:1,4:1"
...
io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Credit limit exceeded. creditTotal:10000, payment:7500
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at sample.rpc.OrderServiceGrpc$OrderServiceBlockingStub.placeOrder(OrderServiceGrpc.java:296)
at sample.client.command.PlaceOrderCommand.call(PlaceOrderCommand.java:38)
at sample.client.command.PlaceOrderCommand.call(PlaceOrderCommand.java:12)
at picocli.CommandLine.executeUserObject(CommandLine.java:2041)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
at picocli.CommandLine.execute(CommandLine.java:2170)
at sample.client.Client.main(Client.java:39)
...

支払いが完了すると、顧客は再度注文できるようになります。

./gradlew :client:run --args="Repayment 1 8000"
...
./gradlew :client:run --args="GetCustomerInfo 1"
...
{"id": 1,"name": "Yamada Taro","credit_limit": 10000,"credit_total": 2000}
...
./gradlew :client:run --args="PlaceOrder 1 3:1,4:1"
...
{"order_id": "b6adabd8-0a05-4109-9618-3420fea3161f"}
...

クリーンアップ

Cassandra、MySQL、マイクロサービスを停止するには、次のコマンドを実行します。

docker-compose down

リファレンス - マイクロサービストランザクションの実現方法

注文、単一注文の取得、注文履歴の取得のトランザクションは、マイクロサービストランザクションを実現します。このセクションでは、注文を例に、Customer Service と Order Service にまたがるトランザクションの実装方法に焦点を当てます。

次のシーケンス図は、注文を行うトランザクションを示しています。

シーケンス図

1. 2 フェーズコミットインターフェースによるトランザクションが開始されます

クライアントが Order Service に注文リクエストを送信すると、OrderService.placeOrder() が呼び出され、マイクロサービストランザクションが開始されます。

最初に、Order Service は次のように ScalarDbTwoPcRepository.executeTwoPcTransaction() を使用して 2 フェーズコミットインターフェースによるトランザクションを開始します。参考として、OrderService.java を参照してください。

// Start a two-phase commit interface transaction
TwoPcResult<String> result = orderRepository.executeTwoPcTransaction(txId -> {
...
}, ...);

CRUD 操作が実行される2 フェーズコミットプロトコルを使用してトランザクションがコミットされるエラー処理のアクションは、API によって自動的に実行されます。

2. CRUD 操作が実行される

2 フェーズコミットインターフェイスを使用したトランザクションが開始されると、ScalarDbTwoPcRepository.executeTwoPcTransaction() によって CRUD 操作が実行されます。Order Service は、注文情報を order_service.orders テーブルに、詳細情報を order_service.statements テーブルに次のように格納します。参考までに、OrderService.java を参照してください。

// Put the order info into the `orders` table
orderRepository.insert(order);

AtomicInteger amount = new AtomicInteger();
for (ItemOrder itemOrder : request.getItemOrderList()) {
int itemId = itemOrder.getItemId();
int count = itemOrder.getCount();
// Retrieve the item info from the `items` table
Optional<Item> itemOpt = itemRepository.findById(itemId);
if (!itemOpt.isPresent()) {
String message = "Item not found: " + itemId;
responseObserver.onError(
Status.NOT_FOUND.withDescription(message).asRuntimeException());
throw new ScalarDbNonTransientException(message);
}
Item item = itemOpt.get();

int cost = item.price * count;
// Put the order statement into the `statements` table
statementRepository.insert(new Statement(itemId, orderId, count));
// Calculate the total amount
amount.addAndGet(cost);
}

次に、Order Service は、トランザクション ID とともに Customer Service の payment gRPC エンドポイントを呼び出します。参考については、OrderService.java を参照してください。

customerServiceStub.payment(
PaymentRequest.newBuilder()
.setTransactionId(transactionId)
.setCustomerId(customerId)
.setAmount(amount)
.build());

Customer Service の payment エンドポイントは、まず次のように ScalarDbTwoPcRepository.joinTransactionOnParticipant() を使用してトランザクションを結合します。参考として、CustomerService.java を参照してください。

customerRepository.joinTransactionOnParticipant(request.getTransactionId(), ...);

次に、エンドポイントは顧客情報を取得し、支払い後に顧客のクレジット合計がクレジット限度額を超えているかどうかを確認します。クレジット合計がクレジット限度額を超えていない場合、エンドポイントは顧客のクレジット合計を更新します。参考として、CustomerService.java を参照してください。

Customer customer = getCustomer(responseObserver, request.getCustomerId());

int updatedCreditTotal = customer.creditTotal + request.getAmount();
// Check if the credit total exceeds the credit limit after payment
if (updatedCreditTotal > customer.creditLimit) {
String message = String.format(
"Credit limit exceeded. creditTotal:%d, payment:%d", customer.creditTotal, request.getAmount());
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(message).asRuntimeException());
throw new ScalarDbNonTransientException(message);
}

// Reduce `credit_total` for the customer
customerRepository.update(customer.withCreditTotal(updatedCreditTotal));

3. トランザクションは 2 相コミットプロトコルを使用してコミットされます

Order Service は、支払いが成功したという更新を受け取った後、トランザクションをコミットしようとします。

Order Service で呼び出された ScalarDbTwoPcRepository.executeTwoPcTransaction() API は、ローカル Order Service とリモート Customer Service の両方の準備、検証、およびコミットを自動的に実行します。これらの手順は、上記の CRUD 操作が正常に終了した後に順番に実行されます。Customer Service の preparevalidate、および commit gRPC エンドポイントを呼び出す実装は、API にパラメーターとして渡す必要があります。参考として、OrderService.java を参照してください。

TwoPcResult<PlaceOrderResponse> result = orderRepository.executeTwoPcTransaction(txId ->{
...
},

Collections.singletonList(
RemotePrepareCommitPhaseOperations.createSerializable(
this::callPrepareEndpoint,
this::callValidateEndpoint,
this::callCommitEndpoint,
this::callRollbackEndpoint
)
)
);

高レベル 2PC API のシーケンス図

Customer Service の prepare エンドポイントでは、エンドポイントは ScalarDbTwoPcRepository.prepareTransactionOnParticipant() を使用してトランザクションを再開し、準備します。参考として、CustomerService.java を参照してください。

customerRepository.prepareTransactionOnParticipant(request.getTransactionId());

Customer Service の validate エンドポイントでは、エンドポイントは ScalarDbTwoPcRepository.validateTransactionOnParticipant() を使用してトランザクションを再開し、検証します。参考として、CustomerService.java を参照してください。

customerRepository.validateTransactionOnParticipant(request.getTransactionId());

Customer Service の commit エンドポイントでは、エンドポイントは ScalarDbTwoPcRepository.commitTransactionOnParticipant() を使用してトランザクションを再開し、コミットします。参考として、CustomerService.java を参照してください。

customerRepository.commitTransactionOnParticipant(request.getTransactionId());

エラー処理

トランザクションの実行中にエラーが発生した場合、ScalarDbTwoPcRepository.executeTwoPcTransaction() は、ローカルの Order Service とリモートの Customer Service の両方でトランザクションを自動的にロールバックします。Customer Service の rollback gRPC エンドポイントを呼び出す実装も、他の実装とともに API にパラメータとして渡す必要があります。参考として、OrderService.java を参照してください。

TwoPcResult<PlaceOrderResponse> result = orderRepository.executeTwoPcTransaction(txId ->{
...
},

Collections.singletonList(
RemotePrepareCommitPhaseOperations.createSerializable(
this::callPrepareEndpoint,
this::callValidateEndpoint,
this::callCommitEndpoint,
this::callRollbackEndpoint
)
)
);

Customer Service の rollback エンドポイントでは、エンドポイントがトランザクションを再開してロールバックします。参考として、CustomerService.java を参照してください。

customerRepository.rollbackTransactionOnParticipant(request.getTransactionId());

ScalarDB で例外を処理する方法の詳細については、例外の処理方法を参照してください。