ScalarDB Cluster での Python をはじめよう
このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。
このドキュメントでは、Python を使用して ScalarDB Cluster の gRPC クライアントコードを記述する方法について説明します。
前提条件
- Python 3.7以降
- Kubernetes クラスターで実行されている ScalarDB Cluster
- ScalarDB Cluster をローカルにデプロイする方法の手順に従ってデプロイした Kubernetes クラスターで ScalarDB Cluster が実行されていることを前提としています。
ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。
サンプルアプリケーション
このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。
ステップ 1. schema.json
を作成する
以下は簡単なサンプルスキーマです。
schema.json
を作成し、ファイルに次の内容を追加します。
{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}
ステップ 2. database.properties
を作成する
ScalarDB Cluster の Schema Loader 用に database.properties
を作成する必要があります。ただし、まず LoadBalancer
サービス (scalardb-cluster-envoy
) のサービスリソースの EXTERNAL-IP
アドレスを取得する必要があります。
EXTERNAL-IP
アドレスを確認するには、次のコマンドを実行します。
kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h
この場合、EXTERNAL-IP
アドレスは localhost
です。
次に、database.properties
を作成し、ファイルに次の内容を追加します。
scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost
ScalarDB Cluster に接続するには、scalar.db.transaction_manager
プロパティに cluster
を指定する必要があります。また、このチュートリアルでは indirect
クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。
ステップ 3. スキーマをロードする
ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。
java -jar scalardb-cluster-schema-loader-3.14.0-all.jar --config database.properties -f schema.json --coordinator
ステップ 4. Python 環境をセットアップする
Python 環境の管理方法は自由に選択できます。このガイドでは、Python アプリケーションが venv
を使用して環境で実行されていることを前提としています。
任意の場所に作業ディレクトリを作成し、そこに移動します。次に、次のコマンドを実行して venv
をアクティブ化します。
python3 -m venv venv
source venv/bin/activate
pip
コマンドを使用して gRPC パッケージをインストールしましょう。
pip install grpcio grpcio-tools
ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する
ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。
まず、scalardb-cluster.proto
ファイルをダウンロードし、作業ディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto
ファイルが必要な場合は、お問い合わせください。
次のコマンドを実行すると、スタブコードを生成できます。
python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. scalardb-cluster.proto
次のファイルが生成されます:
scalardb_cluster_pb2.py
scalardb_cluster_pb2.pyi
scalardb_cluster_pb2_grpc.py
ステップ 6. サンプルアプリケーションの作成
以下は、スタブコードを使用するサンプル Python アプリケーション (electronic_money.py
) です。このプログラムは、ScalarDB をはじめよう の ElectronicMoney.java
プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer
サービスの EXTERNAL-IP
値に基づいて SERVER_ADDRESS
の値を更新する必要があることに注意してください。
import argparse
from typing import Optional
import grpc
import scalardb_cluster_pb2_grpc
from scalardb_cluster_pb2 import (
BeginRequest,
BeginResponse,
Column,
CommitRequest,
Get,
GetRequest,
GetResponse,
Key,
Put,
PutRequest,
RequestHeader,
RollbackRequest,
)
SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"
request_header = RequestHeader(hop_limit=10)
def charge(id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)
begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)
transaction_id = begin_response.transaction_id
try:
pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
)
# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=pkey,
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)
# Calculate the balance
balance = amount
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
current = balance_column.int_value.value
balance += current
# Update the balance
put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=pkey,
columns=[
Column(name=BALANCE, int_value=Column.IntValue(value=balance))
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[put],
)
)
# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e
def pay(from_id: str, to_id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)
begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)
transaction_id = begin_response.transaction_id
try:
from_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=from_id),
)
]
)
to_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=to_id),
)
]
)
# Retrieve the current balances for ids
from_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=from_pkey,
)
from_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=from_get,
)
)
to_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=to_pkey,
)
to_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=to_get,
)
)
# Calculate the balances (it assumes that both accounts exist)
new_from_balance = (
next(
c for c in from_get_response.result.columns if c.name == BALANCE
).int_value.value
- amount
)
new_to_balance = (
next(
c for c in to_get_response.result.columns if c.name == BALANCE
).int_value.value
+ amount
)
if new_from_balance < 0:
raise RuntimeError(from_id + " doesn't have enough balance.")
# Update the balances
from_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=from_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_from_balance)
)
],
)
to_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=to_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_to_balance)
)
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[from_put, to_put],
)
)
# Commit the transaction (records are automatically recovered in case of failure)
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e
def get_balance(id: str) -> Optional[int]:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)
begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)
transaction_id = begin_response.transaction_id
try:
# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
),
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)
balance = None
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
balance = balance_column.int_value.value
# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
return balance
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
parser_charge = subparsers.add_parser("charge")
parser_charge.add_argument("-amount", type=int, required=True)
parser_charge.add_argument("-to", type=str, required=True, dest="to_id")
parser_charge.set_defaults(func=lambda args: charge(args.to_id, args.amount))
parser_pay = subparsers.add_parser("pay")
parser_pay.add_argument("-amount", type=int, required=True)
parser_pay.add_argument("-from", type=str, required=True, dest="from_id")
parser_pay.add_argument("-to", type=str, required=True, dest="to_id")
parser_pay.set_defaults(
func=lambda args: pay(args.from_id, args.to_id, args.amount)
)
parser_get_balance = subparsers.add_parser("get-balance")
parser_get_balance.add_argument("-id", type=str, required=True)
parser_get_balance.set_defaults(func=lambda args: print(get_balance(args.id)))
args = parser.parse_args()
args.func(args)
その後、次のようにプログラムを実行できます:
-
user1
に1000
を請求します:python electronic_money.py charge -amount 1000 -to user1
-
merchant1
に0
を請求します (merchant1
のアカウントを作成するだけです):python electronic_money.py charge -amount 0 -to merchant1
-
user1
からmerchant1
に100
を支払います:python electronic_money.py pay -amount 100 -from user1 -to merchant1
-
user1
の残高を取得します。python electronic_money.py get-balance -id user1
-
merchant1
の残高を取得します。python electronic_money.py get-balance -id merchant1
参照
その他の ScalarDB Cluster チュートリアルについては、以下を参照してください。
- ScalarDB Cluster をはじめよう
- ScalarDB Cluster GraphQL をはじめよう
- JDBC 経由の ScalarDB Cluster SQL をはじめよう
- Spring Data JDBC for ScalarDB を使用した ScalarDB Cluster SQL をはじ めよう
- ScalarDB Cluster での Go をはじめよう
Java API で ScalarDB Cluster を使用するアプリケーションの開発の詳細については、以下を参照してください。
ScalarDB Cluster gRPC API の詳細については、以下を参照してください。