メインコンテンツまでスキップ
バージョン: 3.13

ScalarDB Cluster での Python をはじめよう

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

このドキュメントでは、Python を使用して ScalarDB Cluster の gRPC クライアントコードを記述する方法について説明します。

前提条件

警告

ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。

サンプルアプリケーション

このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。

ステップ 1. schema.json を作成する

以下は簡単なサンプルスキーマです。

schema.json を作成し、ファイルに次の内容を追加します。

{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}

ステップ 2. database.properties を作成する

ScalarDB Cluster の Schema Loader 用に database.properties を作成する必要があります。ただし、まず LoadBalancer サービス (scalardb-cluster-envoy) のサービスリソースの EXTERNAL-IP アドレスを取得する必要があります。

EXTERNAL-IP アドレスを確認するには、次のコマンドを実行します。

kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h

この場合、EXTERNAL-IP アドレスは localhost です。

次に、database.properties を作成し、ファイルに次の内容を追加します。

scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost

ScalarDB Cluster に接続するには、scalar.db.transaction_manager プロパティに cluster を指定する必要があります。また、このチュートリアルでは indirect クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。

ステップ 3. スキーマをロードする

ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。

java -jar scalardb-cluster-schema-loader-3.13.1-all.jar --config database.properties -f schema.json --coordinator

ステップ 4. Python 環境をセットアップする

Python 環境の管理方法は自由に選択できます。このガイドでは、Python アプリケーションが venv を使用して環境で実行されていることを前提としています。

任意の場所に作業ディレクトリを作成し、そこに移動します。次に、次のコマンドを実行して venv をアクティブ化します。

python3 -m venv venv
source venv/bin/activate

pip コマンドを使用して gRPC パッケージをインストールしましょう。

pip install grpcio grpcio-tools

ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する

ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。

まず、scalardb-cluster.proto ファイルをダウンロードし、作業ディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto ファイルが必要な場合は、お問い合わせください。

次のコマンドを実行すると、スタブコードを生成できます。

python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. scalardb-cluster.proto

次のファイルが生成されます:

  • scalardb_cluster_pb2.py
  • scalardb_cluster_pb2.pyi
  • scalardb_cluster_pb2_grpc.py

ステップ 6. サンプルアプリケーションの作成

以下は、スタブコードを使用するサンプル Python アプリケーション (electronic_money.py) です。このプログラムは、ScalarDB をはじめようElectronicMoney.java プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer サービスの EXTERNAL-IP 値に基づいて SERVER_ADDRESS の値を更新する必要があることに注意してください。

import argparse
from typing import Optional

import grpc

import scalardb_cluster_pb2_grpc
from scalardb_cluster_pb2 import (
BeginRequest,
BeginResponse,
Column,
CommitRequest,
Get,
GetRequest,
GetResponse,
Key,
Put,
PutRequest,
RequestHeader,
RollbackRequest,
)

SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"

request_header = RequestHeader(hop_limit=10)


def charge(id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
)

# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=pkey,
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)

# Calculate the balance
balance = amount
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
current = balance_column.int_value.value
balance += current

# Update the balance
put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=pkey,
columns=[
Column(name=BALANCE, int_value=Column.IntValue(value=balance))
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[put],
)
)

# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


def pay(from_id: str, to_id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
from_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=from_id),
)
]
)
to_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=to_id),
)
]
)

# Retrieve the current balances for ids
from_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=from_pkey,
)
from_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=from_get,
)
)
to_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=to_pkey,
)
to_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=to_get,
)
)

# Calculate the balances (it assumes that both accounts exist)
new_from_balance = (
next(
c for c in from_get_response.result.columns if c.name == BALANCE
).int_value.value
- amount
)
new_to_balance = (
next(
c for c in to_get_response.result.columns if c.name == BALANCE
).int_value.value
+ amount
)

if new_from_balance < 0:
raise RuntimeError(from_id + " doesn't have enough balance.")

# Update the balances
from_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=from_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_from_balance)
)
],
)
to_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=to_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_to_balance)
)
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[from_put, to_put],
)
)

# Commit the transaction (records are automatically recovered in case of failure)
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


def get_balance(id: str) -> Optional[int]:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
),
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)

balance = None
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
balance = balance_column.int_value.value

# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)

return balance

except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)

parser_charge = subparsers.add_parser("charge")
parser_charge.add_argument("-amount", type=int, required=True)
parser_charge.add_argument("-to", type=str, required=True, dest="to_id")
parser_charge.set_defaults(func=lambda args: charge(args.to_id, args.amount))

parser_pay = subparsers.add_parser("pay")
parser_pay.add_argument("-amount", type=int, required=True)
parser_pay.add_argument("-from", type=str, required=True, dest="from_id")
parser_pay.add_argument("-to", type=str, required=True, dest="to_id")
parser_pay.set_defaults(
func=lambda args: pay(args.from_id, args.to_id, args.amount)
)

parser_get_balance = subparsers.add_parser("get-balance")
parser_get_balance.add_argument("-id", type=str, required=True)
parser_get_balance.set_defaults(func=lambda args: print(get_balance(args.id)))

args = parser.parse_args()
args.func(args)

その後、次のようにプログラムを実行できます:

  • user11000 を請求します:

    python electronic_money.py charge -amount 1000 -to user1
  • merchant10 を請求します (merchant1 のアカウントを作成するだけです):

    python electronic_money.py charge -amount 0 -to merchant1
  • user1 から merchant1100 を支払います:

    python electronic_money.py pay -amount 100 -from user1 -to merchant1
  • user1 の残高を取得します。

    python electronic_money.py get-balance -id user1
  • merchant1 の残高を取得します。

    python electronic_money.py get-balance -id merchant1

参照

その他の ScalarDB Cluster チュートリアルについては、以下を参照してください。

Java API で ScalarDB Cluster を使用するアプリケーションの開発の詳細については、以下を参照してください。

ScalarDB Cluster gRPC API の詳細については、以下を参照してください。