Building an OCPP 1.6 Central System with Flask async, WebSockets, and MongoDB
Managing EV chargers requires real-time, asynchronous communication, remote control, and reliable data logging. In this post, we share how we built a production-ready, OCPP 1.6-compatible central system (CSMS) using:
- 🐍 Python + Flask[async]
- 📡 WebSocket (via
python-ocpp) - 🗃 MongoDB for message storage
- 💻 Alpine.js dashboard
- 🐳 Docker for modular deployment
This system allows us to control charge points, receive their status and transaction updates, and visualize system activity through a dashboard.
🧠 Architecture Overview
graph TD
UI["Dashboard UI\n(Alpine.js)\nPort 5050"] -->|Fetch API| FlaskAPI["Flask API\n(Remote Control)\nPort 6000"]
FlaskAPI -->|Send OCPP call\nvia main_loop| WebSocket["OCPP WebSocket Server\n(websockets)\nPort 9000"]
FlaskAPI -->|Log Messages| MongoDB["MongoDB\nCollection: messages"]
WebSocket -->|Receive / Respond| CP["Charge Point\n(OCPP 1.6 Client)"]
⚙️ Tech Stack
| Layer | Tool / Tech |
|---|---|
| WebSocket | python-ocpp, websockets |
| API Server | Flask[async], asyncio |
| Database | MongoDB |
| UI Frontend | Alpine.js + fetch() |
| Deployment | Docker, docker-compose |
🚀 Flask[async] + WebSockets: Working Together
Running both a WebSocket server and Flask together in one app requires careful coordination of the asyncio event loop. Flask 2.0 introduced native async view support, making it possible to:
- Serve async REST APIs using Flask[async]
- Share a common event loop with the OCPP WebSocket server
awaitcharge point communication in Flask routes usingasyncio.run_coroutine_threadsafe(...)
✅ Example
We capture the main loop like this:
async def main():
global main_loop
main_loop = asyncio.get_running_loop()
...
Then in any Flask API, we safely send OCPP commands like:
future = asyncio.run_coroutine_threadsafe(cp.call(...), main_loop)
result = future.result(timeout=10)
This lets us keep everything in a single asyncio context — no threading, no blocking, and full event-driven behavior.
🛰 OCPP WebSocket Server
We use websockets.serve(...) and python-ocpp to handle standard OCPP 1.6 actions like:
AuthorizeStartTransactionStopTransactionStatusNotificationDataTransfer
Each connected charge point is tracked in a global connected_charge_points dictionary. We also log every incoming OCPP payload into MongoDB:
@on(Action.heartbeat)
async def on_heartbeat(self, **kwargs):
log_client_message(self.id, "2", "Heartbeat", **kwargs)
return call_result.HeartbeatPayload(current_time=iso_now())
🧾 MongoDB Logging
We store all client payloads in the ocpp.messages collection using a generic logger:
def log_client_message(cp_id, message_type, action, **payload):
db.messages.insert_one({
"cp_id": cp_id,
"type": message_type,
"action": action,
"payload": payload,
"timestamp": datetime.utcnow()
})
This makes it easy to query historical activity, debug field installations, or integrate with reporting systems.
🛠 Flask API for Remote Control
We expose REST APIs to send remote commands:
POST /api/remote_start_transactionPOST /api/remote_stop_transactionPOST /api/unlock_connectorPOST /api/get_diagGET /api/messages/latest
Each API sends a command to the corresponding charge point using cp.call(...).
💻 Alpine.js Dashboard
The frontend is powered by Alpine.js and talks to Flask using simple fetch() calls.
Each charge point shows controls to:
- Start / stop a transaction
- Unlock the connector
- Run diagnostics
- View logs via
/api/messages/latest?cp_id=...
Example:
async remoteStart(cp_id) {
const idTag = prompt("Enter ID tag:");
const connectorId = prompt("Connector ID:");
const res = await fetch("/api/remote_start_transaction", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ cp_id, id_tag: idTag, connector_id: parseInt(connectorId) })
});
alert(await res.text());
}
🐳 Docker Compose Explained
We use docker-compose.yml to orchestrate the following services:
services:
mongo:
image: mongo
volumes:
- mongo-data:/data/db
ports:
- "27017:27017"
dashboard:
build: ./dashboard
ports:
- "5050:80"
ocpp:
build: .
volumes:
- .:/app
ports:
- "9000:9000" # WebSocket server
- "6030:6000" # Flask API
depends_on:
- mongo
volumes:
mongo-data:
🔍 Breakdown:
mongostores all messages.ocppruns both the WebSocket server and Flask async API.dashboardserves the Alpine.js frontend.- Port
6030(host) is mapped to Flask’s port6000(container). - All services share a network and MongoDB is persistent across restarts.
This setup makes it easy to scale, debug, or run locally or in the cloud.
📈 Example Query: Show Last 100 Logs
We expose an API to query messages:
@app.route("/api/messages/latest", methods=["GET"])
def get_latest_messages():
cp_id = request.args.get("cp_id")
query = {"cp_id": cp_id} if cp_id else {}
results = messages_collection.find(query).sort("_id", -1).limit(100)
return dumps(list(results))
🔮 What’s Next
- 📊 Real-time metrics dashboard
- 🔒 Authentication for API and dashboard
- 💸 Billing system integration
- 📨 Webhook support for external apps
- 📤 Automatic diagnostics + upload handling
🧩 Final Thoughts
With Flask[async], python-ocpp, and a few lines of glue, you can build a scalable and standards-compliant OCPP backend:
- ✅ OCPP-compliant and fully async
- ✅ Modular and Dockerized
- ✅ Logs every message to MongoDB
- ✅ Web dashboard for live control
Whether you're an EV operator, developer, or hardware integrator — this stack gives you the control and observability you need.
📬 Want help launching your own EV platform?
Reach out to Simplico Co., Ltd. or request a demo.
Get in Touch with us
Related Posts
- 面向市级与区级政府的数字化系统参考架构
- Reference Architecture for Provincial / Municipal Digital Systems
- 实用型 GovTech 架构:ERP、GIS、政务服务平台与数据中台
- A Practical GovTech Architecture: ERP, GIS, Citizen Portal, and Data Platform
- 为什么应急响应系统必须采用 Offline First 设计(来自 ATAK 的启示)
- Why Emergency Systems Must Work Offline First (Lessons from ATAK)
- 为什么地方政府的软件项目会失败 —— 如何在编写代码之前避免失败
- Why Government Software Projects Fail — And How to Prevent It Before Writing Code
- AI 热潮之后:接下来会发生什么(以及这对中国企业意味着什么)
- After the AI Hype: What Always Comes Next (And Why It Matters for Business)
- 为什么没有系统集成,回收行业的 AI 项目往往会失败
- Why AI in Recycling Fails Without System Integration
- ISA-95 vs RAMI 4.0:中国制造业应该如何选择(以及为什么两者缺一不可)
- ISA-95 vs RAMI 4.0: Which One Should You Use (And Why Both Matter)
- 为什么低代码正在退潮(以及它正在被什么取代)
- Why Low‑Code Is Falling Out of Trend (and What Replaced It)
- 2025 年失败的产品 —— 真正的原因是什么?
- The Biggest Product Failures of 2025 — And the Real Reason They Failed
- Agentic AI Explained: Manus vs OpenAI vs Google —— 中国企业的实践选择
- Agentic AI Explained: Manus vs OpenAI vs Google — What Enterprises Really Need













