Skip to main content
Version: 3.17

Getting Started with Distributed Transactions in the ScalarDB Cluster .NET Client SDK

The ScalarDB Cluster .NET Client SDK supports the distributed transaction functionality of ScalarDB Cluster. The SDK includes transaction and manager abstractions for easier communication within a cluster.

note

Although we recommend using asynchronous methods as in the following examples, you can use synchronous versions instead.

For details about distributed SQL transactions, see Getting Started with Distributed SQL Transactions in the ScalarDB Cluster .NET Client SDK.

Install the SDK​

Install the same major and minor version of the SDK as ScalarDB Cluster into the .NET project. You can do this by using the built-in NuGet package manager, replacing <MAJOR>.<MINOR> with the version that you're using:

dotnet add package ScalarDB.Client --version '<MAJOR>.<MINOR>.*'

Create a settings file​

Create a scalardb-options.json file and add the following, replacing <HOSTNAME_OR_IP_ADDRESS> with the FQDN or the IP address, and <PORT> with the port number (60053 by default) of your cluster:

{
"ScalarDbOptions": {
"Address": "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
"HopLimit": 10
}
}

For details about settings files and other ways to configure the client, see Client configuration.

Get a transaction manager​

You need to get a transaction manager for distributed transactions. To get the transaction manager, you can use TransactionFactory as follows:

// Pass the path to the settings file created in the previous step.
var factory = TransactionFactory.Create("scalardb-options.json");

using var manager = factory.GetTransactionManager();

Manage transactions​

To execute multiple CRUD operations as part of a single transaction, first, you need to begin a transaction. You can begin a transaction by using the transaction manager as follows:

var transaction = await manager.BeginAsync();

You can also resume a transaction that is already being executed as follows:

var transaction = manager.Resume(transactionIdString);
note

The Resume method doesn't have an asynchronous version because it only creates a transaction object. Because of this, resuming a transaction by using the wrong ID is possible.

Begin a read-only transaction​

If you perform only read operations (that is, GetAsync and ScanAsync) in a transaction, you can begin a read-only transaction as follows:

var transaction = await manager.BeginReadOnlyAsync();
note

Read-only transactions are optimized for read operations only. Attempting to perform write operations (InsertAsync, UpsertAsync, UpdateAsync, DeleteAsync, or MutateAsync) in a read-only transaction will result in an error.

When a transaction is ready to be committed, you can call the CommitAsync method of the transaction as follows:

await transaction.CommitAsync();

To roll back the transaction, you can use the RollbackAsync method:

await transaction.RollbackAsync();

Execute CRUD operations​

A transaction has GetAsync, ScanAsync, InsertAsync, UpsertAsync, UpdateAsync, DeleteAsync, and MutateAsync methods to execute CRUD operations against the cluster. As a parameter, these methods have an operation object. An operation object can be created by using the builders listed in this section.

note

CRUD operations can be executed in a one-shot transaction manner without needing to explicitly create a transaction. For that, a manager object has the same CRUD methods as a transaction object.

To use builders, add the following namespace to the using section:

using ScalarDB.Client.Builders;
note

The cluster does not support parallel execution of commands inside one transaction, so make sure to use await for asynchronous methods.

GetAsync method example​

To retrieve a single record, you can use the GetAsync method as follows:

var get =
new GetBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.SetProjections("item_id", "count")
.Build();

var getResult = await transaction.GetAsync(get);

It is possible to retrieve a record by using an index instead of a partition key. To do that, you need to set the type of operation to GetWithIndex as follows:

// ...
using ScalarDB.Client.Core;

// ...

var get =
new GetBuilder()
// ...
.SetGetType(GetOperationType.GetWithIndex)
.AddPartitionKey("index_column", "1")
.Build();

You can also specify arbitrary conditions that a retrieved record must meet, or it won't be returned. The conditions can be set as conjunctions of conditions as follows:

var get =
new GetBuilder()
// ...
.AddConjunction(c => c.AddCondition("cost", 1000, Operator.LessThan))
.AddConjunction(c =>
{
c.AddCondition("cost", 10000, Operator.LessThan);
c.AddCondition("in_stock", true, Operator.Equal);
})
.Build();

In the above example, a record will be returned only if its cost is less than 1000, or if its cost is less than 10000 and in_stock is true.

Handle IResult objects​

The GetAsync and ScanAsync methods return IResult objects. An IResult object contains columns of the retrieved record. The value of the specific column can be retrieved in the following manner:

// Getting an integer value from the "item_id" column.
// If it fails, an exception will be thrown.
var itemId = result.GetValue<int>("item_id");

// Trying to get a string value from the "order_id" column.
// If it fails, no exception will be thrown.
if (result.TryGetValue<string>("order_id", out var orderId))
Console.WriteLine($"order_id: {orderId}");

// Checking if the "count" column is null.
if (result.IsNull("count"))
Console.WriteLine("'count' is null");

For details about which type should be used in GetValue<T> and TryGetValue<T>, see How ScalarDB Column Types Are Converted to and from .NET Types.

ScanAsync method example​

To retrieve a range of records, you can use the ScanAsync method as follows:

var scan =
new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.SetProjections("item_id", "count")
.Build();

var scanResult = await transaction.ScanAsync(scan);

It is possible to retrieve a record by using an index instead of a partition key. To do that, you need to set the type of operation to ScanWithIndex as follows:

// ...
using ScalarDB.Client.Core;

// ...

var scan =
new ScanBuilder()
// ...
.SetScanType(ScanOperationType.ScanWithIndex)
.AddPartitionKey("index_column", "1")
.Build();

The arbitrary conditions that a retrieved record must meet can also be set for a scan operation in the same way as for a get operation.

Use a scanner to retrieve large datasets​

The ScanAsync method returns all results at once, which can be memory intensive for large datasets. To handle large datasets more efficiently, you can use a scanner that streams results from the cluster and enables incremental processing.

Create a scanner​

You can create a scanner from a transaction by using the CreateScannerAsync method:

using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;

// ...

var scan = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("items")
.SetScanType(ScanOperationType.ScanAll)
.Build();

await using var scanner = await transaction.CreateScannerAsync(scan);
note

The scanner must be disposed of after use to release server-side resources. If the scanner's lifetime is limited to a single scope, use await using (or using for synchronous code) for automatic cleanup. If you need to return a scanner from a method or manage its lifetime across multiple scopes, call DisposeAsync() (or Dispose()) explicitly when done.

Retrieve results one by one​

Use the OneAsync method to retrieve results one at a time:

await using var scanner = await transaction.CreateScannerAsync(scan);

while (true)
{
var result = await scanner.OneAsync();
if (result == null)
break;

// Process the result.
var itemId = result.GetValue<int>("item_id");
Console.WriteLine($"Item ID: {itemId}");
}

If there are no more results, OneAsync returns null. Subsequent calls to OneAsync after receiving null will continue to return null.

Retrieve all remaining results​

Use the AllAsync method to retrieve all remaining results at once:

await using var scanner = await transaction.CreateScannerAsync(scan);

var results = await scanner.AllAsync();
foreach (var result in results)
{
// Process each result.
var itemId = result.GetValue<int>("item_id");
Console.WriteLine($"Item ID: {itemId}");
}
note

If you have already retrieved some results by using OneAsync, the AllAsync method will return only the remaining results that have not yet been retrieved.

Configure scanner fetch size​

You can configure the default fetch size for scanner operations by using ScalarDbOptions. The fetch size determines how many records are fetched from the server in each batch:

var scalarDbOptions = new ScalarDbOptions
{
Address = "http://<HOSTNAME_OR_IP_ADDRESS>:<PORT>",
ScannerFetchSize = 20 // Default is 10.
};

Use a scanner with scan options​

The scanner supports all scan options available in ScanBuilder.

Scan with limit​
using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;

// ...

var scanWithLimit = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("items")
.SetScanType(ScanOperationType.ScanAll)
.SetLimit(100) // Limit to 100 results.
.Build();

await using var scanner = await transaction.CreateScannerAsync(scanWithLimit);
var results = await scanner.AllAsync();
Scan with clustering key range​
using ScalarDB.Client.Builders;
using ScalarDB.Client.Core;

// ...

var scanWithRange = new ScanBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.SetScanType(ScanOperationType.Scan)
.AddPartitionKey("order_id", "1")
.AddStartClusteringKey("item_id", 2)
.SetStartInclusive(true)
.AddEndClusteringKey("item_id", 8)
.SetEndInclusive(true)
.Build();

await using var scanner = await transaction.CreateScannerAsync(scanWithRange);

Close the scanner​

The scanner implements both IDisposable and IAsyncDisposable. The recommended approach is to use await using (or using for synchronous code) for automatic disposal:

await using var scanner = await transaction.CreateScannerAsync(scan);
// The scanner is automatically closed when leaving the scope.

If you need to close the scanner explicitly, you can use the CloseAsync method:

var scanner = await transaction.CreateScannerAsync(scan);
try
{
var result = await scanner.OneAsync();
// Process result...
}
finally
{
await scanner.CloseAsync();
}

Synchronous API​

All scanner methods have synchronous versions:

using var scanner = transaction.CreateScanner(scan);

// Retrieve one result.
var result = scanner.One();

// Retrieve all remaining results.
var results = scanner.All();

// Close the scanner.
scanner.Close();

Scanner vs. ScanAsync​

FeatureScannerScanAsync
Memory usageLow (streaming)Higher (all results in memory)
Use caseLarge datasetsSmall to medium datasets
Result retrievalIncremental (OneAsync) or all at once (AllAsync)All at once
Resource managementRequires explicit disposalAutomatic
note

For details on handling exceptions during scanner and other transaction operations, see Exception Handling in the ScalarDB Cluster .NET Client SDK.

InsertAsync method example​

To insert a new record, you can use the InsertAsync method as follows:

var insert =
new InsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();

await transaction.InsertAsync(insert);

UpsertAsync method example​

To upsert a record (update an existing record or insert a new one), you can use the UpsertAsync method as follows:

var upsert =
new UpsertBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.Build();

await transaction.UpsertAsync(upsert);

UpdateAsync method example​

To update an existing record, you can use the UpdateAsync method as follows:

// ...
using ScalarDB.Client.Core;

// ...

var update =
new UpdateBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddColumn("count", 11)
.AddCondition("processed", false, Operator.Equal)
.Build();

await transaction.UpdateAsync(update);

DeleteAsync method example​

To delete a record, you can use the DeleteAsync method as follows:

// ...
using ScalarDB.Client.Core;

// ...

var delete =
new DeleteBuilder()
.SetNamespaceName("ns")
.SetTableName("statements")
.AddPartitionKey("order_id", "1")
.AddClusteringKey("item_id", 2)
.AddCondition("processed", false, Operator.Equal)
.Build();

await transaction.DeleteAsync(delete);

MutateAsync method example​

The MutateAsync method allows you to execute more than one mutation operation in a single call to the cluster. You can do this in the following manner:

// ...
using ScalarDB.Client.Core;

// ...

var mutations = new IMutation[]
{
new InsertBuilder()
// ...
.Build(),
new UpsertBuilder()
// ...
.Build(),
new UpdateBuilder()
// ...
.Build(),
new DeleteBuilder()
// ...
.Build()
};

await transaction.MutateAsync(mutations);
note

To modify data by using the InsertAsync, UpsertAsync, UpdateAsync, DeleteAsync, or MutateAsync method, the data must be retrieved first by using the GetAsync or ScanAsync method.