认知中枢实现方案
这是《认知中枢设计》的实现层配套文档。 概念文档定义了认知中枢是什么、Agenda 长什么样、判断主循环的 6 步。本文回答的是:代码怎么组织、现有模块怎么收口、分几步迁移。 如本文与《认知中枢设计》《产品与架构设计》冲突,以后者为准。 最后更新:2026-03-19
一、为什么需要实现设计
概念文档已经定义了"认知中枢是唯一大脑",但当前代码里,认知判断分散在 3 处,各自独立决策:
scripts/awareness/engine.py— 事件驱动,入口先写 Fact,再做三级处理(L0/L1/L2)。它会产出create_concern / advance_concern / notify / record这类感知判断,但不会再直接执行 Concern / 通知动作,而是把signal_payload回写到 Fact 后统一交给cognitive.evaluate_fact()。当前由avatar_core、feishu_bot等显式调用awareness.dispatch_event(),不再通过event_store监听器隐式注入事件。scripts/direct/hooks.py:_detect_followup_concerns()— 对话结束后的 hook,用内部 LLM 检测承诺/委托/风险,直接调engine.raise_concern()创建 followup concern。scripts/concern/scheduler.py:gather_discoveries()+awareness.scan_absence()— 规则引擎,检测逾期任务和卡住审批并先写 absence_detection Fact,再统一通过cognitive.evaluate_fact()判断是否进 watch_item/concern,旧的scan_heartbeat()直接建 concern 的逻辑已经移除。
这 3 处的共同问题:
- 没有统一的评估接口;每处各写一套判断逻辑
- 没有经过 Agenda;concern 直接创建,绕过了 watch_item 缓冲
- 没有去重合并;同一事实可能从多个入口重复立项
- 没有统一决策记录;追溯时需要到 3 个地方查
实现设计的目标,是把这 3 处收口到一个统一的 evaluate() 入口,经过 Agenda 判断后再分流。
二、与现有代码的映射
| 概念层 | 现有代码 | 收口方向 |
|---|---|---|
| Perception(信号解释) | awareness/engine.py 的 handle_event() + handle_fact() + match_attention() + _llm_evaluate() | 保留为 Perception 子能力;先写 Fact,再回写 signal_payload,最终走 cognitive.evaluate_fact() |
| 对话后判断 | hooks.py:_detect_followup_concerns() | 改为产出 Signal,调 cognitive.evaluate(signal) |
| 缺席检测 | scheduler.py:gather_discoveries() + awareness.scan_absence() | 改为先写 absence_detection Fact,再调 cognitive.evaluate_fact(fact_id) |
| Agenda 管理 | 不存在 | 新建 cognitive/agenda.py |
| 统一评估 | 不存在 | 新建 cognitive/core.py 的 evaluate() |
| Workspace 缓存 | awareness/workspace.py | 保留,改为从 Agenda 真源投影刷新 |
| Concern 创建/推进 | concern/engine.py 的 raise_concern() / advance_concern() | 不变,但只允许通过 cognitive.evaluate() 调用 |
三、模块结构
新建 scripts/cognitive/ 包:
scripts/cognitive/
├── __init__.py
├── core.py # 统一评估入口 evaluate()
├── agenda.py # Agenda MongoDB CRUD
└── signal.py # Signal 数据结构定义3.1 signal.py — Signal 数据结构
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class Signal:
"""认知中枢的统一输入信号"""
signal_type: str
# 典型值: delegation / promise / reply / blocker / risk
# completion / absence / preference / work_state
source: str
# 产出来源: "awareness" / "dialog_hook" / "absence_scan"
summary: str
# 一句话描述
person_refs: list[str] = field(default_factory=list)
# 关联 person_id 列表
fact_refs: list[str] = field(default_factory=list)
# 关联 fact/event id 列表
confidence: str = "medium"
# "low" / "medium" / "high"
priority: str = "medium"
# "low" / "medium" / "high"
raw_data: dict = field(default_factory=dict)
# 原始事件/对话数据,供 evaluate 深度判断用Signal 类型枚举详见《认知中枢设计》第 8.2 节。
3.2 agenda.py — Agenda CRUD
MongoDB 单文档,_id: "agenda_qinghai",集合名 agenda。文档结构与概念文档第 6.1 节 JSON 一致。
3.3 core.py — 统一评估入口
唯一对外接口 evaluate(signal),内部实现概念文档的 6 步主循环。
四、统一评估接口
async def evaluate(signal: Signal) -> EvaluateResult:
"""认知中枢统一评估入口
对应概念文档第八节的 6 步主循环:
1. 接收 Fact(Signal 已经包含 fact_refs)
2. Perception 已产出 Signal(由调用方完成)
3. 绑定上下文
4. 评估五个维度
5. 统一裁决
6. 写入与动作执行
Args:
signal: 已形成的结构化信号
Returns:
EvaluateResult(decision_type, detail)
"""4.1 输入
Signal对象(由 3 个入口各自产出后传入)
4.2 内部流程
Signal 到达
↓
Step 3: 绑定上下文
├─ match_existing_agenda(signal)
│ ├─ 是否命中已有 watch_item
│ ├─ 是否命中已有 concern_ref
│ └─ 是否关联已有 focus_queue 条目
├─ 加载相关 PersonContext slice
└─ 加载相关 Concern summary
↓
Step 4: 评估五个维度
├─ impact / urgency / continuity / confidence / scope
└─ 规则优先,不确定时调内部 LLM
↓
Step 5: 统一裁决 → decision_type
├─ ignore / record_only
├─ session_update
├─ person_update
├─ agenda_watch → add_watch_item()
├─ direct_action → 执行一次性动作
├─ create_concern → raise_concern()
├─ advance_concern → advance_concern()
├─ close_concern → resolve_concern()
└─ escalate → 升级给老板
↓
Step 6: apply_agenda_decision(decision)
├─ 写 recent_decisions
├─ 写 Person / Agenda / Concern
├─ 触发 Action
└─ refresh_workspace_cache()4.3 输出
@dataclass
class EvaluateResult:
decision_type: str
# ignore / record_only / session_update / person_update
# agenda_watch / direct_action / create_concern
# advance_concern / close_concern / escalate
concern_id: str | None = None
watch_id: str | None = None
reason: str = ""
action_taken: str = ""4.4 规则优先 + LLM 兜底
evaluate 内部不是每次都调 LLM。判断策略分两层:
| 条件 | 处理方式 | 调 LLM |
|---|---|---|
| signal 命中已有 concern(同人+同主题) | 直接 advance_concern | 否 |
| signal 命中已有 watch_item | 补充证据,判断是否 promote | 否(规则判断) |
| signal.confidence == "high" 且 continuity 明确 | 直接按 signal_type 分流 | 否 |
| signal.confidence == "low" 或维度判断不确定 | 调内部 LLM 做综合裁决 | 是 |
4.5 五个维度的量化标准
每个 Signal 经过 5 个维度评估后,产出结构化评分用于分流裁决。
impact(业务影响)
| 级别 | 条件 | 示例 |
|---|---|---|
critical | 涉及老板显式关注的项目或人 | 老板说"盯紧 v2.0" |
high | 涉及关键项目 deadline 或跨部门阻塞 | v2.0 后端提测逾期 |
medium | 涉及单人单任务正常推进 | 张三日报未提交 |
low | 不影响业务推进 | 闲聊、情绪表达 |
判定依据:Agenda.focus_queue 中是否有相关条目 → 自动提升;Person.org.is_key_person → 提升一级。
urgency(时间紧迫)
| 级别 | 条件 |
|---|---|
immediate | 有硬 deadline 且剩余 ≤2 小时,或老板当场等结果 |
today | deadline 在当天,或承诺"今天给" |
this_week | deadline 在本周内 |
none | 无时间约束 |
判定依据:Signal.detail 中的 deadline / when 字段;已有 watch_item 的 expires_at。
continuity(持续性)
| 值 | 含义 | 典型分流 |
|---|---|---|
needs_followup | 未来需要系统再次介入 | → concern 或 watch |
one_shot | 当前一次处理即可 | → direct_action |
none | 不需要后续 | → record_only 或 ignore |
判定依据:signal_type 自带默认值。delegation/promise/absence → needs_followup;completion/preference → one_shot;work_state → none。可被 LLM 覆盖。
confidence(证据充分度)
| 级别 | 条件 |
|---|---|
high | 明确事实或显式表达("帮我催张三"、任务状态变为已完成) |
medium | 合理推断但非确证("感觉这个可能会延期") |
low | 弱信号、单条线索、模糊表述 |
confidence 在 Signal 产出时由 Perception 初步设定(规则产出默认 high,LLM 产出由模型判断),evaluate() 可根据上下文调整。
scope(影响范围)
| 级别 | 条件 |
|---|---|
cross_project | 影响多个项目或多个部门 |
cross_person | 影响多人但在同一项目内 |
single | 仅涉及单人单事 |
4.6 分流决策树
5 个维度评估完成后,按以下决策树裁决。
对话承诺/委托的分流规则:对话中检测到的 delegation_signal(高 confidence)直接进入 create_concern;promise_signal / risk_signal(中低 confidence)先进 watch,积累证据后再升级。这是统一规则,不因来源(对话 hook 或外部事件)而不同。
命中已有 concern?
├─ 是 → advance_concern(直接推进)
└─ 否 → 继续判断
↓
命中已有 watch_item?
├─ 是 → 补充证据 → 检查 promote 条件
│ ├─ 满足 → promote_watch_to_concern
│ └─ 不满足 → 更新 watch_item,结束
└─ 否 → 继续判断
↓
continuity == needs_followup?
├─ 否 →
│ ├─ impact >= high 或 urgency == immediate → direct_action
│ ├─ impact == medium → person_update 或 record_only
│ └─ 其他 → ignore
└─ 是 → 继续判断
↓
confidence >= high 且 impact >= medium?
├─ 是 → create_concern(直接立项)
└─ 否 → agenda_watch(先观察)
↓
impact >= high 或 urgency <= today?
├─ 是 → create_concern(紧急优先立项)
└─ 否 → agenda_watch + 设置 promote_condition4.7 focus_queue 排序算法
focus_queue 排序不是固定公式,而是以下优先级规则的叠加:
排序权重(从高到低):
1. 老板显式关注 → 置顶(人工标记或 delegation_signal 来源为 owner)
2. urgency == immediate → 排在所有非老板关注项之前
3. impact × urgency 组合:
critical+today > high+today > critical+this_week > high+this_week > medium+today > ...
4. 同等级内按 updated_at 降序(最近有新进展的排前面)
5. 低优且超过 48 小时无进展的自动下沉更新时机:每次 apply_agenda_decision() 执行后自动重排。
五、Agenda 实现
5.1 MongoDB 文档结构
集合 agenda,单文档 _id: "agenda_qinghai"。字段定义与概念文档第 6.1 节完全一致:
{
"_id": "agenda_qinghai",
"focus_queue": [],
"watch_items": [],
"concern_refs": [],
"recent_decisions": [],
"updated_at": "..."
}recent_decisions 滚动窗口,保留最近 100 条。
5.2 CRUD API
def get_agenda() -> dict:
"""获取或初始化 Agenda 文档"""
def add_watch_item(
topic: str,
priority: str,
confidence: str,
source_fact_refs: list[str],
person_refs: list[str],
promote_condition: str,
expires_at: str | None = None,
) -> str:
"""添加 watch_item,返回 watch_id"""
def promote_watch_to_concern(
watch_id: str,
concern_id: str,
) -> None:
"""watch_item 升级为 concern:移除 watch,添加 concern_ref"""
def add_concern_ref(
concern_id: str,
priority: str,
status: str,
agenda_role: str = "active",
) -> None:
"""在 Agenda 中注册新 concern 引用"""
def remove_concern_ref(concern_id: str) -> None:
"""从 Agenda 中移除 concern 引用(concern 关闭时调用)"""
def update_focus_queue(entries: list[dict]) -> None:
"""重排 focus_queue(由 evaluate 裁决后调用)"""
def append_decision(
decision_type: str,
fact_refs: list[str],
signal_refs: list[str],
reason: str,
) -> str:
"""追加决策记录到 recent_decisions,返回 decision_id
自动淘汰超出窗口的旧记录。"""
def find_matching_watch(
person_refs: list[str],
topic_keywords: list[str],
) -> dict | None:
"""查找是否有匹配的 watch_item(去重用)"""5.3 watch_item 升级(promote)规则
watch_item 存在的意义是"缓冲不确定的事项"。升级为 concern 的条件必须明确,否则缓冲层形同虚设。
自动升级条件(满足任一即触发 promote)
| 条件类型 | 规则 | 示例 |
|---|---|---|
| 时间到期 | expires_at 到期且无完成证据 | watch"v2.0 今天提测",18:00 到期无 merge 事件 → 升级 |
| 证据累积 | 同一 watch 收到 ≥2 条同向 Signal | 两条 risk_signal 都指向张三延期 → 升级 |
| 优先级跃迁 | 新 Signal 的 impact ≥ high 且命中已有 watch | 老板追问"那个事怎么样了" → 立即升级 |
| 关联升级 | watch 的 person_refs 同时出现在高优 concern 中 | 张三已有一个 high concern,新 watch 自动升级 |
不升级条件
| 条件 | 处理 |
|---|---|
expires_at 到期但已有 completion_signal | 移除 watch,不升级 |
| watch 超过 7 天无任何新 Signal | 静默移除(过期清理) |
| confidence 始终为 low,无新证据 | 保持 watch,等待 |
promote 流程
watch_item 收到新 Signal
→ 检查上述升级条件
→ 满足 → promote_watch_to_concern(watch_id, new_concern_id)
├─ 调 raise_concern() 创建 concern
├─ 从 watch_items 移除
├─ 写入 concern_refs
└─ 写入 recent_decisions(decision_type="promote")
→ 不满足 → 更新 watch_item 的 source_fact_refs,继续观察5.4 Workspace 缓存同步
Workspace 是 Agenda 的派生缓存,写方向单向。
同步时机
| 触发事件 | 同步内容 |
|---|---|
apply_agenda_decision() 执行后 | 全量刷新 attention_watchlist + working_summary |
| concern 状态变化(resolve/advance) | 增量更新 working_summary 中的 concern 摘要 |
| 定时(每 5 分钟) | 全量刷新(兜底,防止遗漏) |
同步方式
def refresh_workspace_cache():
agenda = get_agenda()
active_concerns = get_active_concerns()
recent_facts = get_recent_facts(hours=24)
# attention_watchlist 投影
watchlist = [
{
"watch_id": w["watch_id"],
"topic": w["topic"],
"priority": w["priority"],
"person_refs": w["person_refs"],
"promote_condition": w["promote_condition"],
"match_rules": _derive_match_rules(w), # 从 topic + person_refs 派生事件匹配规则
"source_agenda_ref": w["watch_id"],
"projected_at": now(),
}
for w in agenda["watch_items"]
]
# working_summary 汇总
summary = _summarize(
focus_queue=agenda["focus_queue"],
concern_summaries=[c["title"] + ": " + c["status"] for c in active_concerns],
recent_facts=recent_facts[:20],
)
update_workspace(working_summary=summary, attention_watchlist=watchlist)六、三个入口的收口方式
6.1 显式事件分发 → awareness.handle_event
当前实现:avatar_core、feishu_bot 在收到入站消息或外部平台事件后,显式调用 awareness.dispatch_event();AwarenessEngine.handle_event() 先写 Fact,再基于 Fact 做 L0/L1/L2 判断,命中动作时把 signal_payload 写回 Fact,然后统一走 cognitive.evaluate_fact(fact_id)。
改造后:
avatar_core / feishu_bot 构造外部事件
→ awareness.dispatch_event(event)
→ awareness.handle_event(event)
→ _write_fact(event)
→ handle_fact(fact_id)
→ L0: 写 workspace.event_buffer
→ L1/L2: 生成 signal_payload
→ cognitive.evaluate_fact(fact_id)
→ Agenda 判断 → 分流执行关键改动点:
- 不再依赖
event_store.register_event_listener之类的隐式监听入口 handle_event()的职责是写 Fact 并完成感知判断;真正的统一裁决入口是cognitive.evaluate_fact(fact_id)- awareness 不再直接执行发送/建 concern 等动作,只负责把
signal_payload写回 Fact,交认知中枢处理 - L1 规则匹配(已有 concern 自动 advance)保留为快速路径,但结果仍通过 Agenda.recent_decisions 留痕
6.2 对话 on_stop → hooks._detect_followup_concerns
现状:_detect_followup_concerns() 用 LLM 检测对话中的承诺/委托,直接调 engine.raise_concern()。
改造后:
on_stop hook
→ _detect_followup_concerns(user_msg, ai_msg, session_id)
→ LLM 检测(不变)
→ 有跟进事项时,构造 Signal(signal_type="promise"|"delegation"|...)
→ cognitive.evaluate(signal)
→ Agenda 判断:可能进 watch_item 或直接建 concern关键改动点:
_detect_followup_concerns()不再直接调engine.raise_concern()- 改为构造
Signal并调cognitive.evaluate() - 好处:低置信度的对话承诺会先进入 watch_item 而非立即建 concern
6.3 定时扫描 → scheduler.scan_absence
现状:gather_discoveries() 收集逾期任务和卡住审批,历史上 scan_heartbeat()/scan_absence() 直接创建 concern,但随着旧路径清理,现在只有 awareness.scan_absence() 在运行。
改造后:
daemon concern_scan 每 5 轮
→ awareness.scan_absence()
→ gather_discoveries()
→ 每个 discovery 写成 absence_detection Fact
→ cognitive.evaluate_fact(fact_id)
→ Agenda 判断:已有 watch 则补证据,否则判断是否建 concern关键改动点:
scan_absence()不再把 discovery 包装成 event 再走handle_event()- discovery 统一先落 Fact,再由
cognitive.evaluate_fact()从事实出发判断 scan_heartbeat()的旧路径已经停用,所有缺席判断目前都由awareness.scan_absence()处理
七、与 daemon 的集成
7.1 concern_scan 任务
daemon._task_concern_scan() 当前逻辑:
# 每 60s
s.scan_due_concerns() # Concern 到期 → agent loop
s.scan_timeouts() # Concern 超时
s.scan_schedules() # 定时 Concern 触发
# 每 5 轮(~300s)
awareness_engine.scan_absence() # 缺席检测改造后保持相同调度节奏,唯一区别是 scan_absence() 内部走 cognitive.evaluate_fact() 而非直接建 concern。
scan_due_concerns / scan_timeouts / scan_schedules 属于 Concern 生命周期管理,不需要经过 evaluate,保持不变。
7.2 awareness 事件分发
daemon 仅负责在启动时调用 scripts.awareness.init_runtime(engine),初始化 awareness runtime。事件分发不再依赖 event_store.register_event_listener;目前所有进入 awareness 的事件都是由调用方显式 dispatch_event():
avatar_core 收到入站消息后 → awareness.dispatch_event(inbound_event)
feishu_bot 接收到非消息平台事件 → awareness.dispatch_event(platform_event)owner 审批回调不再进入 awareness;这类内部控制动作只写 action_result Fact 和事件日志,不参与感知判断。真正进入 awareness 的只有用户入站消息和外部平台事件。每个事件进入 awareness 后都会先写 Fact,再基于 Fact 评估;命中动作时写 signal_payload 并调用 cognitive.evaluate_fact(fact_id)。串行化由 awareness runtime 内部 loop 负责,不再依赖 daemon 自己维护回调监听链。
7.3 cognitive 模块初始化
在 WorkDaemon.__init__() 中,Concern 引擎初始化后立即通过 scripts.cognitive.init_core(...) 初始化认知中枢;awareness 只消费这个全局 core:
from scripts.concern.engine import init_engine as _init_concern
from scripts.cognitive import init_core as _init_cognitive
self.concern_engine = _init_concern(...)
_init_cognitive(self.concern_engine)八、并发与一致性
系统有三个并发入口可能同时调用 evaluate() 并写入 Agenda。本节定义并发保护策略。
8.1 并发场景
| 入口 | 线程模型 | 频率 | 典型 Agenda 写操作 |
|---|---|---|---|
| 显式 dispatch_event() → handle_event() → evaluate_fact() | awareness_loop(独立 async loop,守护线程) | 事件驱动,不定 | add_watch / add_concern_ref / append_decision |
| 对话 on_stop → evaluate() | 对话 async context | 每次对话结束 | 同上 |
| 定时扫描 → evaluate() | daemon 主线程(每 5 轮 ~300s) | 定时 | 同上 |
三个入口可能同时处理涉及同一人的不同 Signal,导致:
- 重复创建 watch_item(同一话题两条)
- 重复 promote watch → concern(两个 Signal 同时触发升级)
- focus_queue 排序冲突
8.2 Agenda 原子操作规范
Agenda 单文档(_id: "agenda_qinghai")的所有写入必须使用 MongoDB 原子操作:
| 操作 | MongoDB 命令 | 原子性保证 |
|---|---|---|
| 添加 watch_item | findOneAndUpdate + $push + 唯一性前置查询 | 单文档原子 |
| 移除 watch_item | updateOne + $pull: {"watch_items": {"watch_id": x}} | 单文档原子 |
| promote watch→concern | findOneAndUpdate 条件含 watch_id 存在性 + $pull + $push | 单次原子,CAS 语义 |
| 添加 concern_ref | updateOne + $push + 前置去重查询 | 单文档原子 |
| 移除 concern_ref | updateOne + $pull | 单文档原子 |
| 追加 decision | updateOne + $push + $slice: -100 | 单文档原子,自动滚动窗口 |
| 重排 focus_queue | findOneAndUpdate + $set: {"focus_queue": [...]} | 单文档原子,整体覆盖 |
promote 原子实现
promote 是并发冲突最敏感的操作。两个 Signal 同时触发同一 watch 的 promote 时,必须保证只有一个成功。
result = db.agenda.find_one_and_update(
{
"_id": "agenda_qinghai",
"watch_items.watch_id": watch_id, # 条件:watch 仍存在
},
{
"$pull": {"watch_items": {"watch_id": watch_id}},
"$push": {"concern_refs": concern_ref_doc},
"$set": {"updated_at": now()},
},
return_document=ReturnDocument.AFTER,
)
if result is None:
# watch 已被另一个 evaluate 先行 promote,跳过
return条件中包含 watch_id 存在性检查。第二个到达的 promote 因条件不满足而自动 no-op(CAS 语义)。
8.3 evaluate() 串行化策略
方案:per-person asyncio.Lock + Agenda 乐观并发
class CognitiveCore:
def __init__(self):
self._person_locks: dict[str, asyncio.Lock] = {}
self._locks_lock = asyncio.Lock()
async def _get_person_lock(self, person_id: str) -> asyncio.Lock:
async with self._locks_lock:
if person_id not in self._person_locks:
self._person_locks[person_id] = asyncio.Lock()
return self._person_locks[person_id]
async def evaluate(self, signal: Signal) -> EvaluateResult:
primary_person = signal.person_refs[0] if signal.person_refs else "_global"
lock = await self._get_person_lock(primary_person)
async with lock:
return await self._evaluate_inner(signal)为什么 per-person 而非全局
- 大部分 Signal 关联不同人,全局锁会让无关 Signal 互相等待
- 同一人的多个 Signal(如同时收到对话承诺 + 事件信号)需要串行,避免重复 watch/concern
- Agenda 写入本身通过 MongoDB 原子操作保证,不需要应用层全局锁
跨人 Signal 处理
当 Signal 的 person_refs 含多人时,取第一个为锁定对象。Agenda 原子操作保证跨人写入不会出现不一致。
锁清理
_person_locks 字典随时间增长,但 person 数量有限(公司人数),不需要主动清理。如需优化,可定期清除 30 分钟无竞争的锁。
8.4 Concern 调度去重
当前问题:scan_due_concerns() 可能在多轮扫描中重复捞到同一 concern。
解决方案:原子 CAS 标记
def process_concern(self, concern_id: str, trigger: str):
# 原子标记:正在处理
result = db.concerns.find_one_and_update(
{
"concern_id": concern_id,
"$or": [
{"processing_at": None},
{"processing_at": {"$lt": now() - timedelta(minutes=10)}},
],
},
{"$set": {"processing_at": now()}},
return_document=ReturnDocument.AFTER,
)
if result is None:
return # 已有另一个扫描在处理
try:
await self._run_agent_loop(result, trigger)
finally:
db.concerns.update_one(
{"concern_id": concern_id},
{"$set": {"processing_at": None}},
)10 分钟超时兜底:防止 agent loop 崩溃后 concern 永久锁定。
8.5 watch_item 去重
添加 watch_item 前,先查是否已有同人同主题的 watch:
def add_watch_item(self, topic, person_refs, ...):
existing = find_matching_watch(person_refs, topic_keywords)
if existing:
_append_evidence(existing["watch_id"], signal.fact_refs)
return existing["watch_id"]
# 原子添加新 watch_item
...find_matching_watch 使用 person_refs 精确匹配 + topic 关键词交集判断。由于 evaluate() 已 per-person 串行,同人的 watch_item 去重不存在竞态。
8.6 daemon 集成约束
三个入口必须共享同一个 CognitiveCore 实例和同一套 per-person lock。
| 入口 | 投递方式 | 锁共享 |
|---|---|---|
| awareness_loop 中的 evaluate() | 直接在 async loop 中调用 | 共享 CognitiveCore 实例 |
| 对话 on_stop 中的 evaluate() | asyncio.run_coroutine_threadsafe() 投递到 awareness_loop | 共享 CognitiveCore 实例 |
| 定时扫描中的 evaluate() | asyncio.run_coroutine_threadsafe() 投递到 awareness_loop | 共享 CognitiveCore 实例 |
关键:所有 evaluate() 调用都汇聚到同一个 async event loop(awareness_loop),确保 per-person lock 在同一个 loop 内生效。
九、渐进式迁移
Phase 1:建基础设施,不改现有逻辑
目标:Agenda 集合可用,evaluate 骨架可调用,现有 3 入口不变。
- 新建
scripts/cognitive/包:signal.py+agenda.py+core.py agenda.py实现完整 CRUD(get_agenda / add_watch_item / append_decision 等)core.py实现evaluate()骨架,内部先只做append_decision()+ 透传到现有逻辑- 现有 3 个入口在执行完原逻辑后,额外调用
agenda.append_decision()写入决策记录 - awareness 动作分发结果写回 Agenda / Fact,确保 create_concern 路径可追踪
hooks._detect_followup_concerns()创建 concern 后,额外调用agenda.add_concern_ref()
验证点:Agenda 文档中能看到所有 concern 创建和推进的决策记录。
Phase 2:统一 evaluate 入口
目标:3 个入口全部改为调 cognitive.evaluate()。
- awareness
handle_event()改为先写 Fact,再把signal_payload写回该 Fact 并调用evaluate_fact() - 发送消息、创建/推进 concern 的真正执行逻辑全部收口到
core.py - hooks
_detect_followup_concerns()改为产出 Signal + 调 evaluate scan_absence()改为写 absence_detection Fact + 调evaluate_fact()- evaluate 内部实现去重合并(
find_matching_watch+ concern 查询) - evaluate 内部实现 watch_item 缓冲(低置信度信号先 watch,不直接建 concern)
验证点:只有 cognitive.evaluate() / cognitive.evaluate_fact() 这条统一认知链才能创建 concern,3 个旧入口不再直接调 raise_concern()。
Phase 3:收口旧逻辑
目标:workspace 从 Agenda 真源投影,旧 heartbeat 逻辑退役。
workspace.py的attention_watchlist改为从Agenda.watch_items + active concern_refs投影生成workspace.py的working_summary改为从Agenda.focus_queue + Concern summaries + recent Facts汇总生成- 实现
refresh_workspace_cache(),由 evaluate 写入后调用;workspace 集合只保留最小运行时字段,不再镜像attention_watchlist/recent_observations - awareness 不再直接执行发送/建 concern 动作
验证点:workspace 文档完全是派生缓存,删掉后系统功能不受影响。
十、接口边界
与概念文档第十三节定义的 5 个接口对齐:
| 概念接口 | 实现位置 | 签名 |
|---|---|---|
evaluate_fact(fact_id) | cognitive/core.py | async def evaluate_fact(fact_id: str) -> EvaluateResult — 从 Fact 出发构造 Signal,调 evaluate |
evaluate_signal(signal) | cognitive/core.py | async def evaluate(signal: Signal) -> EvaluateResult — 统一评估主入口 |
match_existing_agenda(...) | cognitive/core.py 内部 | def _match_existing_agenda(signal: Signal) -> AgendaMatch — 匹配 watch / concern |
apply_agenda_decision(...) | cognitive/core.py 内部 | async def _apply_decision(decision: EvaluateResult, signal: Signal) -> None — 写入并执行 |
refresh_workspace_cache() | cognitive/core.py | def refresh_workspace_cache() -> None — 从真源重建 workspace |
其中 evaluate_fact 是 evaluate 的上层便捷方法:加载 Fact → 读取 signal_payload 或按 Fact 类型补建 Signal → 调 evaluate(signal)。当前 awareness / scan_absence 主链都优先走 evaluate_fact();显式构造 Signal 的入口主要保留给 direct/hooks.py 这类非事件型调用。
十一、当前原则
- 统一入口 — 所有认知判断必须经过
cognitive.evaluate()或cognitive.evaluate_fact(),不允许旁路创建 concern - Fact / Signal 双入口,核心单出口 — 外围入口可以显式构造 Signal 调
evaluate(signal),也可以先写 Fact 再调evaluate_fact(fact_id);但最终都必须落到同一个认知裁决主循环 - Agenda 先行 — 低置信度信号先进 watch_item,不直接建 concern
- 规则优先 — 能用规则判断的不调 LLM,降低成本
- 去重合并 — 同人、同主题、同时间窗口的信号优先合并到已有 watch / concern
- 决策可追溯 — 每次 evaluate 都写入
recent_decisions,含 fact_refs 和 reason - 渐进迁移 — 分 3 阶段收口,每阶段可独立验证,不一次性重写
- 降级兜底 — 不再保留
scan_heartbeat()或其他旧兼容路径,awareness/cognitive 出现问题必须靠监控告警与人工恢复 - 不破坏对话 — evaluate 在对话结束后异步执行,不阻塞回复链路
- workspace 是缓存 — 从 Agenda + Concern + Fact 真源投影,不作为判断依据