Skip to content

认知中枢实现方案

这是《认知中枢设计》的实现层配套文档。 概念文档定义了认知中枢是什么、Agenda 长什么样、判断主循环的 6 步。本文回答的是:代码怎么组织、现有模块怎么收口、分几步迁移。 如本文与《认知中枢设计》《产品与架构设计》冲突,以后者为准。 最后更新:2026-03-19


一、为什么需要实现设计

概念文档已经定义了"认知中枢是唯一大脑",但当前代码里,认知判断分散在 3 处,各自独立决策:

  1. scripts/awareness/engine.py — 事件驱动,入口先写 Fact,再做三级处理(L0/L1/L2)。它会产出 create_concern / advance_concern / notify / record 这类感知判断,但不会再直接执行 Concern / 通知动作,而是把 signal_payload 回写到 Fact 后统一交给 cognitive.evaluate_fact()。当前由 avatar_corefeishu_bot 等显式调用 awareness.dispatch_event(),不再通过 event_store 监听器隐式注入事件。
  2. scripts/direct/hooks.py:_detect_followup_concerns() — 对话结束后的 hook,用内部 LLM 检测承诺/委托/风险,直接调 engine.raise_concern() 创建 followup concern。
  3. scripts/concern/scheduler.py:gather_discoveries() + awareness.scan_absence() — 规则引擎,检测逾期任务和卡住审批并先写 absence_detection Fact,再统一通过 cognitive.evaluate_fact() 判断是否进 watch_item/concern,旧的 scan_heartbeat() 直接建 concern 的逻辑已经移除。

这 3 处的共同问题:

  • 没有统一的评估接口;每处各写一套判断逻辑
  • 没有经过 Agenda;concern 直接创建,绕过了 watch_item 缓冲
  • 没有去重合并;同一事实可能从多个入口重复立项
  • 没有统一决策记录;追溯时需要到 3 个地方查

实现设计的目标,是把这 3 处收口到一个统一的 evaluate() 入口,经过 Agenda 判断后再分流。


二、与现有代码的映射

概念层现有代码收口方向
Perception(信号解释)awareness/engine.pyhandle_event() + handle_fact() + match_attention() + _llm_evaluate()保留为 Perception 子能力;先写 Fact,再回写 signal_payload,最终走 cognitive.evaluate_fact()
对话后判断hooks.py:_detect_followup_concerns()改为产出 Signal,调 cognitive.evaluate(signal)
缺席检测scheduler.py:gather_discoveries() + awareness.scan_absence()改为先写 absence_detection Fact,再调 cognitive.evaluate_fact(fact_id)
Agenda 管理不存在新建 cognitive/agenda.py
统一评估不存在新建 cognitive/core.pyevaluate()
Workspace 缓存awareness/workspace.py保留,改为从 Agenda 真源投影刷新
Concern 创建/推进concern/engine.pyraise_concern() / advance_concern()不变,但只允许通过 cognitive.evaluate() 调用

三、模块结构

新建 scripts/cognitive/ 包:

scripts/cognitive/
├── __init__.py
├── core.py        # 统一评估入口 evaluate()
├── agenda.py      # Agenda MongoDB CRUD
└── signal.py      # Signal 数据结构定义

3.1 signal.py — Signal 数据结构

python
from __future__ import annotations
from dataclasses import dataclass, field


@dataclass
class Signal:
    """认知中枢的统一输入信号"""

    signal_type: str
    # 典型值: delegation / promise / reply / blocker / risk
    #         completion / absence / preference / work_state

    source: str
    # 产出来源: "awareness" / "dialog_hook" / "absence_scan"

    summary: str
    # 一句话描述

    person_refs: list[str] = field(default_factory=list)
    # 关联 person_id 列表

    fact_refs: list[str] = field(default_factory=list)
    # 关联 fact/event id 列表

    confidence: str = "medium"
    # "low" / "medium" / "high"

    priority: str = "medium"
    # "low" / "medium" / "high"

    raw_data: dict = field(default_factory=dict)
    # 原始事件/对话数据,供 evaluate 深度判断用

Signal 类型枚举详见《认知中枢设计》第 8.2 节。

3.2 agenda.py — Agenda CRUD

MongoDB 单文档,_id: "agenda_qinghai",集合名 agenda。文档结构与概念文档第 6.1 节 JSON 一致。

3.3 core.py — 统一评估入口

唯一对外接口 evaluate(signal),内部实现概念文档的 6 步主循环。


四、统一评估接口

python
async def evaluate(signal: Signal) -> EvaluateResult:
    """认知中枢统一评估入口

    对应概念文档第八节的 6 步主循环:
      1. 接收 Fact(Signal 已经包含 fact_refs)
      2. Perception 已产出 Signal(由调用方完成)
      3. 绑定上下文
      4. 评估五个维度
      5. 统一裁决
      6. 写入与动作执行

    Args:
        signal: 已形成的结构化信号

    Returns:
        EvaluateResult(decision_type, detail)
    """

4.1 输入

  • Signal 对象(由 3 个入口各自产出后传入)

4.2 内部流程

text
Signal 到达

Step 3: 绑定上下文
  ├─ match_existing_agenda(signal)
  │   ├─ 是否命中已有 watch_item
  │   ├─ 是否命中已有 concern_ref
  │   └─ 是否关联已有 focus_queue 条目
  ├─ 加载相关 PersonContext slice
  └─ 加载相关 Concern summary

Step 4: 评估五个维度
  ├─ impact / urgency / continuity / confidence / scope
  └─ 规则优先,不确定时调内部 LLM

Step 5: 统一裁决 → decision_type
  ├─ ignore / record_only
  ├─ session_update
  ├─ person_update
  ├─ agenda_watch        → add_watch_item()
  ├─ direct_action       → 执行一次性动作
  ├─ create_concern      → raise_concern()
  ├─ advance_concern     → advance_concern()
  ├─ close_concern       → resolve_concern()
  └─ escalate            → 升级给老板

Step 6: apply_agenda_decision(decision)
  ├─ 写 recent_decisions
  ├─ 写 Person / Agenda / Concern
  ├─ 触发 Action
  └─ refresh_workspace_cache()

4.3 输出

python
@dataclass
class EvaluateResult:
    decision_type: str
    # ignore / record_only / session_update / person_update
    # agenda_watch / direct_action / create_concern
    # advance_concern / close_concern / escalate

    concern_id: str | None = None
    watch_id: str | None = None
    reason: str = ""
    action_taken: str = ""

4.4 规则优先 + LLM 兜底

evaluate 内部不是每次都调 LLM。判断策略分两层:

条件处理方式调 LLM
signal 命中已有 concern(同人+同主题)直接 advance_concern
signal 命中已有 watch_item补充证据,判断是否 promote否(规则判断)
signal.confidence == "high" 且 continuity 明确直接按 signal_type 分流
signal.confidence == "low" 或维度判断不确定调内部 LLM 做综合裁决

4.5 五个维度的量化标准

每个 Signal 经过 5 个维度评估后,产出结构化评分用于分流裁决。

impact(业务影响)

级别条件示例
critical涉及老板显式关注的项目或人老板说"盯紧 v2.0"
high涉及关键项目 deadline 或跨部门阻塞v2.0 后端提测逾期
medium涉及单人单任务正常推进张三日报未提交
low不影响业务推进闲聊、情绪表达

判定依据:Agenda.focus_queue 中是否有相关条目 → 自动提升;Person.org.is_key_person → 提升一级。

urgency(时间紧迫)

级别条件
immediate有硬 deadline 且剩余 ≤2 小时,或老板当场等结果
todaydeadline 在当天,或承诺"今天给"
this_weekdeadline 在本周内
none无时间约束

判定依据:Signal.detail 中的 deadline / when 字段;已有 watch_item 的 expires_at。

continuity(持续性)

含义典型分流
needs_followup未来需要系统再次介入→ concern 或 watch
one_shot当前一次处理即可→ direct_action
none不需要后续→ record_only 或 ignore

判定依据:signal_type 自带默认值。delegation/promise/absence → needs_followup;completion/preference → one_shot;work_state → none。可被 LLM 覆盖。

confidence(证据充分度)

级别条件
high明确事实或显式表达("帮我催张三"、任务状态变为已完成)
medium合理推断但非确证("感觉这个可能会延期")
low弱信号、单条线索、模糊表述

confidence 在 Signal 产出时由 Perception 初步设定(规则产出默认 high,LLM 产出由模型判断),evaluate() 可根据上下文调整。

scope(影响范围)

级别条件
cross_project影响多个项目或多个部门
cross_person影响多人但在同一项目内
single仅涉及单人单事

4.6 分流决策树

5 个维度评估完成后,按以下决策树裁决。

对话承诺/委托的分流规则:对话中检测到的 delegation_signal(高 confidence)直接进入 create_concern;promise_signal / risk_signal(中低 confidence)先进 watch,积累证据后再升级。这是统一规则,不因来源(对话 hook 或外部事件)而不同。

text
命中已有 concern?
  ├─ 是 → advance_concern(直接推进)
  └─ 否 → 继续判断

命中已有 watch_item?
  ├─ 是 → 补充证据 → 检查 promote 条件
  │   ├─ 满足 → promote_watch_to_concern
  │   └─ 不满足 → 更新 watch_item,结束
  └─ 否 → 继续判断

continuity == needs_followup?
  ├─ 否 →
  │   ├─ impact >= high 或 urgency == immediate → direct_action
  │   ├─ impact == medium → person_update 或 record_only
  │   └─ 其他 → ignore
  └─ 是 → 继续判断

confidence >= high 且 impact >= medium?
  ├─ 是 → create_concern(直接立项)
  └─ 否 → agenda_watch(先观察)

impact >= high 或 urgency <= today?
  ├─ 是 → create_concern(紧急优先立项)
  └─ 否 → agenda_watch + 设置 promote_condition

4.7 focus_queue 排序算法

focus_queue 排序不是固定公式,而是以下优先级规则的叠加:

text
排序权重(从高到低):

1. 老板显式关注 → 置顶(人工标记或 delegation_signal 来源为 owner)
2. urgency == immediate → 排在所有非老板关注项之前
3. impact × urgency 组合:
   critical+today > high+today > critical+this_week > high+this_week > medium+today > ...
4. 同等级内按 updated_at 降序(最近有新进展的排前面)
5. 低优且超过 48 小时无进展的自动下沉

更新时机:每次 apply_agenda_decision() 执行后自动重排。


五、Agenda 实现

5.1 MongoDB 文档结构

集合 agenda,单文档 _id: "agenda_qinghai"。字段定义与概念文档第 6.1 节完全一致:

json
{
  "_id": "agenda_qinghai",
  "focus_queue": [],
  "watch_items": [],
  "concern_refs": [],
  "recent_decisions": [],
  "updated_at": "..."
}

recent_decisions 滚动窗口,保留最近 100 条。

5.2 CRUD API

python
def get_agenda() -> dict:
    """获取或初始化 Agenda 文档"""

def add_watch_item(
    topic: str,
    priority: str,
    confidence: str,
    source_fact_refs: list[str],
    person_refs: list[str],
    promote_condition: str,
    expires_at: str | None = None,
) -> str:
    """添加 watch_item,返回 watch_id"""

def promote_watch_to_concern(
    watch_id: str,
    concern_id: str,
) -> None:
    """watch_item 升级为 concern:移除 watch,添加 concern_ref"""

def add_concern_ref(
    concern_id: str,
    priority: str,
    status: str,
    agenda_role: str = "active",
) -> None:
    """在 Agenda 中注册新 concern 引用"""

def remove_concern_ref(concern_id: str) -> None:
    """从 Agenda 中移除 concern 引用(concern 关闭时调用)"""

def update_focus_queue(entries: list[dict]) -> None:
    """重排 focus_queue(由 evaluate 裁决后调用)"""

def append_decision(
    decision_type: str,
    fact_refs: list[str],
    signal_refs: list[str],
    reason: str,
) -> str:
    """追加决策记录到 recent_decisions,返回 decision_id
    自动淘汰超出窗口的旧记录。"""

def find_matching_watch(
    person_refs: list[str],
    topic_keywords: list[str],
) -> dict | None:
    """查找是否有匹配的 watch_item(去重用)"""

5.3 watch_item 升级(promote)规则

watch_item 存在的意义是"缓冲不确定的事项"。升级为 concern 的条件必须明确,否则缓冲层形同虚设。

自动升级条件(满足任一即触发 promote)

条件类型规则示例
时间到期expires_at 到期且无完成证据watch"v2.0 今天提测",18:00 到期无 merge 事件 → 升级
证据累积同一 watch 收到 ≥2 条同向 Signal两条 risk_signal 都指向张三延期 → 升级
优先级跃迁新 Signal 的 impact ≥ high 且命中已有 watch老板追问"那个事怎么样了" → 立即升级
关联升级watch 的 person_refs 同时出现在高优 concern 中张三已有一个 high concern,新 watch 自动升级

不升级条件

条件处理
expires_at 到期但已有 completion_signal移除 watch,不升级
watch 超过 7 天无任何新 Signal静默移除(过期清理)
confidence 始终为 low,无新证据保持 watch,等待

promote 流程

text
watch_item 收到新 Signal
  → 检查上述升级条件
  → 满足 → promote_watch_to_concern(watch_id, new_concern_id)
    ├─ 调 raise_concern() 创建 concern
    ├─ 从 watch_items 移除
    ├─ 写入 concern_refs
    └─ 写入 recent_decisions(decision_type="promote")
  → 不满足 → 更新 watch_item 的 source_fact_refs,继续观察

5.4 Workspace 缓存同步

Workspace 是 Agenda 的派生缓存,写方向单向。

同步时机

触发事件同步内容
apply_agenda_decision() 执行后全量刷新 attention_watchlist + working_summary
concern 状态变化(resolve/advance)增量更新 working_summary 中的 concern 摘要
定时(每 5 分钟)全量刷新(兜底,防止遗漏)

同步方式

python
def refresh_workspace_cache():
    agenda = get_agenda()
    active_concerns = get_active_concerns()
    recent_facts = get_recent_facts(hours=24)

    # attention_watchlist 投影
    watchlist = [
        {
            "watch_id": w["watch_id"],
            "topic": w["topic"],
            "priority": w["priority"],
            "person_refs": w["person_refs"],
            "promote_condition": w["promote_condition"],
            "match_rules": _derive_match_rules(w),  # 从 topic + person_refs 派生事件匹配规则
            "source_agenda_ref": w["watch_id"],
            "projected_at": now(),
        }
        for w in agenda["watch_items"]
    ]

    # working_summary 汇总
    summary = _summarize(
        focus_queue=agenda["focus_queue"],
        concern_summaries=[c["title"] + ": " + c["status"] for c in active_concerns],
        recent_facts=recent_facts[:20],
    )

    update_workspace(working_summary=summary, attention_watchlist=watchlist)

六、三个入口的收口方式

6.1 显式事件分发 → awareness.handle_event

当前实现avatar_corefeishu_bot 在收到入站消息或外部平台事件后,显式调用 awareness.dispatch_event()AwarenessEngine.handle_event() 先写 Fact,再基于 Fact 做 L0/L1/L2 判断,命中动作时把 signal_payload 写回 Fact,然后统一走 cognitive.evaluate_fact(fact_id)

改造后

text
avatar_core / feishu_bot 构造外部事件
  → awareness.dispatch_event(event)
  → awareness.handle_event(event)
      → _write_fact(event)
      → handle_fact(fact_id)
          → L0: 写 workspace.event_buffer
          → L1/L2: 生成 signal_payload
          → cognitive.evaluate_fact(fact_id)
              → Agenda 判断 → 分流执行

关键改动点:

  • 不再依赖 event_store.register_event_listener 之类的隐式监听入口
  • handle_event() 的职责是写 Fact 并完成感知判断;真正的统一裁决入口是 cognitive.evaluate_fact(fact_id)
  • awareness 不再直接执行发送/建 concern 等动作,只负责把 signal_payload 写回 Fact,交认知中枢处理
  • L1 规则匹配(已有 concern 自动 advance)保留为快速路径,但结果仍通过 Agenda.recent_decisions 留痕

6.2 对话 on_stop → hooks._detect_followup_concerns

现状_detect_followup_concerns() 用 LLM 检测对话中的承诺/委托,直接调 engine.raise_concern()

改造后

text
on_stop hook
  → _detect_followup_concerns(user_msg, ai_msg, session_id)
      → LLM 检测(不变)
      → 有跟进事项时,构造 Signal(signal_type="promise"|"delegation"|...)
      → cognitive.evaluate(signal)
          → Agenda 判断:可能进 watch_item 或直接建 concern

关键改动点:

  • _detect_followup_concerns() 不再直接调 engine.raise_concern()
  • 改为构造 Signal 并调 cognitive.evaluate()
  • 好处:低置信度的对话承诺会先进入 watch_item 而非立即建 concern

6.3 定时扫描 → scheduler.scan_absence

现状gather_discoveries() 收集逾期任务和卡住审批,历史上 scan_heartbeat()/scan_absence() 直接创建 concern,但随着旧路径清理,现在只有 awareness.scan_absence() 在运行。

改造后

text
daemon concern_scan 每 5 轮
  → awareness.scan_absence()
      → gather_discoveries()
      → 每个 discovery 写成 absence_detection Fact
      → cognitive.evaluate_fact(fact_id)
          → Agenda 判断:已有 watch 则补证据,否则判断是否建 concern

关键改动点:

  • scan_absence() 不再把 discovery 包装成 event 再走 handle_event()
  • discovery 统一先落 Fact,再由 cognitive.evaluate_fact() 从事实出发判断
  • scan_heartbeat() 的旧路径已经停用,所有缺席判断目前都由 awareness.scan_absence() 处理

七、与 daemon 的集成

7.1 concern_scan 任务

daemon._task_concern_scan() 当前逻辑:

python
# 每 60s
s.scan_due_concerns()    # Concern 到期 → agent loop
s.scan_timeouts()        # Concern 超时
s.scan_schedules()       # 定时 Concern 触发

# 每 5 轮(~300s)
awareness_engine.scan_absence()  # 缺席检测

改造后保持相同调度节奏,唯一区别是 scan_absence() 内部走 cognitive.evaluate_fact() 而非直接建 concern。

scan_due_concerns / scan_timeouts / scan_schedules 属于 Concern 生命周期管理,不需要经过 evaluate,保持不变。

7.2 awareness 事件分发

daemon 仅负责在启动时调用 scripts.awareness.init_runtime(engine),初始化 awareness runtime。事件分发不再依赖 event_store.register_event_listener;目前所有进入 awareness 的事件都是由调用方显式 dispatch_event()

avatar_core 收到入站消息后 → awareness.dispatch_event(inbound_event)
feishu_bot 接收到非消息平台事件 → awareness.dispatch_event(platform_event)

owner 审批回调不再进入 awareness;这类内部控制动作只写 action_result Fact 和事件日志,不参与感知判断。真正进入 awareness 的只有用户入站消息和外部平台事件。每个事件进入 awareness 后都会先写 Fact,再基于 Fact 评估;命中动作时写 signal_payload 并调用 cognitive.evaluate_fact(fact_id)。串行化由 awareness runtime 内部 loop 负责,不再依赖 daemon 自己维护回调监听链。

7.3 cognitive 模块初始化

WorkDaemon.__init__() 中,Concern 引擎初始化后立即通过 scripts.cognitive.init_core(...) 初始化认知中枢;awareness 只消费这个全局 core:

python
from scripts.concern.engine import init_engine as _init_concern
from scripts.cognitive import init_core as _init_cognitive

self.concern_engine = _init_concern(...)
_init_cognitive(self.concern_engine)

八、并发与一致性

系统有三个并发入口可能同时调用 evaluate() 并写入 Agenda。本节定义并发保护策略。

8.1 并发场景

入口线程模型频率典型 Agenda 写操作
显式 dispatch_event() → handle_event() → evaluate_fact()awareness_loop(独立 async loop,守护线程)事件驱动,不定add_watch / add_concern_ref / append_decision
对话 on_stop → evaluate()对话 async context每次对话结束同上
定时扫描 → evaluate()daemon 主线程(每 5 轮 ~300s)定时同上

三个入口可能同时处理涉及同一人的不同 Signal,导致:

  • 重复创建 watch_item(同一话题两条)
  • 重复 promote watch → concern(两个 Signal 同时触发升级)
  • focus_queue 排序冲突

8.2 Agenda 原子操作规范

Agenda 单文档(_id: "agenda_qinghai")的所有写入必须使用 MongoDB 原子操作:

操作MongoDB 命令原子性保证
添加 watch_itemfindOneAndUpdate + $push + 唯一性前置查询单文档原子
移除 watch_itemupdateOne + $pull: {"watch_items": {"watch_id": x}}单文档原子
promote watch→concernfindOneAndUpdate 条件含 watch_id 存在性 + $pull + $push单次原子,CAS 语义
添加 concern_refupdateOne + $push + 前置去重查询单文档原子
移除 concern_refupdateOne + $pull单文档原子
追加 decisionupdateOne + $push + $slice: -100单文档原子,自动滚动窗口
重排 focus_queuefindOneAndUpdate + $set: {"focus_queue": [...]}单文档原子,整体覆盖

promote 原子实现

promote 是并发冲突最敏感的操作。两个 Signal 同时触发同一 watch 的 promote 时,必须保证只有一个成功。

python
result = db.agenda.find_one_and_update(
    {
        "_id": "agenda_qinghai",
        "watch_items.watch_id": watch_id,  # 条件:watch 仍存在
    },
    {
        "$pull": {"watch_items": {"watch_id": watch_id}},
        "$push": {"concern_refs": concern_ref_doc},
        "$set": {"updated_at": now()},
    },
    return_document=ReturnDocument.AFTER,
)
if result is None:
    # watch 已被另一个 evaluate 先行 promote,跳过
    return

条件中包含 watch_id 存在性检查。第二个到达的 promote 因条件不满足而自动 no-op(CAS 语义)。

8.3 evaluate() 串行化策略

方案:per-person asyncio.Lock + Agenda 乐观并发

python
class CognitiveCore:
    def __init__(self):
        self._person_locks: dict[str, asyncio.Lock] = {}
        self._locks_lock = asyncio.Lock()

    async def _get_person_lock(self, person_id: str) -> asyncio.Lock:
        async with self._locks_lock:
            if person_id not in self._person_locks:
                self._person_locks[person_id] = asyncio.Lock()
            return self._person_locks[person_id]

    async def evaluate(self, signal: Signal) -> EvaluateResult:
        primary_person = signal.person_refs[0] if signal.person_refs else "_global"
        lock = await self._get_person_lock(primary_person)
        async with lock:
            return await self._evaluate_inner(signal)

为什么 per-person 而非全局

  • 大部分 Signal 关联不同人,全局锁会让无关 Signal 互相等待
  • 同一人的多个 Signal(如同时收到对话承诺 + 事件信号)需要串行,避免重复 watch/concern
  • Agenda 写入本身通过 MongoDB 原子操作保证,不需要应用层全局锁

跨人 Signal 处理

当 Signal 的 person_refs 含多人时,取第一个为锁定对象。Agenda 原子操作保证跨人写入不会出现不一致。

锁清理

_person_locks 字典随时间增长,但 person 数量有限(公司人数),不需要主动清理。如需优化,可定期清除 30 分钟无竞争的锁。

8.4 Concern 调度去重

当前问题:scan_due_concerns() 可能在多轮扫描中重复捞到同一 concern。

解决方案:原子 CAS 标记

python
def process_concern(self, concern_id: str, trigger: str):
    # 原子标记:正在处理
    result = db.concerns.find_one_and_update(
        {
            "concern_id": concern_id,
            "$or": [
                {"processing_at": None},
                {"processing_at": {"$lt": now() - timedelta(minutes=10)}},
            ],
        },
        {"$set": {"processing_at": now()}},
        return_document=ReturnDocument.AFTER,
    )
    if result is None:
        return  # 已有另一个扫描在处理

    try:
        await self._run_agent_loop(result, trigger)
    finally:
        db.concerns.update_one(
            {"concern_id": concern_id},
            {"$set": {"processing_at": None}},
        )

10 分钟超时兜底:防止 agent loop 崩溃后 concern 永久锁定。

8.5 watch_item 去重

添加 watch_item 前,先查是否已有同人同主题的 watch:

python
def add_watch_item(self, topic, person_refs, ...):
    existing = find_matching_watch(person_refs, topic_keywords)
    if existing:
        _append_evidence(existing["watch_id"], signal.fact_refs)
        return existing["watch_id"]
    # 原子添加新 watch_item
    ...

find_matching_watch 使用 person_refs 精确匹配 + topic 关键词交集判断。由于 evaluate() 已 per-person 串行,同人的 watch_item 去重不存在竞态。

8.6 daemon 集成约束

三个入口必须共享同一个 CognitiveCore 实例和同一套 per-person lock。

入口投递方式锁共享
awareness_loop 中的 evaluate()直接在 async loop 中调用共享 CognitiveCore 实例
对话 on_stop 中的 evaluate()asyncio.run_coroutine_threadsafe() 投递到 awareness_loop共享 CognitiveCore 实例
定时扫描中的 evaluate()asyncio.run_coroutine_threadsafe() 投递到 awareness_loop共享 CognitiveCore 实例

关键:所有 evaluate() 调用都汇聚到同一个 async event loop(awareness_loop),确保 per-person lock 在同一个 loop 内生效。


九、渐进式迁移

Phase 1:建基础设施,不改现有逻辑

目标:Agenda 集合可用,evaluate 骨架可调用,现有 3 入口不变。

  • 新建 scripts/cognitive/ 包:signal.py + agenda.py + core.py
  • agenda.py 实现完整 CRUD(get_agenda / add_watch_item / append_decision 等)
  • core.py 实现 evaluate() 骨架,内部先只做 append_decision() + 透传到现有逻辑
  • 现有 3 个入口在执行完原逻辑后,额外调用 agenda.append_decision() 写入决策记录
  • awareness 动作分发结果写回 Agenda / Fact,确保 create_concern 路径可追踪
  • hooks._detect_followup_concerns() 创建 concern 后,额外调用 agenda.add_concern_ref()

验证点:Agenda 文档中能看到所有 concern 创建和推进的决策记录。

Phase 2:统一 evaluate 入口

目标:3 个入口全部改为调 cognitive.evaluate()

  • awareness handle_event() 改为先写 Fact,再把 signal_payload 写回该 Fact 并调用 evaluate_fact()
  • 发送消息、创建/推进 concern 的真正执行逻辑全部收口到 core.py
  • hooks _detect_followup_concerns() 改为产出 Signal + 调 evaluate
  • scan_absence() 改为写 absence_detection Fact + 调 evaluate_fact()
  • evaluate 内部实现去重合并(find_matching_watch + concern 查询)
  • evaluate 内部实现 watch_item 缓冲(低置信度信号先 watch,不直接建 concern)

验证点:只有 cognitive.evaluate() / cognitive.evaluate_fact() 这条统一认知链才能创建 concern,3 个旧入口不再直接调 raise_concern()

Phase 3:收口旧逻辑

目标:workspace 从 Agenda 真源投影,旧 heartbeat 逻辑退役。

  • workspace.pyattention_watchlist 改为从 Agenda.watch_items + active concern_refs 投影生成
  • workspace.pyworking_summary 改为从 Agenda.focus_queue + Concern summaries + recent Facts 汇总生成
  • 实现 refresh_workspace_cache(),由 evaluate 写入后调用;workspace 集合只保留最小运行时字段,不再镜像 attention_watchlist / recent_observations
  • awareness 不再直接执行发送/建 concern 动作

验证点:workspace 文档完全是派生缓存,删掉后系统功能不受影响。


十、接口边界

与概念文档第十三节定义的 5 个接口对齐:

概念接口实现位置签名
evaluate_fact(fact_id)cognitive/core.pyasync def evaluate_fact(fact_id: str) -> EvaluateResult — 从 Fact 出发构造 Signal,调 evaluate
evaluate_signal(signal)cognitive/core.pyasync def evaluate(signal: Signal) -> EvaluateResult — 统一评估主入口
match_existing_agenda(...)cognitive/core.py 内部def _match_existing_agenda(signal: Signal) -> AgendaMatch — 匹配 watch / concern
apply_agenda_decision(...)cognitive/core.py 内部async def _apply_decision(decision: EvaluateResult, signal: Signal) -> None — 写入并执行
refresh_workspace_cache()cognitive/core.pydef refresh_workspace_cache() -> None — 从真源重建 workspace

其中 evaluate_factevaluate 的上层便捷方法:加载 Fact → 读取 signal_payload 或按 Fact 类型补建 Signal → 调 evaluate(signal)。当前 awareness / scan_absence 主链都优先走 evaluate_fact();显式构造 Signal 的入口主要保留给 direct/hooks.py 这类非事件型调用。


十一、当前原则

  1. 统一入口 — 所有认知判断必须经过 cognitive.evaluate()cognitive.evaluate_fact(),不允许旁路创建 concern
  2. Fact / Signal 双入口,核心单出口 — 外围入口可以显式构造 Signal 调 evaluate(signal),也可以先写 Fact 再调 evaluate_fact(fact_id);但最终都必须落到同一个认知裁决主循环
  3. Agenda 先行 — 低置信度信号先进 watch_item,不直接建 concern
  4. 规则优先 — 能用规则判断的不调 LLM,降低成本
  5. 去重合并 — 同人、同主题、同时间窗口的信号优先合并到已有 watch / concern
  6. 决策可追溯 — 每次 evaluate 都写入 recent_decisions,含 fact_refs 和 reason
  7. 渐进迁移 — 分 3 阶段收口,每阶段可独立验证,不一次性重写
  8. 降级兜底 — 不再保留 scan_heartbeat() 或其他旧兼容路径,awareness/cognitive 出现问题必须靠监控告警与人工恢复
  9. 不破坏对话 — evaluate 在对话结束后异步执行,不阻塞回复链路
  10. workspace 是缓存 — 从 Agenda + Concern + Fact 真源投影,不作为判断依据

Boss-AGI · 超级 AI 企业助理