1use super::traits::{Memory, MemoryCategory, MemoryEntry, ProceduralMessage};
8use async_trait::async_trait;
9use chrono::Local;
10use parking_lot::Mutex;
11use rusqlite::{Connection, params};
12use std::path::Path;
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Copy)]
17pub enum AuditOp {
18 Store,
19 Recall,
20 Get,
21 List,
22 Forget,
23 Purge,
24 StoreProcedural,
25}
26
27impl std::fmt::Display for AuditOp {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 Self::Store => write!(f, "store"),
31 Self::Recall => write!(f, "recall"),
32 Self::Get => write!(f, "get"),
33 Self::List => write!(f, "list"),
34 Self::Forget => write!(f, "forget"),
35 Self::Purge => write!(f, "purge"),
36 Self::StoreProcedural => write!(f, "store_procedural"),
37 }
38 }
39}
40
41pub struct AuditedMemory<M: Memory> {
43 inner: M,
44 audit_conn: Arc<Mutex<Connection>>,
45}
46
47impl<M: Memory> ::zeroclaw_api::attribution::Attributable for AuditedMemory<M> {
48 fn role(&self) -> ::zeroclaw_api::attribution::Role {
49 self.inner.role()
50 }
51 fn alias(&self) -> &str {
52 self.inner.alias()
53 }
54}
55
56impl<M: Memory> AuditedMemory<M> {
57 pub fn new(inner: M, workspace_dir: &Path) -> anyhow::Result<Self> {
58 let db_path = workspace_dir.join("memory").join("audit.db");
59 if let Some(parent) = db_path.parent() {
60 std::fs::create_dir_all(parent)?;
61 }
62
63 let conn = Connection::open(&db_path)?;
64 conn.execute_batch(
65 "PRAGMA journal_mode = WAL;
66 PRAGMA synchronous = NORMAL;
67 CREATE TABLE IF NOT EXISTS memory_audit (
68 id INTEGER PRIMARY KEY AUTOINCREMENT,
69 operation TEXT NOT NULL,
70 key TEXT,
71 namespace TEXT,
72 session_id TEXT,
73 timestamp TEXT NOT NULL,
74 metadata TEXT
75 );
76 CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON memory_audit(timestamp);
77 CREATE INDEX IF NOT EXISTS idx_audit_operation ON memory_audit(operation);",
78 )?;
79
80 Ok(Self {
81 inner,
82 audit_conn: Arc::new(Mutex::new(conn)),
83 })
84 }
85
86 fn log_audit(
87 &self,
88 op: AuditOp,
89 key: Option<&str>,
90 namespace: Option<&str>,
91 session_id: Option<&str>,
92 metadata: Option<&str>,
93 ) {
94 let conn = self.audit_conn.lock();
95 let now = Local::now().to_rfc3339();
96 let op_str = op.to_string();
97 let _ = conn.execute(
98 "INSERT INTO memory_audit (operation, key, namespace, session_id, timestamp, metadata)
99 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
100 params![op_str, key, namespace, session_id, now, metadata],
101 );
102 }
103
104 pub fn prune_older_than(&self, retention_days: u32) -> anyhow::Result<u64> {
106 let conn = self.audit_conn.lock();
107 let cutoff =
108 (Local::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
109 let affected = conn.execute(
110 "DELETE FROM memory_audit WHERE timestamp < ?1",
111 params![cutoff],
112 )?;
113 Ok(u64::try_from(affected).unwrap_or(0))
114 }
115
116 pub fn audit_count(&self) -> anyhow::Result<usize> {
118 let conn = self.audit_conn.lock();
119 let count: i64 =
120 conn.query_row("SELECT COUNT(*) FROM memory_audit", [], |row| row.get(0))?;
121 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
122 Ok(count as usize)
123 }
124}
125
126#[async_trait]
127impl<M: Memory> Memory for AuditedMemory<M> {
128 fn name(&self) -> &str {
129 self.inner.name()
130 }
131
132 async fn store(
133 &self,
134 key: &str,
135 content: &str,
136 category: MemoryCategory,
137 session_id: Option<&str>,
138 ) -> anyhow::Result<()> {
139 self.log_audit(AuditOp::Store, Some(key), None, session_id, None);
140 self.inner.store(key, content, category, session_id).await
141 }
142
143 async fn recall(
144 &self,
145 query: &str,
146 limit: usize,
147 session_id: Option<&str>,
148 since: Option<&str>,
149 until: Option<&str>,
150 ) -> anyhow::Result<Vec<MemoryEntry>> {
151 self.log_audit(
152 AuditOp::Recall,
153 None,
154 None,
155 session_id,
156 Some(&format!("query={query}")),
157 );
158 self.inner
159 .recall(query, limit, session_id, since, until)
160 .await
161 }
162
163 async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>> {
164 self.log_audit(AuditOp::Get, Some(key), None, None, None);
165 self.inner.get(key).await
166 }
167
168 async fn get_for_agent(
169 &self,
170 key: &str,
171 agent_id: &str,
172 ) -> anyhow::Result<Option<MemoryEntry>> {
173 self.log_audit(
174 AuditOp::Get,
175 Some(key),
176 None,
177 None,
178 Some(&format!("agent_id={agent_id}")),
179 );
180 self.inner.get_for_agent(key, agent_id).await
181 }
182
183 async fn list(
184 &self,
185 category: Option<&MemoryCategory>,
186 session_id: Option<&str>,
187 ) -> anyhow::Result<Vec<MemoryEntry>> {
188 self.log_audit(AuditOp::List, None, None, session_id, None);
189 self.inner.list(category, session_id).await
190 }
191
192 async fn forget(&self, key: &str) -> anyhow::Result<bool> {
193 self.log_audit(AuditOp::Forget, Some(key), None, None, None);
194 self.inner.forget(key).await
195 }
196
197 async fn forget_for_agent(&self, key: &str, agent_id: &str) -> anyhow::Result<bool> {
198 self.log_audit(
199 AuditOp::Forget,
200 Some(key),
201 None,
202 None,
203 Some(&format!("agent_id={agent_id}")),
204 );
205 self.inner.forget_for_agent(key, agent_id).await
206 }
207
208 async fn purge_session_for_agent(
209 &self,
210 session_id: &str,
211 agent_id: &str,
212 ) -> anyhow::Result<usize> {
213 self.log_audit(
214 AuditOp::Purge,
215 None,
216 None,
217 Some(session_id),
218 Some(&format!("agent_id={agent_id}")),
219 );
220 self.inner
221 .purge_session_for_agent(session_id, agent_id)
222 .await
223 }
224
225 async fn count(&self) -> anyhow::Result<usize> {
226 self.inner.count().await
227 }
228
229 async fn health_check(&self) -> bool {
230 self.inner.health_check().await
231 }
232
233 async fn store_procedural(
234 &self,
235 messages: &[ProceduralMessage],
236 session_id: Option<&str>,
237 ) -> anyhow::Result<()> {
238 self.log_audit(
239 AuditOp::StoreProcedural,
240 None,
241 None,
242 session_id,
243 Some(&format!("messages={}", messages.len())),
244 );
245 self.inner.store_procedural(messages, session_id).await
246 }
247
248 async fn recall_namespaced(
249 &self,
250 namespace: &str,
251 query: &str,
252 limit: usize,
253 session_id: Option<&str>,
254 since: Option<&str>,
255 until: Option<&str>,
256 ) -> anyhow::Result<Vec<MemoryEntry>> {
257 self.log_audit(
258 AuditOp::Recall,
259 None,
260 Some(namespace),
261 session_id,
262 Some(&format!("query={query}")),
263 );
264 self.inner
265 .recall_namespaced(namespace, query, limit, session_id, since, until)
266 .await
267 }
268
269 async fn store_with_metadata(
270 &self,
271 key: &str,
272 content: &str,
273 category: MemoryCategory,
274 session_id: Option<&str>,
275 namespace: Option<&str>,
276 importance: Option<f64>,
277 ) -> anyhow::Result<()> {
278 self.log_audit(AuditOp::Store, Some(key), namespace, session_id, None);
279 self.inner
280 .store_with_metadata(key, content, category, session_id, namespace, importance)
281 .await
282 }
283
284 async fn store_with_agent(
285 &self,
286 key: &str,
287 content: &str,
288 category: MemoryCategory,
289 session_id: Option<&str>,
290 namespace: Option<&str>,
291 importance: Option<f64>,
292 agent_id: Option<&str>,
293 ) -> anyhow::Result<()> {
294 self.log_audit(AuditOp::Store, Some(key), namespace, session_id, None);
295 self.inner
296 .store_with_agent(
297 key, content, category, session_id, namespace, importance, agent_id,
298 )
299 .await
300 }
301
302 async fn recall_for_agents(
303 &self,
304 allowed_agent_ids: &[&str],
305 query: &str,
306 limit: usize,
307 session_id: Option<&str>,
308 since: Option<&str>,
309 until: Option<&str>,
310 ) -> anyhow::Result<Vec<MemoryEntry>> {
311 self.log_audit(
312 AuditOp::Recall,
313 None,
314 None,
315 session_id,
316 Some(&format!("query={query}")),
317 );
318 self.inner
319 .recall_for_agents(allowed_agent_ids, query, limit, session_id, since, until)
320 .await
321 }
322
323 async fn ensure_agent_uuid(&self, alias: &str) -> anyhow::Result<String> {
324 self.inner.ensure_agent_uuid(alias).await
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::none::NoneMemory;
332 use tempfile::TempDir;
333
334 #[tokio::test]
335 async fn audited_memory_logs_store_operation() {
336 let tmp = TempDir::new().unwrap();
337 let inner = NoneMemory::new("none");
338 let audited = AuditedMemory::new(inner, tmp.path()).unwrap();
339
340 audited
341 .store("test_key", "test_value", MemoryCategory::Core, None)
342 .await
343 .unwrap();
344
345 assert_eq!(audited.audit_count().unwrap(), 1);
346 }
347
348 #[tokio::test]
349 async fn audited_memory_logs_recall_operation() {
350 let tmp = TempDir::new().unwrap();
351 let inner = NoneMemory::new("none");
352 let audited = AuditedMemory::new(inner, tmp.path()).unwrap();
353
354 let _ = audited.recall("query", 10, None, None, None).await;
355
356 assert_eq!(audited.audit_count().unwrap(), 1);
357 }
358
359 #[tokio::test]
360 async fn audited_memory_prune_works() {
361 let tmp = TempDir::new().unwrap();
362 let inner = NoneMemory::new("none");
363 let audited = AuditedMemory::new(inner, tmp.path()).unwrap();
364
365 audited
366 .store("k1", "v1", MemoryCategory::Core, None)
367 .await
368 .unwrap();
369
370 let pruned = audited.prune_older_than(0).unwrap();
372 let _ = pruned;
375 }
376
377 #[tokio::test]
378 async fn audited_memory_delegates_correctly() {
379 let tmp = TempDir::new().unwrap();
380 let inner = NoneMemory::new("none");
381 let audited = AuditedMemory::new(inner, tmp.path()).unwrap();
382
383 assert_eq!(audited.name(), "none");
384 assert!(audited.health_check().await);
385 assert_eq!(audited.count().await.unwrap(), 0);
386 }
387}