Flask[async]・WebSocket・MongoDB を使って OCPP 1.6 中央システムを構築する

EV(電気自動車)の充電ステーションを管理するには、リアルタイムな非同期通信、遠隔操作、そして信頼性の高いデータロギングが不可欠です。

本記事では、私たちがどのようにして Flask、WebSocket、MongoDB を使い、**OCPP 1.6 準拠の中央システム(CSMS)**を構築したかをご紹介します。

✅ 構成要素:

  • 🐍 Python + Flask[async]
  • 📡 WebSocket(python-ocpp 利用)
  • 🗃 MongoDB によるメッセージ保存
  • 💻 Alpine.js ダッシュボード
  • 🐳 Docker による分離・展開

このシステムにより、チャージポイントを制御し、その状態やトランザクションの更新を受信し、ダッシュボード上で可視化できます。


🧠 システム構成図(Mermaid)

graph TD
    UI["Dashboard UI\n(Alpine.js)\nポート 5050"] -->|API リクエスト| FlaskAPI["Flask API\n(遠隔操作)\nポート 6000"]
    FlaskAPI -->|main_loop 経由で OCPP 呼び出し| WebSocket["OCPP WebSocket Server\n(websockets)\nポート 9000"]
    FlaskAPI -->|メッセージ保存| MongoDB["MongoDB\nコレクション: messages"]
    WebSocket -->|送受信| CP["チャージポイント\n(OCPP 1.6 クライアント)"]

⚙️ 使用技術一覧

レイヤー 使用技術
WebSocket python-ocpp, websockets
API サーバー Flask[async], asyncio
データベース MongoDB
フロントエンドUI Alpine.js + fetch()
デプロイ Docker, docker-compose

🚀 Flask[async] と WebSocket の連携

1つのアプリ内で Flask API と WebSocket サーバーを動作させるには、asyncio イベントループを共有する必要があります。

Flask 2.0 以降では ネイティブな async view をサポートしており、以下のような連携が可能です:

  • 非同期 REST API を Flask[async] で提供
  • WebSocket サーバーと同一の asyncio loop を使用
  • asyncio.run_coroutine_threadsafe(...) を使って Flask から OCPP 呼び出し

✅ 実装例

async def main():
    global main_loop
    main_loop = asyncio.get_running_loop()

Flask API 側からは次のように使用します:

future = asyncio.run_coroutine_threadsafe(cp.call(...), main_loop)
result = future.result(timeout=10)

これによりスレッド不要・ノンブロッキングで、完全にイベント駆動型の構成になります。


🛰 OCPP WebSocket サーバー

python-ocppwebsockets.serve(...) を用いて、以下のような OCPP 1.6 の標準アクションに対応:

  • Authorize
  • StartTransaction
  • StopTransaction
  • StatusNotification
  • DataTransfer

各チャージポイントの接続は connected_charge_points によって管理されます。

MongoDB にはすべてのメッセージを記録:

@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
    log_client_message(self.id, "2", "Heartbeat", **kwargs)
    return call_result.HeartbeatPayload(current_time=iso_now())

🧾 MongoDB によるログ保存

すべてのクライアントメッセージを ocpp.messages コレクションに保存しています。

def log_client_message(cp_id, message_type, action, **payload):
    db.messages.insert_one({
        "cp_id": cp_id,
        "type": message_type,
        "action": action,
        "payload": payload,
        "timestamp": datetime.utcnow()
    })

ログは分析、トラブルシュート、監査にも活用可能です。


🛠 Flask API による遠隔制御

提供している REST API:

  • POST /api/remote_start_transaction
  • POST /api/remote_stop_transaction
  • POST /api/unlock_connector
  • POST /api/get_diag
  • GET /api/messages/latest

これらは OCPP 経由でチャージポイントに命令を送ります。


💻 Alpine.js ダッシュボード

軽量な UI を Alpine.js で構築し、fetch() で Flask API にアクセス。

チャージポイントごとに以下が可能:

  • トランザクション開始 / 停止
  • コネクタ解除
  • 診断要求
  • ログ表示 /api/messages/latest?cp_id=...
async remoteStart(cp_id) {
  const idTag = prompt("IDタグを入力:");
  const connectorId = prompt("コネクタIDを入力:");
  const res = await fetch("/api/remote_start_transaction", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ cp_id, id_tag: idTag, connector_id: parseInt(connectorId) })
  });
  alert(await res.text());
}

🐳 Docker Compose の詳細

docker-compose.yml で構成される3つのサービス:

services:
  mongo:
    image: mongo
    volumes:
      - mongo-data:/data/db
    ports:
      - "27017:27017"

  dashboard:
    build: ./dashboard
    ports:
      - "5050:80"

  ocpp:
    build: .
    volumes:
      - .:/app
    ports:
      - "9000:9000"
      - "6030:6000"
    depends_on:
      - mongo

volumes:
  mongo-data:

構成のポイント:

  • mongo:全メッセージの保存先
  • ocpp:WebSocket と Flask API 両方を担当
  • dashboard:フロントエンド提供
  • コンテナ間でネットワークを共有、永続ボリュームを使用

📈 最新100件のログ取得API

@app.route("/api/messages/latest", methods=["GET"])
def get_latest_messages():
    cp_id = request.args.get("cp_id")
    query = {"cp_id": cp_id} if cp_id else {}
    results = messages_collection.find(query).sort("_id", -1).limit(100)
    return dumps(list(results))

🔮 今後の計画

  • 📊 リアルタイムメトリクス対応のダッシュボード
  • 🔐 APIとUIの認証
  • 💸 請求システム連携
  • 📨 Webhook対応
  • 📤 自動診断・アップロードフロー

🧩 最後に

Flask[async] と python-ocpp、MongoDB を組み合わせることで、次のようなシステムが簡単に構築できます:

  • ✅ OCPP準拠の非同期通信
  • ✅ Dockerベースの簡単な展開
  • ✅ 高度なログ記録とダッシュボード操作
  • ✅ EV充電ネットワークに拡張可能な設計

📬 EVチャージシステムの導入・カスタムをご検討中ですか?
ぜひ https://simplico.net よりお問い合わせください。デモも承ります。

Related Posts

Our Products


Related Posts

Our Products


Get in Touch with us

Speak to Us or Whatsapp(+66) 83001 0222

Chat with Us on LINEiiitum1984

Our HeadquartersChanthaburi, Thailand