ScalarDB Cluster での Go をはじめよう
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
このドキュメントでは、Go を使用して ScalarDB Cluster の gRPC クライアントコードを 記述する方法について説明します。
前提条件
- Go (最新の3つのメジャーリリースのいずれか)
- Kubernetes クラスターで実行されている ScalarDB Cluster
- ScalarDB Cluster をローカルにデプロイする方法の手順に従ってデプロイした Kubernetes クラスターで ScalarDB Cluster が実行されていることを前提としています。
ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。
サンプルアプリケーション
このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。
ステップ 1. schema.json
を作成する
以下は簡単なサンプルスキーマです。
schema.json
を作成し、ファイルに次の内容を追加します。
{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}
ステップ 2. database.properties
を作成する
ScalarDB Cluster の Schema Loader 用に database.properties
を作成する必要があります。ただし、まず LoadBalancer
サービス (scalardb-cluster-envoy
) のサービスリソースの EXTERNAL-IP
アドレスを取得する必要があります。
EXTERNAL-IP
アドレスを確認するには、次のコマンドを実行します。
kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h
この場合、EXTERNAL-IP
アドレスは localhost
です。
次に、database.properties
を作成し、ファイルに次の内容を追加します。
scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost
ScalarDB Cluster に接続するには、scalar.db.transaction_manager
プロパティに cluster
を指定する必要があります。また、このチュートリアルでは indirect
クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。
ステップ 3. スキーマをロードする
ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。
java -jar scalardb-cluster-schema-loader-3.13.1-all.jar --config database.properties -f schema.json --coordinator
ステップ 4. Go 環境をセットアップする
gRPC クイックスタートドキュメントの Prerequisites セクションに従って、次のコンポーネントをインストールします。
- Go
- プロトコルバッファコンパイラ、
protoc
、バージョン3.15以降 - プロトコルコンパイラ用の Go プラグイン
ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する
ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。
まず、新しい作業ディレクトリに、次のコマンドを実行して、gRPC コードの生成に使用 する scalardb-cluster
という名前のディレクトリを作成します。
mkdir scalardb-cluster
次に、scalardb-cluster.proto
ファイルをダウンロードし、作成したディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto
ファイルが必要な場合は、お問い合わせください。
次のコマンドを実行して gRPC コードを生成します。
protoc --go_out=. --go_opt=paths=source_relative \
--go_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--go-grpc_opt=Mscalardb-cluster/scalardb-cluster.proto=example.com/scalardb-cluster \
scalardb-cluster/scalardb-cluster.proto
コマンドを実行すると、scalardb-cluster
サブディレクトリに scalardb-cluster.pb.go
と scalardb-cluster_grpc.pb.go
という2つのファイルが表示されます。
ステップ 6. サンプルアプリケーションの作成
以下は、gRPC コードを使用するプログラムです。これを作業ディレクトリに main.go
として保存します。このプログラムは、ScalarDB をはじめようの ElectronicMoney.java
プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer
サービスの EXTERNAL-IP
値に基づいて SERVER_ADDRESS
の値を更新する必要があることに注意してください。
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"time"
pb "emoney/scalardb-cluster"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"
)
var requestHeader = pb.RequestHeader{HopLimit: 10}
type TxFn func(ctx context.Context, client pb.DistributedTransactionClient, transactionId string) error
func withTransaction(fn TxFn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Set up a connection to the server.
conn, err := grpc.Dial(SERVER_ADDRESS, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()
client := pb.NewDistributedTransactionClient(conn)
// Begin a transaction
beginResponse, err := client.Begin(ctx, &pb.BeginRequest{RequestHeader: &requestHeader})
if err != nil {
return err
}
transactionId := beginResponse.TransactionId
// Execute the function
err = fn(ctx, client, transactionId)
if err != nil {
// Rollback the transaction if there is an error
client.Rollback(ctx, &pb.RollbackRequest{TransactionId: transactionId})
return err
}
// Commit the transaction
_, err = client.Commit(ctx, &pb.CommitRequest{RequestHeader: &requestHeader, TransactionId: transactionId})
return err
}
func charge(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string, amount int) error {
partitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}}
// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return err
}
// Calculate the balance
balance := int32(amount)
if result := getResponse.GetResult(); result != nil {
for _, column := range result.GetColumns() {
if column.Name == BALANCE {
balance += column.GetIntValue().GetValue()
break
}
}
}
// Update the balance
put := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &partitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &balance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&put}})
return err
}
func pay(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, fromId string, toId string, amount int) error {
fromPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &fromId}}}}}
toPartitionKey := pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &toId}}}}}
// Retrieve the current balances for ids
fromGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
fromGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &fromGet})
if err != nil {
return err
}
toGet := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
toGetResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &toGet})
if err != nil {
return err
}
// Calculate the balances (it assumes that both accounts exist)
var (
fromBalance int32
toBalance int32
)
for _, column := range fromGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
fromBalance = column.GetIntValue().GetValue()
break
}
}
for _, column := range toGetResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
toBalance = column.GetIntValue().GetValue()
break
}
}
newFromBalance := fromBalance - int32(amount)
newToBalance := toBalance + int32(amount)
if newFromBalance < 0 {
return errors.New(fromId + " doesn't have enough balance.")
}
// Update the balances
fromPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &fromPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newFromBalance}}},
},
}
toPut := pb.Put{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &toPartitionKey, ClusteringKey: nil,
Columns: []*pb.Column{
{Name: BALANCE, Value: &pb.Column_IntValue_{IntValue: &pb.Column_IntValue{Value: &newToBalance}}},
},
}
_, err = client.Put(ctx, &pb.PutRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Puts: []*pb.Put{&fromPut, &toPut}})
return err
}
func getBalance(ctx context.Context, client pb.DistributedTransactionClient, transactionId string, id string) (int, error) {
// Retrieve the current balance for id
get := pb.Get{
NamespaceName: NAMESPACE, TableName: TABLENAME,
PartitionKey: &pb.Key{Columns: []*pb.Column{{Name: ID, Value: &pb.Column_TextValue_{TextValue: &pb.Column_TextValue{Value: &id}}}}},
ClusteringKey: nil,
GetType: pb.Get_GET_TYPE_GET,
}
getResponse, err := client.Get(ctx, &pb.GetRequest{RequestHeader: &requestHeader, TransactionId: transactionId, Get: &get})
if err != nil {
return 0, err
}
if getResponse.GetResult() == nil || len(getResponse.GetResult().GetColumns()) == 0 {
return 0, errors.New("Account " + id + " doesn't exist.")
}
var balance int
for _, column := range getResponse.GetResult().GetColumns() {
if column.Name == BALANCE {
balance = int(column.GetIntValue().GetValue())
break
}
}
return balance, nil
}
func main() {
var (
action = flag.String("action", "", "Action to perform: charge / pay / getBalance")
fromId = flag.String("from", "", "From account (needed for pay)")
toId = flag.String("to", "", "To account (needed for charge and pay)")
id = flag.String("id", "", "Account id (needed for getBalance)")
)
var amount int
flag.IntVar(&amount, "amount", 0, "Amount to transfer (needed for charge and pay)")
flag.Parse()
if *action == "charge" {
if *toId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return charge(ctx, client, txId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "pay" {
if *toId == "" || *fromId == "" || amount < 0 {
printUsageAndExit()
}
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
return pay(ctx, client, txId, *fromId, *toId, amount)
})
if err != nil {
log.Fatalf("error: %v", err)
}
} else if *action == "getBalance" {
if *id == "" {
printUsageAndExit()
}
var balance int
err := withTransaction(func(ctx context.Context, client pb.DistributedTransactionClient, txId string) error {
var err error
balance, err = getBalance(ctx, client, txId, *id)
return err
})
if err != nil {
log.Fatalf("error: %v", err)
}
fmt.Println(balance)
} else {
fmt.Fprintln(os.Stderr, "Unknown action "+*action)
printUsageAndExit()
}
}
func printUsageAndExit() {
flag.Usage()
os.Exit(1)
}
main.go
ファイルを作成した後、次のコマンドを実行して go.mod
ファイルを作成する必要があります。
go mod init emoney
go mod tidy
これで、ディレクトリ構造は次のようになります。
.
├── go.mod
├── go.sum
├── main.go
└── scalardb-cluster
├── scalardb-cluster.pb.go
├── scalardb-cluster.proto
└── scalardb-cluster_grpc.pb.go
その後、次のようにプログラムを実行できます:
-
user1
に1000
を請求します:go run main.go -action charge -amount 1000 -to user1
-
merchant1
に0
を請求します (merchant1
のアカウントを作成するだけです):go run main.go -action charge -amount 0 -to merchant1