TSPR层:状态表示与SAL更新(伪代码实现)
1.1 状态向量结构(Python dataclass)
python
from dataclasses import dataclass, field from typing import Dict, List import numpy as np @dataclass class UserState: user_id: str intent_probs: np.ndarray # shape (num_intents,), 例如 [0.2,0.7,0.1] profile: Dict[str, float] # 兴趣标签权重 {"sports":0.8, "tech":0.3} session_cnt: int = 0 last_active_ts: int = 0 @dataclass class EnvState: time_of_day: int # 0-23 device_type: str # "mobile"/"pc" network_quality: float # 0-1 @dataclass class TaskState: current_task: str # "recommend"/"qa"/"control" task_progress: float # 0-1 pending_actions: List[str] = field(default_factory=list) @dataclass class GlobalState: user: UserState env: EnvState task: TaskState version: int = 0 # 用于乐观锁更新
1.2 SAL实时更新函数(基于Flink KeyedProcessFunction)
java
// Java伪代码,实际可用PyFlink public class StateUpdateFunction extends KeyedProcessFunction<String, FeedbackEvent, StateUpdateResult> { private transient ValueState<GlobalState> stateDescriptor; @Override public void processElement(FeedbackEvent event, Context ctx, Collector<StateUpdateResult> out) { GlobalState oldState = stateDescriptor.value(); GlobalState newState = updateState(oldState, event); // 写Redis(异步) redisClient.hset("state:" + event.userId, newState); // 发布状态变更事件给下游(HIC/ACTION) out.collect(StateUpdateResult.of(event.userId, newState)); stateDescriptor.update(newState); } private GlobalState updateState(GlobalState s, FeedbackEvent e) { // 贝叶斯更新意图概率 float[] newIntentProbs = bayesianUpdate(s.user.intent_probs, e.observedIntentLikelihood); s.user.intent_probs = newIntentProbs; // 更新用户兴趣(指数衰减+增量) for (String tag : e.interactedTags) { s.user.profile[tag] = s.user.profile.getOrDefault(tag, 0.0) * 0.9 + 0.1; } s.user.session_cnt += 1; // 环境状态:直接覆盖 s.env.network_quality = e.networkQuality; // 任务进度更新 if (e.actionExecuted != null) { s.task.task_progress = Math.min(1.0, s.task.task_progress + 0.1); } s.version++; return s; } }
1.3 Redis状态存储格式
bash
# Key: state:{user_id} # Hash field: global (整个JSON) # 同时存储独立字段便于快速读取 HSET state:user123 global '{"user":{...},"env":{...},"task":{...},"version":5}' HSET state:user123 intent_probs "0.2,0.7,0.1" HSET state:user123 profile '{"sports":0.8,"tech":0.3}'
二、HIC层:规则引擎实现(OPA + REL演化)
2.1 OPA规则示例(Rego语言)
rego
package hic # 硬规则:禁止危险动作 default deny = false deny { input.action.type == "control_robot" input.action.params.speed > 0.5 msg = "Speed exceeds safety limit" } # 软规则:个性化推荐加权 default recommend_weight = 1.0 recommend_weight = weight { input.user.profile.sports > 0.6 input.action.params.category == "sports" weight = 1.5 } recommend_weight = weight { input.user.intent_probs[2] > 0.8 # 假设index2为"purchase" input.action.type == "recommend" weight = 1.2 } # 最终决策分数 = LLM原始分 * recommend_weight,并检查deny
2.2 REL演化伪代码(离线训练+安全上线)
python
# 离线训练:基于历史反馈优化规则权重 def rel_offline_train(feedback_dataset): # 使用贝叶斯优化或Bandit算法 from skopt import gp_minimize def objective(weights): # 模拟使用新权重在历史数据上的累计奖励 return - simulate_reward(weights, feedback_dataset) # 初始权重(当前线上规则权重) initial_weights = [1.0, 1.2, 0.8] res = gp_minimize(objective, [(-0.5, 2.0)]*len(initial_weights), n_calls=50) new_weights = res.x return new_weights # 安全上线流程 def rel_safe_deploy(new_rules_or_weights): # 1. 沙盒验证:用最近1天数据回测,检查指标不下降(p<0.05) if not sandbox_backtest(new_rules_or_weights): logging.error("Sandbox failed, abort deploy") return False # 2. 人工审核(对于权重变化>0.3或新增规则) if need_human_review(new_rules_or_weights): if not human_approval(): return False # 3. 灰度发布:5%流量,监控核心KPI(转化率、错误率) for traffic in [0.05, 0.20, 0.50, 1.0]: deploy_to_traffic(traffic, new_rules_or_weights) if not observe_kpi_stable(hours=2): rollback() return False return True
三、双环协同:Kafka消息定义
3.1 Topic规划
| Topic名称 | 消息类型 | 分区键 | 保留时间 |
|---|---|---|---|
raw_feedback |
原始反馈(用户行为、系统日志) | user_id | 7天 |
filtered_feedback |
质量过滤后的反馈 | user_id | 30天 |
state_update |
SAL产生的状态变更事件 | user_id | 3天 |
rule_update |
REL产生的规则变更(JSON diff) | rule_id | 永久(压缩) |
3.2 消息Schema示例(Avro)
avro
{
"type": "record",
"name": "Feedback",
"fields": [
{"name": "feedback_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "action_id", "type": "string"},
{"name": "reward", "type": "float"},
{"name": "quality_score", "type": "float", "default": 1.0},
{"name": "timestamp", "type": "long"}
]
}
3.3 双环协同流处理(Flink作业)
sql
-- 从原始反馈过滤低质量 INSERT INTO filtered_feedback SELECT * FROM raw_feedback WHERE quality_score > 0.3; -- SAL: 更新状态(使用自定义UDAF) INSERT INTO state_update SELECT user_id, update_state(collect_list(*)) FROM filtered_feedback GROUP BY user_id; -- REL: 每隔1000条反馈触发一次规则优化(窗口聚合) SELECT rule_update_job(collect_list(*)) FROM filtered_feedback WHERE HOP_START(rowtime, INTERVAL '1' HOUR) % INTERVAL '1' DAY = 0;
四、Kubernetes部署架构(生产级)
4.1 命名空间与服务
yaml
# namespace: taios apiVersion: v1 kind: Namespace metadata: name: taios --- # API Gateway (Kong/Ingress) apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: taios-gateway namespace: taios spec: rules: - host: api.taios.example.com http: paths: - path: /v1/chat backend: service: name: web-layer port: 8080
4.2 核心微服务Deployment示例(LLM层)
yaml
apiVersion: apps/v1 kind: Deployment metadata: name: llm-gateway namespace: taios spec: replicas: 3 selector: matchLabels: app: llm-gateway template: metadata: labels: app: llm-gateway spec: containers: - name: main image: taios/llm-gateway:v1.0 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: llm-secrets key: openai-key - name: MODEL_ROUTING_CONFIG value: "/config/routing.yaml" resources: limits: cpu: "2" memory: "4Gi" livenessProbe: httpGet: path: /health port: 8080 volumeMounts: - name: config mountPath: /config volumes: - name: config configMap: name: llm-routing-config
4.3 状态层StatefulSet(Redis Cluster)
yaml
apiVersion: apps/v1 kind: StatefulSet metadata: name: redis-cluster namespace: taios spec: serviceName: redis-cluster replicas: 6 selector: matchLabels: app: redis-cluster template: metadata: labels: app: redis-cluster spec: containers: - name: redis image: redis:7-alpine command: ["redis-server"] args: ["--cluster-enabled", "yes", "--appendonly", "yes"] ports: - containerPort: 6379 volumeMounts: - name: data mountPath: /data volumeClaimTemplates: - metadata: name: data spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 100Gi
五、监控与可观测性(Prometheus + Grafana)
5.1 关键指标
yaml
# 暴露的Metrics - taios_llm_request_duration_seconds (histogram) - taios_hic_rule_evaluation_total (counter, with rule_id, result) - taios_sal_state_update_latency_ms (histogram) - taios_rel_rule_deploy_total (counter, with status) - taios_feedback_quality_score (gauge)
5.2 告警规则示例
yaml
groups: - name: taios_alerts rules: - alert: LLMHighLatency expr: histogram_quantile(0.99, taios_llm_request_duration_seconds) > 5 for: 5m annotations: summary: "LLM p99 latency > 5s" - alert: RELRuleRollback expr: increase(taios_rel_rule_deploy_total{status="rollback"}[1h]) > 0 annotations: summary: "Rule deployment rolled back due to KPI drop"
六、下一步行动建议
-
立即开始:搭建MVP(阶段1)– 使用Docker Compose本地运行WEB+LLM+HIC静态规则。
-
一周内:完成TSPR状态服务(Redis + 简单SAL Python服务)。
-
两周内:集成Kafka + Flink实现实时双环。
-
持续迭代:建立规则沙盒环境与A/B测试平台。
如果您需要某个组件的完整代码仓库结构(如 llm-gateway 的Python FastAPI实现)、CI/CD流水线配置(GitHub Actions/Jenkins)、或者详细的API文档模板,请进一步说明。