メインコンテンツまでスキップ
バージョン: 3.14

ScalarDB Cluster での Python をはじめよう

注記

このページは英語版のページが機械翻訳されたものです。英語版との間に矛盾または不一致がある場合は、英語版を正としてください。

このドキュメントでは、Python を使用して ScalarDB Cluster の gRPC クライアントコードを記述する方法について説明します。

前提条件

警告

ScalarDB Cluster を使用するには、ライセンスキー (試用ライセンスまたは商用ライセンス) が必要です。ライセンスキーをお持ちでない場合は、お問い合わせください。

サンプルアプリケーション

このチュートリアルでは、口座間で送金できる電子マネーアプリケーションを作成するプロセスについて説明します。

ステップ 1. schema.json を作成する

以下は簡単なサンプルスキーマです。

schema.json を作成し、ファイルに次の内容を追加します。

{
"emoney.account": {
"transaction": true,
"partition-key": [
"id"
],
"clustering-key": [],
"columns": {
"id": "TEXT",
"balance": "INT"
}
}
}

ステップ 2. database.properties を作成する

ScalarDB Cluster の Schema Loader 用に database.properties を作成する必要があります。ただし、まず LoadBalancer サービス (scalardb-cluster-envoy) のサービスリソースの EXTERNAL-IP アドレスを取得する必要があります。

EXTERNAL-IP アドレスを確認するには、次のコマンドを実行します。

kubectl get svc scalardb-cluster-envoy
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
scalardb-cluster-envoy LoadBalancer 10.105.121.51 localhost 60053:30641/TCP 16h

この場合、EXTERNAL-IP アドレスは localhost です。

次に、database.properties を作成し、ファイルに次の内容を追加します。

scalar.db.transaction_manager=cluster
scalar.db.contact_points=indirect:localhost

ScalarDB Cluster に接続するには、scalar.db.transaction_manager プロパティに cluster を指定する必要があります。また、このチュートリアルでは indirect クライアントモードを使用して Envoy のサービスリソースに接続します。クライアントモードの詳細については、Java API を使用した ScalarDB Cluster の開発者ガイドを参照してください。

ステップ 3. スキーマをロードする

ScalarDB Cluster 経由でスキーマをロードするには、専用の ScalarDB Cluster 用 Schema Loader (Schema Loader for Cluster) を使用する必要があります。Schema Loader for Cluster の使用方法は、JAR ファイルの名前が異なることを除いて、Schema Loader for ScalarDB の使用方法と基本的に同じです。Schema Loader for Cluster は、ScalarDB リリースからダウンロードできます。JAR ファイルをダウンロードしたら、次のコマンドで Schema Loader for Cluster を実行できます。

java -jar scalardb-cluster-schema-loader-3.14.1-all.jar --config database.properties -f schema.json --coordinator

ステップ 4. Python 環境をセットアップする

Python 環境の管理方法は自由に選択できます。このガイドでは、Python アプリケーションが venv を使用して環境で実行されていることを前提としています。

任意の場所に作業ディレクトリを作成し、そこに移動します。次に、次のコマンドを実行して venv をアクティブ化します。

python3 -m venv venv
source venv/bin/activate

pip コマンドを使用して gRPC パッケージをインストールしましょう。

pip install grpcio grpcio-tools

ステップ 5. ScalarDB Cluster gRPC のスタブコードを生成する

ScalarDB Cluster の gRPC サーバーと通信するには、proto ファイルからスタブコードを生成する必要があります。

まず、scalardb-cluster.proto ファイルをダウンロードし、作業ディレクトリに保存します。商用ライセンスをお持ちの ScalarDB Cluster ユーザーで、scalardb-cluster.proto ファイルが必要な場合は、お問い合わせください。

次のコマンドを実行すると、スタブコードを生成できます。

python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. scalardb-cluster.proto

次のファイルが生成されます:

  • scalardb_cluster_pb2.py
  • scalardb_cluster_pb2.pyi
  • scalardb_cluster_pb2_grpc.py

ステップ 6. サンプルアプリケーションの作成

以下は、スタブコードを使用するサンプル Python アプリケーション (electronic_money.py) です。このプログラムは、ScalarDB をはじめようElectronicMoney.java プログラムと同じことを行います。環境内の ScalarDB Cluster LoadBalancer サービスの EXTERNAL-IP 値に基づいて SERVER_ADDRESS の値を更新する必要があることに注意してください。

import argparse
from typing import Optional

import grpc

import scalardb_cluster_pb2_grpc
from scalardb_cluster_pb2 import (
BeginRequest,
BeginResponse,
Column,
CommitRequest,
Get,
GetRequest,
GetResponse,
Key,
Put,
PutRequest,
RequestHeader,
RollbackRequest,
)

SERVER_ADDRESS = "localhost:60053"
NAMESPACE = "emoney"
TABLENAME = "account"
ID = "id"
BALANCE = "balance"

request_header = RequestHeader(hop_limit=10)


def charge(id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
)

# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=pkey,
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)

# Calculate the balance
balance = amount
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
current = balance_column.int_value.value
balance += current

# Update the balance
put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=pkey,
columns=[
Column(name=BALANCE, int_value=Column.IntValue(value=balance))
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[put],
)
)

# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


def pay(from_id: str, to_id: str, amount: int) -> None:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
from_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=from_id),
)
]
)
to_pkey = Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=to_id),
)
]
)

# Retrieve the current balances for ids
from_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=from_pkey,
)
from_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=from_get,
)
)
to_get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=to_pkey,
)
to_get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=to_get,
)
)

# Calculate the balances (it assumes that both accounts exist)
new_from_balance = (
next(
c for c in from_get_response.result.columns if c.name == BALANCE
).int_value.value
- amount
)
new_to_balance = (
next(
c for c in to_get_response.result.columns if c.name == BALANCE
).int_value.value
+ amount
)

if new_from_balance < 0:
raise RuntimeError(from_id + " doesn't have enough balance.")

# Update the balances
from_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=from_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_from_balance)
)
],
)
to_put = Put(
namespace_name=NAMESPACE,
table_name=TABLENAME,
partition_key=to_pkey,
columns=[
Column(
name=BALANCE, int_value=Column.IntValue(value=new_to_balance)
)
],
)
stub.Put(
PutRequest(
request_header=request_header,
transaction_id=transaction_id,
puts=[from_put, to_put],
)
)

# Commit the transaction (records are automatically recovered in case of failure)
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


def get_balance(id: str) -> Optional[int]:
with grpc.insecure_channel(SERVER_ADDRESS) as channel:
stub = scalardb_cluster_pb2_grpc.DistributedTransactionStub(channel)

begin_response: BeginResponse = stub.Begin(
BeginRequest(request_header=request_header)
)

transaction_id = begin_response.transaction_id

try:
# Retrieve the current balance for id
get = Get(
namespace_name=NAMESPACE,
table_name=TABLENAME,
get_type=Get.GetType.GET_TYPE_GET,
partition_key=Key(
columns=[
Column(
name=ID,
text_value=Column.TextValue(value=id),
)
]
),
)
get_response: GetResponse = stub.Get(
GetRequest(
request_header=request_header,
transaction_id=transaction_id,
get=get,
)
)

balance = None
if get_response.result.columns:
balance_column = next(
c for c in get_response.result.columns if c.name == BALANCE
)
balance = balance_column.int_value.value

# Commit the transaction
stub.Commit(
CommitRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)

return balance

except Exception as e:
# Rollback the transaction
stub.Rollback(
RollbackRequest(
request_header=request_header,
transaction_id=transaction_id,
)
)
raise e


if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)

parser_charge = subparsers.add_parser("charge")
parser_charge.add_argument("-amount", type=int, required=True)
parser_charge.add_argument("-to", type=str, required=True, dest="to_id")
parser_charge.set_defaults(func=lambda args: charge(args.to_id, args.amount))

parser_pay = subparsers.add_parser("pay")
parser_pay.add_argument("-amount", type=int, required=True)
parser_pay.add_argument("-from", type=str, required=True, dest="from_id")
parser_pay.add_argument("-to", type=str, required=True, dest="to_id")
parser_pay.set_defaults(
func=lambda args: pay(args.from_id, args.to_id, args.amount)
)

parser_get_balance = subparsers.add_parser("get-balance")
parser_get_balance.add_argument("-id", type=str, required=True)
parser_get_balance.set_defaults(func=lambda args: print(get_balance(args.id)))

args = parser.parse_args()
args.func(args)

その後、次のようにプログラムを実行できます:

  • user11000 を請求します:

    python electronic_money.py charge -amount 1000 -to user1
  • merchant10 を請求します (merchant1 のアカウントを作成するだけです):

    python electronic_money.py charge -amount 0 -to merchant1
  • user1 から merchant1100 を支払います:

    python electronic_money.py pay -amount 100 -from user1 -to merchant1
  • user1 の残高を取得します。

    python electronic_money.py get-balance -id user1
  • merchant1 の残高を取得します。

    python electronic_money.py get-balance -id merchant1

参照

その他の ScalarDB Cluster チュートリアルについては、以下を参照してください。

Java API で ScalarDB Cluster を使用するアプリケーションの開発の詳細については、以下を参照してください。

ScalarDB Cluster gRPC API の詳細については、以下を参照してください。

This website uses cookies to enhance the visitor experience. By continuing to use this website, you acknowledge that you have read and understood our privacy policy and consent to the use of cookies to help improve your browsing experience and provide you with personalized content.