Flask[async]・WebSocket・MongoDB を使って OCPP 1.6 中央システムを構築する
EV(電気自動車)の充電ステーションを管理するには、リアルタイムな非同期通信、遠隔操作、そして信頼性の高いデータロギングが不可欠です。
本記事では、私たちがどのようにして Flask、WebSocket、MongoDB を使い、**OCPP 1.6 準拠の中央システム(CSMS)**を構築したかをご紹介します。
✅ 構成要素:
- 🐍 Python + Flask[async]
- 📡 WebSocket(
python-ocpp
利用) - 🗃 MongoDB によるメッセージ保存
- 💻 Alpine.js ダッシュボード
- 🐳 Docker による分離・展開
このシステムにより、チャージポイントを制御し、その状態やトランザクションの更新を受信し、ダッシュボード上で可視化できます。
🧠 システム構成図(Mermaid)
graph TD
UI["Dashboard UI\n(Alpine.js)\nポート 5050"] -->|API リクエスト| FlaskAPI["Flask API\n(遠隔操作)\nポート 6000"]
FlaskAPI -->|main_loop 経由で OCPP 呼び出し| WebSocket["OCPP WebSocket Server\n(websockets)\nポート 9000"]
FlaskAPI -->|メッセージ保存| MongoDB["MongoDB\nコレクション: messages"]
WebSocket -->|送受信| CP["チャージポイント\n(OCPP 1.6 クライアント)"]
⚙️ 使用技術一覧
レイヤー | 使用技術 |
---|---|
WebSocket | python-ocpp , websockets |
API サーバー | Flask[async] , asyncio |
データベース | MongoDB |
フロントエンドUI | Alpine.js + fetch() |
デプロイ | Docker , docker-compose |
🚀 Flask[async] と WebSocket の連携
1つのアプリ内で Flask API と WebSocket サーバーを動作させるには、asyncio
イベントループを共有する必要があります。
Flask 2.0 以降では ネイティブな async view をサポートしており、以下のような連携が可能です:
- 非同期 REST API を Flask[async] で提供
- WebSocket サーバーと同一の
asyncio
loop を使用 asyncio.run_coroutine_threadsafe(...)
を使って Flask から OCPP 呼び出し
✅ 実装例
async def main():
global main_loop
main_loop = asyncio.get_running_loop()
Flask API 側からは次のように使用します:
future = asyncio.run_coroutine_threadsafe(cp.call(...), main_loop)
result = future.result(timeout=10)
これによりスレッド不要・ノンブロッキングで、完全にイベント駆動型の構成になります。
🛰 OCPP WebSocket サーバー
python-ocpp
と websockets.serve(...)
を用いて、以下のような OCPP 1.6 の標準アクションに対応:
Authorize
StartTransaction
StopTransaction
StatusNotification
DataTransfer
各チャージポイントの接続は connected_charge_points
によって管理されます。
MongoDB にはすべてのメッセージを記録:
@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
log_client_message(self.id, "2", "Heartbeat", **kwargs)
return call_result.HeartbeatPayload(current_time=iso_now())
🧾 MongoDB によるログ保存
すべてのクライアントメッセージを ocpp.messages
コレクションに保存しています。
def log_client_message(cp_id, message_type, action, **payload):
db.messages.insert_one({
"cp_id": cp_id,
"type": message_type,
"action": action,
"payload": payload,
"timestamp": datetime.utcnow()
})
ログは分析、トラブルシュート、監査にも活用可能です。
🛠 Flask API による遠隔制御
提供している REST API:
POST /api/remote_start_transaction
POST /api/remote_stop_transaction
POST /api/unlock_connector
POST /api/get_diag
GET /api/messages/latest
これらは OCPP 経由でチャージポイントに命令を送ります。
💻 Alpine.js ダッシュボード
軽量な UI を Alpine.js で構築し、fetch()
で Flask API にアクセス。
チャージポイントごとに以下が可能:
- トランザクション開始 / 停止
- コネクタ解除
- 診断要求
- ログ表示
/api/messages/latest?cp_id=...
async remoteStart(cp_id) {
const idTag = prompt("IDタグを入力:");
const connectorId = prompt("コネクタIDを入力:");
const res = await fetch("/api/remote_start_transaction", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ cp_id, id_tag: idTag, connector_id: parseInt(connectorId) })
});
alert(await res.text());
}
🐳 Docker Compose の詳細
docker-compose.yml
で構成される3つのサービス:
services:
mongo:
image: mongo
volumes:
- mongo-data:/data/db
ports:
- "27017:27017"
dashboard:
build: ./dashboard
ports:
- "5050:80"
ocpp:
build: .
volumes:
- .:/app
ports:
- "9000:9000"
- "6030:6000"
depends_on:
- mongo
volumes:
mongo-data:
構成のポイント:
mongo
:全メッセージの保存先ocpp
:WebSocket と Flask API 両方を担当dashboard
:フロントエンド提供- コンテナ間でネットワークを共有、永続ボリュームを使用
📈 最新100件のログ取得API
@app.route("/api/messages/latest", methods=["GET"])
def get_latest_messages():
cp_id = request.args.get("cp_id")
query = {"cp_id": cp_id} if cp_id else {}
results = messages_collection.find(query).sort("_id", -1).limit(100)
return dumps(list(results))
🔮 今後の計画
- 📊 リアルタイムメトリクス対応のダッシュボード
- 🔐 APIとUIの認証
- 💸 請求システム連携
- 📨 Webhook対応
- 📤 自動診断・アップロードフロー
🧩 最後に
Flask[async] と python-ocpp
、MongoDB を組み合わせることで、次のようなシステムが簡単に構築できます:
- ✅ OCPP準拠の非同期通信
- ✅ Dockerベースの簡単な展開
- ✅ 高度なログ記録とダッシュボード操作
- ✅ EV充電ネットワークに拡張可能な設計
📬 EVチャージシステムの導入・カスタムをご検討中ですか?
ぜひ https://simplico.net よりお問い合わせください。デモも承ります。
Related Posts
- AIはOdooの会計・在庫管理システムをどう強化するのか(開発視点付き)
- JavaScriptでフルスタックのEコマースシステムを開発しよう
- Python・Langchain・OllamaでエージェンティックAIを構築する方法(eコマース & 工場自動化向け)
- PythonとOBD-IIライブデータでP0420の根本原因を診断する
- スタートアップのアイデアを正しく検証するための『The Mom Test』の活用法
- RasaとLangchain、どちらを選ぶべきか?チャットボット開発の選択基準
- OCR Document Managerのご紹介:書類を簡単にテキスト化できるWebアプリ
- まだバズっていない「売れ筋商品」をAIで発見するツールを作っています ― 興味ありますか?
- あなたのウェブサイトがリードを失っている理由 — それは「沈黙」です
- スマート農業を革新するAgentic AIとは?あなたの農場が今すぐ導入すべき理由
- LangChain + Ollama で RAGチャットボットを作る方法
- SCPI を使った EXFO 機器の自動化:実践ガイド
- レガシーコードを扱いやすくするためのデザインパターン
- 🧠 レガシーコードに安全に新機能を追加する方法
- レガシーソフトウェアを安全に近代化 — 全面リライト不要!
- OpenSearchの仕組みとは?リアルタイム検索エンジンの内部構造を解説
- DjangoでBasicとPremium機能を分けるベストな戦略とは?
- オーダーメイド家具ビジネスをデジタル化しよう — あなたのブランド専用ECプラットフォーム
- simpliPOSのご紹介:ERPNextを基盤にしたスマートPOSシステム
- スマート農業をもっと簡単に:農業資材を効率的に管理・計画するアプリ
Our Products
Related Posts
- AIはOdooの会計・在庫管理システムをどう強化するのか(開発視点付き)
- JavaScriptでフルスタックのEコマースシステムを開発しよう
- Python・Langchain・OllamaでエージェンティックAIを構築する方法(eコマース & 工場自動化向け)
- PythonとOBD-IIライブデータでP0420の根本原因を診断する
- スタートアップのアイデアを正しく検証するための『The Mom Test』の活用法
- RasaとLangchain、どちらを選ぶべきか?チャットボット開発の選択基準
- OCR Document Managerのご紹介:書類を簡単にテキスト化できるWebアプリ
- まだバズっていない「売れ筋商品」をAIで発見するツールを作っています ― 興味ありますか?
- あなたのウェブサイトがリードを失っている理由 — それは「沈黙」です
- スマート農業を革新するAgentic AIとは?あなたの農場が今すぐ導入すべき理由
- LangChain + Ollama で RAGチャットボットを作る方法
- SCPI を使った EXFO 機器の自動化:実践ガイド
- レガシーコードを扱いやすくするためのデザインパターン
- 🧠 レガシーコードに安全に新機能を追加する方法
- レガシーソフトウェアを安全に近代化 — 全面リライト不要!
- OpenSearchの仕組みとは?リアルタイム検索エンジンの内部構造を解説
- DjangoでBasicとPremium機能を分けるベストな戦略とは?
- オーダーメイド家具ビジネスをデジタル化しよう — あなたのブランド専用ECプラットフォーム
- simpliPOSのご紹介:ERPNextを基盤にしたスマートPOSシステム
- スマート農業をもっと簡単に:農業資材を効率的に管理・計画するアプリ