FarmScript:我们如何从零设计一门农业IoT领域特定语言
为什么农业IoT需要一门DSL?
在构建农业IoT自动化系统时,工程师通常面临两个选择:
- YAML/JSON配置文件 — 简单,但遇到复杂条件分支就会崩溃
- 直接用通用语言(Python/Node.js)实现 — 灵活,但领域专家无法阅读业务逻辑
两者都有根本性的问题。农业自动化逻辑往往形成"领域专家定义规则、工程师实现代码"的双轨结构,导致需求错位和维护成本不断膨胀。
FarmScript的答案是:设计一门领域特定语言(DSL)。以泰国尖竹汶府的榴莲果园为主要用例,从零实现了一个编译器。本文分享这一过程中的设计决策与工程实现。
动机:为什么"配置文件"不够用
先从技术层面梳理为什么需要DSL。
一个典型的榴莲农场自动化需求是这样的:
- 早上5:30,如果土壤湿度低于55%,启动灌溉泵25分钟
- 但如果开花阶段状态为
budding或以上,禁止灌溉(开花期浇水是金枕榴莲减产的主要原因) - 气温超过38°C时启动喷雾系统并发送LINE通知;超过40°C时同时发送SMS
- 连续14天无降雨且开花阶段为
none时,发送干旱胁迫积累预警
用YAML来表达这些逻辑:
# YAML的尝试 — 很快变得不可读
rules:
- condition:
and:
- sensor: soil_moisture_z1
op: lt
value: 55
- sensor: flowering_stage
op: eq
value: none
action:
type: activate
target: pump_z1
duration_minutes: 25
当when A and B and not C then X elif D then Y这类复合条件增多时,YAML迅速失控——嵌套层级加深,条件优先级不清晰,最关键的是领域专家无法审核。
而直接用Python实现虽然表达能力足够,但规格变更时工程师成为瓶颈,果园管理者无法自主验证逻辑正确性。
DSL解决了这个矛盾:拥有农业领域的词汇和语法,同时由编译器负责验证和转换。
语言设计决策
声明式块结构
程序按语义分块:传感器声明、执行器声明、计划任务、规则、告警配置各自独立。依赖关系隐式处理——在rule块内引用传感器名称,编译器自动与sensors块进行关联验证。
响应式规则语义
规则是轮询驱动而非事件驱动的。生成的运行时每30秒对所有规则求值一次。这是优先保证简单性的选择,便于在ESP32或Raspberry Pi等嵌入式目标上实现。未来计划支持MQTT事件触发。
最小化类型系统
传感器类型只有numeric和enum两种。numeric支持比较运算符(>, <, ==, !=),enum只允许等值比较。类型检查在语义分析阶段完成,类型错误在编译时发现。
错误信息面向领域专家
编译器错误信息面向农业从业者而非工程师——这是有意为之的设计决策。FarmScript文件可能由工程师编写,但应该由领域专家来审核。
语法全貌
FarmScript程序的整体结构:
program
:= farm_block
sensors_block
actuators_block
alerts_block
(schedule_block | rule_block)*
sensors 块
sensors {
soil_moisture_z1: analog pin A0 range 0..100 unit "%"
soil_moisture_z2: analog pin A1 range 0..100 unit "%"
air_temperature: i2c device DHT22 unit "°C"
air_humidity: i2c device DHT22 unit "%"
rainfall: digital pin D5 unit "mm"
days_without_rain: computed from rainfall
flowering_stage: visual pin D6
values [none, budding, detected, full_bloom]
soil_temp_z1: i2c device DS18B20 unit "°C"
}
传感器声明包含:协议类型(analog, i2c, digital, computed, visual)、硬件参数(引脚号或设备名)、类型信息(指定range则为numeric,指定values则为enum)。computed是派生自其他传感器的虚拟传感器,用于days_without_rain这类有状态的聚合值。
actuators 块
actuators {
pump_z1: relay pin D7 label "1区灌溉泵"
pump_z2: relay pin D8 label "2区灌溉泵"
pump_z3: relay pin D9 label "3区灌溉泵"
pump_z4: relay pin D10 label "4区灌溉泵"
all_pumps: group [pump_z1, pump_z2, pump_z3, pump_z4]
mist_system: relay pin D11
fertigation: relay pin D12
}
group是多执行器批量操作的语法糖,在代码生成阶段展开。
rule 块
规则由多个when子句组成,每个子句包含条件表达式和命令列表:
rule flowering_control {
when days_without_rain > 14 and flowering_stage == none {
alert "已连续{days_without_rain}天无雨 — 请监测开花信号" via line
log "干旱胁迫积累中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%"
}
when flowering_stage == budding {
turn off all_pumps
alert "检测到开花(花蕾期)— 已停止全部灌溉。严禁浇水!"
via line priority high
alert "检测到开花 — 全部灌溉已停止"
via sms priority high
}
when flowering_stage == full_bloom {
turn off all_pumps
alert "已确认满开。灌溉持续暂停。预计4至6天内坐果。" via line
}
}
rule heat_stress {
when air_temperature > 38 {
turn on mist_system
alert "热应激预警:{air_temperature}°C — 喷雾已启动" via line
}
when air_temperature > 40 {
turn on mist_system
alert "危急高温:{air_temperature}°C — 存在落果风险,请立即检查!"
via line priority high
alert "果园危急高温 {air_temperature}°C" via sms priority high
}
when air_temperature < 34 {
turn off mist_system
}
}
条件表达式的文法:
condition := expr (("and" | "or") expr)*
expr := sensor_ref op literal
op := ">" | "<" | ">=" | "<=" | "==" | "!="
literal := NUMBER | IDENTIFIER # NUMBER为numeric类型,IDENTIFIER为enum值
and和or左结合。括号显式分组在当前版本未支持(规划中)。
编译器实现
编译器用Python实现,分为四个阶段。
阶段一:词法分析器
采用手写状态机而非正则表达式驱动的词法分析器,便于控制错误信息质量,并灵活支持农业词汇的关键字识别。
class Lexer:
KEYWORDS = {
'farm', 'sensors', 'actuators', 'alerts', 'schedule', 'rule',
'when', 'and', 'or', 'turn', 'on', 'off', 'for', 'alert',
'log', 'report', 'via', 'at', 'daily', 'computed', 'from',
'group', 'range', 'unit', 'values', 'priority', 'high'
}
def tokenize(self, source: str) -> list[Token]:
tokens = []
pos = 0
while pos < len(source):
if source[pos] in ' \t\n\r':
pos += 1
continue
if source[pos] == '#': # 注释
while pos < len(source) and source[pos] != '\n':
pos += 1
continue
if source[pos] == '"': # 字符串字面量
end = source.index('"', pos + 1)
tokens.append(Token(TokenType.STRING, source[pos+1:end]))
pos = end + 1
continue
if source[pos].isalpha() or source[pos] == '_': # 标识符/关键字
start = pos
while pos < len(source) and (source[pos].isalnum() or source[pos] == '_'):
pos += 1
word = source[start:pos]
tt = TokenType.KEYWORD if word in self.KEYWORDS else TokenType.IDENTIFIER
tokens.append(Token(tt, word))
continue
if source[pos].isdigit(): # 数值字面量
start = pos
while pos < len(source) and (source[pos].isdigit() or source[pos] == '.'):
pos += 1
tokens.append(Token(TokenType.NUMBER, float(source[start:pos])))
continue
two_char = source[pos:pos+2]
if two_char in ('>=', '<=', '==', '!=', '..'):
tokens.append(Token(TokenType.OPERATOR, two_char))
pos += 2
continue
tokens.append(Token(TokenType.SYMBOL, source[pos]))
pos += 1
return tokens
阶段二:语法分析器
采用递归下降解析器。每种块类型有对应的解析函数,相互递归调用。
class Parser:
def __init__(self, tokens: list[Token]):
self.tokens = tokens
self.pos = 0
def parse_program(self) -> ProgramNode:
nodes = []
while not self.at_end():
if self.peek_keyword('farm'): nodes.append(self.parse_farm_block())
elif self.peek_keyword('sensors'): nodes.append(self.parse_sensors_block())
elif self.peek_keyword('rule'): nodes.append(self.parse_rule_block())
elif self.peek_keyword('schedule'):nodes.append(self.parse_schedule_block())
# ... 其他块类型
else:
raise ParseError(f"意外的token:{self.current()}")
return ProgramNode(nodes)
def parse_rule_block(self) -> RuleNode:
self.expect_keyword('rule')
name = self.expect(TokenType.IDENTIFIER).value
self.expect_symbol('{')
clauses = []
while not self.peek_symbol('}'):
self.expect_keyword('when')
condition = self.parse_condition()
self.expect_symbol('{')
body = self.parse_command_list()
self.expect_symbol('}')
clauses.append(WhenClause(condition, body))
self.expect_symbol('}')
return RuleNode(name, clauses)
def parse_condition(self) -> ConditionNode:
left = self.parse_expr()
while self.peek_keyword('and') or self.peek_keyword('or'):
op = self.advance().value
right = self.parse_expr()
left = BinaryCondition(left, op, right)
return left
def parse_expr(self) -> ExprNode:
sensor = self.expect(TokenType.IDENTIFIER).value
op = self.expect(TokenType.OPERATOR).value
if self.peek(TokenType.NUMBER):
value = self.advance().value
return CompareExpr(SensorRef(sensor), op, NumericLiteral(value))
else:
value = self.advance().value
return CompareExpr(SensorRef(sensor), op, EnumLiteral(value))
阶段三:语义分析器
语义分析阶段验证以下内容:
class SemanticAnalyzer:
def __init__(self, program: ProgramNode):
self.sensor_types: dict[str, SensorType] = {}
self.actuator_names: set[str] = set()
self.alert_channels: set[str] = set()
self.errors: list[SemanticError] = []
def analyze(self) -> bool:
self._collect_declarations()
self._validate_rules()
return len(self.errors) == 0
def _validate_expr(self, expr: ExprNode, line: int):
sensor_name = expr.left.name
if sensor_name not in self.sensor_types:
# 提供拼写建议
candidates = difflib.get_close_matches(
sensor_name, self.sensor_types.keys(), n=1
)
hint = f"\n 您是否想输入:'{candidates[0]}'?" if candidates else ""
self.errors.append(SemanticError(
line,
f"传感器 '{sensor_name}' 尚未声明。{hint}"
))
return
sensor_type = self.sensor_types[sensor_name]
if isinstance(sensor_type, EnumSensorType):
if expr.op not in ('==', '!='):
self.errors.append(SemanticError(
line,
f"enum类型传感器 '{sensor_name}' 只能使用 '==' 或 '!='。"
))
if isinstance(expr.right, NumericLiteral):
self.errors.append(SemanticError(
line,
f"'{sensor_name}' 是enum类型,不能与数值比较。"
f"有效值:{sensor_type.values}"
))
elif expr.right.value not in sensor_type.values:
self.errors.append(SemanticError(
line,
f"'{expr.right.value}' 不是 '{sensor_name}' 的有效枚举值。"
f"有效值:{sensor_type.values}"
))
阶段四:代码生成器
遍历经验证的AST,生成简洁可读的Python代码——目标质量是与手写代码无异。
class CodeGenerator:
def generate_rule(self, rule: RuleNode) -> str:
lines = [
f"# 规则: {rule.name}",
f"# 由 FarmScript v1.0 生成 — 请勿手动修改",
f"def rule_{rule.name}():",
]
sensors_used = self._collect_sensors(rule)
for sensor in sorted(sensors_used):
lines.append(f" {sensor} = sensors.read('{sensor}')")
lines.append("")
for i, clause in enumerate(rule.clauses):
keyword = "if" if i == 0 else "elif"
condition_str = self._generate_condition(clause.condition)
lines.append(f" {keyword} {condition_str}:")
for cmd in clause.body:
lines.extend(self._generate_command(cmd, indent=8))
lines.append("")
return "\n".join(lines)
开花控制规则的生成结果:
# 规则: flowering_control
# 由 FarmScript v1.0 生成 — 请勿手动修改
def rule_flowering_control():
days_without_rain = sensors.read('days_without_rain')
flowering_stage = sensors.read('flowering_stage')
soil_moisture_z1 = sensors.read('soil_moisture_z1')
soil_moisture_z2 = sensors.read('soil_moisture_z2')
if days_without_rain > 14 and flowering_stage == "none":
alerts.line(
f"已连续{days_without_rain}天无雨 — 请监测开花信号"
)
log(f"干旱胁迫积累中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%")
elif flowering_stage == "budding":
actuators.group_off("all_pumps")
alerts.line(
"检测到开花(花蕾期)— 已停止全部灌溉。严禁浇水!",
priority="high"
)
alerts.sms("检测到开花 — 全部灌溉已停止", priority="high")
elif flowering_stage == "full_bloom":
actuators.group_off("all_pumps")
alerts.line("已确认满开。灌溉持续暂停。预计4至6天内坐果。")
运行时架构
生成的Python代码依赖farmscript_rt运行时库,该库提供硬件抽象层,使生成代码无需关心传感器的物理协议和执行器的控制细节。
┌─────────────────────────────────┐
│ generated/main.py │ ← FarmScript生成
│ rule_flowering_control() │
│ rule_heat_stress() │
│ rule_irrigation() │
└──────────────┬──────────────────┘
│ imports
┌──────────────▼──────────────────┐
│ farmscript_rt/ │ ← 运行时库
│ SensorManager (I2C/GPIO) │
│ ActuatorManager (Relay/PWM) │
│ AlertManager (LINE/SMS) │
│ TimerManager (async tasks) │
│ LogManager (JSON/file) │
└─────────────────────────────────┘
│ hardware calls
┌──────────────▼──────────────────┐
│ Raspberry Pi 4 / ESP32 │
│ GPIO, I2C, 4G调制解调器 │
└─────────────────────────────────┘
轮询主循环:
# generated/main.py
import time, signal, logging
from rules import rule_flowering_control, rule_heat_stress, rule_irrigation
from farmscript_rt import RuntimeContext
ctx = RuntimeContext.from_config("farm_config.json")
def main():
logging.info("FarmScript 运行时启动")
while True:
try:
rule_flowering_control()
rule_heat_stress()
rule_irrigation()
except SensorReadError as e:
logging.error(f"传感器读取错误: {e}")
ctx.alerts.line(f"传感器错误: {e}", priority="high")
except Exception as e:
logging.critical(f"未预期错误: {e}", exc_info=True)
time.sleep(ctx.poll_interval_seconds) # 默认30秒
if __name__ == "__main__":
signal.signal(signal.SIGTERM, lambda *_: ctx.shutdown())
main()
设计权衡与已知局限
轮询 vs 事件驱动
当前实现基于轮询,每30秒对所有规则求值一次。实现简单,但对高频传感器(如雨量传感器的上升沿)不够理想。下一版本计划增加MQTT事件桥接,支持轮询与事件驱动的混合模式。
条件表达式的表达能力
目前不支持括号显式分组。when A and B or C按左结合解析为(A and B) or C,无法表达A and (B or C)。当前规避方式是拆分为多个when子句。括号支持的语法改造成本较低,计划纳入下个版本。
定时器处理
turn on pump_z1 for 25 minutes这类带定时器的指令由TimerManager异步管理。竞态条件(如定时器运行中另一条规则操作同一执行器)通过运行时锁处理,但理想情况是在编译器层面静态检测执行器冲突——这是下一步的工作。
多目标编译
目前只支持CPython(Raspberry Pi)。MicroPython(ESP32)编译支持正在开发中,代码生成器正在迁移到目标插件化架构。
横向比较:Node-RED、Home Assistant等替代方案
这类问题工程师必然会问。
| 方案 | 优势 | 劣势 |
|---|---|---|
| Node-RED | 可视化、生态成熟 | 流程复杂后难以版本控制;领域专家仍难审核 |
| Home Assistant | 面向消费端IoT成熟 | 农业特有概念(开花阶段、区域分组)需要大量自定义 |
| 直接写Python | 完整表达能力 | 领域专家无法自主验证;工程师成为变更瓶颈 |
| YAML规则引擎 | 声明式、易于解析 | 复杂条件表达能力差;无编译时验证 |
| FarmScript | 领域语义清晰;编译时验证;可读可审核 | 初期开发成本高;需维护编译器 |
FarmScript并非在所有场景都是最优选择。如果你的系统足够简单,Node-RED或YAML配置完全够用。DSL的价值在于领域知识复杂到一定程度、且需要领域专家直接参与逻辑审核的情境。
总结
FarmScript的设计与实现过程提炼出以下核心经验:
- DSL的有效性本质上是"复杂度在哪里管理"的问题。 当领域存在YAML或通用语言难以清晰表达的复合条件逻辑,DSL值得投入。
- 递归下降解析器是中等复杂度DSL的合理选择。 相比ANTLR等解析器生成器,实现成本低,错误信息定制能力强。
- 语义分析是DSL可靠性的核心。 不只是类型检查——未来希望将领域规则(如开花期禁止灌溉)也编码为编译时约束。
- 生成代码的可读性同样重要。 果园管理者应该能通过阅读生成的Python来验证部署后的行为。
如果你正在构建农业IoT、工业控制或任何其他"领域专家需要直接参与逻辑定义"的系统,DSL都是值得认真考虑的架构选项。
Simplico Co., Ltd. 是一家总部位于曼谷的软件工程与产品工作室,专注于IoT自动化系统、AI/RAG应用和企业级系统集成。如有技术交流或合作意向,欢迎通过 simplico.net 联系我们。
标签: DSL设计 编译器 农业IoT Python 递归下降 Raspberry Pi FarmScript MicroPython 智慧农业 语法分析
Get in Touch with us
Related Posts
- FarmScript: How We Designed a Programming Language for Chanthaburi Durian Farmers
- 智慧农业项目为何止步于试点阶段
- Why Smart Farming Projects Fail Before They Leave the Pilot Stage
- ERP项目为何总是超支、延期,最终令人失望
- ERP Projects: Why They Cost More, Take Longer, and Disappoint More Than Expected
- AI Security in Production: What Enterprise Teams Must Know in 2026
- 弹性无人机蜂群设计:具备安全通信的无领导者容错网状网络
- Designing Resilient Drone Swarms: Leaderless-Tolerant Mesh Networks with Secure Communications
- NumPy广播规则详解:为什么`(3,)`和`(3,1)`行为不同——以及它何时会悄悄给出错误答案
- NumPy Broadcasting Rules: Why `(3,)` and `(3,1)` Behave Differently — and When It Silently Gives Wrong Answers
- 关键基础设施遭受攻击:从乌克兰电网战争看工业IT/OT安全
- Critical Infrastructure Under Fire: What IT/OT Security Teams Can Learn from Ukraine’s Energy Grid
- LM Studio代码开发的系统提示词工程:`temperature`、`context_length`与`stop`词详解
- LM Studio System Prompt Engineering for Code: `temperature`, `context_length`, and `stop` Tokens Explained
- LlamaIndex + pgvector: Production RAG for Thai and Japanese Business Documents
- simpliShop:专为泰国市场打造的按需定制多语言电商平台
- simpliShop: The Thai E-Commerce Platform for Made-to-Order and Multi-Language Stores
- ERP项目为何失败(以及如何让你的项目成功)
- Why ERP Projects Fail (And How to Make Yours Succeed)
- Payment API幂等性设计:用Stripe、支付宝、微信支付和2C2P防止重复扣款













