Getting Started with Distributed Transactions in the ScalarDB Cluster .NET Client SDK
The ScalarDB Cluster .NET Client SDK supports the distributed transaction functionality of ScalarDB Cluster. The SDK includes transaction and manager abstractions for easier communication within a cluster.
Although we recommend using asynchronous methods as in the following examples, you can use synchronous versions instead.
For details about distributed SQL transactions, see Getting Started with Distributed SQL Transactions in the ScalarDB Cluster .NET Client SDK.
Install the SDK
Install the same major and minor version of the SDK as ScalarDB Cluster into the .NET project. You can do this by using the built-in NuGet package manager, replacing <MAJOR>.<MINOR>
with the version that you're using:
dotnet add package ScalarDB.Client --version '<MAJOR>.<MINOR>.*'
Create a settings file
Create a scalardb-options.json
file and add the following, replacing <HOSTNAME_OR_IP_ADDRESS>
with the FQDN or the IP address, and <PORT>
with the port number (60053
by default) of your cluster:
{
"ScalarDbOptions": {
"Address": "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
"HopLimit": 10
}
}
For details about settings files and other ways to configure the client, see Client configuration.
Get a transaction manager
You need to get a transaction manager for distributed transactions. To get the transaction manager, you can use TransactionFactory
as follows:
// Pass the path to the settings file created in the previous step.
var factory = TransactionFactory.Create("scalardb-options.json");
using var manager = factory.GetTransactionManager();
Manage transactions
To execute multiple CRUD operations as part of a single transaction, first, you need to begin a transaction. You can begin a transaction by using the transaction manager as follows:
var transaction = await manager.BeginAsync();
You can also resume a transaction that is already being executed as follows:
var transaction = manager.Resume(transactionIdString);
The Resume
method doesn't have an asynchronous version because it only creates a transaction object. Because of this, resuming a transaction by using the wrong ID is possible.
When a transaction is ready to be committed, you can call the CommitAsync
method of the transaction as follows:
await transaction.CommitAsync();
To roll back the transaction, you can use the RollbackAsync
method:
await transaction.RollbackAsync();
Execute CRUD operations
A transaction has GetAsync
, ScanAsync
, InsertAsync
, UpsertAsync
, UpdateAsync
, DeleteAsync
, and MutateAsync
methods to execute CRUD operations against the cluster. As a parameter, these methods have an operation object. An operation object can be created by using the builders listed in this section.
CRUD operations can be executed in a one-shot transaction manner without needing to explicitly create a transaction. For that, a manager object has the same CRUD methods as a transaction object.
To use builders, add the following namespace to the using
section:
using ScalarDB.Client.Builders;
The cluster does not support parallel execution of commands inside one transaction, so make sure to use await
for asynchronous methods.
GetAsync
method example
To retrieve a single record, you can use the GetAsync
method as follows:
var get =
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.SetProjections("item_id", "count")
.Build();
var getResult = await transaction.GetAsync(get);
It is possible to retrieve a record by using an index instead of a partition key. To do that, you need to set the type of operation to GetWithIndex
as follows:
// ...
using ScalarDB.Client.Core;
// ...
var get =
new GetBuilder()
// ...
.SetGetType(GetOperationType.GetWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
You can also specify arbitrary conditions that a retrieved record must meet, or it won't be returned. The conditions can be set as conjunctions of conditions as follows:
var get =
new GetBuilder()
// ...
.AddConjunction(c => c.AddCondition("cost", 1000, Operator.LessThan))
.AddConjunction(c =>
{
c.AddCondition("cost", 10000, Operator.LessThan);
c.AddCondition("in_stock", true, Operator.Equal);
})
.Build();
In the above example, a record will be returned only if its cost
is less than 1000
, or if its cost
is less than 10000
and in_stock
is true.
Handle IResult
objects
The GetAsync
and ScanAsync
methods return IResult
objects. An IResult
object contains columns of the retrieved record. The value of the specific column can be retrieved in the following manner:
// Getting an integer value from the "item_id" column.
// If it fails, an exception will be thrown.
var itemId = result.GetValue<int>("item_id");
// Trying to get a string value from the "order_id" column.
// If it fails, no exception will be thrown.
if (result.TryGetValue<string>("order_id", out var orderId))
Console.WriteLine($"order_id: {orderId}");
// Checking if the "count" column is null.
if (result.IsNull("count"))
Console.WriteLine("'count' is null");
For details about which type should be used in GetValue<T>
and TryGetValue<T>
, see How ScalarDB Column Types Are Converted to and from .NET Types.
ScanAsync
method example
To retrieve a range of records, you can use the ScanAsync
method as follows:
var scan =
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.SetProjections("item_id", "count")
.Build();
var scanResult = await transaction.ScanAsync(scan);
It is possible to retrieve a record by using an index instead of a partition key. To do that, you need to set the type of operation to ScanWithIndex
as follows:
// ...
using ScalarDB.Client.Core;
// ...
var scan =
new ScanBuilder()
// ...
.SetScanType(ScanOperationType.ScanWithIndex)
.AddPartitionKey("index_column", "1")
.Build();
The arbitrary conditions that a retrieved record must meet can also be set for a scan operation in the same way as for a get operation.
InsertAsync
method example
To insert a new record, you can use the InsertAsync
method as follows:
var insert =
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.InsertAsync(insert);
UpsertAsync
method example
To upsert a record (update an existing record or insert a new one), you can use the UpsertAsync
method as follows:
var upsert =
new UpsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();
await transaction.UpsertAsync(upsert);
UpdateAsync
method example
To update an existing record, you can use the UpdateAsync
method as follows:
// ...
using ScalarDB.Client.Core;
// ...
var update =
new UpdateBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.UpdateAsync(update);
DeleteAsync
method example
To delete a record, you can use the DeleteAsync
method as follows:
// ...
using ScalarDB.Client.Core;
// ...
var delete =
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddCondition("processed", false, Operator.Equal)
.Build();
await transaction.DeleteAsync(delete);
MutateAsync
method example
The MutateAsync
method allows you to execute more than one mutation operation in a single call to the cluster. You can do this in the following manner:
// ...
using ScalarDB.Client.Core;
// ...
var mutations = new IMutation[]
{
new InsertBuilder()
// ...
.Build(),
new UpsertBuilder()
// ...
.Build(),
new UpdateBuilder()
// ...
.Build(),
new DeleteBuilder()
// ...
.Build()
};
await transaction.MutateAsync(mutations);
To modify data by using the InsertAsync
, UpsertAsync
, UpdateAsync
, DeleteAsync
, or MutateAsync
method, the data must be retrieved first by using the GetAsync
or ScanAsync
method.