FarmScript:農業IoTのためにDSLをゼロから設計した話

なぜ農業IoTにDSLが必要だったのか

農業IoTの自動化システムを構築するとき、エンジニアはたいてい次のどちらかを選ぶ。

  1. YAML/JSONベースの設定ファイル — シンプルだが、条件分岐が複雑になると破綻する
  2. 汎用言語(Python/Node.js)で直接実装 — 柔軟だが、農業の専門家がロジックを読めない

どちらも根本的な問題を抱えている。農業の自動化ロジックは「ドメインエキスパートが書き、エンジニアが実装する」という構造になりがちで、仕様の齟齬と保守コストが膨らむ。

FarmScriptはこの問題に対して、ドメイン特化言語(DSL)を設計するというアプローチで答えを出した。タイ・チャンタブリー県のドリアン農園を主要ユースケースとして、コンパイラをゼロから実装した事例を紹介する。


DSL設計の動機:「設定ファイル」では足りない理由

まず、なぜDSLが必要だったかを技術的に整理する。

典型的な農業IoTの自動化要件はこうなる:

  • 朝5時30分に土壌水分が55%未満なら灌漑ポンプを25分間起動
  • ただし開花ステージがbudding以上であれば灌漑を禁止(開花期の水やりはドリアンの致命的な収量損失につながる)
  • 気温が38°Cを超えたらミストシステムを起動し、LINEに通知。40°Cを超えたら同時にSMSも送信
  • 無降雨14日超かつ開花ステージがnoneのときは、干ばつストレス蓄積中のアラートを送信

これをYAMLで表現しようとすると、すぐに限界が来る。

# YAMLによる試みの例 — すぐに破綻する
rules:
  - condition:
      and:
        - sensor: soil_moisture_z1
          op: lt
          value: 55
        - sensor: flowering_stage
          op: eq
          value: none
    action:
      type: activate
      target: pump_z1
      duration_minutes: 25

when A and B and not C then X else if D then Y のような複合条件が増えるにつれ、YAMLは読めなくなる。ネストが深くなり、条件の優先順位も不明瞭になる。そして何より、農業の専門家がレビューできない

一方でPythonで直書きすれば表現力は十分だが、農園管理者が仕様を確認できる形式ではない。仕様変更のたびにエンジニアがボトルネックになる。

DSLはこのトレードオフを解消する:農業ドメインの語彙と構文を持ちながら、コンパイラが検証と変換を担う


言語設計の方針

FarmScriptの設計で決めた主要な方針は以下のとおりだ。

宣言的ブロック構造

プログラムは意味のある単位でブロックに分割される。センサーの宣言、アクチュエータの宣言、スケジュール、ルール、アラート設定がそれぞれ独立したブロックになる。依存関係は暗黙的——ruleブロック内でセンサー名を参照すれば、コンパイラが自動的にsensorsブロックとの対応を検証する。

リアクティブなルールセマンティクス

ルールはイベント駆動ではなくポーリング駆動だ。生成されたランタイムが30秒ごとに全ルールを評価する。これはシンプルさを優先した選択で、ESP32やRaspberry Piのような組み込みターゲットでの実装を容易にする。将来的にはMQTTイベントトリガーへの移行も検討している。

型システムは最小限に

センサーの型はnumericenumの2種類のみ。numericは比較演算子(>, <, ==, !=)をサポートし、enumは等値比較のみ許可する。型チェックはセマンティック解析フェーズで行われ、型エラーはコンパイル時に検出される。

エラーメッセージはドメイン語彙で

コンパイラエラーはエンジニア向けではなく農業の専門家向けに書かれている。これは意図的な設計判断だ。FarmScriptのファイルを書くのはエンジニアかもしれないが、レビューするのはドメインエキスパートであるべきだという考え方による。


文法の全体像

FarmScriptのプログラムは以下のブロック構造を持つ:

program
  := farm_block
     sensors_block
     actuators_block
     alerts_block
     (schedule_block | rule_block)*

各ブロックの文法を順に見ていこう。

sensors ブロック

sensors {
  soil_moisture_z1:  analog pin A0  range 0..100  unit "%"
  soil_moisture_z2:  analog pin A1  range 0..100  unit "%"
  air_temperature:   i2c device DHT22  unit "°C"
  air_humidity:      i2c device DHT22  unit "%"
  rainfall:          digital pin D5    unit "mm"
  days_without_rain: computed from rainfall
  flowering_stage:   visual pin D6
                     values [none, budding, detected, full_bloom]
  soil_temp_z1:      i2c device DS18B20  unit "°C"
}

センサー宣言はプロトコル種別(analog, i2c, digital, computed, visual)、ハードウェアパラメータ(ピン番号またはデバイス名)、型情報(range指定ならnumericvalues指定ならenum)を持つ。computedは他のセンサーから派生する仮想センサーで、days_without_rainのようなステートフルな集計値に使う。

actuators ブロック

actuators {
  pump_z1:      relay pin D7   label "区画1 灌漑ポンプ"
  pump_z2:      relay pin D8   label "区画2 灌漑ポンプ"
  pump_z3:      relay pin D9   label "区画3 灌漑ポンプ"
  pump_z4:      relay pin D10  label "区画4 灌漑ポンプ"
  all_pumps:    group [pump_z1, pump_z2, pump_z3, pump_z4]
  mist_system:  relay pin D11
  fertigation:  relay pin D12
}

groupは複数アクチュエータへの一括命令を可能にするシンタックスシュガーだ。コード生成時に展開される。

rule ブロック

ルールはwhen節の集合で、各when節は条件式と命令列を持つ。

rule flowering_control {

  when days_without_rain > 14 and flowering_stage == none {
    alert "無降雨{days_without_rain}日目 — 開花サインを監視" via line
    log "干ばつストレス蓄積中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%"
  }

  when flowering_stage == budding {
    turn off all_pumps
    alert "開花(蕾)検知 — 全灌漑停止。灌漑厳禁。"
      via line priority high
    alert "開花検知 — 全灌漑停止"
      via sms priority high
  }

  when flowering_stage == full_bloom {
    turn off all_pumps
    alert "満開確認。灌漑継続停止。4〜6日以内に結実見込み。" via line
  }
}

rule heat_stress {
  when air_temperature > 38 {
    turn on mist_system
    alert "熱ストレス: {air_temperature}°C — ミスト起動" via line
  }
  when air_temperature > 40 {
    turn on mist_system
    alert "危険高温: {air_temperature}°C — 落果リスク。直ちに確認。"
      via line priority high
    alert "危険高温 {air_temperature}°C" via sms priority high
  }
  when air_temperature < 34 {
    turn off mist_system
  }
}

条件式の文法:

condition := expr (("and" | "or") expr)*
expr      := sensor_ref op literal
op        := ">" | "<" | ">=" | "<=" | "==" | "!="
literal   := NUMBER | IDENTIFIER  # NUMBERはnumeric型、IDENTIFIERはenum値

andorの優先順位は左結合。括弧による明示的なグルーピングは現バージョンでは未サポート(計画中)。


コンパイラの実装

コンパイラはPythonで実装されており、4つのフェーズで構成される。

フェーズ1:レクサー(字句解析)

レクサーは手書きのステートマシンで実装した。正規表現ベースのレクサーに比べてエラーメッセージの制御がしやすく、農業語彙のキーワード認識も柔軟に対応できる。

class Lexer:
    KEYWORDS = {
        'farm', 'sensors', 'actuators', 'alerts', 'schedule', 'rule',
        'when', 'and', 'or', 'turn', 'on', 'off', 'for', 'alert',
        'log', 'report', 'via', 'at', 'daily', 'on', 'computed', 'from',
        'group', 'range', 'unit', 'values', 'priority', 'high'
    }

    def tokenize(self, source: str) -> list[Token]:
        tokens = []
        pos = 0
        while pos < len(source):
            # スキップ: 空白・コメント
            if source[pos] in ' \t\n\r':
                pos += 1
                continue
            if source[pos:pos+1] == '#':
                while pos < len(source) and source[pos] != '\n':
                    pos += 1
                continue
            # 文字列リテラル
            if source[pos] == '"':
                end = source.index('"', pos + 1)
                tokens.append(Token(TokenType.STRING, source[pos+1:end]))
                pos = end + 1
                continue
            # 識別子 / キーワード
            if source[pos].isalpha() or source[pos] == '_':
                start = pos
                while pos < len(source) and (source[pos].isalnum() or source[pos] == '_'):
                    pos += 1
                word = source[start:pos]
                tt = TokenType.KEYWORD if word in self.KEYWORDS else TokenType.IDENTIFIER
                tokens.append(Token(tt, word))
                continue
            # 数値
            if source[pos].isdigit():
                start = pos
                while pos < len(source) and (source[pos].isdigit() or source[pos] == '.'):
                    pos += 1
                tokens.append(Token(TokenType.NUMBER, float(source[start:pos])))
                continue
            # 演算子・区切り文字
            two_char = source[pos:pos+2]
            if two_char in ('>=', '<=', '==', '!=', '..'):
                tokens.append(Token(TokenType.OPERATOR, two_char))
                pos += 2
                continue
            tokens.append(Token(TokenType.SYMBOL, source[pos]))
            pos += 1
        return tokens

フェーズ2:パーサー(構文解析)

再帰降下パーサーで実装した。各ブロック種別に対応するパース関数があり、それらが相互に呼び出す構造になっている。

class Parser:
    def __init__(self, tokens: list[Token]):
        self.tokens = tokens
        self.pos = 0

    def parse_program(self) -> ProgramNode:
        nodes = []
        while not self.at_end():
            if self.peek_keyword('farm'):
                nodes.append(self.parse_farm_block())
            elif self.peek_keyword('sensors'):
                nodes.append(self.parse_sensors_block())
            elif self.peek_keyword('actuators'):
                nodes.append(self.parse_actuators_block())
            elif self.peek_keyword('alerts'):
                nodes.append(self.parse_alerts_block())
            elif self.peek_keyword('schedule'):
                nodes.append(self.parse_schedule_block())
            elif self.peek_keyword('rule'):
                nodes.append(self.parse_rule_block())
            else:
                raise ParseError(f"予期しないトークン: {self.current()}")
        return ProgramNode(nodes)

    def parse_rule_block(self) -> RuleNode:
        self.expect_keyword('rule')
        name = self.expect(TokenType.IDENTIFIER).value
        self.expect_symbol('{')
        clauses = []
        while not self.peek_symbol('}'):
            self.expect_keyword('when')
            condition = self.parse_condition()
            self.expect_symbol('{')
            body = self.parse_command_list()
            self.expect_symbol('}')
            clauses.append(WhenClause(condition, body))
        self.expect_symbol('}')
        return RuleNode(name, clauses)

    def parse_condition(self) -> ConditionNode:
        left = self.parse_expr()
        while self.peek_keyword('and') or self.peek_keyword('or'):
            op = self.advance().value
            right = self.parse_expr()
            left = BinaryCondition(left, op, right)
        return left

    def parse_expr(self) -> ExprNode:
        sensor = self.expect(TokenType.IDENTIFIER).value
        op = self.expect(TokenType.OPERATOR).value
        if self.peek(TokenType.NUMBER):
            value = self.advance().value
            return CompareExpr(SensorRef(sensor), op, NumericLiteral(value))
        else:
            value = self.advance().value
            return CompareExpr(SensorRef(sensor), op, EnumLiteral(value))

フェーズ3:セマンティック解析

セマンティック解析では以下の検証を行う:

class SemanticAnalyzer:
    def __init__(self, program: ProgramNode):
        self.program = program
        self.sensor_types: dict[str, SensorType] = {}
        self.actuator_names: set[str] = set()
        self.alert_channels: set[str] = set()
        self.errors: list[SemanticError] = []

    def analyze(self) -> bool:
        self._collect_declarations()
        self._validate_rules()
        return len(self.errors) == 0

    def _collect_declarations(self):
        for node in self.program.nodes:
            if isinstance(node, SensorsBlock):
                for decl in node.declarations:
                    if decl.values:  # enum型
                        self.sensor_types[decl.name] = EnumSensorType(decl.values)
                    else:            # numeric型
                        self.sensor_types[decl.name] = NumericSensorType(decl.range)
            elif isinstance(node, ActuatorsBlock):
                for decl in node.declarations:
                    self.actuator_names.add(decl.name)
                    if isinstance(decl, GroupDecl):
                        self.actuator_names.add(decl.name)
            elif isinstance(node, AlertsBlock):
                for channel in node.channels:
                    self.alert_channels.add(channel.type)

    def _validate_expr(self, expr: ExprNode, line: int):
        sensor_name = expr.left.name
        if sensor_name not in self.sensor_types:
            # タイプミス候補を提案
            candidates = difflib.get_close_matches(
                sensor_name, self.sensor_types.keys(), n=1
            )
            suggestion = f"  もしかして: '{candidates[0]}' ですか?" if candidates else ""
            self.errors.append(SemanticError(
                line,
                f"センサー '{sensor_name}' が宣言されていません。\n{suggestion}"
            ))
            return

        sensor_type = self.sensor_types[sensor_name]
        if isinstance(sensor_type, EnumSensorType):
            if expr.op not in ('==', '!='):
                self.errors.append(SemanticError(
                    line,
                    f"enum型センサー '{sensor_name}' には '==' か '!=' のみ使用できます。"
                ))
            if isinstance(expr.right, NumericLiteral):
                self.errors.append(SemanticError(
                    line,
                    f"'{sensor_name}' はenum型です。数値との比較はできません。"
                    f"有効な値: {sensor_type.values}"
                ))
            elif expr.right.value not in sensor_type.values:
                self.errors.append(SemanticError(
                    line,
                    f"'{expr.right.value}' は '{sensor_name}' の有効な値ではありません。"
                    f"有効な値: {sensor_type.values}"
                ))

フェーズ4:コード生成

ASTを辿ってPythonモジュールを生成する。生成コードはクリーンで読みやすく、手で書いたかのような品質を目標にしている。

class CodeGenerator:
    def generate_rule(self, rule: RuleNode) -> str:
        lines = [
            f"# ルール: {rule.name}",
            f"# FarmScript v1.0 生成 — 手動編集禁止",
            f"def rule_{rule.name}():",
        ]
        # ルール内で使われるセンサーを収集してread文を生成
        sensors_used = self._collect_sensors(rule)
        for sensor in sorted(sensors_used):
            lines.append(f"    {sensor} = sensors.read('{sensor}')")
        lines.append("")

        for i, clause in enumerate(rule.clauses):
            keyword = "if" if i == 0 else "elif"
            condition_str = self._generate_condition(clause.condition)
            lines.append(f"    {keyword} {condition_str}:")
            for cmd in clause.body:
                lines.extend(self._generate_command(cmd, indent=8))
            lines.append("")
        return "\n".join(lines)

    def _generate_condition(self, cond: ConditionNode) -> str:
        if isinstance(cond, CompareExpr):
            sensor = cond.left.name
            op = cond.op
            if isinstance(cond.right, EnumLiteral):
                return f'{sensor} == "{cond.right.value}"'
            else:
                return f"{sensor} {op} {cond.right.value}"
        elif isinstance(cond, BinaryCondition):
            left = self._generate_condition(cond.left)
            right = self._generate_condition(cond.right)
            return f"({left} {cond.op} {right})"

開花制御ルールの生成結果:

# ルール: flowering_control
# FarmScript v1.0 生成 — 手動編集禁止

def rule_flowering_control():
    days_without_rain = sensors.read('days_without_rain')
    flowering_stage   = sensors.read('flowering_stage')
    soil_moisture_z1  = sensors.read('soil_moisture_z1')
    soil_moisture_z2  = sensors.read('soil_moisture_z2')

    if days_without_rain > 14 and flowering_stage == "none":
        alerts.line(
            f"無降雨{days_without_rain}日目 — 開花サインを監視"
        )
        log(f"干ばつストレス蓄積中。Z1:{soil_moisture_z1}% Z2:{soil_moisture_z2}%")

    elif flowering_stage == "budding":
        actuators.group_off("all_pumps")
        alerts.line(
            "開花(蕾)検知 — 全灌漑停止。灌漑厳禁。",
            priority="high"
        )
        alerts.sms("開花検知 — 全灌漑停止", priority="high")

    elif flowering_stage == "full_bloom":
        actuators.group_off("all_pumps")
        alerts.line("満開確認。灌漑継続停止。4〜6日以内に結実見込み。")

ランタイムアーキテクチャ

生成されたPythonコードはfarmscript_rtランタイムライブラリに依存する。このライブラリはハードウェア抽象化レイヤーを提供し、生成コードがセンサーの物理プロトコルやアクチュエータの制御方法を意識しなくて済む設計になっている。

┌─────────────────────────────────┐
│         generated/main.py       │  ← FarmScriptが生成
│   rule_flowering_control()      │
│   rule_heat_stress()            │
│   rule_irrigation()             │
└──────────────┬──────────────────┘
               │ imports
┌──────────────▼──────────────────┐
│        farmscript_rt/           │  ← ランタイムライブラリ
│   SensorManager   (I2C/GPIO)    │
│   ActuatorManager (Relay/PWM)   │
│   AlertManager    (LINE/SMS)    │
│   TimerManager    (async tasks) │
│   LogManager      (JSON/file)   │
└─────────────────────────────────┘
               │ hardware calls
┌──────────────▼──────────────────┐
│      Raspberry Pi 4 / ESP32     │
│   GPIO, I2C, 4G modem           │
└─────────────────────────────────┘

ポーリングループ:

# generated/main.py

import time
import signal
import logging
from rules import rule_flowering_control, rule_heat_stress, rule_irrigation
from farmscript_rt import RuntimeContext

ctx = RuntimeContext.from_config("farm_config.json")

def main():
    logging.info("FarmScript runtime 起動")
    while True:
        try:
            rule_flowering_control()
            rule_heat_stress()
            rule_irrigation()
        except SensorReadError as e:
            logging.error(f"センサー読み取りエラー: {e}")
            ctx.alerts.line(f"センサーエラー: {e}", priority="high")
        except Exception as e:
            logging.critical(f"予期しないエラー: {e}", exc_info=True)
        time.sleep(ctx.poll_interval_seconds)  # デフォルト30秒

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, lambda *_: ctx.shutdown())
    main()

設計上のトレードオフと今後の課題

ポーリング vs イベント駆動

現在の実装はポーリングベースで、30秒ごとに全ルールを評価する。シンプルで実装が容易だが、高頻度センサー(例:雨量センサーの立ち上がりエッジ)には不向きだ。次バージョンではMQTTイベントブリッジを追加し、ポーリングとイベント駆動のハイブリッドにする予定だ。

条件式の表現力

現在、括弧によるグルーピングがサポートされていない。when A and B or C(A and B) or Cとして左結合で評価されるが、A and (B or C)は書けない。ユーザーには別のwhen節に分割することで回避してもらっている。パーサーへの括弧サポート追加は実装コストが低く、次期リリースに含める。

タイマー処理

turn on pump_z1 for 25 minutesのようなタイマー付き命令は、TimerManagerが非同期で管理する。競合状態(例:タイマー実行中に別のルールが同じアクチュエータを操作する)の検出はランタイム側でロックを使って対処しているが、FarmScriptレベルで静的に検出できるようにしたい。コンパイル時のアクチュエータ競合解析は今後の課題だ。

マルチターゲットコンパイル

現在はCPython(Raspberry Pi)のみをターゲットとしているが、MicroPython(ESP32)へのコンパイルも計画している。コード生成器をターゲット別に分離するプラグインアーキテクチャに移行中だ。


まとめ

FarmScriptの設計と実装を通じて得た主な知見:

  • DSLの有効性は「どこで複雑さを管理するか」の問題だ。 YAMLや汎用言語で表現しにくい複合条件がドメインに存在するなら、DSLは投資に値する。
  • 再帰降下パーサーは実装コストが低く、農業のような中程度の複雑さのDSLに適している。 パーサージェネレータ(ANTLR等)は表現力が高いが、エラーメッセージのカスタマイズが難しい。
  • セマンティック解析でドメインルールを強制することが、DSLの信頼性の核心だ。 型チェックだけでなく、ドメイン固有の制約(開花期には灌漑不可など)をコンパイル時に検出できる仕組みを将来的に組み込みたい。
  • 生成コードの可読性は重要だ。 農業の専門家がデプロイ後の動作を確認できるよう、生成Pythonは手書きと見分けがつかない品質を目標にしている。

Simplico Co., Ltd. はバンコクを拠点とするソフトウェアエンジニアリング&プロダクトスタジオです。IoT自動化システム、AI/RAGアプリケーション、エンタープライズインテグレーションを専門としています。技術的なご質問やコラボレーションのご相談は simplico.net よりお気軽にどうぞ。


タグ: DSL設計 コンパイラ 農業IoT Python Raspberry Pi 再帰降下パーサー FarmScript MicroPython スマート農業


Get in Touch with us

Chat with Us on LINE

iiitum1984

Speak to Us or Whatsapp

(+66) 83001 0222

Related Posts

Our Products