Building an OCPP 1.6 Central System with Flask async, WebSockets, and MongoDB

Managing EV chargers requires real-time, asynchronous communication, remote control, and reliable data logging. In this post, we share how we built a production-ready, OCPP 1.6-compatible central system (CSMS) using:

  • 🐍 Python + Flask[async]
  • 📡 WebSocket (via python-ocpp)
  • 🗃 MongoDB for message storage
  • 💻 Alpine.js dashboard
  • 🐳 Docker for modular deployment

This system allows us to control charge points, receive their status and transaction updates, and visualize system activity through a dashboard.


🧠 Architecture Overview

graph TD
    UI["Dashboard UI\n(Alpine.js)\nPort 5050"] -->|Fetch API| FlaskAPI["Flask API\n(Remote Control)\nPort 6000"]
    FlaskAPI -->|Send OCPP call\nvia main_loop| WebSocket["OCPP WebSocket Server\n(websockets)\nPort 9000"]
    FlaskAPI -->|Log Messages| MongoDB["MongoDB\nCollection: messages"]
    WebSocket -->|Receive / Respond| CP["Charge Point\n(OCPP 1.6 Client)"]

⚙️ Tech Stack

Layer Tool / Tech
WebSocket python-ocpp, websockets
API Server Flask[async], asyncio
Database MongoDB
UI Frontend Alpine.js + fetch()
Deployment Docker, docker-compose

🚀 Flask[async] + WebSockets: Working Together

Running both a WebSocket server and Flask together in one app requires careful coordination of the asyncio event loop. Flask 2.0 introduced native async view support, making it possible to:

  • Serve async REST APIs using Flask[async]
  • Share a common event loop with the OCPP WebSocket server
  • await charge point communication in Flask routes using asyncio.run_coroutine_threadsafe(...)

✅ Example

We capture the main loop like this:

async def main():
    global main_loop
    main_loop = asyncio.get_running_loop()
    ...

Then in any Flask API, we safely send OCPP commands like:

future = asyncio.run_coroutine_threadsafe(cp.call(...), main_loop)
result = future.result(timeout=10)

This lets us keep everything in a single asyncio context — no threading, no blocking, and full event-driven behavior.


🛰 OCPP WebSocket Server

We use websockets.serve(...) and python-ocpp to handle standard OCPP 1.6 actions like:

  • Authorize
  • StartTransaction
  • StopTransaction
  • StatusNotification
  • DataTransfer

Each connected charge point is tracked in a global connected_charge_points dictionary. We also log every incoming OCPP payload into MongoDB:

@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
    log_client_message(self.id, "2", "Heartbeat", **kwargs)
    return call_result.HeartbeatPayload(current_time=iso_now())

🧾 MongoDB Logging

We store all client payloads in the ocpp.messages collection using a generic logger:

def log_client_message(cp_id, message_type, action, **payload):
    db.messages.insert_one({
        "cp_id": cp_id,
        "type": message_type,
        "action": action,
        "payload": payload,
        "timestamp": datetime.utcnow()
    })

This makes it easy to query historical activity, debug field installations, or integrate with reporting systems.


🛠 Flask API for Remote Control

We expose REST APIs to send remote commands:

  • POST /api/remote_start_transaction
  • POST /api/remote_stop_transaction
  • POST /api/unlock_connector
  • POST /api/get_diag
  • GET /api/messages/latest

Each API sends a command to the corresponding charge point using cp.call(...).


💻 Alpine.js Dashboard

The frontend is powered by Alpine.js and talks to Flask using simple fetch() calls.

Each charge point shows controls to:

  • Start / stop a transaction
  • Unlock the connector
  • Run diagnostics
  • View logs via /api/messages/latest?cp_id=...

Example:

async remoteStart(cp_id) {
  const idTag = prompt("Enter ID tag:");
  const connectorId = prompt("Connector ID:");
  const res = await fetch("/api/remote_start_transaction", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ cp_id, id_tag: idTag, connector_id: parseInt(connectorId) })
  });
  alert(await res.text());
}

🐳 Docker Compose Explained

We use docker-compose.yml to orchestrate the following services:

services:
  mongo:
    image: mongo
    volumes:
      - mongo-data:/data/db
    ports:
      - "27017:27017"

  dashboard:
    build: ./dashboard
    ports:
      - "5050:80"

  ocpp:
    build: .
    volumes:
      - .:/app
    ports:
      - "9000:9000"     # WebSocket server
      - "6030:6000"     # Flask API
    depends_on:
      - mongo

volumes:
  mongo-data:

🔍 Breakdown:

  • mongo stores all messages.
  • ocpp runs both the WebSocket server and Flask async API.
  • dashboard serves the Alpine.js frontend.
  • Port 6030 (host) is mapped to Flask’s port 6000 (container).
  • All services share a network and MongoDB is persistent across restarts.

This setup makes it easy to scale, debug, or run locally or in the cloud.


📈 Example Query: Show Last 100 Logs

We expose an API to query messages:

@app.route("/api/messages/latest", methods=["GET"])
def get_latest_messages():
    cp_id = request.args.get("cp_id")
    query = {"cp_id": cp_id} if cp_id else {}
    results = messages_collection.find(query).sort("_id", -1).limit(100)
    return dumps(list(results))

🔮 What’s Next

  • 📊 Real-time metrics dashboard
  • 🔒 Authentication for API and dashboard
  • 💸 Billing system integration
  • 📨 Webhook support for external apps
  • 📤 Automatic diagnostics + upload handling

🧩 Final Thoughts

With Flask[async], python-ocpp, and a few lines of glue, you can build a scalable and standards-compliant OCPP backend:

  • ✅ OCPP-compliant and fully async
  • ✅ Modular and Dockerized
  • ✅ Logs every message to MongoDB
  • ✅ Web dashboard for live control

Whether you're an EV operator, developer, or hardware integrator — this stack gives you the control and observability you need.


📬 Want help launching your own EV platform?
Reach out to Simplico Co., Ltd. or request a demo.

Related Posts

Our Products


Related Posts

Our Products


Get in Touch with us

Speak to Us or Whatsapp(+66) 83001 0222

Chat with Us on LINEiiitum1984

Our HeadquartersChanthaburi, Thailand