メインコンテンツまでスキップ
バージョン: 3.15

ScalarDB Cluster での Go をはじめよう

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

このドキュメントでは、Go を使用して ScalarDB Cluster の gRPC クライアントコードを記述する方法について説明します。

前提条件

  • Go (最新の3つのメジャーリリースのいずれか)
  • Kubernetes クラスターで実行されている ScalarDB Cluster
警告

ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。

サンプルアプリケーション

このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。

ステップ 1. schema.json を作成する

以下は簡単なサンプルスキーマです。

schema.json を作成し、ファイルに次の内容を追加します。

{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}

ステップ 2. database.properties を作成する

ScalarDB Cluster の Schema Loader 用に database.properties を作成する必要があります。ただし、まず LoadBalancer サービス (scalardb-cluster-envoy) のサービスリソースの EXTERNAL-IP アドレスを取得する必要があります。

EXTERNAL-IP アドレスを確認するには、次のコマンドを実行します。

kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h

この場合、EXTERNAL-IP アドレスは localhost です。

次に、database.properties を作成し、ファイルに次の内容を追加します。

scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost

ScalarDB Cluster に接続するには、scalar.db.transaction_manager プロパティに cluster を指定する必要があります。また、このチュートリアルでは indirect クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。

ステップ 3. スキーマをロードする

ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。

java -jar scalardb-cluster-schema-loader-3.15.1-all.jar --config database.properties -f schema.json --coordinator

ステップ 4. Go 環境をセットアップする

gRPC クイックスタートドキュメントの Prerequisites セクションに従って、次のコンポーネントをインストールします。

  • Go
  • プロトコルバッファコンパイラ、protoc、バージョン3.15以降
  • プロトコルコンパイラ用の Go プラグイン

ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する

ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。

まず、新しい作業ディレクトリに、次のコマンドを実行して、gRPC コードの生成に使用する scalardb-cluster という名前のディレクトリを作成します。

mkdir scalardb-cluster

次に、scalardb-cluster.proto ファイルをダウンロードし、作成したディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto ファイルが必要な場合は、お問い合わせください。

次のコマンドを実行して gRPC コードを生成します。

protoc --go_out=. --go_opt=paths=source_relative \
--go_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--go-grpc_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
scalardb-cluster/scalardb-cluster.proto

コマンドを実行すると、scalardb-cluster サブディレクトリに scalardb-cluster.pb.goscalardb-cluster_grpc.pb.go という2つのファイルが表示されます。

ステップ 6. サンプルアプリケーションの作成

以下は、gRPC コードを使用するプログラムです。これを作業ディレクトリに main.go として保存します。このプログラムは、ScalarDB をはじめようElectronicMoney.java プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer サービスの EXTERNAL-IP 値に基づいて SERVER_ADDRESS の値を更新する必要があることに注意してください。

package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"time"

pb "emoney/scalardb-cluster"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"
)

var requestHeader = pb.RequestHeader{HopLimit: 10}

type TxFn func(ctx context.Context, client pb.DistributedTransactionClient, transactionId string) error

func withTransaction(fn TxFn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Set up a connection to the server.
conn, err := grpc.Dial(SERVER_ADDRESS, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()

client := pb.NewDistributedTransactionClient(conn)

// Begin a transaction
beginResponse, err := client.Begin(ctx, &pb.BeginRequest{RequestHeader: &requestHeader})
if err != nil {
return err
}
transactionId := beginResponse.TransactionId

// Execute the function
err = fn(ctx, client, transactionId)
if err != nil {
// Rollback the transaction if there is an error
client.Rollback(ctx, &pb.RollbackRequest{TransactionId: transactionId})
return err
}

// Commit the transaction
_, err = client.Commit(ctx, &pb.CommitRequest{RequestHeader: &requestHeader, TransactionId: transactionId})
return err
}

func charge(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string, amount int) error {
partitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}}

// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return err
}

// Calculate the balance
balance := int32(amount)
if result := getResponse.GetResult(); result != nil {
for _, column := range result.GetColumns() {
if column.Name == BALANCE {
balance += column.GetIntValue().GetValue()
break
}
}
}

// Update the balance
put := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &balance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&put}})
return err
}

func pay(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, fromId string, toId string, amount int) error {
fromPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &fromId}}}}}
toPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &toId}}}}}

// Retrieve the current balances for ids
fromGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
fromGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &fromGet})
if err != nil {
return err
}
toGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
toGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &toGet})
if err != nil {
return err
}

// Calculate the balances (it assumes that both accounts exist)
var (
fromBalance int32
toBalance int32
)
for _, column := range fromGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
fromBalance = column.GetIntValue().GetValue()
break
}
}
for _, column := range toGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
toBalance = column.GetIntValue().GetValue()
break
}
}
newFromBalance := fromBalance - int32(amount)
newToBalance := toBalance + int32(amount)

if newFromBalance < 0 {
return errors.New(fromId + " doesn't have enough balance.")
}

// Update the balances
fromPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newFromBalance}}},
},
}
toPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newToBalance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&fromPut, &toPut}})
return err
}

func getBalance(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string) (int, error) {
// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}},
ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return 0, err
}
if getResponse.GetResult() == nil || len(getResponse.GetResult().GetColumns()) == 0 {
return 0, errors.New("Account " + id + " doesn't exist.")
}

var balance int
for _, column := range getResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
balance = int(column.GetIntValue().GetValue())
break
}
}
return balance, nil
}

func main() {
var (
action = flag.String("action", "", "Action to perform: charge / pay / getBalance")
fromId = flag.String("from", "", "From account (needed for pay)")
toId = flag.String("to", "", "To account (needed for charge and pay)")
id = flag.String("id", "", "Account id (needed for getBalance)")
)
var amount int
flag.IntVar(&amount, "amount", 0, "Amount to transfer (needed for charge and pay)")
flag.Parse()

if *action == "charge" {
if *toId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return charge(ctx, client, txId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "pay" {
if *toId == "" || *fromId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return pay(ctx, client, txId, *fromId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "getBalance" {
if *id == "" {
printUsageAndExit()
}
var balance int
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
var err error
balance, err = getBalance(ctx, client, txId, *id)
return err
})
if err != nil {
log.Fatalf("error: %v", err)
}
fmt.Println(balance)
} else {
fmt.Fprintln(os.Stderr, "Unknown action "+*action)
printUsageAndExit()
}
}

func printUsageAndExit() {
flag.Usage()
os.Exit(1)
}

main.go ファイルを作成した後、次のコマンドを実行して go.mod ファイルを作成する必要があります。

go mod init emoney
go mod tidy

これで、ディレクトリ構造は次のようになります。

.
├── go.mod
├── go.sum
├── main.go
└── scalardb-cluster
├── scalardb-cluster.pb.go
├── scalardb-cluster.proto
└── scalardb-cluster_grpc.pb.go

その後、次のようにプログラムを実行できます:

  • user11000 を請求します:

    go run main.go -action charge -amount 1000 -to user1
  • merchant10 を請求します (merchant1 のアカウントを作成するだけです):

    go run main.go -action charge -amount 0 -to merchant1
  • user1 から merchant1100 を支払います:

    go run main.go -action pay -amount 100 -from user1 -to merchant1
  • user1 の残高を取得します。

    go run main.go -action getBalance -id user1
  • merchant1 の残高を取得します。

    go run main.go -action getBalance -id merchant1

go build を使用してバイナリを取得してから実行することもできます。

go build
./emoney -action getBalance -id user1

参照

その他の ScalarDB Cluster チュートリアルについては、以下を参照してください。

Java API で ScalarDB Cluster を使用するアプリケーションの開発の詳細については、以下を参照してください。

ScalarDB Cluster gRPC API の詳細については、以下を参照してください。

This website uses cookies to enhance the visitor experience. By continuing to use this website, you acknowledge that you have read and understood our privacy policy and consent to the use of cookies to help improve your browsing experience and provide you with personalized content.