ScalarDB Java API ガイド
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
ScalarDB Java API は、主に管理 API とトランザクション API で設定されています。このガイドでは、どのような種類の API が存在するのか、それらの使用方法、例外の処理方法などの関連トピックについて簡単に説明します。
Administrative API
このセクションでは、ScalarDB の管理 API を使用してプログラムで管理操作を実行する方法について説明します。
管理操作を実行する別の方法は、Schema Loader を使用することです。
DistributedTransactionAdmin
インスタンスを取得する
管理操作を実行するには、まず DistributedTransactionAdmin
インスタンスを取得する必要があります。
DistributedTransactionAdmin
インスタンスを取得するには、次のように TransactionFactory
を使用します。
TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin();
設定の詳細については、ScalarDB 設定を参照してください。
すべての管理操作を実行したら、次のように DistributedTransactionAdmin
インスタンスを閉じる必要があります。
admin.close();
名前空間を作成する
テーブルは1つの名前空間に属するため、テーブルを作成する前に名前空間を作成する必要があります。
名前空間は次のように作成できます。
// Create the namespace "ns". If the namespace already exists, an exception will be thrown.
admin.createNamespace("ns");
// Create the namespace only if it does not already exist.
boolean ifNotExists = true;
admin.createNamespace("ns", ifNotExists);
// Create the namespace with options.
Map<String, String> options = ...;
admin.createNamespace("ns", options);
作成オプション
名前空間の作成やテーブルの作成などの作成操作では、オプション名と値のマップであるオプション (Map<String, String>
) を指定できます。オプションを使用すると、ストレージアダプタ固有の設定ができます。
データベースを選択して、使用可能なオプションを確認します。
- JDBC データベース
- DynamoDB
- Cosmos DB for NoSQL
- Cassandra
JDBC データベースには使用できるオプションはありません。
名前 | 説明 | デフォルト |
---|---|---|
no-scaling | DynamoDB の自動スケーリングを無効にします。 | false |
no-backup | DynamoDB の継続的なバックアップを無効にします。 | false |
ru | 基本リソース単位。 | 10 |
名前 | 説明 | デフォルト |
---|---|---|
ru | 基本リソース単位。 | 400 |
no-scaling | Cosmos DB for NoSQL の自動スケーリングを無効にします。 | false |
名前 | 説明 | デフォルト |
---|---|---|
replication-strategy | Cassandra レプリケーション戦略。SimpleStrategy または NetworkTopologyStrategy である必要があります。 | SimpleStrategy |
compaction-strategy | Cassandra 圧縮戦略。LCS 、STCS 、または TWCS である必要があります。 | STCS |
replication-factor | Cassandra の複製係数。 | 3 |
テーブルを作成する
テーブルを作成するときは、テーブルメタデータを定義してからテーブルを作成する必要があります。
テーブルメタデータを定義するには、TableMetadata
を使用できます。次の図は、テーブルの列、パーティションキー、クラスタリングキー (クラスタリング順序を含む)、およびセカンダリインデックスを定義する方法を示しています。
// Define the table metadata.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
.addColumn("c1", DataType.INT)
.addColumn("c2", DataType.TEXT)
.addColumn("c3", DataType.BIGINT)
.addColumn("c4", DataType.FLOAT)
.addColumn("c5", DataType.DOUBLE)
.addPartitionKey("c1")
.addClusteringKey("c2", Scan.Ordering.Order.DESC)
.addClusteringKey("c3", Scan.Ordering.Order.ASC)
.addSecondaryIndex("c4")
.build();
ScalarDB のデータモデルの詳細については、データモデルを参照してください。
次に、次のようにテーブルを作成します。
// Create the table "ns.tbl". If the table already exists, an exception will be thrown.
admin.createTable("ns", "tbl", tableMetadata);
// Create the table only if it does not already exist.
boolean ifNotExists = true;
admin.createTable("ns", "tbl", tableMetadata, ifNotExists);
// Create the table with options.
Map<String, String> options = ...;
admin.createTable("ns", "tbl", tableMetadata, options);
セカンダリインデックスを作成する
セカンダリインデックスは次のように作成できます。
// Create a secondary index on column "c5" for table "ns.tbl". If a secondary index already exists, an exception will be thrown.
admin.createIndex("ns", "tbl", "c5");
// Create the secondary index only if it does not already exist.
boolean ifNotExists = true;
admin.createIndex("ns", "tbl", "c5", ifNotExists);
// Create the secondary index with options.
Map<String, String> options = ...;
admin.createIndex("ns", "tbl", "c5", options);
テーブルに新しい列を追加する
次のように、テーブルに新しい非パーティションキー列を追加できます。
// Add a new column "c6" with the INT data type to the table "ns.tbl".
admin.addNewColumnToTable("ns", "tbl", "c6", DataType.INT)
テーブルに新しい列を追加する場合は、基盤となるストレージによって実行時間が大きく異なる可能性があるため、慎重に検討する必要があります。それに応じて計画を立て、特にデータベースが本番環境で実行されている場合は、次の点を考慮してください。
- Cosmos DB for NoSQL および DynamoDB の場合: テーブルスキーマは変更されないため、列の追加はほぼ瞬時に行われます。別のテーブルに格納されているテーブルメタデータのみが更新されます。
- Cassandra の場合: 列を追加すると、スキーマメタデータのみが更新され、既存のスキーマレコードは変更されません。クラスタートポロジが実行時間の主な要因です。スキーマメタデータの変更は、ゴシッププロトコルを介して各クラスターノードに共有されます。このため、クラスターが大きいほど、すべてのノードが更新されるまでの時間が長くなります。
- リレーショナルデータベース (MySQL、Oracle など) の場合: 列の追加は実行にそれほど時間がかかりません。
テーブルを切り捨てる
テーブルを切り捨てるには、次のようにします。
// Truncate the table "ns.tbl".
admin.truncateTable("ns", "tbl");
セカンダリインデックスを削除する
セカンダリインデックスは次のように削除できます。
// Drop the secondary index on column "c5" from table "ns.tbl". If the secondary index does not exist, an exception will be thrown.
admin.dropIndex("ns", "tbl", "c5");
// Drop the secondary index only if it exists.
boolean ifExists = true;
admin.dropIndex("ns", "tbl", "c5", ifExists);
テーブルを削除する
テーブルを削除するには、次のようにします。
// Drop the table "ns.tbl". If the table does not exist, an exception will be thrown.
admin.dropTable("ns", "tbl");
// Drop the table only if it exists.
boolean ifExists = true;
admin.dropTable("ns", "tbl", ifExists);
名前空間を削除する
名前空間を削除するには、次のようにします。
// Drop the namespace "ns". If the namespace does not exist, an exception will be thrown.
admin.dropNamespace("ns");
// Drop the namespace only if it exists.
boolean ifExists = true;
admin.dropNamespace("ns", ifExists);
既存の名前空間を取得する
既存の名前空間は次のように取得できます。
Set<String> namespaces = admin.getNamespaceNames();
名前空間のテーブルを取得する
名前空間のテーブルは次のように取得できます。
// Get the tables of the namespace "ns".
Set<String> tables = admin.getNamespaceTableNames("ns");
テーブルメタデータを取得する
テーブルメタデータは次のように取得できます。
// Get the table metadata for "ns.tbl".
TableMetadata tableMetadata = admin.getTableMetadata("ns", "tbl");
名前空間を修復する
名前空間が不明な状態の場合 (たとえば、名前空間が基盤となるストレージに存在するが ScalarDB メタデータが存在しない、またはその逆)、このメソッドは必要に応じて名前空間とそのメタデータを再作成します。
名前空間は次のように修復できます。
// Repair the namespace "ns" with options.
Map<String, String> options = ...;
admin.repairNamespace("ns", options);
テーブルを修復する
テーブルが不明な状態の場合 (テーブルは基盤となるストレージに存在するが ScalarDB メタデータは存在しない、またはその逆)、このメソッドは必要に応じてテーブル、そのセカンダリインデックス、およびそのメタデータを再作成します。
テーブルは次のように修復できます。
// Repair the table "ns.tbl" with options.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
...
.build();
Map<String, String> options = ...;
admin.repairTable("ns", "tbl", tableMetadata, options);
最新の ScalarDB API をサポートするように環境をアップグレードする
ScalarDB API の最新バージョンをサポートするように ScalarDB 環境をアップグレードできます。通常、リリースノートに記載されているように、アプリケーション環境が使用する ScalarDB バージョンを更新した後、このメソッドを実行する必要があります。
// Upgrade the ScalarDB environment.
Map<String, String> options = ...;
admin.upgrade(options);
Coordinator テーブルの操作を指定する
Coordinator テーブルは、Transactional API によってトランザクションのステータスを追跡するために使用されます。
トランザクションマネージャーを使用する場合は、トランザクションを実行するために Coordinator テーブルを作成する必要があります。テーブルの作成に加えて、Coordinator テーブルを切り捨てたり削除したりすることもできます。
Coordinator テーブルを作成する
Coordinator テーブルは次のように作成できます。
// Create the Coordinator table.
admin.createCoordinatorTables();
// Create the Coordinator table only if one does not already exist.
boolean ifNotExist = true;
admin.createCoordinatorTables(ifNotExist);
// Create the Coordinator table with options.
Map<String, String> options = ...;
admin.createCoordinatorTables(options);
Coordinator テーブルを切り捨てる
Coordinator テーブルは次のように切り捨てることができます。
// Truncate the Coordinator table.
admin.truncateCoordinatorTables();
Coordinator テーブルを削除する
Coordinator テーブルは次のように削除できます。
// Drop the Coordinator table.
admin.dropCoordinatorTables();
// Drop the Coordinator table if one exist.
boolean ifExist = true;
admin.dropCoordinatorTables(ifExist);
テーブルをインポートする
既存のテーブルを ScalarDB にインポートするには、次のようにします。
// Import the table "ns.tbl". If the table is already managed by ScalarDB, the target table does not
// exist, or the table does not meet the requirements of the ScalarDB table, an exception will be thrown.
admin.importTable("ns", "tbl", options, overrideColumnsType);
運用環境で ScalarDB にテーブルをインポートする場合は、データベーステーブルと ScalarDB メタデータテーブルにトランザクションメタデータ列が追加されるため、慎重に計画する必要があります。この場合、データベースと ScalarDB の間にはいくつかの違いがあり、いくつかの制限もあります。詳細については、ScalarDB Schema Loader を使用して既存のテーブルを ScalarDB にインポートするを参照してください。
Transactional API
このセクションでは、ScalarDB の Transactional API を使用してトランザクション操作を実行する方法について説明します。
DistributedTransactionManager
インスタンスを取得する
トランザクション操作を実行するには、まず DistributedTransactionManager
インスタンスを取得する必要があります。
DistributedTransactionManager
インスタンスを取得するには、次のように TransactionFactory
を使用します。
TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = transactionFactory.getTransactionManager();
すべてのトランザクション操作を実行した後、次のように DistributedTransactionManager
インスタンスを閉じる必要があります。
transactionManager.close();
トランザクションを実行する
このサブセクションでは、複数の CRUD 操作でトランザクションを実行する方法について説明します。
トランザクションを開始する
トランザクション CRUD 操作を実行する前に、トランザクションを開始する必要があります。
トランザクションは次のように開始できます。
// Begin a transaction.
DistributedTransaction transaction = transactionManager.begin();
または、次のようにトランザクションを開始することもできます。
// Start a transaction.
DistributedTransaction transaction = transactionManager.start();
あるいは、次のようにトランザクション ID を指定して、トランザクションに begin
メソッドを使用することもできます。
// Begin a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.begin("<TRANSACTION_ID>");
または、次のようにトランザクション ID を指定して、トランザクションに start
メソッドを使用することもできます。
// Start a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.start("<TRANSACTION_ID>");
トランザクション ID を指定すると、外部システムを ScalarDB にリンクする場合に便利です。それ以外の場合は、begin()
メソッドまたは start()
メソッドを使用する必要があります。
トランザクション ID を指定する場合は、ScalarDB の正確性はトランザクション ID の一意性に依存するため、システム全体で一意の ID (UUID v4など) を指定してください。
トランザクションに参加する
トランザクションに参加することは、トランザクションが複数のクライアントリクエストにまたがるステートフルアプリケーションで特に便利です。このようなシナリオでは、アプリケーションは最初のクライアントリクエスト中にトランザクションを開始できます。その後、後続のクライアントリクエストで、アプリケーションは join()
メソッドを使用して進行中のトランザクションに参加できます。
次のようにトランザクション ID を指定すると、すでに開始されている進行中のトランザクションに参加できます。
// Join a transaction.
DistributedTransaction transaction = transactionManager.join("<TRANSACTION_ID>");
getId()
を使用してトランザクション ID を取得するには、次のように指定します。
tx.getId();
トランザクションを再開する
トランザクションの再開は、トランザクションが複数のクライアントリクエストにまたがるステートフルアプリケーションで特に役立ちます。このようなシナリオでは、アプリケーションは最初のクライアントリクエスト中にトランザクションを開始できます。その後、後続のクライアントリクエストで、アプリケーションは resume()
メソッドを使用して進行中のトランザクションを再開できます。
次のようにトランザクション ID を指定すると、すでに開始した進行中のトランザクションを再開できます。
// Resume a transaction.
DistributedTransaction transaction = transactionManager.resume("<TRANSACTION_ID>");
getId()
を使用してトランザクション ID を取得するには、次のように指定します。
tx.getId();
CRUD 操作を実装する
次のセクションでは、キーの構築と CRUD 操作について説明します。
CRUD 操作のすべてのビルダーは consistency()
メソッドを使用して一貫性を指定できますが、これらのメソッドは無視されます。代わりに、トランザクションでは常に LINEARIZABLE
一貫性レベルが使用されます。
キーの構築
ほとんどの CRUD 操作では、Key
オブジェクト (パーティションキー、クラスタリングキーなど) を指定する必要があります。そのため、CRUD 操作に進む前に、Key
オブジェクトの構築方法を次に説明します。
単一の列キーの場合、次のように Key.of<TYPE_NAME>()
メソッドを使用してキーを構築できます。
// For a key that consists of a single column of INT.
Key key1 = Key.ofInt("col1", 1);
// For a key that consists of a single column of BIGINT.
Key key2 = Key.ofBigInt("col1", 100L);
// For a key that consists of a single column of DOUBLE.
Key key3 = Key.ofDouble("col1", 1.3d);
// For a key that consists of a single column of TEXT.
Key key4 = Key.ofText("col1", "value");
2~5列で設定されるキーの場合は、Key.of()
メソッドを使用して次のようにキーを構築できます。Guava の ImmutableMap.of()
と同様に、列名と値を順番に指定する必要があります。
// For a key that consists of two to five columns.
Key key1 = Key.of("col1", 1, "col2", 100L);
Key key2 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d);
Key key3 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value");
Key key4 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value", "col5", false);
5列を超えるキーの場合は、ビルダーを使用して次のようにキーを構築できます。
// For a key that consists of more than five columns.
Key key = Key.newBuilder()
.addInt("col1", 1)
.addBigInt("col2", 100L)
.addDouble("col3", 1.3d)
.addText("col4", "value")
.addBoolean("col5", false)
.addInt("col6", 100)
.build();
Get
操作
Get
は、主キーで指定された単一のレコードを取得する操作です。
まず Get
オブジェクトを作成し、次に次のように transaction.get()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();
// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);
射影を指定して、返される列を選択できます。
WHERE
句を使用する
where()
メソッドを使用して任意の条件を指定することもできます。取得したレコードが where()
メソッドで指定された条件と一致しない場合は、Option.empty()
が返されます。where()
メソッドの引数として、条件、AND 条件セット、または OR 条件セットを指定できます。where()
メソッドを呼び出した後、次のように and()
メソッドまたは or()
メソッドを使用して、さらに条件または条件セットを追加できます。
// Create a `Get` operation with condition sets.
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.build();
where()
条件メソッドチェーンでは、条件は上記の例のように ConditionalExpression
または OrConditionSet
の AND 結合 (結合正規形と呼ばれる)、または ConditionalExpression
または AndConditionSet
の OR 結合 (分離正規形と呼ばれる) である必要があります。
使用可能な条件と条件セットの詳細については、使用している ScalarDB バージョンの Javadoc の ConditionBuilder
および ConditionSetBuilder
ページを参照してください。
Result
オブジェクトの処理
Get
操作と Scan
操作は Result
オブジェクトを返します。次に、Result
オブジェクトの処理方法を示します。
次のように get<TYPE_NAME>("<COLUMN_NAME>")
メソッドを使用して、結果の列値を取得できます。
// Get the BOOLEAN value of a column.
boolean booleanValue = result.getBoolean("<COLUMN_NAME>");
// Get the INT value of a column.
int intValue = result.getInt("<COLUMN_NAME>");
// Get the BIGINT value of a column.
long bigIntValue = result.getBigInt("<COLUMN_NAME>");
// Get the FLOAT value of a column.
float floatValue = result.getFloat("<COLUMN_NAME>");
// Get the DOUBLE value of a column.
double doubleValue = result.getDouble("<COLUMN_NAME>");
// Get the TEXT value of a column.
String textValue = result.getText("<COLUMN_NAME>");
// Get the BLOB value of a column as a `ByteBuffer`.
ByteBuffer blobValue = result.getBlob("<COLUMN_NAME>");
// Get the BLOB value of a column as a `byte` array.
byte[] blobValueAsBytes = result.getBlobAsBytes("<COLUMN_NAME>");
// Get the DATE value of a column as a `LocalDate`.
LocalDate dateValue = result.getDate("<COLUMN_NAME>");
// Get the TIME value of a column as a `LocalTime`.
LocalTime timeValue = result.getTime("<COLUMN_NAME>");
// Get the TIMESTAMP value of a column as a `LocalDateTime`.
LocalDateTime timestampValue = result.getTimestamp("<COLUMN_NAME>");
// Get the TIMESTAMPTZ value of a column as a `Instant`.
Instant timestampTZValue = result.getTimestampTZ("<COLUMN_NAME>");
列の値が null かどうかを確認する必要がある場合は、isNull("<COLUMN_NAME>")
メソッドを使用できます。
// Check if a value of a column is null.
boolean isNull = result.isNull("<COLUMN_NAME>");
詳細については、使用している ScalarDB のバージョンの Javadoc の Result
ページを参照してください。
セカンダリインデックスを使用して Get
を実行する
セカンダリインデックスを使用して Get
操作を実行できます。
パーティションキーを指定する代わりに、次のようにインデックスキー (インデックス付き列) を指定してセカンダリインデックスを使用できます。
// Create a `Get` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();
// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);
where()
メソッドを使用して任意の条件を指定することもできます。詳細については、WHERE
句を使用するを参照してください。
結果に複数のレコードがある場合、transaction.get()
は例外をスローします。複数の結果を処理する場合は、セカンダリインデックスを使用して Scan
を実行するを参照してください。
Scan
操作
Scan
は、パーティション内の複数のレコードを取得する操作です。Scan
操作では、クラスタリングキーの境界とクラスタリングキー列の順序を指定できます。
まず Scan
オブジェクトを作成し、次に次のように transaction.scan()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();
// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
クラスタリングキー境界を省略するか、start
境界または end
境界のいずれかを指定できます。orderings
を指定しない場合は、テーブルの作成時に定義したクラスタリング順序で結果が並べられます。
さらに、projections
を指定して返される列を選択し、limit
を使用して Scan
操作で返されるレコードの数を指定できます。
WHERE
句を使用する
where()
メソッドを使用してスキャンされたレコードをフィルタリングすることで、任意の条件を指定することもできます。where()
メソッドの引数として、条件、AND 条件セット、または OR 条件セットを指定できます。where()
メソッドを呼び出した後、次のように and()
メソッドまたは or()
メソッドを使用して、さらに条件または条件セットを追加できます。
// Create a `Scan` operation with condition sets.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.limit(10)
.build();
where()
条件メソッドチェーンでは、条件は上記の例のように ConditionalExpression
または OrConditionSet
の AND 結合 (結合正規形と呼ばれる)、または ConditionalExpression
または AndConditionSet
の OR 結合 (分離正規形と呼ばれる) である必要があります。
使用可能な条件と条件セットの詳細については、使用している ScalarDB のバージョンの Javadoc の ConditionBuilder
および ConditionSetBuilder
ページを参照してください。
セカンダリインデックスを使用して Scan
を実行する
セカンダリインデックスを使用して、Scan
操作を実行できます。
パーティションキーを指定する代わりに、次のようにインデックスキー (インデックス付き列) を指定してセカンダリインデックスを使用できます。
// Create a `Scan` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();
// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
where()
メソッドを使用して任意の条件を指定することもできます。詳細については、WHERE
句を使用するを参照してください。
セカンダリインデックスを使用して、Scan
でクラスタリングキーの境界と順序を指定することはできません。
パーティションキーを指定せずにクロスパーティション Scan
を実行し、テーブルのすべてのレコードを取得します
ScalarDB プロパティファイルで次の設定を有効にすると、パーティションキーを指定せずに、すべてのパーティションにわたって Scan
操作 (クロスパーティションスキャン* と呼びます) を実行できます。
scalar.db.cross_partition_scan.enabled=true
非 JDBC データベースの場合、SERIALIZABLE
分離レベルでクロスパーティションスキャンを有効にした場合でも、トランザクションは読み取りコミットスナップショット分離 (SNAPSHOT
) で実行される可能性があります。これは、より低い分離レベルです。非 JDBC データベースを使用する場合は、トランザクションの一貫性が重要でない場合にのみ、クロスパーティションスキャンを使用してください。
ビルダーで partitionKey()
メソッドを呼び出す代わりに、次のように all()
メソッドを呼び出して、パーティションキーを指定せずにテーブルをスキャンできます。
// Create a `Scan` operation without specifying a partition key.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.projections("c1", "c2", "c3", "c4")
.limit(10)
.build();
// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
非 JDBC データベースを使用する場合、クロスパーティション Scan
で順序を指定することはできません。フィルタリングまたは順序付けを指定したクロスパーティション Scan
の使用方法の詳細については、フィルタリングと順序付けを使用してパーティション間の Scan
を実行するを参照してください。
フィルタリングと順序付けを使用してパーティション間の Scan
を実行する
次のようにフィルタリングと順序付けによるクロスパーティションスキャンオプションを有効にすると、柔軟な条件と順序付けでクロスパーティション Scan
操作を実行できます。
scalar.db.cross_partition_scan.enabled=true
scalar.db.cross_partition_scan.filtering.enabled=true
scalar.db.cross_partition_scan.ordering.enabled=true
非 JDBC データベースでは scalar.db.cross_partition_scan.ordering
を有効にすることはできません。
次のように、all()
メソッドを呼び出した後に where()
メソッドと ordering()
メソッドを呼び出して、任意の条件と順序を指定できます。
// Create a `Scan` operation with arbitrary conditions and orderings.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c3"), Scan.Ordering.asc("c4"))
.limit(10)
.build();
// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
WHERE
句の詳細については、WHERE
句を使用するを参照してください。
Put
操作
Put
操作は ScalarDB 3.13以降では非推奨となり、将来のリリースでは削除される予定です。Put
操作の代わりに、Insert
操作、Upsert
操作、または Update
操作を使用してください。
Put
は、主キーで指定されたレコードを配置する操作です。この操作はレコードの upsert 操作として動作し、レコードが存在する場合はレコードを更新し、レコードが存在しない場合はレコードを挿入します。
既存のレコードを更新する場合、Put
操作を使用する前に Get
または Scan
を使用してレコードを読み取る必要があります。そうしないと、競合により操作が失敗します。これは、トランザクションを適切に管理するための ScalarDB の仕様により発生します。レコードを明示的に読み取る代わりに、暗黙的な事前読み取りを有効にできます。詳細については、Put
操作の暗黙的な事前読み取りを有効にするを参照してください。
まず Put
オブジェクトを作成し、次に次のように transaction.put()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Put` operation.
transaction.put(put);
次のように null
値を持つレコードを配置することもできます。
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", null)
.doubleValue("c5", null)
.build();
Put
操作の暗黙的な事前読み取りを有効にする
Consensus Commit では、レコードが存在する場合、レコードの最新の状態を取得するために、アプリケーションは Put
および Delete
操作でレコードを変更する前にレコードを読み取る必要があります。レコードを明示的に読み取る代わりに、暗黙的な事前読み取り を有効にすることができます。暗黙的な事前読み取りを有効にすると、アプリケーションがトランザクションでレコードを明示的に読み取らない場合は、ScalarDB がトランザクションをコミットする前にアプリケーションに代わってレコードを読み取ります。
Put
操作の暗黙的な事前読み取りを有効にするには、次のように Put
操作ビルダーで enableImplicitPreRead()
を指定します。
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.enableImplicitPreRead()
.build();
変更しようとしているレコードが存在しないことが確実な場合は、パフォーマンスを向上させるために、Put
操作の暗黙的な事前読み取りを有効にしないでください。たとえば、初期データをロードする場合は、暗黙的な事前読み取りを有効にしないでください。暗黙的な事前読み取りのない Put
操作は、暗黙的な事前読み取りのある Put
操作よりも高速です。これは、操作によって不要な読み取りがスキップされるためです。
Insert
操作
Insert
は、トランザクションを通じて基礎となるストレージにエントリを挿入する操作です。エントリがすでに存在する場合は、競合エラーが発生します。
まず Insert
オブジェクトを作成し、次に次のように transaction.insert()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Insert` operation.
transaction.insert(insert);
Upsert
操作
Upsert
は、トランザクションを通じて基礎となるストレージにエントリを挿入したり、エントリを更新したりする操作です。エントリがすでに存在する場合は更新され、そうでない場合はエントリが挿入されます。
まず Upsert
オブジェクトを作成し、次に次のように transaction.upsert()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Upsert` operation.
transaction.upsert(upsert);
Update
操作
Update
は、トランザクションを通じて基礎となるストレージ内のエントリを更新する操作です。エントリが存在しない場合、操作によって変更は行われません。
まず Update
オブジェクトを作成し、次に次のように transaction.update()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Update` operation.
transaction.update(update);
Delete
操作
Delete
は、主キーで指定されたレコードを削除する操作です。
レコードを削除する場合、Delete
操作では暗黙的な事前読み取りが常に有効になっているため、事前にレコードを読み取る必要はありません。
まず Delete
オブジェクトを作成し、次に次のように transaction.delete()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();
// Execute the `Delete` operation.
transaction.delete(delete);
条件付きの Put
、Delete
、Update
トランザクション内で条件をチェックするロジックを実装することで、コミット前にトランザクションが満たす必要のある任意の条件 (たとえば、銀行口座の残高が0以上である必要がある) を記述できます。または、Put
、Delete
、Update
などのミューテーション操作で単純な条件を記述することもできます。
Put
、Delete
、Update
操作に条件が含まれている場合、指定された条件が満たされた場合にのみ操作が実行されます。操作の実行時に条件が満たされていない場合は、UnsatisfiedConditionException
という例外がスローされます。
Put
操作で条件を指定する場合は、事前にレコードを読み取るか、暗黙的な事前読み取りを有効にする必要があります。
Put
の条件
Put
操作では、次のように条件を指定できます。
// Build a condition.
MutationCondition condition =
ConditionBuilder.putIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();
// Execute the `Put` operation.
transaction.put(put);
putIf
条件を使用するだけでなく、次のように putIfExists
条件と putIfNotExists
条件を指定することもできます。
// Build a `putIfExists` condition.
MutationCondition putIfExistsCondition = ConditionBuilder.putIfExists();
// Build a `putIfNotExists` condition.
MutationCondition putIfNotExistsCondition = ConditionBuilder.putIfNotExists();
Delete
の条件
Delete
操作では、次のように条件を指定できます。
// Build a condition.
MutationCondition condition =
ConditionBuilder.deleteIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();
Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.condition(condition) // condition
.build();
// Execute the `Delete` operation.
transaction.delete(delete);
deleteIf
条件を使用するだけでなく、次のように deleteIfExists
条件を指定することもできます。
// Build a `deleteIfExists` condition.
MutationCondition deleteIfExistsCondition = ConditionBuilder.deleteIfExists();
Update
の条件
Update
操作では、次のように条件を指定できます。
// Build a condition.
MutationCondition condition =
ConditionBuilder.updateIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();
Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();
// Execute the `Update` operation.
transaction.update(update);
updateIf
条件を使用するだけでなく、次のように updateIfExists
条件を指定することもできます。
// Build a `updateIfExists` condition.
MutationCondition updateIfExistsCondition = ConditionBuilder.updateIfExists();
Mutate 操作
Mutate は、Put
、Insert
、Upsert
、Update
、Delete
の複数の操作を実行する操作です。
まず Mutate オブジェクトを作成し、次に次のように transaction.mutate()
メソッドを使用してオブジェクトを実行する必要があります。
// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);
Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();
// Execute the operations.
transaction.mutate(Arrays.asList(put, delete));
CRUD 操作のデフォルト名前空間
すべての CRUD 操作のデフォルト名前空間は、ScalarDB 設定のプロパティを使用して設定できます。
scalar.db.default_namespace_name=<NAMESPACE_NAME>
名前空間を指定しない操作では、設定されたデフォルトの名前空間が使用されます。
// This operation will target the default namespace.
Scan scanUsingDefaultNamespace =
Scan.newBuilder()
.table("tbl")
.all()
.build();
// This operation will target the "ns" namespace.
Scan scanUsingSpecifiedNamespace =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.build();
オペレーション属性
オペレーション属性は、オペレーションに関する追加情報を保持するためのキーと値のペアです。以下のように、オペレーションビルダーのattribute()
メソッドやattributes()
メソッドを使って設定できます。
// Set operation attributes in the `Get` operation.
Get get = Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
// Set operation attributes in the `Scan` operation.
Scan scan = Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.projections("c1", "c2", "c3", "c4")
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
// Set operation attributes in the `Insert` operation.
Insert insert = Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
// Set operation attributes in the `Upsert` operation.
Upsert upsert = Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
// Set operation attributes in the `Update` operation.
Update update = Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
// Set operation attributes in the `Delete` operation.
Delete delete = Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
ScalarDB では現在、利用可能なオペレーション属性はありません。
トランザクションをコミットする
CRUD 操作を実行した後、トランザクションをコミットして終了する必要があります。
トランザクションは次のようにコミットできます。
// Commit a transaction.
transaction.commit();
トランザクションをロールバックまたはアボートする
トランザクションの実行中にエラーが発生した場合は、トランザクションをロールバックまたはアボートできます。
トランザクションは次のようにロールバックできます。
// Roll back a transaction.
transaction.rollback();
または、次のようにトランザクションをアボートすることもできます。
// Abort a transaction.
transaction.abort();
ScalarDB で例外を処理する方法の詳細については、例外の処理方法を参照してください。
トランザクションを開始せずにトランザクションを実行する
トランザクションを開始せずにトランザクション操作を実行できます。この場合、ScalarDB は操作を実行する前に自動的にトランザクションを開始し、操作の実行後にトランザクションをコミットします。このセクションでは、トランザクションを開始せずにトランザクションを実行する方法について説明します。
Get
操作を実行する
Get
は、主キーで指定された単一のレコードを取得する操作です。
最初に Get
オブジェクトを作成し、次に次のように transactionManager.get()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.build();
// Execute the `Get` operation.
Optional<Result> result = transactionManager.get(get);
Get
操作の詳細については、Get
操作を参照してください。
Scan
操作の実行
Scan
は、パーティション内の複数のレコードを取得する操作です。Scan
操作では、クラスタリングキーの境界とクラスタリングキー列の順序を指定できます。
まず Scan
オブジェクトを作成し、次に次のように transactionManager.scan()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.limit(10)
.build();
// Execute the `Scan` operation.
List<Result> results = transactionManager.scan(scan);
Scan
操作の詳細については、Scan
操作を参照してください。
Put
操作を実行します
Put
操作は ScalarDB 3.13以降では非推奨となり、将来のリリースでは削除される予定です。Put
操作の代わりに、Insert
操作、Upsert
操作、または Update
操作を使用してください。
まず Put
オブジェクトを作成し、次に次のように transactionManager.put()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Put` operation.
transactionManager.put(put);
Put
操作の詳細については、Put
操作を参照してください。
Insert
操作の実行
Insert
は、トランザクションを通じて基礎となるストレージにエントリを挿入する操作です。エントリがすでに存在する場合は、競合エラーが発生します。
まず Insert
オブジェクトを作成し、次に次のように transactionManager.insert()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Insert` operation.
transactionManager.insert(insert);
Insert
操作の詳細については、Insert
操作を参照してください。
Upsert
操作を実行する
Upsert
は、トランザクションを通じて基礎となるストレージにエントリを挿入するか、エントリを更新する操作です。エントリがすでに存在する場合は更新され、そうでない場合はエントリが挿入されます。
まず Upsert
オブジェクトを作成し、次に次のように transactionManager.upsert()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Upsert` operation.
transactionManager.upsert(upsert);
Insert
操作の詳細については、Upsert
操作を参照してください。
Update
操作の実行
Update
は、トランザクションを通じて基礎となるストレージ内のエントリを更新する操作です。エントリが存在しない場合、操作によって変更は行われません。
まず Update
オブジェクトを作成し、次に次のように transactionManager.update()
メソッドを使用してオブジェクトを実行する必要があります。
// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
// Execute the `Update` operation.
transactionManager.update(update);
Update
操作の詳細については、Update
操作を参照してください。
Delete
操作を実行する
Delete
は、主キーで指定されたレコードを削除する操作です。
まず Delete
オブジェクトを作成し、次に次のように transaction.delete()
メソッドを使用してオブジェクトを実行する必要があります。
// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);
Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();
// Execute the `Delete` operation.
transactionManager.delete(delete);
Delete
操作の詳細については、Delete
操作を参照してください。
Mutate 操作の実行
Mutate は、複数のミューテーション (Put
、Insert
、Upsert
、Update
、および Delete
操作) を実行する操作です。
まずミューテーションオブジェクトを作成し、次に次のように transactionManager.mutate()
メソッドを使用してオブジェクトを実行する必要があります。
// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);
Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();
Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);
Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();
// Execute the operations.
transactionManager.mutate(Arrays.asList(put, delete));
Mutate 操作の詳細については、Mutate 操作を参照してください。
また、ScalarDB での例外の処理方法の詳細については、例外の処理方法を参照してください。
例外の処理方法
トランザクションを実行するときは、例外も適切に処理する必要があります。
例外を適切に処理しないと、異常やデータの不整合が発生する可能性があります。
次のサンプルコードは、例外を処理する方法を示しています。
public class Sample {
public static void main(String[] args) throws Exception {
TransactionFactory factory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = factory.getTransactionManager();
int retryCount = 0;
TransactionException lastException = null;
while (true) {
if (retryCount++ > 0) {
// Retry the transaction three times maximum.
if (retryCount >= 3) {
// Throw the last exception if the number of retries exceeds the maximum.
throw lastException;
}
// Sleep 100 milliseconds before retrying the transaction.
TimeUnit.MILLISECONDS.sleep(100);
}
DistributedTransaction transaction = null;
try {
// Begin a transaction.
transaction = transactionManager.begin();
// Execute CRUD operations in the transaction.
Optional<Result> result = transaction.get(...);
List<Result> results = transaction.scan(...);
transaction.put(...);
transaction.delete(...);
// Commit the transaction.
transaction.commit();
} catch (UnsatisfiedConditionException e) {
// You need to handle `UnsatisfiedConditionException` only if a mutation operation specifies a condition.
// This exception indicates the condition for the mutation operation is not met.
try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. Since the transaction should eventually recover,
// you don't need to do anything further. You can simply log the occurrence here.
}
// You can handle the exception here, according to your application requirements.
return;
} catch (UnknownTransactionStatusException e) {
// If you catch `UnknownTransactionStatusException` when committing the transaction,
// it indicates that the status of the transaction, whether it was successful or not, is unknown.
// In such a case, you need to check if the transaction is committed successfully or not and
// retry the transaction if it failed. How to identify a transaction status is delegated to users.
return;
} catch (TransactionException e) {
// For other exceptions, you can try retrying the transaction.
// For `CrudConflictException`, `CommitConflictException`, and `TransactionNotFoundException`,
// you can basically retry the transaction. However, for the other exceptions, the transaction
// will still fail if the cause of the exception is non-transient. In such a case, you will
// exhaust the number of retries and throw the last exception.
if (transaction != null) {
try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. The transaction should eventually recover,
// so you don't need to do anything further. You can simply log the occurrence here.
}
}
lastException = e;
}
}
}
}
TransactionException
および TransactionNotFoundException
begin()
API は TransactionException
または TransactionNotFoundException
をスローする可能性があります:
TransactionException
をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクションを開始できなかったことを示します。トランザクションを再試行することはできますが、非一時的障害が原因でトランザクションを開始できない可能性があります。TransactionNotFoundException
をキャッチした場合、この例外は、一時的障害が原因でトランザクションを開始できなかったことを示します。この場合、トランザクションを再試行できます。
join()
API も TransactionNotFoundException
をスローする可能性があります。この例外は、begin()
API の例外を処理するのと同じ方法で処理できます。
CrudException
および CrudConflictException
CRUD 操作の API (get()
、scan()
、put()
、delete()
、および mutate()
) は、CrudException
または CrudConflictException
をスローする可能性があります:
CrudException
をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクション CRUD 操作が失敗したことを示します。トランザクションを最初から再試行できますが、原因が非一時的である場合は、トランザクションが失敗する可能性があります。CrudConflictException
をキャッチした場合、この例外は、一時的な障害 (競合エラーなど) が原因でトランザクション CRUD 操作が失敗したことを示します。この場合、トランザクションを最初から再試行できます。
UnsatisfiedConditionException
ミューテーション操作の API (put()
、delete()
、および mutate()
) も UnsatisfiedConditionException
をスローする可能性があります。
UnsatisfiedConditionException
をキャッチした場合、この例外はミューテーション操作の条件が満たされていないことを示します。この例外は、アプリケーションの要件に従って処理できます。
CommitException
、CommitConflictException
、および UnknownTransactionStatusException
commit()
API は CommitException
、CommitConflictException
、または UnknownTransactionStatusException
をスローする可能性があります。
CommitException
をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクションのコミットが失敗したことを示します。トランザクションを最初から再試行できますが、原因が非一時的である場合はトランザクションが失敗する可能性があります。CommitConflictException
をキャッチした場合、この例外は、一時的な障害 (競合エラーなど) が原因でトランザクションのコミットが失敗したことを示します。この場合、トランザクションを最初から再試行できます。UnknownTransactionStatusException
をキャッチした場合、この例外は、トランザクションのステータス (成功したかどうか) が不明であることを示します。この場合、トランザクションが正常にコミットされたかどうかを確認し、失敗した場合はトランザクションを再試行する必要があります。
トランザクションステータスを識別する方法は、ユーザーに委任されています。トランザクションステータステーブルを作成し、他のアプリケーションデータを使用してトランザクション的に更新して、ステータステーブルからトランザクションのステータスを取得できるようにすることができます。
一部の例外に関する注意
サンプルコードには示されていませんが、resume()
API は TransactionNotFoundException
をスローする可能性もあります。この例外は、指定された ID に関連付けられたトランザクションが見つからなかったか、トランザクションの有効期限が切れた可能性があることを示します。いずれの場合も、この例外の原因は基本的に一時的なものであるため、トランザクションを最初から再試行できます。
サンプルコードでは、UnknownTransactionStatusException
の場合、重複操作の可能性を回避するためにアプリケーションがトランザクションが成功したかどうかを確認する必要があるため、トランザクションは再試行されません。その他の例外の場合、例外の原因が一時的または非一時的であるため、トランザクションは再試行されます。例外の原因が一時的な場合、再試行するとトランザクションが成功する可能性があります。ただし、例外の原因が非一時的である場合、再試行してもトランザクションは失敗します。このような場合、再試行回数を使い果たします。
サンプルコードでは、トランザクションは最大3回再試行され、再試行される前に100ミリ秒間スリープします。ただし、アプリケーションの要件に応じて、指数バックオフなどの再試行ポリシーを選択できます。
Coordinator テーブルのグループコミット
Consensus Commit トランザクションに使用される Coordinator テーブルは重要なデータストアであり、堅牢なストレージを使用することをお勧めします。ただし、内部でマルチ AZ またはマルチリージョンレプリケーションを活用するなど、より堅牢なストレージオプションを使用すると、ストレージにレコードを書き込むときにレイテンシが増加し、スループットパフォーマンスが低下する可能性があります。
ScalarDB は、Coordinator テーブルにグループコミット機能を提供します。この機能は、複数のレコードの書き込みを1つの書き込み操作にグループ化し、書き込みスループットを向上させます。この場合、基盤となるデータベースとワークロードに応じて、レイテンシが増加または減少する可能性があります。
グループコミット機能を有効にするには、次の設定を追加します。
# By default, this configuration is set to `false`.
scalar.db.consensus_commit.coordinator.group_commit.enabled=true
# These properties are for tuning the performance of the group commit feature.
# scalar.db.consensus_commit.coordinator.group_commit.group_size_fix_timeout_millis=40
# scalar.db.consensus_commit.coordinator.group_commit.delayed_slot_move_timeout_millis=800
# scalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=30000
# scalar.db.consensus_commit.coordinator.group_commit.timeout_check_interval_millis=10
# scalar.db.consensus_commit.coordinator.group_commit.metrics_monitor_log_enabled=true
制限事項
このセクションでは、グループコミット機能の制限事項について説明します。
ユーザーが渡したカスタムトランザクション ID
グループコミット機能は、内部値を暗黙的に生成し、それをトランザクション ID の一部として使用します。したがって、ユーザーが com.scalar.db.transaction.consensuscommit.ConsensusCommitManager.begin(String txId)
または com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitManager.begin(String txId)
を介して手動で渡したカスタムトランザクション ID は、その後の API 呼び出しでそのまま使用することはできません。代わりに、com.scalar.db.transaction.consensuscommit.ConsensusCommit.getId()
または com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommit.getId()
から返されたトランザクション ID を使用する必要があります。
// This custom transaction ID needs to be used for ScalarDB transactions.
String myTxId = UUID.randomUUID().toString();
...
DistributedTransaction transaction = manager.begin(myTxId);
...
// When the group commit feature is enabled, a custom transaction ID passed by users can't be used as is.
// logger.info("The transaction state: {}", manager.getState(myTxId));
logger.info("The transaction state: {}", manager.getState(transaction.getId()));
2フェーズコミットインターフェースでの使用の禁止
グループコミット機能は、進行中のすべてのトランザクションをメモリ内で管理します。この機能が2フェーズコミットインターフェースで有効になっている場合、Coordinator テーブルへの参加者サービスの一貫性のない書き込みによって生じる競合 (グループ間で異なるトランザクション分散が含まれる場合があります) を防ぐために、情報は Coordinator サービスによってのみ維持される必要があります。
この制限により、アプリケーション開発に関連する複雑さと柔軟性が損なわれます。したがって、グループコミット機能と2フェーズコミットインターフェースを組み合わせて使用することは現在禁止されています。
Consensus Commit トランザクションマネージャーエラーの調査
Consensus Commit トランザクションマネージャーの使用時にエラーを調査するには、トランザクションメタデータ列が追加されたテーブルメタデータを返す設定を有効にできます。これは、トランザクション関連の問題を調査するときに役立ちます。この設定は、Consensus Commit トランザクションマネージャーのトラブルシューティング時にのみ使用可能で、DistributedTransactionAdmin.getTableMetadata()
メソッドを使用して、特定のテーブルのトランザクションメタデータ列の詳細を表示できます。
次の設定を追加すると、Get
および Scan
操作の結果に トランザクションメタデータが含まれます。
# By default, this configuration is set to `false`.
scalar.db.consensus_commit.include_metadata.enabled=true