メインコンテンツまでスキップ
バージョン: 3.13

ScalarDB Cluster .NET Client SDK での分散トランザクションをはじめよう

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

ScalarDB Cluster .NET Client SDK は、ScalarDB Cluster の分散トランザクション機能をサポートします。SDK には、クラスター内での通信を容易にするためのトランザクションとマネージャーの抽象化が含まれています。

注記

次の例のように非同期メソッドを使用することをお勧めしますが、代わりに同期バージョンを使用することもできます。

分散 SQL トランザクションの詳細については、ScalarDB Cluster .NET Client SDK での分散 SQL トランザクションをはじめようを参照してください。

SDK をインストールする

ScalarDB Cluster と同じメジャーバージョンとマイナーバージョンの SDK を .NET プロジェクトにインストールします。組み込みの NuGet パッケージマネージャーを使用して、<MAJOR>.<MINOR> を使用しているバージョンに置き換えることでこれを行うことができます。

dotnet add package ScalarDB.Client --version '<MAJOR>.<MINOR>.*'

設定ファイルを作成する

scalardb-options.json ファイルを作成し、次の内容を追加します。<HOSTNAME_OR_IP_ADDRESS> を FQDN または IP アドレスに、<PORT> をクラスターのポート番号 (デフォルトでは 60053) に置き換えます。

{
"ScalarDbOptions": {
"Address": "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
"HopLimit": 10
}
}

設定ファイルやクライアントを設定するその他の方法の詳細については、クライアント設定を参照してください。

トランザクションマネージャーを取得する

分散トランザクションにはトランザクションマネージャーを取得する必要があります。トランザクションマネージャーを取得するには、次のように TransactionFactory を使用します。

// Pass the path to the settings file created in the previous step.
var factory = TransactionFactory.Create("scalardb-options.json");

using var manager = factory.GetTransactionManager();

トランザクションを管理する

複数の CRUD 操作を単一のトランザクションの一部として実行するには、まずトランザクションを開始する必要があります。次のようにトランザクションマネージャーを使用してトランザクションを開始できます。

var transaction = await manager.BeginAsync();

次のようにして、すでに実行中のトランザクションを再開することもできます。

var transaction = manager.Resume(transactionIdString);
注記

Resume メソッドはトランザクションオブジェクトを作成するだけなので、非同期バージョンはありません。このため、間違った ID を使用してトランザクションを再開する可能性があります。

トランザクションをコミットする準備ができたら、次のようにトランザクションの CommitAsync メソッドを呼び出すことができます。

await transaction.CommitAsync();

トランザクションをロールバックするには、RollbackAsync メソッドを使用できます。

await transaction.RollbackAsync();

CRUD 操作を実行する

トランザクションには、クラスターに対して CRUD 操作を実行するための GetAsyncScanAsyncInsertAsyncUpsertAsyncUpdateAsyncDeleteAsync、および MutateAsync メソッドがあります。パラメーターとして、これらのメソッドには操作オブジェクトがあります。操作オブジェクトは、このセクションにリストされているビルダーを使用して作成できます。

注記

CRUD 操作は、トランザクションを明示的に作成する必要なく、ワンショットトランザクション方式で実行できます。そのために、マネージャーオブジェクトには、トランザクションオブジェクトと同じ CRUD メソッドがあります。

ビルダーを使用するには、using セクションに次の名前空間を追加します。

using ScalarDB.Client.Builders;
注記

クラスターは 1 つのトランザクション内でのコマンドの並列実行をサポートしていないため、非同期メソッドには必ず await を使用してください。

GetAsync メソッドの例

単一のレコードを取得するには、次のように GetAsync メソッドを使用します。

var get =
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.SetProjections("item_id", "count")
.Build();

var getResult = await transaction.GetAsync(get);

パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を GetWithIndex に設定する必要があります。

// ...
using ScalarDB.Client.Core;

// ...

var get =
new GetBuilder()
// ...
.SetGetType(GetOperationType.GetWithIndex)
.AddPartitionKey("index_column", "1")
.Build();

取得されたレコードが満たす必要のある任意の条件を指定することもできます。満たさない場合はレコードは返されません。条件は、次のように条件の結合として設定できます。

var get =
new GetBuilder()
// ...
.AddConjunction(c => c.AddCondition("cost", 1000, Operator.LessThan))
.AddConjunction(c =>
{
c.AddCondition("cost", 10000, Operator.LessThan);
c.AddCondition("in_stock", true, Operator.Equal);
})
.Build();

上記の例では、cost1000 未満の場合、または cost10000 未満で in_stock が true の場合にのみ、レコードが返されます。

IResult オブジェクトの処理

GetAsync メソッドと ScanAsync メソッドは IResult オブジェクトを返します。IResult オブジェクトには、取得されたレコードの列が含まれます。特定の列の値は、次の方法で取得できます。

// Getting an integer value from the "item_id" column.
// If it fails, an exception will be thrown.
var itemId = result.GetValue<int>("item_id");

// Trying to get a string value from the "order_id" column.
// If it fails, no exception will be thrown.
if (result.TryGetValue<string>("order_id", out var orderId))
Console.WriteLine($"order_id: {orderId}");

// Checking if the "count" column is null.
if (result.IsNull("count"))
Console.WriteLine("'count' is null");

GetValue<T> および TryGetValue<T> で使用する型の詳細については、ScalarDB 列型と .NET 型間の変換方法 を参照してください。

ScanAsync メソッドの例

レコードの範囲を取得するには、次のように ScanAsync メソッドを使用できます。

var scan =
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.SetProjections("item_id", "count")
.Build();

var scanResult = await transaction.ScanAsync(scan);

パーティションキーの代わりにインデックスを使用してレコードを取得することも可能です。そのためには、次のように操作の種類を ScanWithIndex に設定する必要があります。

// ...
using ScalarDB.Client.Core;

// ...
var scan =
new ScanBuilder()
// ...
.SetScanType(ScanOperationType.ScanWithIndex)
.AddPartitionKey("index_column", "1")
.Build();

取得されたレコードが満たす必要のある任意の条件は、get 操作の場合と同様に、スキャン操作に対しても設定できます。

InsertAsync メソッドの例

新しいレコードを挿入するには、次のように InsertAsync メソッドを使用します。

var insert =
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();

await transaction.InsertAsync(insert);

UpsertAsync メソッドの例

レコードをUPSERTする (既存のレコードを更新するか、新しいレコードを挿入する) には、次のように UpsertAsync メソッドを使用できます。

var upsert =
new UpsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();

await transaction.UpsertAsync(upsert);

UpdateAsync メソッドの例

既存のレコードを更新するには、次のように UpdateAsync メソッドを使用します。

// ...
using ScalarDB.Client.Core;

// ...

var update =
new UpdateBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.AddCondition("processed", false, Operator.Equal)
.Build();

await transaction.UpdateAsync(update);

DeleteAsync メソッドの例

レコードを削除するには、次のように DeleteAsync メソッドを使用します。

// ...
using ScalarDB.Client.Core;

// ...
var delete =
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddCondition("processed", false, Operator.Equal)
.Build();

await transaction.DeleteAsync(delete);

MutateAsync メソッドの例

MutateAsync メソッドを使用すると、クラスターへの 1 回の呼び出しで複数のミューテーション操作を実行できます。これは次の方法で実行できます。

// ...
using ScalarDB.Client.Core;

// ...
var mutations = new IMutation[]
{
new InsertBuilder()
// ...
.Build(),
new UpsertBuilder()
// ...
.Build(),
new UpdateBuilder()
// ...
.Build(),
new DeleteBuilder()
// ...
.Build()
};

await transaction.MutateAsync(mutations);
注記

InsertAsyncUpsertAsyncUpdateAsyncDeleteAsync、または MutateAsync メソッドを使用してデータを変更するには、まず GetAsync または ScanAsync メソッドを使用してデータを取得する必要があります。