メインコンテンツまでスキップ
バージョン: 3.14

ScalarDB Cluster での Go をはじめよう

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

このドキュメントでは、Go を使用して ScalarDB Cluster の gRPC クライアントコードを記述する方法について説明します。

前提条件

  • Go (最新の 3 つのメジャーリリースのいずれか)
  • Kubernetes クラスターで実行されている ScalarDB Cluster
警告

ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。

サンプルアプリケーション

このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。

ステップ 1. schema.json を作成する

以下は簡単なサンプルスキーマです。

schema.json を作成し、ファイルに次の内容を追加します。

{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}

ステップ 2. database.properties を作成する

ScalarDB Cluster の Schema Loader 用に database.properties を作成する必要があります。ただし、まず LoadBalancer サービス (scalardb-cluster-envoy) のサービスリソースの EXTERNAL-IP アドレスを取得する必要があります。

EXTERNAL-IP アドレスを確認するには、次のコマンドを実行します。

kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h

この場合、EXTERNAL-IP アドレスは localhost です。

次に、database.properties を作成し、ファイルに次の内容を追加します。

scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost

ScalarDB Cluster に接続するには、scalar.db.transaction_manager プロパティに cluster を指定する必要があります。また、このチュートリアルでは indirect クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。

ステップ 3. スキーマをロードする

ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。

java -jar scalardb-cluster-schema-loader-3.14.0-all.jar --config database.properties -f schema.json --coordinator

ステップ 4. Go 環境をセットアップする

gRPC クイックスタートドキュメントの Prerequisites セクションに従って、次のコンポーネントをインストールします。

  • Go
  • プロトコルバッファコンパイラ、protoc、バージョン 3.15 以降
  • プロトコルコンパイラ用の Go プラグイン

ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する

ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。

まず、新しい作業ディレクトリに、次のコマンドを実行して、gRPC コードの生成に使用する scalardb-cluster という名前のディレクトリを作成します。

mkdir scalardb-cluster

次に、scalardb-cluster.proto ファイルをダウンロードし、作成したディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto ファイルが必要な場合は、お問い合わせください。

次のコマンドを実行して gRPC コードを生成します。

protoc --go_out=. --go_opt=paths=source_relative \
--go_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--go-grpc_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
scalardb-cluster/scalardb-cluster.proto

コマンドを実行すると、scalardb-cluster サブディレクトリに scalardb-cluster.pb.goscalardb-cluster_grpc.pb.go という 2 つのファイルが表示されます。

ステップ 6. サンプルアプリケーションの作成

以下は、gRPC コードを使用するプログラムです。これを作業ディレクトリに main.go として保存します。このプログラムは、ScalarDB をはじめようElectronicMoney.java プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer サービスの EXTERNAL-IP 値に基づいて SERVER_ADDRESS の値を更新する必要があることに注意してください。

package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"time"

pb "emoney/scalardb-cluster"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"
)

var requestHeader = pb.RequestHeader{HopLimit: 10}

type TxFn func(ctx context.Context, client pb.DistributedTransactionClient, transactionId string) error

func withTransaction(fn TxFn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Set up a connection to the server.
conn, err := grpc.Dial(SERVER_ADDRESS, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()

client := pb.NewDistributedTransactionClient(conn)

// Begin a transaction
beginResponse, err := client.Begin(ctx, &pb.BeginRequest{RequestHeader: &requestHeader})
if err != nil {
return err
}
transactionId := beginResponse.TransactionId

// Execute the function
err = fn(ctx, client, transactionId)
if err != nil {
// Rollback the transaction if there is an error
client.Rollback(ctx, &pb.RollbackRequest{TransactionId: transactionId})
return err
}

// Commit the transaction
_, err = client.Commit(ctx, &pb.CommitRequest{RequestHeader: &requestHeader, TransactionId: transactionId})
return err
}

func charge(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string, amount int) error {
partitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}}

// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return err
}

// Calculate the balance
balance := int32(amount)
if result := getResponse.GetResult(); result != nil {
for _, column := range result.GetColumns() {
if column.Name == BALANCE {
balance += column.GetIntValue().GetValue()
break
}
}
}

// Update the balance
put := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &balance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&put}})
return err
}

func pay(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, fromId string, toId string, amount int) error {
fromPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &fromId}}}}}
toPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &toId}}}}}

// Retrieve the current balances for ids
fromGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
fromGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &fromGet})
if err != nil {
return err
}
toGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
toGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &toGet})
if err != nil {
return err
}

// Calculate the balances (it assumes that both accounts exist)
var (
fromBalance int32
toBalance int32
)
for _, column := range fromGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
fromBalance = column.GetIntValue().GetValue()
break
}
}
for _, column := range toGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
toBalance = column.GetIntValue().GetValue()
break
}
}
newFromBalance := fromBalance - int32(amount)
newToBalance := toBalance + int32(amount)

if newFromBalance < 0 {
return errors.New(fromId + " doesn't have enough balance.")
}

// Update the balances
fromPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newFromBalance}}},
},
}
toPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newToBalance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&fromPut, &toPut}})
return err
}

func getBalance(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string) (int, error) {
// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}},
ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return 0, err
}
if getResponse.GetResult() == nil || len(getResponse.GetResult().GetColumns()) == 0 {
return 0, errors.New("Account " + id + " doesn't exist.")
}

var balance int
for _, column := range getResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
balance = int(column.GetIntValue().GetValue())
break
}
}
return balance, nil
}

func main() {
var (
action = flag.String("action", "", "Action to perform: charge / pay / getBalance")
fromId = flag.String("from", "", "From account (needed for pay)")
toId = flag.String("to", "", "To account (needed for charge and pay)")
id = flag.String("id", "", "Account id (needed for getBalance)")
)
var amount int
flag.IntVar(&amount, "amount", 0, "Amount to transfer (needed for charge and pay)")
flag.Parse()

if *action == "charge" {
if *toId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return charge(ctx, client, txId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "pay" {
if *toId == "" || *fromId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return pay(ctx, client, txId, *fromId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "getBalance" {
if *id == "" {
printUsageAndExit()
}
var balance int
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
var err error
balance, err = getBalance(ctx, client, txId, *id)
return err
})
if err != nil {
log.Fatalf("error: %v", err)
}
fmt.Println(balance)
} else {
fmt.Fprintln(os.Stderr, "Unknown action "+*action)
printUsageAndExit()
}
}

func printUsageAndExit() {
flag.Usage()
os.Exit(1)
}

main.go ファイルを作成した後、次のコマンドを実行して go.mod ファイルを作成する必要があります。

go mod init emoney
go mod tidy

これで、ディレクトリ構造は次のようになります。

.
├── go.mod
├── go.sum
├── main.go
└── scalardb-cluster
├── scalardb-cluster.pb.go
├── scalardb-cluster.proto
└── scalardb-cluster_grpc.pb.go

その後、次のようにプログラムを実行できます:

  • user11000 を請求します:

    go run main.go -action charge -amount 1000 -to user1
  • merchant10 を請求します (merchant1 のアカウントを作成するだけです):

    go run main.go -action charge -amount 0 -to merchant1
  • user1 から merchant1100 を支払います:

    go run main.go -action pay -amount 100 -from user1 -to merchant1
  • user1 の残高を取得します。

    go run main.go -action getBalance -id user1
  • merchant1 の残高を取得します。

    go run main.go -action getBalance -id merchant1

go build を使用してバイナリを取得してから実行することもできます。

go build
./emoney -action getBalance -id user1

参照

その他の ScalarDB Cluster チュートリアルについては、以下を参照してください。

Java API で ScalarDB Cluster を使用するアプリケーションの開発の詳細については、以下を参照してください。

ScalarDB Cluster gRPC API の詳細については、以下を参照してください。