ScalarDB Cluster .NET Client SDK での分散トランザクションをはじめよう
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
ScalarDB Cluster .NET Client SDK は、ScalarDB Cluster の分散トランザクション機能をサポートします。SDK には、クラスター内での通信を容易にするためのトランザクションとマネージャーの抽象化が含まれています。
次の例のように非同期メソッドを使用することをお勧めしますが、代わりに同期バージョンを使用することもできます。
分散 SQL トランザクションの詳細については、ScalarDB Cluster .NET Client SDK での分散 SQL トランザクションをはじめようを参照してください。
SDK をインストールする
ScalarDB Cluster と同じメジャーバージョンとマイナーバージョンの SDK を .NET プロジェクトにインストールします。組み込みの NuGet パッケージマネージャーを使用して、<MAJOR>.<MINOR> を使用しているバージョンに置き換えることでこれを行うことができます。
dotnet add package ScalarDB.Client --version '<MAJOR>.<MINOR>.*'
設定ファイルを作成する
scalardb-options.json ファイルを作成し、次の内容を追加します。<HOSTNAME_OR_IP_ADDRESS> を FQDN または IP アドレスに、<PORT> をクラスターのポート番号 (デフォルトでは 60053) に置き換えます。
{
"ScalarDbOptions": {
"Address": "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
"HopLimit": 10
}
}
設定ファイルやクライアントを設定するその他の方法の詳細については、クライアント設定を参照してください。
トランザクションマネージャーを取得する
分散トランザクションにはトランザクションマネージャーを取得する必要があります。トランザクションマネージャーを取得するには、次のように TransactionFactory を使用します。
// Pass the path to the settings file created in the previous step.
var factory = TransactionFactory.Create("scalardb-options.json");
using var manager = factory.GetTransactionManager();
トランザクションを管理する
複数の CRUD 操作を単一のトランザクションの一部として実行するには、まずトランザクションを開始する必要があります。次のようにトランザクションマネージャーを使用してトランザクションを開始できます。
var transaction = await manager.BeginAsync();
次のようにして、すでに実行中のトランザクションを再開することもできます。
var transaction = manager.Resume(transactionIdString);
Resume メソッドはトランザクションオブジェクトを作成するだけなので、非同期バージョンはありません。このため、間違った ID を使用してトランザクションを再開する可能性があります。
読み取り専用トランザクションを開始する
トランザクション内で読み取り操作 (GetAsync および ScanAsync) のみを実行する場合、次のように読み取り専用トランザクションを開始できます。
var transaction = await manager.BeginReadOnlyAsync();
読み取り専用トランザクションは読み取り操作専用に最適化されています。読み取り専用トランザクション内で書き込み操作 (InsertAsync、UpsertAsync、UpdateAsync、DeleteAsync、MutateAsync、または書き込み操作を含む BatchAsync) を実行しようとする とエラーになります。
トランザクションをコミットする準備ができたら、次のようにトランザクションの CommitAsync メソッドを呼び出すことができます。
await transaction.CommitAsync();
トランザクションをロールバックするには、RollbackAsync メソッドを使用できます。
await transaction.RollbackAsync();
CRUD 操作を実行する
トランザクションには、クラスターに対して CRUD 操作を実行するための GetAsync、ScanAsync、InsertAsync、UpsertAsync、UpdateAsync、DeleteAsync、および MutateAsync メソッドがあります。パラメーターとして、これらのメソッドには操作オブジェクトがあ ります。操作オブジェクトは、このセクションにリストされているビルダーを使用して作成できます。
CRUD 操作は、トランザクションを明示的に作成する必要なく、ワンショットトランザクション方式で実行できます。そのために、マネージャーオブジェクトには、トランザクションオブジェクトと同じ CRUD メソッドがあります。
ビルダーを使用するには、using セクションに次の名前空間を追加します。
using ScalarDB.Client.Builders;
クラスターは1つのトランザクション内でのコマンドの並列実行をサポートしていないため、非同期メソッドには必ず await を使用してください。
GetAsync メソッドの例
単一のレコードを取得するには、次のように GetAsync メソッドを使用します。
var get =
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.SetProjections("item_id", "count")
.Build();
var getResult = await transaction.GetAsync(get);
パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を GetWithIndex に設定する必要があります。
// ...
using ScalarDB.Client.Core;
// ...
var get =
new GetBuilder()
// ...
.SetGetType(GetOperationType.GetWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
取得されたレコードが満たす必要のある任意の条件を指定することもできます。満たさない場合はレコードは返されません。条件は、次のように条件の結合として設定できます。
var get =
new GetBuilder()
// ...
.AddConjunction(c => c.AddCondition("cost", 1000, Operator.LessThan))
.AddConjunction(c =>
{
c.AddCondition("cost", 10000, Operator.LessThan);
c.AddCondition("in_stock", true, Operator.Equal);
})
.Build();
上記の例では、cost が 1000 未満の場合、または cost が 10000 未満で in_stock が true の場合にのみ、レコードが返されます。
IResult オブジェクトの処理
GetAsync メソッドと ScanAsync メソッドは IResult オブジェクトを返します。IResult オブジェクトには、取得されたレコードの列が含まれます。特定の列の値は、次の方法で取得できます。
// Getting an integer value from the "item_id" column.
// If it fails, an exception will be thrown.
var itemId = result.GetValue<int>("item_id");
// Trying to get a string value from the "order_id" column.
// If it fails, no exception will be thrown.
if (result.TryGetValue<string>("order_id", out var orderId))
Console.WriteLine($"order_id: {orderId}");
// Checking if the "count" column is null.
if (result.IsNull("count"))
Console.WriteLine("'count' is null");
GetValue<T> および TryGetValue<T> で使用する型の詳細については、ScalarDB 列型と .NET 型間の変換方法 を参照してください。
ScanAsync メソッドの例
レコードの範囲を取得するには、次のように ScanAsync メソッドを使用できます。
var scan =
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.SetProjections("item_id", "count")
.Build();
var scanResult = await transaction.ScanAsync(scan);
パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を ScanWithIndex に設定する必要があります。
// ...
using ScalarDB.Client.Core;
// ...
var scan =
new ScanBuilder()
// ...
.SetScanType(ScanOperationType.ScanWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
取得されたレコードが満たす必要のある任意の条件は、get 操作の場合と同様に、スキャン操作に対しても設定できます。
Scanner を使用して大規模データセットを取得する
ScanAsync メソッドはすべての結果を一度に返すため、大規模なデータセットではメモリを大量に消費する可能性があります。大規模なデータセットをより効率的に処理するには、クラスターから結果をストリーミングし、段階的に処理できる Scanner を使用できます。
Scanner を作成する
CreateScannerAsync メソッドを使用して、トランザクションから Scanner を作成できます。
using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;
// ...
var scan = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("items")
.SetScanType(ScanOperationType.ScanAll)
.Build();
await using var scanner = await transaction.CreateScannerAsync(scan);
Scanner は、サーバー側のリソースを解放するために使用後に破棄する必要があります。Scanner のライフタイムが単一のスコープに限定されている場合は、自動クリーンアップのために await using (同期コードの場合は using) を使用してください。Scanner をメソッドから返す必要がある場合や、複数のスコープにまたがってライフタイムを管理する必要がある場合は、使用終了時に DisposeAsync() (または Dispose()) を明示的に呼び出してください。
結果を1つずつ取得する
OneAsync メソッドを使用して、結果を1つずつ取得します。
await using var scanner = await transaction.CreateScannerAsync(scan);
while (true)
{
var result = await scanner.OneAsync();
if (result == null)
break;
// 結果を処理する。
var itemId = result.GetValue<int>("item_id");
Console.WriteLine($"Item ID: {itemId}");
}
結果がなくなると、OneAsync は null を返します。null を受け取った後に OneAsync を呼び出しても、引き続き null が返されます。
残りのすべての結果を取得する
AllAsync メソッドを使用して、残りのすべての結果を一度に取得します。
await using var scanner = await transaction.CreateScannerAsync(scan);
var results = await scanner.AllAsync();
foreach (var result in results)
{
// 各結果を処理する。
var itemId = result.GetValue<int>("item_id");
Console.WriteLine($"Item ID: {itemId}");
}
OneAsync を使用して一部の結果を既に取得している場合、AllAsync メソッドはまだ取得されていない残りの結果のみを返します。
Scanner のフェッチサイズを設定する
ScalarDbOptions を使用して、Scanner 操作のデフォルトのフェッチサイズを設定できます。フェッチサイズは、各バッチでサーバーからフェッチされるレコードの数を決定します。
var scalarDbOptions = new ScalarDbOptions
{
Address = "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
ScannerFetchSize = 20 // デフォルトは 10。
};
スキャンオプション付きの Scanner を使用する
Scanner は ScanBuilder で使用可能なすべてのスキャンオプションをサポートしています。
制限付きスキャン
using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;
// ...
var scanWithLimit = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("items")
.SetScanType(ScanOperationType.ScanAll)
.SetLimit(100) // 結果を100件に制限。
.Build();
await using var scanner = await transaction.CreateScannerAsync(scanWithLimit);
var results = await scanner.AllAsync();
クラスタリングキー範囲付きスキャン
using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;
// ...
var scanWithRange = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.SetScanType(ScanOperationType.Scan)
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.Build();
await using var scanner = await transaction.CreateScannerAsync(scanWithRange);
Scanner を閉じる
Scanner は IDisposable と IAsyncDisposable の両方を実装しています。自動破棄のために await using (同期コードの場合は using) を使用することを推奨します。
await using var scanner = await transaction.CreateScannerAsync(scan);
// スコープを離れると Scanner は自動的に閉じられます。
明示的に Scanner を閉じる必要がある場合は、CloseAsync メソッドを使用できます。
var scanner = await transaction.CreateScannerAsync(scan);
try
{
var result = await scanner.OneAsync();
// 結果を処理...
}
finally
{
await scanner.CloseAsync();
}
同期 API
すべての Scanner メソッドには同期バージョンがあります。
using var scanner = transaction.CreateScanner(scan);
// 1つの結果を取得。
var result = scanner.One();
// 残りのすべての結果を取得。
var results = scanner.All();
// Scanner を閉じる。
scanner.Close();
Scanner と ScanAsync の比較
| 機能 | Scanner | ScanAsync |
|---|---|---|
| メモリ使用量 | 低 (ストリーミング) | 高 (すべての結果がメモリに) |
| ユースケース | 大規模データセット | 小規模から中規模のデータセット |
| 結果の取得 | 段階的 (OneAsync) または一括 (AllAsync) | 一括 |
| リソース管理 | 明示的な破棄が必要 | 自動 |
Scanner やその他のトランザクション操作中の例外処理の詳細については、ScalarDB Cluster .NET Client SDK での例外処理を参照してください。
InsertAsync メソッドの例
新しいレコードを挿入するには、次のように InsertAsync メソッドを使用します。
var insert =
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.InsertAsync(insert);
UpsertAsync メソッドの例
レコードをUPSERTする (既存のレコードを更新するか、新しいレコードを挿入する) には、次のように UpsertAsync メソッドを使用できます。
var upsert =
new UpsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.UpsertAsync(upsert);
UpdateAsync メソッドの例
既存のレコードを更新するには、次のように UpdateAsync メソッドを使用します。
// ...
using ScalarDB.Client.Core;
// ...
var update =
new UpdateBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.UpdateAsync(update);
DeleteAsync メソッドの例
レコードを削除するには、次のように DeleteAsync メソッドを使用します。
// ...
using ScalarDB.Client.Core;
// ...
var delete =
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.DeleteAsync(delete);
MutateAsync メソッドの例
MutateAsync メソッドを使用 すると、クラスターへの1回の呼び出しで複数のミューテーション操作を実行できます。これは次の方法で実行できます。
// ...
using ScalarDB.Client.Core;
// ...
var mutations = new IMutation[]
{
new InsertBuilder()
// ...
.Build(),
new UpsertBuilder()
// ...
.Build(),
new UpdateBuilder()
// ...
.Build(),
new DeleteBuilder()
// ...
.Build()
};
await transaction.MutateAsync(mutations);
BatchAsync メソッドの例
BatchAsync メソッドを使用すると、読み取り操作と書き込み操作の両方を含む複数の操作を1回のリクエストで実行できます。これにより、ネットワークラウンドトリップが削減され、パフォーマンスが向上します。このメソッドは次のように使用できます。
using ScalarDB.Client.Core;
// ...
var operations = new IOperation[]
{
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.Build(),
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "2")
.Build(),
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "3")
.AddClusteringKey("item_id", 1)
.AddColumn("count", 5)
.Build(),
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "4")
.AddClusteringKey("item_id", 1)
.Build()
};
var results = await transaction.BatchAsync(operations);
結果は入力操作と同じ順序で返されます。各結果は、型チェックプロパティを使用して次のように確認できます。
for (var i = 0; i < results.Count; i++)
{
var result = results[i];
if (result.IsGetResult)
{
// レコードが見つからなかった場合、GetResult は null になります。
if (result.GetResult != null)
{
var count = result.GetResult.GetValue<int>("count");
}
}
else if (result.IsScanResult)
{
foreach (var record in result.ScanResults)
{
var itemId = record.GetValue<int>("item_id");
}
}
else if (result.IsInsertResult || result.IsUpsertResult
|| result.IsUpdateResult || result.IsDeleteResult)
{
// ミューテーション操作はデータを返しません。
}
}
BatchAsync でサポートされている操作タイプは、Get、Scan、Insert、Upsert、Update、Delete です。