Copy, paste, adapt. Every recipe is runnable Python.
These 20 copy-paste recipes demonstrate common patterns for persistent memory in AI applications. Each uses BotWire's simple key-value interface to solve real-world problems like user sessions, caching, coordination, and state management.
Use this to remember and greet users by name even after your bot restarts.
from botwire import Memory
def get_user_greeting(user_id: str, new_name: str = None) -> str:
user_store = Memory(f"user_profiles")
if new_name:
user_store.set(f"{user_id}_name", new_name)
return f"Nice to meet you, {new_name}!"
stored_name = user_store.get(f"{user_id}_name")
if stored_name:
return f"Welcome back, {stored_name}!"
else:
return "Hello! What's your name?"
# Usage
print(get_user_greeting("user123", "Alice")) # First time
print(get_user_greeting("user123")) # Returns: Welcome back, Alice!
Cache costly LLM calls to avoid redundant API requests for identical inputs.
from botwire import Memory
import hashlib
import openai
def cached_llm_query(prompt: str, model: str = "gpt-3.5-turbo") -> str:
cache = Memory("llm_cache")
# Create hash of prompt + model for cache key
cache_key = hashlib.md5(f"{prompt}:{model}".encode()).hexdigest()
# Check cache first
cached_response = cache.get(cache_key)
if cached_response:
return cached_response
# Make API call if not cached
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
cache.set(cache_key, result)
return result
Store conversation history for LangChain applications with automatic message persistence.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from botwire import BotWireChatHistory
def chat_with_history(session_id: str, user_message: str) -> str:
# Initialize chat history for this session
history = BotWireChatHistory(session_id=f"chat_{session_id}")
# Initialize the LLM
llm = ChatOpenAI(model="gpt-3.5-turbo")
# Add user message to history
history.add_user_message(user_message)
# Get response with full conversation context
messages = history.messages
response = llm.invoke(messages)
# Add AI response to history
history.add_ai_message(response.content)
return response.content
Coordinate multiple AI agents by sharing status and handoff information.
from botwire import Memory
from datetime import datetime
from typing import Dict, Any
def update_agent_status(agent_id: str, status: str, data: Dict[str, Any] = None):
coordination = Memory("agent_coordination")
status_info = {
"status": status,
"timestamp": datetime.now().isoformat(),
"data": data or {}
}
coordination.set(f"agent_{agent_id}", status_info)
def get_available_agents() -> list:
coordination = Memory("agent_coordination")
available = []
for key in coordination.keys():
if key.startswith("agent_"):
agent_info = coordination.get(key)
if agent_info and agent_info["status"] == "available":
available.append(key.replace("agent_", ""))
return available
# Usage
update_agent_status("researcher", "busy", {"task": "web_search"})
update_agent_status("writer", "available")
print(get_available_agents()) # ['writer']
Maintain counters that survive application restarts, useful for tracking usage or iterations.
from botwire import Memory
def increment_usage_counter(feature_name: str) -> int:
counters = Memory("usage_metrics")
current_count = counters.get(f"count_{feature_name}", 0)
new_count = current_count + 1
counters.set(f"count_{feature_name}", new_count)
return new_count
def get_counter_value(feature_name: str) -> int:
counters = Memory("usage_metrics")
return counters.get(f"count_{feature_name}", 0)
def reset_counter(feature_name: str):
counters = Memory("usage_metrics")
counters.delete(f"count_{feature_name}")
# Usage
count = increment_usage_counter("api_calls")
print(f"API called {count} times")
Control agent behavior with persistent feature flags that can be toggled without code changes.
from botwire import Memory
from typing import Dict, Any
def set_feature_flag(flag_name: str, enabled: bool, config: Dict[str, Any] = None):
flags = Memory("feature_flags")
flag_data = {
"enabled": enabled,
"config": config or {}
}
flags.set(flag_name, flag_data)
def is_feature_enabled(flag_name: str) -> bool:
flags = Memory("feature_flags")
flag_data = flags.get(flag_name, {"enabled": False})
return flag_data.get("enabled", False)
def get_feature_config(flag_name: str) -> Dict[str, Any]:
flags = Memory("feature_flags")
flag_data = flags.get(flag_name, {"config": {}})
return flag_data.get("config", {})
# Usage
set_feature_flag("advanced_reasoning", True, {"temperature": 0.7, "max_tokens": 1000})
if is_feature_enabled("advanced_reasoning"):
config = get_feature_config("advanced_reasoning")
print(f"Using advanced reasoning with config: {config}")
Track when users were last seen for analytics, cleanup, or personalized experiences.
from botwire import Memory
from datetime import datetime, timedelta
def record_user_activity(user_id: str):
activity = Memory("user_activity")
timestamp = datetime.now().isoformat()
activity.set(f"last_seen_{user_id}", timestamp)
def get_inactive_users(days_threshold: int = 7) -> list:
activity = Memory("user_activity")
inactive_users = []
cutoff_date = datetime.now() - timedelta(days=days_threshold)
for key in activity.keys():
if key.startswith("last_seen_"):
timestamp_str = activity.get(key)
last_seen = datetime.fromisoformat(timestamp_str)
if last_seen < cutoff_date:
user_id = key.replace("last_seen_", "")
inactive_users.append(user_id)
return inactive_users
def get_last_seen(user_id: str) -> str:
activity = Memory("user_activity")
timestamp = activity.get(f"last_seen_{user_id}")
return timestamp or "Never"
Implement rate limiting to control your agent's API usage and avoid hitting external limits.
from botwire import Memory
from datetime import datetime, timedelta
import time
def check_rate_limit(action: str, max_requests: int = 10, window_minutes: int = 1) -> bool:
limiter = Memory("rate_limits")
now = datetime.now()
window_key = f"{action}_{now.strftime('%Y%m%d_%H%M')}"
current_count = limiter.get(window_key, 0)
if current_count >= max_requests:
return False # Rate limit exceeded
limiter.set(window_key, current_count + 1)
return True
def wait_for_rate_limit(action: str, max_requests: int = 10, window_minutes: int = 1):
while not check_rate_limit(action, max_requests, window_minutes):
print(f"Rate limit reached for {action}, waiting...")
time.sleep(10) # Wait 10 seconds before retry
# Usage
if check_rate_limit("openai_api", max_requests=5, window_minutes=1):
print("Making API call...")
else:
print("Rate limit exceeded!")
Maintain a todo list for your agents that survives restarts and can be shared across instances.
from botwire import Memory
from datetime import datetime
import uuid
def add_task(agent_id: str, task_description: str, priority: int = 1) -> str:
tasks = Memory("agent_tasks")
task_id = str(uuid.uuid4())
task_data = {
"id": task_id,
"description": task_description,
"priority": priority,
"created": datetime.now().isoformat(),
"status": "pending"
}
tasks.set(f"{agent_id}_{task_id}", task_data)
return task_id
def get_next_task(agent_id: str) -> dict:
tasks = Memory("agent_tasks")
agent_tasks = []
for key in tasks.keys():
if key.startswith(f"{agent_id}_"):
task = tasks.get(key)
if task["status"] == "pending":
agent_tasks.append((key, task))
if not agent_tasks:
return None
# Sort by priority (higher number = higher priority)
agent_tasks.sort(key=lambda x: x[1]["priority"], reverse=True)
return agent_tasks[0][1]
def complete_task(agent_id: str, task_id: str):
tasks = Memory("agent_tasks")
task = tasks.get(f"{agent_id}_{task_id}")
if task:
task["status"] = "completed"
task["completed"] = datetime.now().isoformat()
tasks.set(f"{agent_id}_{task_id}", task)
Store API keys and credentials with namespace isolation (remember: encrypt sensitive data before storage).
from botwire import Memory
import base64
def store_credentials(service_name: str, credentials: dict):
"""
WARNING: This stores credentials in plain text. In production,
encrypt credentials before storage and decrypt after retrieval.
"""
creds = Memory("api_credentials")
# Simple base64 encoding (NOT ENCRYPTION - just obfuscation)
encoded_creds = {}
for key, value in credentials.items():
encoded_value = base64.b64encode(str(value).encode()).decode()
encoded_creds[key] = encoded_value
creds.set(service_name, encoded_creds)
def get_credentials(service_name: str) -> dict:
creds = Memory("api_credentials")
encoded_creds = creds.get(service_name)
if not encoded_creds:
return {}
# Decode credentials
decoded_creds = {}
for key, encoded_value in encoded_creds.items():
decoded_value = base64.b64decode(encoded_value.encode()).decode()
decoded_creds[key] = decoded_value
return decoded_creds
# Usage (remember to encrypt in production!)
store_credentials("openai", {"api_key": "sk-...", "org_id": "org-..."})
creds = get_credentials("openai")
Store and retrieve complex user preference objects for personalized agent behavior.
from botwire import Memory
from typing import Dict, Any
def save_user_preferences(user_id: str, preferences: Dict[str, Any]):
prefs = Memory("user_preferences")
# Get existing preferences and merge with new ones
existing_prefs = prefs.get(f"prefs_{user_id}", {})
existing_prefs.update(preferences)
prefs.set(f"prefs_{user_id}", existing_prefs)
def get_user_preference(user_id: str, key: str, default=None):
prefs = Memory("user_preferences")
user_prefs = prefs.get(f"prefs_{user_id}", {})
return user_prefs.get(key, default)
def get_all_user_preferences(user_id: str) -> Dict[str, Any]:
prefs = Memory("user_preferences")
return prefs.get(f"prefs_{user_id}", {})
def delete_user_preference(user_id: str, key: str):
prefs = Memory("user_preferences")
user_prefs = prefs.get(f"prefs_{user_id}", {})
if key in user_prefs:
del user_prefs[key]
prefs.set(f"prefs_{user_id}", user_prefs)
# Usage
save_user_preferences("user123", {
"language": "es",
"temperature": 0.7,
"max_tokens": 500,
"preferred_style": "casual"
})
language = get_user_preference("user123", "language", "en")
print(f"User prefers {language}")
Save progress through complex multi-step processes so they can resume after interruption.
from botwire import Memory
from datetime import datetime
from typing import Dict, Any
def save_workflow_checkpoint(workflow_id: str, step: str, data: Dict[str, Any]):
checkpoints = Memory("workflow_checkpoints")
checkpoint_data = {
"current_step": step,
"data": data,
"timestamp": datetime.now().isoformat()
}
checkpoints.set(f"workflow_{workflow_id}", checkpoint_data)
def load_workflow_checkpoint(workflow_id: str) -> Dict[str, Any]:
checkpoints = Memory("workflow_checkpoints")
return checkpoints.get(f"workflow_{workflow_id}")
def complete_workflow(workflow_id: str):
checkpoints = Memory("workflow_checkpoints")
checkpoint = checkpoints.get(f"workflow_{workflow_id}")
if checkpoint:
checkpoint["current_step"] = "completed"
checkpoint["completed_at"] = datetime.now().isoformat()
checkpoints.set(f"workflow_{workflow_id}", checkpoint)
# Usage example
def process_document_workflow(doc_id: str):
checkpoint = load_workflow_checkpoint(doc_id)
if not checkpoint:
# Start new workflow
save_workflow_checkpoint(doc_id, "extract", {"progress": 0})
checkpoint = load_workflow_checkpoint(doc_id)
current_step = checkpoint["current_step"]
if current_step == "extract":
# Do extraction work...
save_workflow_checkpoint(doc_id, "analyze", {"extracted_text": "..."})
elif current_step == "analyze":
# Do analysis work...
save_workflow_checkpoint(doc_id, "summarize", {"analysis": "..."})
elif current_step == "summarize":
# Do summarization...
complete_workflow(doc_id)
Consistently assign users to test variants and track their assignments across sessions.
from botwire import Memory
import hashlib
def assign_ab_variant(user_id: str, experiment_name: str, variants: list) -> str:
experiments = Memory("ab_experiments")
assignment_key = f"{experiment_name}_{user_id}"
# Check if user already has an assignment
existing_assignment = experiments.get(assignment_key)
if existing_assignment:
return existing_assignment
# Deterministic assignment based on user_id hash
hash_input = f"{user_id}_{experiment_name}".encode()
hash_value = int(hashlib.md5(hash_input).hexdigest(), 16)
variant_index = hash_value % len(variants)
assigned_variant = variants[variant_index]
# Store the assignment
experiments.set(assignment_key, assigned_variant)
return assigned_variant
def get_user_variant(user_id: str, experiment_name: str) -> str:
experiments = Memory("ab_experiments")
assignment_key = f"{experiment_name}_{user_id}"
return experiments.get(assignment_key)
def get_experiment_stats(experiment_name: str) -> dict:
experiments = Memory("ab_experiments")
stats = {}
for key in experiments.keys():
if key.startswith(f"{experiment_name}_"):
variant = experiments.get(key)
stats[variant] = stats.get(variant, 0) + 1
return stats
# Usage
variant = assign_ab_variant("user123", "prompt_style", ["formal", "casual", "technical"])
print(f"User assigned to variant: {variant}")
print(f"Experiment stats: {get_experiment_stats('prompt_style')}")
Avoid processing the same content twice by maintaining a set of seen content hashes.
from botwire import Memory
import hashlib
def add_to_seen_set(namespace: str, content: str) -> bool:
"""Returns True if content is new, False if already seen"""
seen_tracker = Memory(f"seen_{namespace}")
# Create hash of content
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check if we've seen this before
if seen_tracker.get(content_hash):
return False # Already seen
# Mark as seen
seen_tracker.set(content_hash, True)
return True # New content
def is_content_seen(namespace: str, content: str) -> bool:
seen_tracker = Memory(f"seen_{namespace}")
content_hash = hashlib.sha256(content.encode()).hexdigest()
return bool(seen_tracker.get(content_hash))
def clear_seen_set(namespace: str):
seen_tracker = Memory(f"seen_{namespace}")
for key in seen_tracker.keys():
seen_tracker.delete(key)
def get_seen_count(namespace: str) -> int:
seen_tracker = Memory(f"seen_{namespace}")
return len(seen_tracker.keys())
# Usage
if add_to_seen_set("articles", "This is an article about AI"):
print("Processing new article...")
else:
print("Article already processed, skipping...")
print(f"Total articles seen: {get_seen_count('articles')}")
Maintain conversation summaries that compress old messages while preserving context.
from botwire import Memory
from typing import List, Dict
import json
def add_message_to_buffer(session_id: str, role: str, content: str, max_messages: int = 20):
buffer = Memory("conversation_buffers")
buffer_key = f"session_{session_id}"
# Get existing messages
messages = buffer.get(buffer_key, [])
# Add new message
new_message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
}
messages.append(new_message)
# If buffer is too large, summarize old messages
if len(messages) > max_messages:
messages = compress_message_buffer(messages, max_messages)
buffer.set(buffer_key, messages)
def compress_message_buffer(messages: List[Dict], target_size: int) -> List[Dict]:
if len(messages) <= target_size:
return messages
# Keep the most recent messages
recent_messages = messages[-(target_size-1):]
# Summarize older messages
old_messages = messages[:-(target_size-1)]
summary_content = f"[Summary of {len(old_messages)} earlier messages: "
summary_content += " | ".join([msg["content"][:50] + "..." for msg in old_messages[-5:]])
summary_content += "]"
summary_message = {
"role": "system",
"content": summary_content,
"timestamp": old_messages[-1]["timestamp"] if old_messages else ""
}
return [summary_message] + recent_messages
def get_conversation_buffer(session_id: str) -> List[Dict]:
buffer = Memory("conversation_buffers")
return buffer.get(f"session_{session_id}", [])
# Usage
add_message_to_buffer("chat123", "user", "Hello, how are you?")
add_message_to_buffer("chat123", "assistant", "I'm doing well, thank you!")
Enable multiple agents to share notes, findings, and intermediate results in a common workspace.
from botwire import Memory
from datetime import datetime
from typing import Dict, Any, List
def write_to_scratchpad(agent_id: str, key: str, data: Any, tags: List[str] = None):
scratchpad = Memory("shared_scratchpad")
entry = {
"author": agent_id,
"data": data,
"timestamp": datetime.now().isoformat(),
"tags": tags or []
}
scratchpad.set(key, entry)
def read_from_scratchpad(key: str) -> Dict[str, Any]:
scratchpad = Memory("shared_scratchpad")
return scratchpad.get(key)
def find_scratchpad_entries_by_tag(tag: str) -> Dict[str, Dict[str, Any]]:
scratchpad = Memory("shared_scratchpad")
matching_entries = {}
for key in scratchpad.keys():
entry = scratchpad.get(key)
if entry and tag in entry.get("tags", []):
matching_entries[key] = entry
return matching_entries
def get_scratchpad_by_author(agent_id: str) -> Dict[str, Dict[str, Any]]:
scratchpad = Memory("shared_scratchpad")
author_entries = {}
for key in scratchpad.keys():
entry = scratchpad.get(key)
if entry and entry.get("author") == agent_id:
author_entries[key] = entry
return author_entries
# Usage
write_to_scratchpad("researcher", "market_data",
{"trend": "bullish", "confidence": 0.8},
["analysis", "market", "Q4"])
write_to_scratchpad("analyst", "risk_assessment",
{"risk_level": "medium", "factors": ["volatility", "regulation"]},
["analysis", "risk"])
market_analyses = find_scratchpad_entries_by_tag("analysis")
Implement temporary session memory that can be manually cleaned up based on age patterns.
from botwire import Memory
from datetime import datetime, timedelta
from typing import Any, Optional
def set_session_data(session_id: str, key: str, value: Any, ttl_hours: int = 24):
sessions = Memory("temp_sessions")
session_key = f"{session_id}_{key}"
data_entry = {
"value": value,
"created": datetime.now().isoformat(),
"ttl_hours": ttl_hours
}
sessions.set(session_key, data_entry)
def get_session_data(session_id: str, key: str, default=None) -> Any:
sessions = Memory("temp_sessions")
session_key = f"{session_id}_{key}"
entry = sessions.get(session_key)
if not entry:
return default
# Check if expired
created = datetime.fromisoformat(entry["created"])
ttl_hours = entry["ttl_hours"]
if datetime.now() - created > timedelta(hours=ttl_hours):
sessions.delete(session_key) # Clean up expired entry
return default
return entry["value"]
def cleanup_expired_sessions():
sessions = Memory("temp_sessions")
expired_keys = []
for key in sessions.keys():
entry = sessions.get(key)
if entry:
created = datetime.fromisoformat(entry["created"])
ttl_hours = entry["ttl_hours"]
if datetime.now() - created > timedelta(hours=ttl_hours):
expired_keys.append(key)
for key in expired_keys:
sessions.delete(key)
return len(expired_keys)
def clear_session(session_id: str):
sessions = Memory("temp_sessions")
keys_to_delete = [key for key in sessions.keys() if key.startswith(f"{session_id}_")]
for key in keys_to_delete:
sessions.delete(key)
# Usage
set_session_data("session123", "user_context", {"step": 1, "data": "..."}, ttl_hours=2)
context = get_session_data("session123", "user_context", {})
expired_count = cleanup_expired_sessions()
Export your BotWire memory to JSON for backups and import it back when needed.
from botwire import Memory
import json
from datetime import datetime
from typing import Dict, Any
def export_memory_to_json(namespace: str, filename: str = None) -> str:
memory_store = Memory(namespace)
# Collect all data
export_data = {
"namespace": namespace,
"exported_at": datetime.now().isoformat(),
"data": {}
}
for key in memory_store.keys():
export_data["data"][key] = memory_store.get(key)
# Convert to JSON string
json_str = json.dumps(export_data, indent=2, default=str)
# Optionally save to file
if filename:
with open(filename, 'w') as f:
f.write(json_str)
print(f"Exported {len(export_data['data'])} keys to {filename}")
return json_str
def import_memory_from_json(json_data: str, target_namespace: str = None) -> int:
import_data = json.loads(json_data)
namespace = target_namespace or import_data["namespace"]
memory_store = Memory(namespace)
imported_count = 0
for key, value in import_data["data"].items():
memory_store.set(key, value)
imported_count += 1
print(f"Imported {imported_count} keys into namespace '{namespace}'")
return imported_count
def import_memory_from_file(filename: str, target_namespace: str = None) -> int:
with open(filename, 'r') as f:
json_data = f.read()
return import_memory_from_json(json_data, target_namespace)
# Usage
# Export current memory
json_backup = export_memory_to_json("user_preferences", "backup.json")
# Import from backup (could be to different namespace)
import_memory_from_file("backup.json", "user_preferences_restored")
Transfer existing Redis keys and data structures into BotWire's memory system.
from botwire import Memory
import redis
import json
from typing import Any, Dict
def migrate_redis_to_botwire(redis_host: str = "localhost", redis_port: int = 6379,
redis_db: int = 0, target_namespace: str = "migrated"):
# Connect to Redis
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
# Connect to BotWire
botwire_mem = Memory(target_namespace)
migrated_keys = 0
# Get all keys from Redis
for key in r.keys("*"):
key_type = r.type(key)
try:
if key_type == "string":
value = r.get(key)
# Try to parse as JSON, fallback to string
try:
value = json.loads(value)
except:
pass # Keep as string
botwire_mem.set(key, value)
elif key_type == "hash":
hash_data = r.hgetall(key)
botwire_mem.set(key, hash_data)
elif key_type == "list":
list_data = r.lrange(key, 0, -1)
botwire_mem.set(key, list_data)
elif key_type == "set":
set_data = list(r.smembers(key))
botwire_mem.set(key, set_data)
migrated_keys += 1
except Exception as e:
print(f"Failed to migrate key {key}: {e}")
print(f"Successfully migrated {migrated_keys} keys to BotWire namespace '{target_namespace}'")
return migrated_keys
def verify_migration(redis_key: str, botwire_namespace: str,
redis_host: str = "localhost", redis_port: int = 6379):
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
botwire_mem = Memory(botwire_namespace)
redis_value = r.get(redis_key)
botwire_value = botwire_mem.get(redis_key)
print(f"Redis value: {redis_value}")
print(f"BotWire value: {botwire_value}")
print(f"Match: {str(redis_value) == str(botwire_value)}")
# Usage
# migrated_count = migrate_redis_to_botwire("localhost", 6379, 0, "redis_migration")
# verify_migration("some_key", "redis_migration")
Use BotWire memory within Claude's function calling to maintain state across tool invocations.
Install: pip install botwire