ScalarDB Cluster .NET Client SDK での分散トランザクションをはじめよう
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
ScalarDB Cluster .NET Client SDK は、ScalarDB Cluster の分散トランザクション機能をサポートします。SDK には、クラスター内での通信を容易にするためのトランザクションとマネージャーの抽象化が含まれています。
次の例のように非同期メソッドを使用することをお勧めしますが、代わりに同期バージョンを使用することもできます。
分散 SQL トランザクションの詳細については、ScalarDB Cluster .NET Client SDK での分散 SQL トランザクションをはじめようを参照してください。
SDK をインストールする
ScalarDB Cluster と同じメジャーバージョンとマイナーバージョンの SDK を .NET プロジェクトにインストールします。組み込みの NuGet パッケージマネージャーを使用して、<MAJOR>.<MINOR>
を使用しているバージョンに置き換えることでこれを行うことができます。
dotnet add package ScalarDB.Client --version '<MAJOR>.<MINOR>.*'
設定ファイルを作成する
scalardb-options.json
ファイルを作成し、次の内容を追加します。<HOSTNAME_OR_IP_ADDRESS>
を FQDN または IP アドレスに、<PORT>
をクラスターのポート番号 (デフォルトでは 60053
) に置き換えます。
{
"ScalarDbOptions": {
"Address": "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
"HopLimit": 10
}
}
設定ファイルやクライアントを設定するその他の方法の詳細については、クライアント設定を参照してください。
トランザクションマネージャーを取得する
分散トランザクションにはトランザクションマネージャーを取得する必要があります。トランザクションマネージャーを取得するには、次のように TransactionFactory
を使用します。
// Pass the path to the settings file created in the previous step.
var factory = TransactionFactory.Create("scalardb-options.json");
using var manager = factory.GetTransactionManager();
トランザクションを管理する
複数の CRUD 操作を単一のトランザクションの一部として実行するには、まずトランザクションを開始する必要があります。次のようにトランザクションマネージャーを使用してトランザクションを開始できます。
var transaction = await manager.BeginAsync();
次のようにして、すでに実行中のトランザクションを再開することもできます。
var transaction = manager.Resume(transactionIdString);
Resume
メソッドはトランザクションオブジェクトを作成するだけなので、非同期バージョンはありません。このため、間違った ID を使用してトランザクションを再開する可能性があります。
トランザクションをコミットする準備ができたら、次のようにトランザクションの CommitAsync
メソッドを呼び出すことができます。
await transaction.CommitAsync();
トランザクションをロールバックするには、RollbackAsync
メソッドを使用できます。
await transaction.RollbackAsync();
CRUD 操作を実行する
トランザクションには、クラスターに対して CRUD 操作を実行するための GetAsync
、ScanAsync
、InsertAsync
、UpsertAsync
、UpdateAsync
、DeleteAsync
、および MutateAsync
メソッドがあります。パラメーターとして、これらのメソッドには操作オブジェクトがあります。操作オブジェクトは、このセクションにリストされているビルダーを使用して作成できます。
CRUD 操作は、トランザクションを明示的に作成する必要なく、ワンショットトランザクション方式で実行できます。そのために、マネージャーオブジェクトには、トランザクションオブジェクトと同じ CRUD メソッドがあります。
ビルダーを使用するには、using
セクションに次の名前空間を追加します。
using ScalarDB.Client.Builders;
クラスターは 1 つのトランザクション内でのコマンドの並列実行をサポートしていないため、非同期メソッドには必ず await
を使用してください。
GetAsync
メソッドの例
単一のレコードを取得するには、次のように GetAsync
メソッドを使用します。
var get =
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.SetProjections("item_id", "count")
.Build();
var getResult = await transaction.GetAsync(get);
パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を GetWithIndex
に設定する必要があります。
// ...
using ScalarDB.Client.Core;
// ...
var get =
new GetBuilder()
// ...
.SetGetType(GetOperationType.GetWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
取得されたレコードが満たす必要のある任意の条件を指定する こともできます。満たさない場合はレコードは返されません。条件は、次のように条件の結合として設定できます。
var get =
new GetBuilder()
// ...
.AddConjunction(c => c.AddCondition("cost", 1000, Operator.LessThan))
.AddConjunction(c =>
{
c.AddCondition("cost", 10000, Operator.LessThan);
c.AddCondition("in_stock", true, Operator.Equal);
})
.Build();
上記の例では、cost
が 1000
未満の場合、または cost
が 10000
未満で in_stock
が true の場合にのみ、レコードが返されます。
IResult
オブジェクトの処理
GetAsync
メソッドと ScanAsync
メソッドは IResult
オブジェクトを返します。IResult
オブジェクトには、取得されたレコードの列が含まれます。特定の列の値は、次の方法で取得できます。
// Getting an integer value from the "item_id" column.
// If it fails, an exception will be thrown.
var itemId = result.GetValue<int>("item_id");
// Trying to get a string value from the "order_id" column.
// If it fails, no exception will be thrown.
if (result.TryGetValue<string>("order_id", out var orderId))
Console.WriteLine($"order_id: {orderId}");
// Checking if the "count" column is null.
if (result.IsNull("count"))
Console.WriteLine("'count' is null");
GetValue<T>
および TryGetValue<T>
で使用する型の詳細については、ScalarDB 列型と .NET 型間の変換方法 を参照してください。
ScanAsync
メソッドの例
レコードの範囲を取得するには、次のように ScanAsync
メソッドを使用できます。
var scan =
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.SetProjections("item_id", "count")
.Build();
var scanResult = await transaction.ScanAsync(scan);
パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を ScanWithIndex
に設定する必要があります。
// ...
using ScalarDB.Client.Core;
// ...
var scan =
new ScanBuilder()
// ...
.SetScanType(ScanOperationType.ScanWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
取得されたレコードが満たす必要のある任意の条件は、get 操作の場合と同様に、スキャン操作に対しても設定できます。
InsertAsync
メソッドの例
新しいレコードを挿入するには、次のように InsertAsync
メソッドを使用します。
var insert =
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.InsertAsync(insert);
UpsertAsync
メソッドの例
レコードをUPSERTする (既存のレコードを更新するか、新しいレコードを挿入する) には、次のように UpsertAsync
メソッドを使用できます。
var upsert =
new UpsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.UpsertAsync(upsert);
UpdateAsync
メソッドの例
既存のレコードを更新するには、次のように UpdateAsync
メソッドを使用します。
// ...
using ScalarDB.Client.Core;
// ...
var update =
new UpdateBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.UpdateAsync(update);
DeleteAsync
メソッドの例
レコードを削除するには、次のように DeleteAsync
メソッドを使用します。
// ...
using ScalarDB.Client.Core;
// ...
var delete =
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.DeleteAsync(delete);
MutateAsync
メソッドの例
MutateAsync
メソッドを使用すると、クラスターへの 1 回の呼び出しで複数のミューテー ション操作を実行できます。これは次の方法で実行できます。
// ...
using ScalarDB.Client.Core;
// ...
var mutations = new IMutation[]
{
new InsertBuilder()
// ...
.Build(),
new UpsertBuilder()
// ...
.Build(),
new UpdateBuilder()
// ...
.Build(),
new DeleteBuilder()
// ...
.Build()
};
await transaction.MutateAsync(mutations);
InsertAsync
、UpsertAsync
、UpdateAsync
、DeleteAsync
、または MutateAsync
メソッドを使用してデータを変更するには、まず GetAsync
または ScanAsync
メソッドを使用してデータを取得する必要があります。