Flask[async]・WebSocket・MongoDB を使って OCPP 1.6 中央システムを構築する
EV(電気自動車)の充電ステーションを管理するには、リアルタイムな非同期通信、遠隔操作、そして信頼性の高いデータロギングが不可欠です。
本記事では、私たちがどのようにして Flask、WebSocket、MongoDB を使い、**OCPP 1.6 準拠の中央システム(CSMS)**を構築したかをご紹介します。
✅ 構成要素:
- 🐍 Python + Flask[async]
- 📡 WebSocket(
python-ocpp利用) - 🗃 MongoDB によるメッセージ保存
- 💻 Alpine.js ダッシュボード
- 🐳 Docker による分離・展開
このシステムにより、チャージポイントを制御し、その状態やトランザクションの更新を受信し、ダッシュボード上で可視化できます。
🧠 システム構成図(Mermaid)
graph TD
UI["Dashboard UI\n(Alpine.js)\nポート 5050"] -->|API リクエスト| FlaskAPI["Flask API\n(遠隔操作)\nポート 6000"]
FlaskAPI -->|main_loop 経由で OCPP 呼び出し| WebSocket["OCPP WebSocket Server\n(websockets)\nポート 9000"]
FlaskAPI -->|メッセージ保存| MongoDB["MongoDB\nコレクション: messages"]
WebSocket -->|送受信| CP["チャージポイント\n(OCPP 1.6 クライアント)"]
⚙️ 使用技術一覧
| レイヤー | 使用技術 |
|---|---|
| WebSocket | python-ocpp, websockets |
| API サーバー | Flask[async], asyncio |
| データベース | MongoDB |
| フロントエンドUI | Alpine.js + fetch() |
| デプロイ | Docker, docker-compose |
🚀 Flask[async] と WebSocket の連携
1つのアプリ内で Flask API と WebSocket サーバーを動作させるには、asyncio イベントループを共有する必要があります。
Flask 2.0 以降では ネイティブな async view をサポートしており、以下のような連携が可能です:
- 非同期 REST API を Flask[async] で提供
- WebSocket サーバーと同一の
asyncioloop を使用 asyncio.run_coroutine_threadsafe(...)を使って Flask から OCPP 呼び出し
✅ 実装例
async def main():
global main_loop
main_loop = asyncio.get_running_loop()
Flask API 側からは次のように使用します:
future = asyncio.run_coroutine_threadsafe(cp.call(...), main_loop)
result = future.result(timeout=10)
これによりスレッド不要・ノンブロッキングで、完全にイベント駆動型の構成になります。
🛰 OCPP WebSocket サーバー
python-ocpp と websockets.serve(...) を用いて、以下のような OCPP 1.6 の標準アクションに対応:
AuthorizeStartTransactionStopTransactionStatusNotificationDataTransfer
各チャージポイントの接続は connected_charge_points によって管理されます。
MongoDB にはすべてのメッセージを記録:
@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
log_client_message(self.id, "2", "Heartbeat", **kwargs)
return call_result.HeartbeatPayload(current_time=iso_now())
🧾 MongoDB によるログ保存
すべてのクライアントメッセージを ocpp.messages コレクションに保存しています。
def log_client_message(cp_id, message_type, action, **payload):
db.messages.insert_one({
"cp_id": cp_id,
"type": message_type,
"action": action,
"payload": payload,
"timestamp": datetime.utcnow()
})
ログは分析、トラブルシュート、監査にも活用可能です。
🛠 Flask API による遠隔制御
提供している REST API:
POST /api/remote_start_transactionPOST /api/remote_stop_transactionPOST /api/unlock_connectorPOST /api/get_diagGET /api/messages/latest
これらは OCPP 経由でチャージポイントに命令を送ります。
💻 Alpine.js ダッシュボード
軽量な UI を Alpine.js で構築し、fetch() で Flask API にアクセス。
チャージポイントごとに以下が可能:
- トランザクション開始 / 停止
- コネクタ解除
- 診断要求
- ログ表示
/api/messages/latest?cp_id=...
async remoteStart(cp_id) {
const idTag = prompt("IDタグを入力:");
const connectorId = prompt("コネクタIDを入力:");
const res = await fetch("/api/remote_start_transaction", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ cp_id, id_tag: idTag, connector_id: parseInt(connectorId) })
});
alert(await res.text());
}
🐳 Docker Compose の詳細
docker-compose.yml で構成される3つのサービス:
services:
mongo:
image: mongo
volumes:
- mongo-data:/data/db
ports:
- "27017:27017"
dashboard:
build: ./dashboard
ports:
- "5050:80"
ocpp:
build: .
volumes:
- .:/app
ports:
- "9000:9000"
- "6030:6000"
depends_on:
- mongo
volumes:
mongo-data:
構成のポイント:
mongo:全メッセージの保存先ocpp:WebSocket と Flask API 両方を担当dashboard:フロントエンド提供- コンテナ間でネットワークを共有、永続ボリュームを使用
📈 最新100件のログ取得API
@app.route("/api/messages/latest", methods=["GET"])
def get_latest_messages():
cp_id = request.args.get("cp_id")
query = {"cp_id": cp_id} if cp_id else {}
results = messages_collection.find(query).sort("_id", -1).limit(100)
return dumps(list(results))
🔮 今後の計画
- 📊 リアルタイムメトリクス対応のダッシュボード
- 🔐 APIとUIの認証
- 💸 請求システム連携
- 📨 Webhook対応
- 📤 自動診断・アップロードフロー
🧩 最後に
Flask[async] と python-ocpp、MongoDB を組み合わせることで、次のようなシステムが簡単に構築できます:
- ✅ OCPP準拠の非同期通信
- ✅ Dockerベースの簡単な展開
- ✅ 高度なログ記録とダッシュボード操作
- ✅ EV充電ネットワークに拡張可能な設計
📬 EVチャージシステムの導入・カスタムをご検討中ですか?
ぜひ https://simplico.net よりお問い合わせください。デモも承ります。
Get in Touch with us
Related Posts
- 都道府県・市町村向けデジタルシステムのリファレンスアーキテクチャ
- 実践的GovTechアーキテクチャ:ERP・GIS・住民向けサービス・データ基盤
- なぜ緊急対応システムは Offline First で設計されるべきなのか(ATAK からの教訓)
- なぜ地方自治体のソフトウェアプロジェクトは失敗するのか —— コードを書く前に防ぐための考え方
- AIブームの後に来るもの:次に起きること(そして日本企業にとって重要な理由)
- システムインテグレーションなしでは、なぜリサイクル業界のAIは失敗するのか
- ISA-95 vs RAMI 4.0:日本の製造業はどちらを使うべきか(そして、なぜ両方が重要なのか)
- なぜローコードはトレンドから外れつつあるのか(そして何が置き換えたのか)
- 2025年に失敗した製品たち —— その本当の理由
- Agentic AI Explained: Manus vs OpenAI vs Google — 日本企業が知るべき選択肢
- AIが実現する病院システムの垂直統合(Vertical Integration)
- Industrial AIにおけるAIアクセラレータ なぜ「チップ」よりもソフトウェアフレームワークが重要なのか
- 日本企業向け|EC・ERP連携に強いAI×ワークフロー型システム開発
- 信頼性の低い「スマート」システムが生む見えないコスト
- GPU vs LPU vs TPU:AIアクセラレータの正しい選び方
- LPUとは何か?日本企業向け実践的な解説と活用事例
- ソフトウェアエンジニアのためのサイバーセキュリティ用語マッピング
- モダンなサイバーセキュリティ監視・インシデント対応システムの設計 Wazuh・SOAR・脅威インテリジェンスを用いた実践的アーキテクチャ
- AI時代におけるクラシック・プログラミングの考え方
- SimpliPOSFlex 現場の「現実」に向き合うためのPOS(日本市場向け)













