FarmScript:我们如何从零设计一门农业IoT领域特定语言

为什么农业IoT需要一门DSL?

在构建农业IoT自动化系统时,工程师通常面临两个选择:

  1. YAML/JSON配置文件 — 简单,但遇到复杂条件分支就会崩溃
  2. 直接用通用语言(Python/Node.js)实现 — 灵活,但领域专家无法阅读业务逻辑

两者都有根本性的问题。农业自动化逻辑往往形成"领域专家定义规则、工程师实现代码"的双轨结构,导致需求错位和维护成本不断膨胀。

FarmScript的答案是:设计一门领域特定语言(DSL)。以泰国尖竹汶府的榴莲果园为主要用例,从零实现了一个编译器。本文分享这一过程中的设计决策与工程实现。


动机:为什么"配置文件"不够用

先从技术层面梳理为什么需要DSL。

一个典型的榴莲农场自动化需求是这样的:

  • 早上5:30,如果土壤湿度低于55%,启动灌溉泵25分钟
  • 但如果开花阶段状态为budding或以上,禁止灌溉(开花期浇水是金枕榴莲减产的主要原因)
  • 气温超过38°C时启动喷雾系统并发送LINE通知;超过40°C时同时发送SMS
  • 连续14天无降雨且开花阶段为none时,发送干旱胁迫积累预警

用YAML来表达这些逻辑:

# YAML的尝试 — 很快变得不可读
rules:
  - condition:
      and:
        - sensor: soil_moisture_z1
          op: lt
          value: 55
        - sensor: flowering_stage
          op: eq
          value: none
    action:
      type: activate
      target: pump_z1
      duration_minutes: 25

when A and B and not C then X elif D then Y这类复合条件增多时,YAML迅速失控——嵌套层级加深,条件优先级不清晰,最关键的是领域专家无法审核

而直接用Python实现虽然表达能力足够,但规格变更时工程师成为瓶颈,果园管理者无法自主验证逻辑正确性。

DSL解决了这个矛盾:拥有农业领域的词汇和语法,同时由编译器负责验证和转换


语言设计决策

声明式块结构

程序按语义分块:传感器声明、执行器声明、计划任务、规则、告警配置各自独立。依赖关系隐式处理——在rule块内引用传感器名称,编译器自动与sensors块进行关联验证。

响应式规则语义

规则是轮询驱动而非事件驱动的。生成的运行时每30秒对所有规则求值一次。这是优先保证简单性的选择,便于在ESP32或Raspberry Pi等嵌入式目标上实现。未来计划支持MQTT事件触发。

最小化类型系统

传感器类型只有numericenum两种。numeric支持比较运算符(>, <, ==, !=),enum只允许等值比较。类型检查在语义分析阶段完成,类型错误在编译时发现。

错误信息面向领域专家

编译器错误信息面向农业从业者而非工程师——这是有意为之的设计决策。FarmScript文件可能由工程师编写,但应该由领域专家来审核。


语法全貌

FarmScript程序的整体结构:

program
  := farm_block
     sensors_block
     actuators_block
     alerts_block
     (schedule_block | rule_block)*

sensors

sensors {
  soil_moisture_z1:  analog pin A0  range 0..100  unit "%"
  soil_moisture_z2:  analog pin A1  range 0..100  unit "%"
  air_temperature:   i2c device DHT22  unit "°C"
  air_humidity:      i2c device DHT22  unit "%"
  rainfall:          digital pin D5    unit "mm"
  days_without_rain: computed from rainfall
  flowering_stage:   visual pin D6
                     values [none, budding, detected, full_bloom]
  soil_temp_z1:      i2c device DS18B20  unit "°C"
}

传感器声明包含:协议类型(analog, i2c, digital, computed, visual)、硬件参数(引脚号或设备名)、类型信息(指定range则为numeric,指定values则为enum)。computed是派生自其他传感器的虚拟传感器,用于days_without_rain这类有状态的聚合值。

actuators

actuators {
  pump_z1:      relay pin D7   label "1区灌溉泵"
  pump_z2:      relay pin D8   label "2区灌溉泵"
  pump_z3:      relay pin D9   label "3区灌溉泵"
  pump_z4:      relay pin D10  label "4区灌溉泵"
  all_pumps:    group [pump_z1, pump_z2, pump_z3, pump_z4]
  mist_system:  relay pin D11
  fertigation:  relay pin D12
}

group是多执行器批量操作的语法糖,在代码生成阶段展开。

rule

规则由多个when子句组成,每个子句包含条件表达式和命令列表:

rule flowering_control {

  when days_without_rain > 14 and flowering_stage == none {
    alert "已连续{days_without_rain}天无雨 — 请监测开花信号" via line
    log "干旱胁迫积累中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%"
  }

  when flowering_stage == budding {
    turn off all_pumps
    alert "检测到开花(花蕾期)— 已停止全部灌溉。严禁浇水!"
      via line priority high
    alert "检测到开花 — 全部灌溉已停止"
      via sms priority high
  }

  when flowering_stage == full_bloom {
    turn off all_pumps
    alert "已确认满开。灌溉持续暂停。预计4至6天内坐果。" via line
  }
}

rule heat_stress {
  when air_temperature > 38 {
    turn on mist_system
    alert "热应激预警:{air_temperature}°C — 喷雾已启动" via line
  }
  when air_temperature > 40 {
    turn on mist_system
    alert "危急高温:{air_temperature}°C — 存在落果风险,请立即检查!"
      via line priority high
    alert "果园危急高温 {air_temperature}°C" via sms priority high
  }
  when air_temperature < 34 {
    turn off mist_system
  }
}

条件表达式的文法:

condition := expr (("and" | "or") expr)*
expr      := sensor_ref op literal
op        := ">" | "<" | ">=" | "<=" | "==" | "!="
literal   := NUMBER | IDENTIFIER   # NUMBER为numeric类型,IDENTIFIER为enum值

andor左结合。括号显式分组在当前版本未支持(规划中)。


编译器实现

编译器用Python实现,分为四个阶段。

阶段一:词法分析器

采用手写状态机而非正则表达式驱动的词法分析器,便于控制错误信息质量,并灵活支持农业词汇的关键字识别。

class Lexer:
    KEYWORDS = {
        'farm', 'sensors', 'actuators', 'alerts', 'schedule', 'rule',
        'when', 'and', 'or', 'turn', 'on', 'off', 'for', 'alert',
        'log', 'report', 'via', 'at', 'daily', 'computed', 'from',
        'group', 'range', 'unit', 'values', 'priority', 'high'
    }

    def tokenize(self, source: str) -> list[Token]:
        tokens = []
        pos = 0
        while pos < len(source):
            if source[pos] in ' \t\n\r':
                pos += 1
                continue
            if source[pos] == '#':             # 注释
                while pos < len(source) and source[pos] != '\n':
                    pos += 1
                continue
            if source[pos] == '"':             # 字符串字面量
                end = source.index('"', pos + 1)
                tokens.append(Token(TokenType.STRING, source[pos+1:end]))
                pos = end + 1
                continue
            if source[pos].isalpha() or source[pos] == '_':   # 标识符/关键字
                start = pos
                while pos < len(source) and (source[pos].isalnum() or source[pos] == '_'):
                    pos += 1
                word = source[start:pos]
                tt = TokenType.KEYWORD if word in self.KEYWORDS else TokenType.IDENTIFIER
                tokens.append(Token(tt, word))
                continue
            if source[pos].isdigit():          # 数值字面量
                start = pos
                while pos < len(source) and (source[pos].isdigit() or source[pos] == '.'):
                    pos += 1
                tokens.append(Token(TokenType.NUMBER, float(source[start:pos])))
                continue
            two_char = source[pos:pos+2]
            if two_char in ('>=', '<=', '==', '!=', '..'):
                tokens.append(Token(TokenType.OPERATOR, two_char))
                pos += 2
                continue
            tokens.append(Token(TokenType.SYMBOL, source[pos]))
            pos += 1
        return tokens

阶段二:语法分析器

采用递归下降解析器。每种块类型有对应的解析函数,相互递归调用。

class Parser:
    def __init__(self, tokens: list[Token]):
        self.tokens = tokens
        self.pos = 0

    def parse_program(self) -> ProgramNode:
        nodes = []
        while not self.at_end():
            if self.peek_keyword('farm'):      nodes.append(self.parse_farm_block())
            elif self.peek_keyword('sensors'): nodes.append(self.parse_sensors_block())
            elif self.peek_keyword('rule'):    nodes.append(self.parse_rule_block())
            elif self.peek_keyword('schedule'):nodes.append(self.parse_schedule_block())
            # ... 其他块类型
            else:
                raise ParseError(f"意外的token:{self.current()}")
        return ProgramNode(nodes)

    def parse_rule_block(self) -> RuleNode:
        self.expect_keyword('rule')
        name = self.expect(TokenType.IDENTIFIER).value
        self.expect_symbol('{')
        clauses = []
        while not self.peek_symbol('}'):
            self.expect_keyword('when')
            condition = self.parse_condition()
            self.expect_symbol('{')
            body = self.parse_command_list()
            self.expect_symbol('}')
            clauses.append(WhenClause(condition, body))
        self.expect_symbol('}')
        return RuleNode(name, clauses)

    def parse_condition(self) -> ConditionNode:
        left = self.parse_expr()
        while self.peek_keyword('and') or self.peek_keyword('or'):
            op = self.advance().value
            right = self.parse_expr()
            left = BinaryCondition(left, op, right)
        return left

    def parse_expr(self) -> ExprNode:
        sensor = self.expect(TokenType.IDENTIFIER).value
        op = self.expect(TokenType.OPERATOR).value
        if self.peek(TokenType.NUMBER):
            value = self.advance().value
            return CompareExpr(SensorRef(sensor), op, NumericLiteral(value))
        else:
            value = self.advance().value
            return CompareExpr(SensorRef(sensor), op, EnumLiteral(value))

阶段三:语义分析器

语义分析阶段验证以下内容:

class SemanticAnalyzer:
    def __init__(self, program: ProgramNode):
        self.sensor_types: dict[str, SensorType] = {}
        self.actuator_names: set[str] = set()
        self.alert_channels: set[str] = set()
        self.errors: list[SemanticError] = []

    def analyze(self) -> bool:
        self._collect_declarations()
        self._validate_rules()
        return len(self.errors) == 0

    def _validate_expr(self, expr: ExprNode, line: int):
        sensor_name = expr.left.name
        if sensor_name not in self.sensor_types:
            # 提供拼写建议
            candidates = difflib.get_close_matches(
                sensor_name, self.sensor_types.keys(), n=1
            )
            hint = f"\n  您是否想输入:'{candidates[0]}'?" if candidates else ""
            self.errors.append(SemanticError(
                line,
                f"传感器 '{sensor_name}' 尚未声明。{hint}"
            ))
            return

        sensor_type = self.sensor_types[sensor_name]
        if isinstance(sensor_type, EnumSensorType):
            if expr.op not in ('==', '!='):
                self.errors.append(SemanticError(
                    line,
                    f"enum类型传感器 '{sensor_name}' 只能使用 '==' 或 '!='。"
                ))
            if isinstance(expr.right, NumericLiteral):
                self.errors.append(SemanticError(
                    line,
                    f"'{sensor_name}' 是enum类型,不能与数值比较。"
                    f"有效值:{sensor_type.values}"
                ))
            elif expr.right.value not in sensor_type.values:
                self.errors.append(SemanticError(
                    line,
                    f"'{expr.right.value}' 不是 '{sensor_name}' 的有效枚举值。"
                    f"有效值:{sensor_type.values}"
                ))

阶段四:代码生成器

遍历经验证的AST,生成简洁可读的Python代码——目标质量是与手写代码无异。

class CodeGenerator:
    def generate_rule(self, rule: RuleNode) -> str:
        lines = [
            f"# 规则: {rule.name}",
            f"# 由 FarmScript v1.0 生成 — 请勿手动修改",
            f"def rule_{rule.name}():",
        ]
        sensors_used = self._collect_sensors(rule)
        for sensor in sorted(sensors_used):
            lines.append(f"    {sensor} = sensors.read('{sensor}')")
        lines.append("")

        for i, clause in enumerate(rule.clauses):
            keyword = "if" if i == 0 else "elif"
            condition_str = self._generate_condition(clause.condition)
            lines.append(f"    {keyword} {condition_str}:")
            for cmd in clause.body:
                lines.extend(self._generate_command(cmd, indent=8))
            lines.append("")
        return "\n".join(lines)

开花控制规则的生成结果:

# 规则: flowering_control
# 由 FarmScript v1.0 生成 — 请勿手动修改

def rule_flowering_control():
    days_without_rain = sensors.read('days_without_rain')
    flowering_stage   = sensors.read('flowering_stage')
    soil_moisture_z1  = sensors.read('soil_moisture_z1')
    soil_moisture_z2  = sensors.read('soil_moisture_z2')

    if days_without_rain > 14 and flowering_stage == "none":
        alerts.line(
            f"已连续{days_without_rain}天无雨 — 请监测开花信号"
        )
        log(f"干旱胁迫积累中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%")

    elif flowering_stage == "budding":
        actuators.group_off("all_pumps")
        alerts.line(
            "检测到开花(花蕾期)— 已停止全部灌溉。严禁浇水!",
            priority="high"
        )
        alerts.sms("检测到开花 — 全部灌溉已停止", priority="high")

    elif flowering_stage == "full_bloom":
        actuators.group_off("all_pumps")
        alerts.line("已确认满开。灌溉持续暂停。预计4至6天内坐果。")

运行时架构

生成的Python代码依赖farmscript_rt运行时库,该库提供硬件抽象层,使生成代码无需关心传感器的物理协议和执行器的控制细节。

┌─────────────────────────────────┐
│        generated/main.py        │  ← FarmScript生成
│   rule_flowering_control()      │
│   rule_heat_stress()            │
│   rule_irrigation()             │
└──────────────┬──────────────────┘
               │ imports
┌──────────────▼──────────────────┐
│         farmscript_rt/          │  ← 运行时库
│   SensorManager   (I2C/GPIO)    │
│   ActuatorManager (Relay/PWM)   │
│   AlertManager    (LINE/SMS)    │
│   TimerManager    (async tasks) │
│   LogManager      (JSON/file)   │
└─────────────────────────────────┘
               │ hardware calls
┌──────────────▼──────────────────┐
│     Raspberry Pi 4 / ESP32      │
│   GPIO, I2C, 4G调制解调器        │
└─────────────────────────────────┘

轮询主循环:

# generated/main.py

import time, signal, logging
from rules import rule_flowering_control, rule_heat_stress, rule_irrigation
from farmscript_rt import RuntimeContext

ctx = RuntimeContext.from_config("farm_config.json")

def main():
    logging.info("FarmScript 运行时启动")
    while True:
        try:
            rule_flowering_control()
            rule_heat_stress()
            rule_irrigation()
        except SensorReadError as e:
            logging.error(f"传感器读取错误: {e}")
            ctx.alerts.line(f"传感器错误: {e}", priority="high")
        except Exception as e:
            logging.critical(f"未预期错误: {e}", exc_info=True)
        time.sleep(ctx.poll_interval_seconds)  # 默认30秒

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, lambda *_: ctx.shutdown())
    main()

设计权衡与已知局限

轮询 vs 事件驱动

当前实现基于轮询,每30秒对所有规则求值一次。实现简单,但对高频传感器(如雨量传感器的上升沿)不够理想。下一版本计划增加MQTT事件桥接,支持轮询与事件驱动的混合模式。

条件表达式的表达能力

目前不支持括号显式分组。when A and B or C按左结合解析为(A and B) or C,无法表达A and (B or C)。当前规避方式是拆分为多个when子句。括号支持的语法改造成本较低,计划纳入下个版本。

定时器处理

turn on pump_z1 for 25 minutes这类带定时器的指令由TimerManager异步管理。竞态条件(如定时器运行中另一条规则操作同一执行器)通过运行时锁处理,但理想情况是在编译器层面静态检测执行器冲突——这是下一步的工作。

多目标编译

目前只支持CPython(Raspberry Pi)。MicroPython(ESP32)编译支持正在开发中,代码生成器正在迁移到目标插件化架构。


横向比较:Node-RED、Home Assistant等替代方案

这类问题工程师必然会问。

方案 优势 劣势
Node-RED 可视化、生态成熟 流程复杂后难以版本控制;领域专家仍难审核
Home Assistant 面向消费端IoT成熟 农业特有概念(开花阶段、区域分组)需要大量自定义
直接写Python 完整表达能力 领域专家无法自主验证;工程师成为变更瓶颈
YAML规则引擎 声明式、易于解析 复杂条件表达能力差;无编译时验证
FarmScript 领域语义清晰;编译时验证;可读可审核 初期开发成本高;需维护编译器

FarmScript并非在所有场景都是最优选择。如果你的系统足够简单,Node-RED或YAML配置完全够用。DSL的价值在于领域知识复杂到一定程度、且需要领域专家直接参与逻辑审核的情境。


总结

FarmScript的设计与实现过程提炼出以下核心经验:

  • DSL的有效性本质上是"复杂度在哪里管理"的问题。 当领域存在YAML或通用语言难以清晰表达的复合条件逻辑,DSL值得投入。
  • 递归下降解析器是中等复杂度DSL的合理选择。 相比ANTLR等解析器生成器,实现成本低,错误信息定制能力强。
  • 语义分析是DSL可靠性的核心。 不只是类型检查——未来希望将领域规则(如开花期禁止灌溉)也编码为编译时约束。
  • 生成代码的可读性同样重要。 果园管理者应该能通过阅读生成的Python来验证部署后的行为。

如果你正在构建农业IoT、工业控制或任何其他"领域专家需要直接参与逻辑定义"的系统,DSL都是值得认真考虑的架构选项。


Simplico Co., Ltd. 是一家总部位于曼谷的软件工程与产品工作室,专注于IoT自动化系统、AI/RAG应用和企业级系统集成。如有技术交流或合作意向,欢迎通过 simplico.net 联系我们。


标签: DSL设计 编译器 农业IoT Python 递归下降 Raspberry Pi FarmScript MicroPython 智慧农业 语法分析


Get in Touch with us

Chat with Us on LINE

iiitum1984

Speak to Us or Whatsapp

(+66) 83001 0222

Related Posts

Our Products