Cognitive Memory System Setup & Test Scenarios

Overview

This document outlines the architecture and behaviour of the Cognitive Memory System, designed to replicate human-like memory functions for real-time AI contextual awareness. The system uses a multi-tiered approach to manage snapshots of audio-visual and semantic data, with memory decaying or being promoted based on relevance and usage.

Memory System Setup

The Cognitive Memory System uses three core components:
1. memory_snapshot.py: Defines structured snapshots capturing the current state.

 

from dataclasses import dataclass, field
from typing import Optional, Any
import time

@dataclass
class MemorySnapshot:
    timestamp: float
    location: Optional[str] = None
    scene: Optional[str] = None
    people: list[dict[str, Any]] = field(default_factory=list)
    objects: list[str] = field(default_factory=list)
    speech: list[dict[str, str]] = field(default_factory=list)
    audio_cues: list[str] = field(default_factory=list)
    relevance_score: float = 0.5
    pointers: list[str] = field(default_factory=list)
    context_id: Optional[str] = None  # semantic thread identifier
    embedding: Optional[list[float]] = None  # vector for semantic similarity
    id: Optional[str] = None  # UUID or hash for persistence

    def is_relevant(self, threshold=0.6) -> bool:
        return self.relevance_score >= threshold

    def update_relevance(self, delta: float):
        self.relevance_score = max(0.0, min(1.0, self.relevance_score + delta))


2. memory_tiers.py: Handles buffering, decay, and tiered memory logic.

 

from collections import deque
from typing import Deque
from .memory_snapshot import MemorySnapshot
import time

class CognitiveMemoryTier:
    def __init__(self, name: str, max_length: int, decay_seconds: float):
        self.name = name
        self.buffer: Deque[MemorySnapshot] = deque()
        self.max_length = max_length
        self.decay_seconds = decay_seconds

    def add_snapshot(self, snapshot: MemorySnapshot):
        self.buffer.append(snapshot)
        if len(self.buffer) > self.max_length:
            self.buffer.popleft()

    def get_recent(self):
        return list(self.buffer)

    def prune(self):
        now = time.time()
        self.buffer = deque([
            snap for snap in self.buffer
            if now - snap.timestamp < self.decay_seconds or snap.is_relevant()
        ])

    def promote(self, condition_fn) -> list[MemorySnapshot]:
        promoted = []
        for snap in list(self.buffer):
            if condition_fn(snap):
                promoted.append(snap)
                self.buffer.remove(snap)
        return promoted


3. memory_manager.py: Oversees ingestion, pruning, and inter-tier movement.

 

from .memory_tiers import CognitiveMemoryTier
from .memory_snapshot import MemorySnapshot
from .semantic_cluster_manager import SemanticClusterManager
from inference_pipeline.core.utils import merge_into
import time
import threading

class CognitiveMemoryManager:
    def __init__(self, cluster_manager: SemanticClusterManager):
        self.fast_memory = CognitiveMemoryTier("fast", max_length=100, decay_seconds=10)
        self.working_memory = CognitiveMemoryTier("working", max_length=500, decay_seconds=300)
        self.deprectiated_working_memory = CognitiveMemoryTier("depreciated_working", max_length=2000, decay_seconds=1800)
        self.mid_term_memory = CognitiveMemoryTier("mid_term", max_length=4000, decay_seconds=5400)
        self.cluster_manager = cluster_manager
        self.recent_snapshots = []
        self.max_recent = 50  # limit context window for clustering

        self.frame_memory = {}
        self.action_backlog = []
        self.max_frame_history = 60 #seconds

        self.lock = threading.Lock()

    def add_frame_data(self, ts, data):
        with self.lock:
            self.frame_memory[ts] = data
            self._expire_old_frames()

    def add_delayed_action(self, action_list):
        """Accepts a list of {timestamp, persons[]} results"""
        with self.lock:
            self.action_backlog.extend(action_list)
            self._merge_backlog()

    def _merge_backlog(self):
        new_backlog = []
        for item in self.action_backlog:
            ts = item["timestamp"]
            if ts in self.frame_memory:
                merge_into(self.frame_memory[ts], item, "ActionDetectionModule")
            else:
                new_backlog.append(item)
        self.action_backlog = new_backlog

    def get_recent_memory(self):
        with self.lock:
            return dict(self.frame_memory)

    def _expire_old_frames(self):
        now = time.time()
        self.frame_memory = {
            ts: data for ts, data in self.frame_memory.items()
            if now - ts <= self.max_frame_history
        }

    def reset(self):
        with self.lock:
            self.frame_memory.clear()
            self.action_backlog.clear()    

    def ingest_snapshot(self, snapshot: MemorySnapshot):
        if snapshot.embedding:
            self.cluster_manager.assign_context(snapshot, self.recent_snapshots)

        self.fast_memory.add_snapshot(snapshot)
        self.recent_snapshots.append(snapshot)
        if len(self.recent_snapshots) > self.max_recent:
            self.recent_snapshots.pop(0)

    def cycle_memory(self):
        # Move snapshots to working memory if still relevant or recently accessed
        promoted = self.fast_memory.promote(lambda s: s.is_relevant())
        for snap in promoted:
            self.working_memory.add_snapshot(snap)

        self.fast_memory.prune()
        self.working_memory.prune()

    def get_context(self):
        return {
            "fast": self.fast_memory.get_recent(),
            "working": self.working_memory.get_recent()
        }

    def elevate_from_long_term(self, snapshot: MemorySnapshot):
        # This would be called when querying from SQL or FAISS
        self.working_memory.add_snapshot(snapshot)

4. semantic_cluster_manager.py

 

import uuid
import numpy as np
from typing import List, Optional
from sklearn.metrics.pairwise import cosine_similarity
from .memory_snapshot import MemorySnapshot

class SemanticClusterManager:
    def __init__(self, similarity_threshold: float = 0.65):
        self.similarity_threshold = similarity_threshold
        self.active_threads = {}  # context_id -> list of embeddings

    def _generate_context_id(self) -> str:
        return str(uuid.uuid4())

    def assign_context(self, snapshot: MemorySnapshot, recent_snapshots: List[MemorySnapshot]) -> str:
        if not snapshot.embedding:
            return self._generate_context_id()

        max_similarity = 0
        best_context_id: Optional[str] = None

        for past in recent_snapshots:
            if past.embedding and past.context_id:
                sim = cosine_similarity(
                    [snapshot.embedding], [past.embedding]
                )[0][0]

                if sim > max_similarity and sim >= self.similarity_threshold:
                    max_similarity = sim
                    best_context_id = past.context_id

        if best_context_id:
            # Extend existing thread
            snapshot.context_id = best_context_id
            self.active_threads[best_context_id].append(snapshot.embedding)
        else:
            # Create a new thread
            new_id = self._generate_context_id()
            snapshot.context_id = new_id
            self.active_threads[new_id] = [snapshot.embedding]

        return snapshot.context_id

    def summarize_thread(self, context_id: str) -> Optional[np.ndarray]:
        # Returns mean embedding as thread summary
        embeddings = self.active_threads.get(context_id)
        if embeddings:
            return np.mean(embeddings, axis=0)
        return None

Example usage:

 

import time
from modules.cognitive_memory.memory_manager import CognitiveMemoryManager
from modules.cognitive_memory.memory_snapshot import MemorySnapshot

memory = CognitiveMemoryManager()

snapshot = MemorySnapshot(
    timestamp=time.time(),
    scene="cafe",
    people=[{"name": "Michelle", "emotion": "happy"}],
    objects=["cup", "cat"],
    speech=[{"speaker": "Michelle", "text": "Did you see that cat?", "sentiment": "curious"}]
)

memory.ingest_snapshot(snapshot)
memory.cycle_memory()  # Prune + promote as needed

context = memory.get_context()

Tiered Memory Architecture

Memory is organized into the following tiers:
- Fast Memory: Stored in GPU or high-speed RAM, retains data for ~10 seconds.
- Working Memory: Stored in CPU RAM, retains data for ~5 minutes.
- Long-Term Memory (planned): Stored in persistent SQL and FAISS, holds memory indefinitely.

Snapshots are tagged with relevance scores (0.0 to 1.0). A snapshot remains in a tier if it is actively referenced or remains contextually important. Otherwise, it decays and is either promoted or demoted to a different tier.

 

Promotion and Demotion Logic

Snapshots are evaluated in each memory cycle. The logic is as follows:
- If a snapshot in Fast Memory has a high relevance score (e.g., >= 0.6), it is promoted to Working Memory.
- If a snapshot in any tier is unreferenced beyond its decay window and has low relevance, it is pruned or demoted.
- Upon reference via keyword or identity (e.g., speaker name, object reappearance), the memory manager retrieves the relevant memory from lower tiers and re-inserts it into Fast or Working Memory.

 

Example:
- A cat walks past briefly. Snapshot is added to Fast Memory.
- No follow-up relevance. It is demoted to Long-Term with a pointer left in Working Memory.
- Later someone says, 'Did you see that cat?'
- The pointer triggers retrieval from Long-Term Memory and elevates it for active use.

 

Pointer System

Pointers are lightweight references stored in higher tiers that point to detailed snapshots in lower tiers. They include a summary, memory ID, and timestamp. These are retained in Working Memory even after the original snapshot is moved to persistent storage.

Example Pointer:
{
  "pointer": "memory_id:catwalk123",
  "summary": "A cat walked by unnoticed",
  "stored_at": "psql",
  "last_referenced": 1681023904
}

Test Scenarios & Evaluation

Scenario 1: Temporary Conversation Interruption

The user tells the AI 'hold on a minute' and takes a 2-minute unrelated call. Upon return, they say 'sorry, where were we?'

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

 

Scenario 2: Multi-Party Interaction with AI Input

Two people converse while occasionally addressing the AI for feedback or questions.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

 

Scenario 3: Relevance Resurfacing

A bird is briefly noted. Minutes later, someone refers to it again, requiring recall.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

Scenario 4: Emotion Change Over Time

A person starts happy but becomes frustrated. AI needs to track mood evolution.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

 

Scenario 5: Visual Reappearance

A person walks by, leaves, and returns later. The AI should remember seeing them before.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

 

Scenario 6: Interrupted Task

During a guided task, an interruption happens. Later, the user asks to resume.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.

 

Scenario 7: Misunderstood Object

Someone refers to 'that weird thing earlier'. AI must infer the object from vague memory.

Evaluation: System generally performs well due to relevance tracking and pointer-based recall. Edge cases require enhancements in semantic linking and conversational thread tracking.