メインコンテンツまでスキップ
バージョン: 3.16

ScalarDB Java API ガイド

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

ScalarDB Java API は、主に管理 API とトランザクション API で設定されています。このガイドでは、どのような種類の API が存在するのか、それらの使用方法、例外の処理方法などの関連トピックについて簡単に説明します。

Administrative API

このセクションでは、ScalarDB の管理 API を使用してプログラムで管理操作を実行する方法について説明します。

警告

管理 API の呼び出しが基盤データベースに書き込みを行う場合、複数の書き込み操作がトリガーされます。ただし、これらの操作はアトミックに実行されないため、呼び出しが途中で失敗した場合、一貫性のない状態に遭遇する可能性があります。この非一貫性の問題を解決するには、テーブルを修復してください。詳細については、次のページを参照してください。

注記

管理操作を実行する別の方法は、Schema Loader を使用することです。

DistributedTransactionAdmin インスタンスを取得する

管理操作を実行するには、まず DistributedTransactionAdmin インスタンスを取得する必要があります。

DistributedTransactionAdmin インスタンスを取得するには、次のように TransactionFactory を使用します。

TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin();

設定の詳細については、ScalarDB 設定を参照してください。

すべての管理操作を実行したら、次のように DistributedTransactionAdmin インスタンスを閉じる必要があります。

admin.close();

名前空間を作成する

テーブルは1つの名前空間に属するため、テーブルを作成する前に名前空間を作成する必要があります。

名前空間は次のように作成できます。

// Create the namespace "ns". If the namespace already exists, an exception will be thrown.
admin.createNamespace("ns");

// Create the namespace only if it does not already exist.
boolean ifNotExists = true;
admin.createNamespace("ns", ifNotExists);

// Create the namespace with options.
Map<String, String> options = ...;
admin.createNamespace("ns", options);

作成オプション

名前空間の作成やテーブルの作成などの作成操作では、オプション名と値のマップであるオプション (Map<String, String>) を指定できます。オプションを使用すると、ストレージアダプタ固有の設定ができます。

データベースを選択して、使用可能なオプションを確認します。

JDBC データベースには使用できるオプションはありません。

テーブルを作成する

テーブルを作成するときは、テーブルメタデータを定義してからテーブルを作成する必要があります。

テーブルメタデータを定義するには、TableMetadata を使用できます。次の図は、テーブルの列、パーティションキー、クラスタリングキー (クラスタリング順序を含む)、およびセカンダリインデックスを定義する方法を示しています。

// Define the table metadata.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
.addColumn("c1", DataType.INT)
.addColumn("c2", DataType.TEXT)
.addColumn("c3", DataType.BIGINT)
.addColumn("c4", DataType.FLOAT)
.addColumn("c5", DataType.DOUBLE)
.addPartitionKey("c1")
.addClusteringKey("c2", Scan.Ordering.Order.DESC)
.addClusteringKey("c3", Scan.Ordering.Order.ASC)
.addSecondaryIndex("c4")
.build();

ScalarDB のデータモデルの詳細については、データモデルを参照してください。

次に、次のようにテーブルを作成します。

// Create the table "ns.tbl". If the table already exists, an exception will be thrown.
admin.createTable("ns", "tbl", tableMetadata);

// Create the table only if it does not already exist.
boolean ifNotExists = true;
admin.createTable("ns", "tbl", tableMetadata, ifNotExists);

// Create the table with options.
Map<String, String> options = ...;
admin.createTable("ns", "tbl", tableMetadata, options);

セカンダリインデックスを作成する

セカンダリインデックスは次のように作成できます。

// Create a secondary index on column "c5" for table "ns.tbl". If a secondary index already exists, an exception will be thrown.
admin.createIndex("ns", "tbl", "c5");

// Create the secondary index only if it does not already exist.
boolean ifNotExists = true;
admin.createIndex("ns", "tbl", "c5", ifNotExists);

// Create the secondary index with options.
Map<String, String> options = ...;
admin.createIndex("ns", "tbl", "c5", options);

テーブルに新しい列を追加する

次のように、テーブルに新しい非パーティションキー列を追加できます。

// Add a new column "c6" with the INT data type to the table "ns.tbl".
admin.addNewColumnToTable("ns", "tbl", "c6", DataType.INT)
警告

テーブルに新しい列を追加する場合は、基盤となるストレージによって実行時間が大きく異なる可能性があるため、慎重に検討する必要があります。特にデータベースが本番環境で実行されている場合は、以下の点を考慮して適切に計画を立ててください。

  • Cosmos DB for NoSQL および DynamoDB の場合: テーブルスキーマは変更されないため、列の追加はほぼ瞬時に行われます。別のテーブルに格納されているテーブルメタデータのみが更新されます。
  • Cassandra の場合: 列を追加すると、スキーマメタデータのみが更新され、既存のスキーマレコードは変更されません。クラスタートポロジが実行時間の主な要因です。スキーマメタデータの変更は、ゴシッププロトコルを介して各クラスターノードに共有されます。このため、クラスターが大きいほど、すべてのノードが更新されるまでの時間が長くなります。
  • リレーショナルデータベース (MySQL、Oracle など) の場合: 列の追加は実行にそれほど時間がかかりません。

テーブルを切り捨てる

テーブルを切り捨てるには、次のようにします。

// Truncate the table "ns.tbl".
admin.truncateTable("ns", "tbl");

セカンダリインデックスを削除する

セカンダリインデックスは次のように削除できます。

// Drop the secondary index on column "c5" from table "ns.tbl". If the secondary index does not exist, an exception will be thrown.
admin.dropIndex("ns", "tbl", "c5");

// Drop the secondary index only if it exists.
boolean ifExists = true;
admin.dropIndex("ns", "tbl", "c5", ifExists);

テーブルを削除する

テーブルを削除するには、次のようにします。

// Drop the table "ns.tbl". If the table does not exist, an exception will be thrown.
admin.dropTable("ns", "tbl");

// Drop the table only if it exists.
boolean ifExists = true;
admin.dropTable("ns", "tbl", ifExists);

名前空間を削除する

名前空間を削除するには、次のようにします。

// Drop the namespace "ns". If the namespace does not exist, an exception will be thrown.
admin.dropNamespace("ns");

// Drop the namespace only if it exists.
boolean ifExists = true;
admin.dropNamespace("ns", ifExists);

既存の名前空間を取得する

既存の名前空間は次のように取得できます。

Set<String> namespaces = admin.getNamespaceNames();

名前空間のテーブルを取得する

名前空間のテーブルは次のように取得できます。

// Get the tables of the namespace "ns".
Set<String> tables = admin.getNamespaceTableNames("ns");

テーブルメタデータを取得する

テーブルメタデータは次のように取得できます。

// Get the table metadata for "ns.tbl".
TableMetadata tableMetadata = admin.getTableMetadata("ns", "tbl");

名前空間を修復する

名前空間が不明な状態の場合 (たとえば、名前空間が基盤となるストレージに存在するが ScalarDB メタデータが存在しない、またはその逆)、このメソッドは必要に応じて名前空間とそのメタデータを再作成します。

名前空間は次のように修復できます。

// Repair the namespace "ns" with options.
Map<String, String> options = ...;
admin.repairNamespace("ns", options);

テーブルを修復する

テーブルが不明な状態の場合 (テーブルは基盤となるストレージに存在するが ScalarDB メタデータは存在しない、またはその逆)、このメソッドは必要に応じてテーブル、そのセカンダリインデックス、およびそのメタデータを再作成します。

テーブルは次のように修復できます。

// Repair the table "ns.tbl" with options.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
...
.build();
Map<String, String> options = ...;
admin.repairTable("ns", "tbl", tableMetadata, options);

最新の ScalarDB API をサポートするように環境をアップグレードする

ScalarDB API の最新バージョンをサポートするように ScalarDB 環境をアップグレードできます。通常、リリースノートに記載されているように、アプリケーション環境が使用する ScalarDB バージョンを更新した後、このメソッドを実行する必要があります。

// Upgrade the ScalarDB environment.
Map<String, String> options = ...;
admin.upgrade(options);

Coordinator テーブルの操作を指定する

Coordinator テーブルは、Transactional API によってトランザクションのステータスを追跡するために使用されます。

トランザクションマネージャーを使用する場合は、トランザクションを実行するために Coordinator テーブルを作成する必要があります。テーブルの作成に加えて、Coordinator テーブルを切り捨てたり削除したりすることもできます。

Coordinator テーブルを作成する

Coordinator テーブルは次のように作成できます。

// Create the Coordinator table.
admin.createCoordinatorTables();

// Create the Coordinator table only if one does not already exist.
boolean ifNotExist = true;
admin.createCoordinatorTables(ifNotExist);

// Create the Coordinator table with options.
Map<String, String> options = ...;
admin.createCoordinatorTables(options);

Coordinator テーブルを切り捨てる

Coordinator テーブルは次のように切り捨てることができます。

// Truncate the Coordinator table.
admin.truncateCoordinatorTables();

Coordinator テーブルを削除する

Coordinator テーブルは次のように削除できます。

// Drop the Coordinator table.
admin.dropCoordinatorTables();

// Drop the Coordinator table if one exist.
boolean ifExist = true;
admin.dropCoordinatorTables(ifExist);

テーブルをインポートする

既存のテーブルを ScalarDB にインポートするには、次のようにします。

// Import the table "ns.tbl". If the table is already managed by ScalarDB, the target table does not
// exist, or the table does not meet the requirements of the ScalarDB table, an exception will be thrown.
admin.importTable("ns", "tbl", options, overrideColumnsType);
警告

運用環境で ScalarDB にテーブルをインポートする場合は、データベーステーブルと ScalarDB メタデータテーブルにトランザクションメタデータ列が追加されるため、慎重に計画する必要があります。この場合、データベースと ScalarDB の間にはいくつかの違いがあり、いくつかの制限もあります。詳細については、ScalarDB Schema Loader を使用して既存のテーブルを ScalarDB にインポートするを参照してください。

Transactional API

このセクションでは、ScalarDB の Transactional API を使用してトランザクション操作を実行する方法について説明します。

DistributedTransactionManager インスタンスを取得する

トランザクション操作を実行するには、まず DistributedTransactionManager インスタンスを取得する必要があります。

DistributedTransactionManager インスタンスを取得するには、次のように TransactionFactory を使用します。

TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = transactionFactory.getTransactionManager();

すべてのトランザクション操作を実行した後、次のように DistributedTransactionManager インスタンスを閉じる必要があります。

transactionManager.close();

トランザクションを実行する

このサブセクションでは、複数の CRUD 操作でトランザクションを実行する方法について説明します。

トランザクションを開始する

トランザクション CRUD 操作を実行する前に、トランザクションを開始する必要があります。

トランザクションは次のように開始できます。

// Begin a transaction.
DistributedTransaction transaction = transactionManager.begin();

または、次のようにトランザクションを開始することもできます。

// Start a transaction.
DistributedTransaction transaction = transactionManager.start();

あるいは、次のようにトランザクション ID を指定して、トランザクションに begin メソッドを使用することもできます。

// Begin a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.begin("<TRANSACTION_ID>");

または、次のようにトランザクション ID を指定して、トランザクションに start メソッドを使用することもできます。

// Start a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.start("<TRANSACTION_ID>");
注記

トランザクション ID を指定すると、外部システムを ScalarDB にリンクする場合に便利です。それ以外の場合は、begin() メソッドまたは start() メソッドを使用する必要があります。

トランザクション ID を指定する場合は、ScalarDB の正確性はトランザクション ID の一意性に依存するため、システム全体で一意の ID (UUID v4など) を指定してください。

読み取り専用モードでトランザクションを開始またはスタートする

読み取り専用モードでトランザクションを開始またはスタートすることもできます。この場合、トランザクションでは書き込み操作は許可されず、読み取り操作に最適化されます。

注記

パフォーマンスを向上させ、リソース使用量を削減するために、読み取り専用操作には読み取り専用トランザクションを使用することを強く推奨します。

次のように読み取り専用モードでトランザクションを開始またはスタートできます。

// Begin a transaction in read-only mode.
DistributedTransaction transaction = transactionManager.beginReadOnly();
// Start a transaction in read-only mode.
DistributedTransaction transaction = transactionManager.startReadOnly();

または、次のようにトランザクション ID を指定して、beginReadOnly メソッドと startReadOnly メソッドを使用することもできます。

// Begin a transaction in read-only mode by specifying a transaction ID.
DistributedTransaction transaction = transactionManager.beginReadOnly("<TRANSACTION_ID>");
// Start a transaction in read-only mode by specifying a transaction ID.
DistributedTransaction transaction = transactionManager.startReadOnly("<TRANSACTION_ID>");
注記

トランザクション ID を指定する方法は、外部システムを ScalarDB にリンクしたい場合に利用できます。それ以外の場合は、beginReadOnly() メソッドまたは startReadOnly() メソッドを使用する必要があります。

トランザクション ID を指定する場合は、ScalarDB の正確性はトランザクション ID の一意性に依存するため、システム全体で一意の ID (UUID v4など) を指定してください。

トランザクションに参加する

トランザクションに参加することは、トランザクションが複数のクライアントリクエストにまたがるステートフルアプリケーションで特に便利です。このようなシナリオでは、アプリケーションは最初のクライアントリクエスト中にトランザクションを開始できます。その後、後続のクライアントリクエストで、アプリケーションは join() メソッドを使用して進行中のトランザクションに参加できます。

次のようにトランザクション ID を指定すると、すでに開始されている進行中のトランザクションに参加できます。

// Join a transaction.
DistributedTransaction transaction = transactionManager.join("<TRANSACTION_ID>");
注記

getId() を使用してトランザクション ID を取得するには、次のように指定します。

tx.getId();

トランザクションを再開する

トランザクションの再開は、トランザクションが複数のクライアントリクエストにまたがるステートフルアプリケーションで特に役立ちます。このようなシナリオでは、アプリケーションは最初のクライアントリクエスト中にトランザクションを開始できます。その後、後続のクライアントリクエストで、アプリケーションは resume() メソッドを使用して進行中のトランザクションを再開できます。

次のようにトランザクション ID を指定すると、すでに開始した進行中のトランザクションを再開できます。

// Resume a transaction.
DistributedTransaction transaction = transactionManager.resume("<TRANSACTION_ID>");
注記

getId() を使用してトランザクション ID を取得するには、次のように指定します。

tx.getId();

CRUD 操作を実装する

次のセクションでは、キーの構築と CRUD 操作について説明します。

注記

CRUD 操作のすべてのビルダーは consistency() メソッドを使用して一貫性を指定できますが、これらのメソッドは無視されます。代わりに、トランザクションでは常に LINEARIZABLE 一貫性レベルが使用されます。

キーの構築

ほとんどの CRUD 操作では、Key オブジェクト (パーティションキー、クラスタリングキーなど) を指定する必要があります。そのため、CRUD 操作に進む前に、Key オブジェクトの構築方法を次に説明します。

単一の列キーの場合、次のように Key.of<TYPE_NAME>() メソッドを使用してキーを構築できます。

// For a key that consists of a single column of INT.
Key key1 = Key.ofInt("col1", 1);

// For a key that consists of a single column of BIGINT.
Key key2 = Key.ofBigInt("col1", 100L);

// For a key that consists of a single column of DOUBLE.
Key key3 = Key.ofDouble("col1", 1.3d);

// For a key that consists of a single column of TEXT.
Key key4 = Key.ofText("col1", "value");

2~5列で設定されるキーの場合は、Key.of() メソッドを使用して次のようにキーを構築できます。Guava の ImmutableMap.of() と同様に、列名と値を順番に指定する必要があります。

// For a key that consists of two to five columns.
Key key1 = Key.of("col1", 1, "col2", 100L);
Key key2 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d);
Key key3 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value");
Key key4 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value", "col5", false);

5列を超えるキーの場合、ビルダーを使用して次のようにキーを構築できます。

// For a key that consists of more than five columns.
Key key = Key.newBuilder()
.addInt("col1", 1)
.addBigInt("col2", 100L)
.addDouble("col3", 1.3d)
.addText("col4", "value")
.addBoolean("col5", false)
.addInt("col6", 100)
.build();
Get 操作

Get は、主キーで指定された単一のレコードを取得する操作です。

まず Get オブジェクトを作成し、次に次のように transaction.get() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();

// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);

射影を指定して、返される列を選択できます。

WHERE 句を使用する

where() メソッドを使用して任意の条件を指定することもできます。取得したレコードが where() メソッドで指定された条件と一致しない場合は、Option.empty() が返されます。where() メソッドの引数として、条件、AND 条件セット、または OR 条件セットを指定できます。where() メソッドを呼び出した後、次のように and() メソッドまたは or() メソッドを使用して、さらに条件または条件セットを追加できます。

// Create a `Get` operation with condition sets.
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.build();
注記

where() 条件メソッドチェーンでは、条件は上記の例のように ConditionalExpression または OrConditionSet の AND 結合 (結合正規形と呼ばれる)、または ConditionalExpression または AndConditionSet の OR 結合 (分離正規形と呼ばれる) である必要があります。

使用可能な条件と条件セットの詳細については、Javadoc の ConditionBuilder および ConditionSetBuilder ページを参照してください。

Result オブジェクトの処理

Get 操作と Scan 操作は Result オブジェクトを返します。次に、Result オブジェクトの処理方法を示します。

次のように get<TYPE_NAME>("<COLUMN_NAME>") メソッドを使用して、結果の列値を取得できます。

// Get the BOOLEAN value of a column.
boolean booleanValue = result.getBoolean("<COLUMN_NAME>");

// Get the INT value of a column.
int intValue = result.getInt("<COLUMN_NAME>");

// Get the BIGINT value of a column.
long bigIntValue = result.getBigInt("<COLUMN_NAME>");

// Get the FLOAT value of a column.
float floatValue = result.getFloat("<COLUMN_NAME>");

// Get the DOUBLE value of a column.
double doubleValue = result.getDouble("<COLUMN_NAME>");

// Get the TEXT value of a column.
String textValue = result.getText("<COLUMN_NAME>");

// Get the BLOB value of a column as a `ByteBuffer`.
ByteBuffer blobValue = result.getBlob("<COLUMN_NAME>");

// Get the BLOB value of a column as a `byte` array.
byte[] blobValueAsBytes = result.getBlobAsBytes("<COLUMN_NAME>");

// Get the DATE value of a column as a `LocalDate`.
LocalDate dateValue = result.getDate("<COLUMN_NAME>");

// Get the TIME value of a column as a `LocalTime`.
LocalTime timeValue = result.getTime("<COLUMN_NAME>");

// Get the TIMESTAMP value of a column as a `LocalDateTime`.
LocalDateTime timestampValue = result.getTimestamp("<COLUMN_NAME>");

// Get the TIMESTAMPTZ value of a column as a `Instant`.
Instant timestampTZValue = result.getTimestampTZ("<COLUMN_NAME>");

列の値が null かどうかを確認する必要がある場合は、isNull("<COLUMN_NAME>") メソッドを使用できます。

// Check if a value of a column is null.
boolean isNull = result.isNull("<COLUMN_NAME>");

詳細については、Javadoc の Result ページを参照してください。

セカンダリインデックスを使用して Get を実行する

セカンダリインデックスを使用して Get 操作を実行できます。

パーティションキーを指定する代わりに、次のようにインデックスキー (インデックス付き列) を指定してセカンダリインデックスを使用できます。

// Create a `Get` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();

// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);

where() メソッドを使用して任意の条件を指定することもできます。詳細については、WHERE 句を使用するを参照してください。

注記

結果に複数のレコードがある場合、transaction.get() は例外をスローします。複数の結果を処理する場合は、セカンダリインデックスを使用して Scan を実行するを参照してください。

Scan 操作

Scan は、パーティション内の複数のレコードを取得する操作です。Scan 操作では、クラスタリングキーの境界とクラスタリングキー列の順序を指定できます。Scan 操作を実行するには、transaction.scan() メソッドまたは transaction.getScanner() メソッドを使用できます。

  • transaction.scan():
    • このメソッドは、指定された Scan 操作を即座に実行し、一致するすべてのレコードのリストを返します。結果セットがメモリに収まる程度の小さい場合に適しています。
  • transaction.getScanner():
    • このメソッドは、結果セットを遅延的にイテレートできる Scanner オブジェクトを返します。結果セットが大きくなる可能性がある場合に有用で、すべてのレコードを一度にメモリに読み込むことを避けられます。

まず Scan オブジェクトを作成し、次に transaction.scan() メソッドまたは transaction.getScanner() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();

// Execute the `Scan` operation by using the `transaction.scan()` method.
List<Result> results = transaction.scan(scan);

// Or, execute the `Scan` operation by using the `transaction.getScanner()` method.
try (TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan)) {
// Fetch the next result from the scanner
Optional<Result> result = scanner.one();

// Fetch all remaining results from the scanner
List<Result> allResults = scanner.all();
}

クラスタリングキー境界を省略するか、start 境界または end 境界のいずれかを指定できます。orderings を指定しない場合は、テーブルの作成時に定義したクラスタリング順序で結果が並べられます。

さらに、projections を指定して返される列を選択し、limit を使用して Scan 操作で返されるレコードの数を指定できます。

WHERE 句を使用する

where() メソッドを使用してスキャンされたレコードをフィルタリングすることで、任意の条件を指定することもできます。where() メソッドの引数として、条件、AND 条件セット、または OR 条件セットを指定できます。where() メソッドを呼び出した後、次のように and() メソッドまたは or() メソッドを使用して、さらに条件または条件セットを追加できます。

// Create a `Scan` operation with condition sets.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.limit(10)
.build();
注記

where() 条件メソッドチェーンでは、条件は上記の例のように ConditionalExpression または OrConditionSet の AND 結合 (結合正規形と呼ばれる)、または ConditionalExpression または AndConditionSet の OR 結合 (分離正規形と呼ばれる) である必要があります。

使用可能な条件と条件セットの詳細については、Javadoc の ConditionBuilder および ConditionSetBuilder ページを参照してください。

セカンダリインデックスを使用して Scan を実行する

セカンダリインデックスを使用して、Scan 操作を実行できます。

パーティションキーを指定する代わりに、次のようにインデックスキー (インデックス付き列) を指定してセカンダリインデックスを使用できます。

// Create a `Scan` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);

where() メソッドを使用して任意の条件を指定することもできます。詳細については、WHERE 句を使用するを参照してください。

注記

セカンダリインデックスを使用して、Scan でクラスタリングキーの境界と順序を指定することはできません。

パーティションキーを指定せずにクロスパーティション Scan を実行し、テーブルのすべてのレコードを取得します

ScalarDB プロパティファイルで次の設定を有効にすると、パーティションキーを指定せずに、すべてのパーティションにわたって Scan 操作 (クロスパーティションスキャン* と呼びます) を実行できます。

scalar.db.cross_partition_scan.enabled=true
警告

非 JDBC データベースの場合、SERIALIZABLE 分離レベルでクロスパーティションスキャンを有効にした場合でも、トランザクションは読み取りコミットスナップショット分離 (SNAPSHOT) で実行される可能性があります。これは、より低い分離レベルです。非 JDBC データベースを使用する場合は、トランザクションの一貫性が重要でない場合にのみ、クロスパーティションスキャンを使用してください。

ビルダーで partitionKey() メソッドを呼び出す代わりに、次のように all() メソッドを呼び出して、パーティションキーを指定せずにテーブルをスキャンできます。

// Create a `Scan` operation without specifying a partition key.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.projections("c1", "c2", "c3", "c4")
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
注記

非 JDBC データベースを使用する場合、クロスパーティション Scan で順序を指定することはできません。フィルタリングまたは順序付けを指定したクロスパーティション Scan の使用方法の詳細については、フィルタリングと順序付けを使用してパーティション間の Scan を実行するを参照してください。

フィルタリングと順序付けを使用してパーティション間の Scan を実行する

次のようにフィルタリングと順序付けによるクロスパーティションスキャンオプションを有効にすると、柔軟な条件と順序付けでクロスパーティション Scan 操作を実行できます。

scalar.db.cross_partition_scan.enabled=true
scalar.db.cross_partition_scan.filtering.enabled=true
scalar.db.cross_partition_scan.ordering.enabled=true
注記

非 JDBC データベースでは scalar.db.cross_partition_scan.ordering を有効にすることはできません。

次のように、all() メソッドを呼び出した後に where() メソッドと ordering() メソッドを呼び出して、任意の条件と順序を指定できます。

// Create a `Scan` operation with arbitrary conditions and orderings.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c3"), Scan.Ordering.asc("c4"))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);

WHERE 句の詳細については、WHERE 句を使用するを参照してください。

Put 操作
注記

Put 操作は ScalarDB 3.13以降では非推奨となり、将来のリリースでは削除される予定です。Put 操作の代わりに、Insert 操作、Upsert 操作、または Update 操作を使用してください。

Put は、主キーで指定されたレコードを配置する操作です。この操作はレコードの upsert 操作として動作し、レコードが存在する場合はレコードを更新し、レコードが存在しない場合はレコードを挿入します。

注記

既存のレコードを更新する場合、Put 操作を使用する前に Get または Scan を使用してレコードを読み取る必要があります。そうしないと、競合により操作が失敗します。これは、トランザクションを適切に管理するための ScalarDB の仕様により発生します。レコードを明示的に読み取る代わりに、暗黙的な事前読み取りを有効にできます。詳細については、Put 操作の暗黙的な事前読み取りを有効にするを参照してください。

まず Put オブジェクトを作成し、次に次のように transaction.put() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Put` operation.
transaction.put(put);

次のように null 値を持つレコードを配置することもできます。

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", null)
.doubleValue("c5", null)
.build();
Put 操作の暗黙的な事前読み取りを有効にする

Consensus Commit では、レコードが存在する場合、レコードの最新の状態を取得するために、アプリケーションは Put および Delete 操作でレコードを変更する前にレコードを読み取る必要があります。レコードを明示的に読み取る代わりに、暗黙的な事前読み取り を有効にすることができます。暗黙的な事前読み取りを有効にすると、アプリケーションがトランザクションでレコードを明示的に読み取らない場合は、ScalarDB がトランザクションをコミットする前にアプリケーションに代わってレコードを読み取ります。

Put 操作の暗黙的な事前読み取りを有効にするには、次のように Put 操作ビルダーで enableImplicitPreRead() を指定します。

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.enableImplicitPreRead()
.build();
注記

変更しようとしているレコードが存在しないことが確実な場合は、パフォーマンスを向上させるために、Put 操作の暗黙的な事前読み取りを有効にしないでください。たとえば、初期データをロードする場合は、暗黙的な事前読み取りを有効にしないでください。暗黙的な事前読み取りのない Put 操作は、暗黙的な事前読み取りのある Put 操作よりも高速です。これは、操作によって不要な読み取りがスキップされるためです。

Insert 操作

Insert は、トランザクションを通じて基礎となるストレージにエントリを挿入する操作です。エントリがすでに存在する場合は、競合エラーが発生します。

まず Insert オブジェクトを作成し、次に次のように transaction.insert() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Insert` operation.
transaction.insert(insert);
Upsert 操作

Upsert は、トランザクションを通じて基礎となるストレージにエントリを挿入したり、エントリを更新したりする操作です。エントリがすでに存在する場合は更新され、そうでない場合はエントリが挿入されます。

まず Upsert オブジェクトを作成し、次に次のように transaction.upsert() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Upsert` operation.
transaction.upsert(upsert);
Update 操作

Update は、トランザクションを通じて基礎となるストレージ内のエントリを更新する操作です。エントリが存在しない場合、操作によって変更は行われません。

まず Update オブジェクトを作成し、次に次のように transaction.update() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Update` operation.
transaction.update(update);
Delete 操作

Delete は、主キーで指定されたレコードを削除する操作です。

注記

レコードを削除する場合、Delete 操作では暗黙的な事前読み取りが常に有効になっているため、事前にレコードを読み取る必要はありません。

まず Delete オブジェクトを作成し、次に次のように transaction.delete() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();

// Execute the `Delete` operation.
transaction.delete(delete);
条件付きの PutDeleteUpdate

トランザクション内で条件をチェックするロジックを実装することで、コミット前にトランザクションが満たす必要のある任意の条件 (たとえば、銀行口座の残高が0以上である必要がある) を記述できます。または、PutDeleteUpdate などのミューテーション操作で単純な条件を記述することもできます。

PutDeleteUpdate 操作に条件が含まれている場合、指定された条件が満たされた場合にのみ操作が実行されます。操作の実行時に条件が満たされていない場合は、UnsatisfiedConditionException という例外がスローされます。

注記

Put 操作で条件を指定する場合は、事前にレコードを読み取るか、暗黙的な事前読み取りを有効にする必要があります。

Put の条件

Put 操作では、次のように条件を指定できます。

// Build a condition.
MutationCondition condition =
ConditionBuilder.putIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();

// Execute the `Put` operation.
transaction.put(put);

putIf 条件を使用するだけでなく、次のように putIfExists 条件と putIfNotExists 条件を指定することもできます。

// Build a `putIfExists` condition.
MutationCondition putIfExistsCondition = ConditionBuilder.putIfExists();

// Build a `putIfNotExists` condition.
MutationCondition putIfNotExistsCondition = ConditionBuilder.putIfNotExists();
Delete の条件

Delete 操作では、次のように条件を指定できます。

// Build a condition.
MutationCondition condition =
ConditionBuilder.deleteIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.condition(condition) // condition
.build();

// Execute the `Delete` operation.
transaction.delete(delete);

deleteIf 条件を使用するだけでなく、次のように deleteIfExists 条件を指定することもできます。

// Build a `deleteIfExists` condition.
MutationCondition deleteIfExistsCondition = ConditionBuilder.deleteIfExists();
Update の条件

Update 操作では、次のように条件を指定できます。

// Build a condition.
MutationCondition condition =
ConditionBuilder.updateIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();

// Execute the `Update` operation.
transaction.update(update);

updateIf 条件を使用するだけでなく、次のように updateIfExists 条件を指定することもできます。

// Build a `updateIfExists` condition.
MutationCondition updateIfExistsCondition = ConditionBuilder.updateIfExists();
Mutate 操作

Mutate は、PutInsertUpsertUpdateDelete の複数の操作を実行する操作です。

まず Mutate オブジェクトを作成し、次に次のように transaction.mutate() メソッドを使用してオブジェクトを実行する必要があります。

// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);

Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();

// Execute the operations.
transaction.mutate(Arrays.asList(put, delete));
CRUD 操作のデフォルト名前空間

すべての CRUD 操作のデフォルト名前空間は、ScalarDB 設定のプロパティを使用して設定できます。

scalar.db.default_namespace_name=<NAMESPACE_NAME>

名前空間を指定しない操作では、設定されたデフォルトの名前空間が使用されます。

// This operation will target the default namespace.
Scan scanUsingDefaultNamespace =
Scan.newBuilder()
.table("tbl")
.all()
.build();
// This operation will target the "ns" namespace.
Scan scanUsingSpecifiedNamespace =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.build();
オペレーション属性

オペレーション属性は、オペレーションに関する追加情報を保持するためのキーと値のペアです。以下のように、オペレーションビルダーのattribute()メソッドやattributes()メソッドを使って設定できます。

// Set operation attributes in the `Get` operation.
Get get = Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();

// Set operation attributes in the `Scan` operation.
Scan scan = Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.projections("c1", "c2", "c3", "c4")
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();

// Set operation attributes in the `Insert` operation.
Insert insert = Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();

// Set operation attributes in the `Upsert` operation.
Upsert upsert = Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();

// Set operation attributes in the `Update` operation.
Update update = Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();

// Set operation attributes in the `Delete` operation.
Delete delete = Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.attribute("attribute1", "value1")
.attributes(ImmutableMap.of("attribute2", "value2", "attribute3", "value3"))
.build();
注記

ScalarDB では現在、利用可能なオペレーション属性はありません。

トランザクションをコミットする

CRUD 操作を実行した後、トランザクションをコミットして終了する必要があります。

トランザクションは次のようにコミットできます。

// Commit a transaction.
transaction.commit();

トランザクションをロールバックまたはアボートする

トランザクションの実行中にエラーが発生した場合は、トランザクションをロールバックまたはアボートできます。

トランザクションは次のようにロールバックできます。

// Roll back a transaction.
transaction.rollback();

または、次のようにトランザクションをアボートすることもできます。

// Abort a transaction.
transaction.abort();

ScalarDB で例外を処理する方法の詳細については、例外の処理方法を参照してください。

トランザクションを開始せずにトランザクションを実行する

トランザクションを開始せずにトランザクション操作を実行できます。この場合、ScalarDB は操作を実行する前に自動的にトランザクションを開始し、操作の実行後にトランザクションをコミットします。このセクションでは、トランザクションを開始せずにトランザクションを実行する方法について説明します。

Get 操作を実行する

Get は、主キーで指定された単一のレコードを取得する操作です。

最初に Get オブジェクトを作成し、次に次のように transactionManager.get() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.build();

// Execute the `Get` operation.
Optional<Result> result = transactionManager.get(get);

Get 操作の詳細については、Get 操作を参照してください。

Scan 操作の実行

Scan は、パーティション内の複数のレコードを取得する操作です。Scan 操作では、クラスタリングキーの境界とクラスタリングキー列の順序を指定できます。Scan 操作を実行するには、transactionManager.scan() メソッドまたは transactionManager.getScanner() メソッドを使用できます。

  • transactionManager.scan():
    • このメソッドは、指定された Scan 操作を即座に実行し、一致するすべてのレコードのリストを返します。結果セットがメモリに収まる程度の小さい場合に適しています。
  • transactionManager.getScanner():
    • このメソッドは、結果セットを遅延的にイテレートできる Scanner オブジェクトを返します。結果セットが大きくなる可能性がある場合に有用で、すべてのレコードを一度にメモリに読み込むことを避けられます。

まず Scan オブジェクトを作成し、次に transactionManager.scan() メソッドまたは transactionManager.getScanner() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.limit(10)
.build();

// Execute the `Scan` operation by using the `transactionManager.scan()` method.
List<Result> results = transactionManager.scan(scan);

// Or, execute the `Scan` operation by using the `transactionManager.getScanner()` method.
try (TransactionManagerCrudOperable.Scanner scanner = transactionManager.getScanner(scan)) {
// Fetch the next result from the scanner
Optional<Result> result = scanner.one();

// Fetch all remaining results from the scanner
List<Result> allResults = scanner.all();
}

Scan 操作の詳細については、Scan 操作を参照してください。

Put 操作を実行します

注記

Put 操作は ScalarDB 3.13以降では非推奨となり、将来のリリースでは削除される予定です。Put 操作の代わりに、Insert 操作、Upsert 操作、または Update 操作を使用してください。

まず Put オブジェクトを作成し、次に次のように transactionManager.put() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Put` operation.
transactionManager.put(put);

Put 操作の詳細については、Put 操作を参照してください。

Insert 操作の実行

Insert は、トランザクションを通じて基礎となるストレージにエントリを挿入する操作です。エントリがすでに存在する場合は、競合エラーが発生します。

まず Insert オブジェクトを作成し、次に次のように transactionManager.insert() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Insert` operation.
transactionManager.insert(insert);

Insert 操作の詳細については、Insert 操作を参照してください。

Upsert 操作を実行する

Upsert は、トランザクションを通じて基礎となるストレージにエントリを挿入するか、エントリを更新する操作です。エントリがすでに存在する場合は更新され、そうでない場合はエントリが挿入されます。

まず Upsert オブジェクトを作成し、次に次のように transactionManager.upsert() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Upsert` operation.
transactionManager.upsert(upsert);

Insert 操作の詳細については、Upsert 操作を参照してください。

Update 操作の実行

Update は、トランザクションを通じて基礎となるストレージ内のエントリを更新する操作です。エントリが存在しない場合、操作によって変更は行われません。

まず Update オブジェクトを作成し、次に次のように transactionManager.update() メソッドを使用してオブジェクトを実行する必要があります。

// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Update` operation.
transactionManager.update(update);

Update 操作の詳細については、Update 操作を参照してください。

Delete 操作を実行する

Delete は、主キーで指定されたレコードを削除する操作です。

まず Delete オブジェクトを作成し、次に次のように transaction.delete() メソッドを使用してオブジェクトを実行する必要があります。

// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();

// Execute the `Delete` operation.
transactionManager.delete(delete);

Delete 操作の詳細については、Delete 操作を参照してください。

Mutate 操作の実行

Mutate は、複数のミューテーション (PutInsertUpsertUpdate、および Delete 操作) を実行する操作です。

まずミューテーションオブジェクトを作成し、次に次のように transactionManager.mutate() メソッドを使用してオブジェクトを実行する必要があります。

// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);

Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();

// Execute the operations.
transactionManager.mutate(Arrays.asList(put, delete));

Mutate 操作の詳細については、Mutate 操作を参照してください。

また、ScalarDB での例外の処理方法の詳細については、例外の処理方法を参照してください。

例外の処理方法

トランザクションを実行するときは、例外も適切に処理する必要があります。

警告

例外を適切に処理しないと、異常やデータの不整合が発生する可能性があります。

次のサンプルコードは、例外を処理する方法を示しています。

public class Sample {
public static void main(String[] args) throws Exception {
TransactionFactory factory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = factory.getTransactionManager();

int retryCount = 0;
TransactionException lastException = null;

while (true) {
if (retryCount++ > 0) {
// Retry the transaction three times maximum.
if (retryCount >= 3) {
// Throw the last exception if the number of retries exceeds the maximum.
throw lastException;
}

// Sleep 100 milliseconds before retrying the transaction.
TimeUnit.MILLISECONDS.sleep(100);
}

DistributedTransaction transaction = null;
try {
// Begin a transaction.
transaction = transactionManager.begin();

// Execute CRUD operations in the transaction.
Optional<Result> result = transaction.get(...);
List<Result> results = transaction.scan(...);
transaction.put(...);
transaction.delete(...);

// Commit the transaction.
transaction.commit();
} catch (UnsatisfiedConditionException e) {
// You need to handle `UnsatisfiedConditionException` only if a mutation operation specifies a condition.
// This exception indicates the condition for the mutation operation is not met.

try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. Since the transaction should eventually recover,
// you don't need to do anything further. You can simply log the occurrence here.
}

// You can handle the exception here, according to your application requirements.

return;
} catch (UnknownTransactionStatusException e) {
// If you catch `UnknownTransactionStatusException` when committing the transaction,
// it indicates that the status of the transaction, whether it was successful or not, is unknown.
// In such a case, you need to check if the transaction is committed successfully or not and
// retry the transaction if it failed. How to identify a transaction status is delegated to users.
return;
} catch (TransactionException e) {
// For other exceptions, you can try retrying the transaction.

// For `CrudConflictException`, `CommitConflictException`, and `TransactionNotFoundException`,
// you can basically retry the transaction. However, for the other exceptions, the transaction
// will still fail if the cause of the exception is non-transient. In such a case, you will
// exhaust the number of retries and throw the last exception.

if (transaction != null) {
try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. The transaction should eventually recover,
// so you don't need to do anything further. You can simply log the occurrence here.
}
}

lastException = e;
}
}
}
}

TransactionException および TransactionNotFoundException

begin() API は TransactionException または TransactionNotFoundException をスローする可能性があります:

  • TransactionException をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクションを開始できなかったことを示します。トランザクションを再試行することはできますが、非一時的障害が原因でトランザクションを開始できない可能性があります。
  • TransactionNotFoundException をキャッチした場合、この例外は、一時的障害が原因でトランザクションを開始できなかったことを示します。この場合、トランザクションを再試行できます。

join() API も TransactionNotFoundException をスローする可能性があります。この例外は、begin() API の例外を処理するのと同じ方法で処理できます。

CrudException および CrudConflictException

CRUD 操作の API (get()scan()put()delete()、および mutate()) は、CrudException または CrudConflictException をスローする可能性があります:

  • CrudException をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクション CRUD 操作が失敗したことを示します。トランザクションを最初から再試行できますが、原因が非一時的である場合は、トランザクションが失敗する可能性があります。
  • CrudConflictException をキャッチした場合、この例外は、一時的な障害 (競合エラーなど) が原因でトランザクション CRUD 操作が失敗したことを示します。この場合、トランザクションを最初から再試行できます。

UnsatisfiedConditionException

ミューテーション操作の API (put()delete()、および mutate()) も UnsatisfiedConditionException をスローする可能性があります。

UnsatisfiedConditionException をキャッチした場合、この例外はミューテーション操作の条件が満たされていないことを示します。この例外は、アプリケーションの要件に従って処理できます。

CommitExceptionCommitConflictException、および UnknownTransactionStatusException

commit() API は CommitExceptionCommitConflictException、または UnknownTransactionStatusException をスローする可能性があります。

  • CommitException をキャッチした場合、この例外は、一時的または非一時的障害が原因でトランザクションのコミットが失敗したことを示します。トランザクションを最初から再試行できますが、原因が非一時的である場合はトランザクションが失敗する可能性があります。
  • CommitConflictException をキャッチした場合、この例外は、一時的な障害 (競合エラーなど) が原因でトランザクションのコミットが失敗したことを示します。この場合、トランザクションを最初から再試行できます。
  • UnknownTransactionStatusException をキャッチした場合、この例外は、トランザクションのステータス (成功したかどうか) が不明であることを示します。この場合、トランザクションが正常にコミットされたかどうかを確認し、失敗した場合はトランザクションを再試行する必要があります。

トランザクションステータスを識別する方法は、ユーザーに委任されています。トランザクションステータステーブルを作成し、他のアプリケーションデータを使用してトランザクション的に更新して、ステータステーブルからトランザクションのステータスを取得できるようにすることができます。

一部の例外に関する注意

サンプルコードには示されていませんが、resume() API は TransactionNotFoundException をスローする可能性もあります。この例外は、指定された ID に関連付けられたトランザクションが見つからなかったか、トランザクションの有効期限が切れた可能性があることを示します。いずれの場合も、この例外の原因は基本的に一時的なものであるため、トランザクションを最初から再試行できます。

サンプルコードでは、UnknownTransactionStatusException の場合、重複操作の可能性を回避するためにアプリケーションがトランザクションが成功したかどうかを確認する必要があるため、トランザクションは再試行されません。その他の例外の場合、例外の原因が一時的または非一時的であるため、トランザクションは再試行されます。例外の原因が一時的な場合、再試行するとトランザクションが成功する可能性があります。ただし、例外の原因が非一時的である場合、再試行してもトランザクションは失敗します。このような場合、再試行回数を使い果たします。

注記

サンプルコードでは、トランザクションは最大3回再試行され、再試行される前に100ミリ秒間スリープします。ただし、アプリケーションの要件に応じて、指数バックオフなどの再試行ポリシーを選択できます。

Coordinator テーブルのグループコミット

Consensus Commit トランザクションに使用される Coordinator テーブルは重要なデータストアであり、堅牢なストレージを使用することをお勧めします。ただし、内部でマルチ AZ またはマルチリージョンレプリケーションを活用するなど、より堅牢なストレージオプションを使用すると、ストレージにレコードを書き込むときにレイテンシが増加し、スループットパフォーマンスが低下する可能性があります。

ScalarDB は、Coordinator テーブルにグループコミット機能を提供します。この機能は、複数のレコードの書き込みを1つの書き込み操作にグループ化し、書き込みスループットを向上させます。この場合、基盤となるデータベースとワークロードに応じて、レイテンシが増加または減少する可能性があります。

グループコミット機能を有効にするには、次の設定を追加します。

# By default, this configuration is set to `false`.
scalar.db.consensus_commit.coordinator.group_commit.enabled=true

# These properties are for tuning the performance of the group commit feature.
# scalar.db.consensus_commit.coordinator.group_commit.group_size_fix_timeout_millis=40
# scalar.db.consensus_commit.coordinator.group_commit.delayed_slot_move_timeout_millis=800
# scalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=30000
# scalar.db.consensus_commit.coordinator.group_commit.timeout_check_interval_millis=10
# scalar.db.consensus_commit.coordinator.group_commit.metrics_monitor_log_enabled=true

制限事項

このセクションでは、グループコミット機能の制限事項について説明します。

ユーザーが渡したカスタムトランザクション ID

グループコミット機能は、内部値を暗黙的に生成し、それをトランザクション ID の一部として使用します。したがって、ユーザーが com.scalar.db.transaction.consensuscommit.ConsensusCommitManager.begin(String txId) または com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitManager.begin(String txId) を介して手動で渡したカスタムトランザクション ID は、その後の API 呼び出しでそのまま使用することはできません。代わりに、com.scalar.db.transaction.consensuscommit.ConsensusCommit.getId() または com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommit.getId() から返されたトランザクション ID を使用する必要があります。

   // This custom transaction ID needs to be used for ScalarDB transactions.
String myTxId = UUID.randomUUID().toString();

...

DistributedTransaction transaction = manager.begin(myTxId);

...

// When the group commit feature is enabled, a custom transaction ID passed by users can't be used as is.
// logger.info("The transaction state: {}", manager.getState(myTxId));
logger.info("The transaction state: {}", manager.getState(transaction.getId()));

2フェーズコミットインターフェースでの使用の禁止

グループコミット機能は、進行中のすべてのトランザクションをメモリ内で管理します。この機能が2フェーズコミットインターフェースで有効になっている場合、Coordinator テーブルへの参加者サービスの一貫性のない書き込みによって生じる競合 (グループ間で異なるトランザクション分散が含まれる場合があります) を防ぐために、情報は Coordinator サービスによってのみ維持される必要があります。

この制限により、アプリケーション開発に関連する複雑さと柔軟性が損なわれます。したがって、グループコミット機能と2フェーズコミットインターフェースを組み合わせて使用​​することは現在禁止されています。

Consensus Commit トランザクションマネージャーエラーの調査

Consensus Commit トランザクションマネージャーの使用時にエラーを調査するには、トランザクションメタデータ列が追加されたテーブルメタデータを返す設定を有効にできます。これは、トランザクション関連の問題を調査するときに役立ちます。この設定は、Consensus Commit トランザクションマネージャーのトラブルシューティング時にのみ使用可能で、DistributedTransactionAdmin.getTableMetadata() メソッドを使用して、特定のテーブルのトランザクションメタデータ列の詳細を表示できます。

次の設定を追加すると、Get および Scan 操作の結果に トランザクションメタデータが含まれます。

# By default, this configuration is set to `false`.
scalar.db.consensus_commit.include_metadata.enabled=true