# This is the "Virtual World Simulator". It creates thousands of digital households
# and tracks how their lives change month-by-month over a year.
from numpy.random import SeedSequence, default_rng
EFFECT_SCALE_MULT = 1.0 # used for sensitivity OAT
# 1. Define the "Households" class.
# This represents thousands of digital people, each with their own health, safety, and stability scores.
class Households:
def __init__(self, n=5000):
# Initialize each household with random but realistic starting levels for 6 key areas.
self.vulnerability = np.clip(np.random.beta(2, 5, size=n), 0, 1)
self.protection_risk = np.clip(np.random.beta(2.5, 4, size=n), 0, 1)
self.basic_needs_gap = np.clip(np.random.beta(2, 6, size=n), 0, 1)
self.education_access = np.clip(np.random.beta(2, 3, size=n), 0, 1)
self.morbidity = np.clip(np.random.beta(2, 6, size=n), 0, 1)
self.service_efficiency= np.zeros(n)
self.n = n
def as_dict(self):
return {
'vulnerability': self.vulnerability,
'protection_risk': self.protection_risk,
'basic_needs_gap': self.basic_needs_gap,
'education_access': self.education_access,
'morbidity': self.morbidity,
'service_efficiency': self.service_efficiency,
}
# 2. Define the "Environment" class.
# This handles "Natural Drift"—the tendency for situations to slowly worsen if no help is provided.
class Environment:
def __init__(self, households: Households, rng=None, drift_mults=None):
self.hh = households
self.time = 0
self.rng = rng or default_rng()
self.drift_mults = drift_mults or {k:1.0 for k in ['vulnerability','protection','basic','morbidity','service','education']}
def natural_drift(self):
n = self.hh.n
rnorm = self.rng.normal
self.hh.vulnerability = np.clip(self.hh.vulnerability + rnorm( 0.005*self.drift_mults['vulnerability'], 0.01, n), 0, 1)
self.hh.protection_risk = np.clip(self.hh.protection_risk + rnorm( 0.006*self.drift_mults['protection'], 0.01, n), 0, 1)
self.hh.basic_needs_gap = np.clip(self.hh.basic_needs_gap + rnorm( 0.004*self.drift_mults['basic'], 0.01, n), 0, 1)
self.hh.morbidity = np.clip(self.hh.morbidity + rnorm( 0.003*self.drift_mults['morbidity'], 0.01, n), 0, 1)
self.hh.service_efficiency= np.clip(self.hh.service_efficiency + rnorm(-0.002*self.drift_mults['service'], 0.01, n), 0, 1)
self.hh.education_access = np.clip(self.hh.education_access + rnorm(-0.002*self.drift_mults['education'], 0.01, n), 0, 1)
def indicators(self):
return {
'Vulnerability (average effect)': float(self.hh.vulnerability.mean()),
'Protection risk (average effect)': float(self.hh.protection_risk.mean()),
'Basic needs gap (average effect)': float(self.hh.basic_needs_gap.mean()),
'Education access (average effect)': float(self.hh.education_access.mean()),
'Morbidity (average effect)': float(self.hh.morbidity.mean()),
'Service efficiency (average effect)': float(self.hh.service_efficiency.mean()),
}
# 3. Use a "Gaussian Mixture Model" (GMM) to initialize the digital households.
# This ensures our virtual population has a realistic mix of stable, struggling, and high-risk families.
from sklearn.mixture import GaussianMixture
def fit_gmm_from_synthetic():
"""
Creates a synthetic multimodal distribution that approximates
realistic heterogeneity found in vulnerability, risk, needs, etc.
This is a fallback in absence of real survey data.
"""
synthetic = np.concatenate([
np.random.beta(2, 6, 4000), # majority stable households
np.random.beta(6, 3, 3000), # moderate vulnerability
np.random.beta(2, 2, 3000), # high risk / high need subgroup
]).reshape(-1, 1)
gmm = GaussianMixture(n_components=3, covariance_type='full', max_iter=500)
gmm.fit(synthetic)
return gmm
# Fit a GMM once for the whole ABM run
GMM_MODEL = fit_gmm_from_synthetic()
def gmm_sample(n):
"""
Draw bounded household state values from the calibrated GMM.
Output is always in [0, 1].
"""
samples, _ = GMM_MODEL.sample(n)
return np.clip(samples.flatten(), 0, 1)
# Create the base household population using GMM
np.random.seed(BASE_SEED); random.seed(BASE_SEED)
H_base = Households(n=HOUSEHOLDS)
H_base.vulnerability = gmm_sample(HOUSEHOLDS)
H_base.protection_risk = gmm_sample(HOUSEHOLDS)
H_base.basic_needs_gap = gmm_sample(HOUSEHOLDS)
H_base.education_access = gmm_sample(HOUSEHOLDS)
H_base.morbidity = gmm_sample(HOUSEHOLDS)
H_base.service_efficiency = gmm_sample(HOUSEHOLDS)
#print("[GMM] Household populations initialized using Gaussian Mixture distributions.")
# 4. Define the rules for "Dynamics"—how households change over time.
# Better than additive "state += delta" inside simulate()
# Variables where higher is better
INCREASING_VARS = {'education_access', 'service_efficiency'}
# Per‑variable responsiveness (you can tune these)
VAR_STRENGTH = {
'vulnerability': 0.10,
'protection_risk': 0.10,
'basic_needs_gap': 0.14,
'education_access': 0.08,
'morbidity': 0.12,
'service_efficiency': 0.16
}
# Optional per‑variable lag (months before interventions show effects)
VAR_LAG = {
'vulnerability': 1,
'protection_risk': 1,
'basic_needs_gap': 0,
'education_access': 2,
'morbidity': 1,
'service_efficiency': 0
}
# Slope of the logistic saturation
LOGISTIC_STEEPNESS = 1.8
def nonlinear_update_scalar(current, effect_value, month, var_name):
"""
Logistic, delayed, bounded update for a single scalar state value.
"""
lag = VAR_LAG.get(var_name, 0)
if month < lag:
return float(current)
# Polarity: for "lower is better" variables we flip sign
inc = (var_name in INCREASING_VARS)
signed_effect = effect_value if inc else -effect_value
# Logistic shift in [-0.5, +0.5]
logistic_shift = (1.0 / (1.0 + np.exp(-LOGISTIC_STEEPNESS * signed_effect))) - 0.5
# Apply with variable‑specific strength
rs = VAR_STRENGTH.get(var_name, 0.12)
new_val = current + rs * logistic_shift
return float(np.clip(new_val, 0.0, 1.0))
def get_activity_effects(activity, hh, budget_scale):
"""
Ask the activity to *compute* its per‑variable effect intensities
WITHOUT mutating households (MODE A).
Your Activity.apply must support: return_effect_only=True
and return a dict, e.g. {'vulnerability': +0.03, 'basic_needs_gap': -0.02, ...}
"""
try:
return activity.apply(hh, budget_scale=budget_scale, return_effect_only=True)
except TypeError as e:
raise RuntimeError(
"Activity.apply(...) must accept return_effect_only=True and return a dict "
"of per‑variable effects WITHOUT mutating households (MODE A). "
"Please patch Activity.apply accordingly."
) from e
def apply_collinearity_propagation(hh, deltas):
"""
After the nonlinear updates, propagate cross‑effects using the per‑household
realized deltas (element‑wise), then clip to [0,1].
This maintains your original collinearity semantics.
"""
pairs = [
('protection_risk', 'vulnerability', COLLINEARITY_PR_VUL),
('vulnerability', 'protection_risk', COLLINEARITY_VUL_PR),
('basic_needs_gap', 'morbidity', COLLINEARITY_BNG_MOR),
('morbidity', 'basic_needs_gap', COLLINEARITY_MOR_BNG),
('service_efficiency','education_access', COLLINEARITY_SE_EA),
('education_access','service_efficiency', COLLINEARITY_EA_SE),
('basic_needs_gap', 'vulnerability', COLLINEARITY_BNG_VUL),
('vulnerability', 'basic_needs_gap', COLLINEARITY_VUL_BNG),
]
for src, dst, coeff in pairs:
if coeff == 0.0:
continue
src_delta = deltas.get(src, None)
if src_delta is None:
continue
dst_arr = getattr(hh, dst)
# Linear propagation on realized deltas; small coefficients recommended (0–0.3)
dst_arr[:] = np.clip(dst_arr + coeff * src_delta, 0.0, 1.0)
# 5. Define "Synergy" Effects (Interactions).
# These functions model how different aspects of a household's life affect each other.
# Variables where higher is better
INCREASING_VARS = {'education_access', 'service_efficiency'}
def is_increasing(var_name: str) -> bool:
return var_name in INCREASING_VARS
def helpful_delta(var_name: str, delta_arr: np.ndarray) -> np.ndarray:
"""
Convert raw delta into 'helpful change' space:
- If higher is better → helpful = +delta
- If lower is better → helpful = -delta (since a decrease is helpful)
"""
return delta_arr if is_increasing(var_name) else (-delta_arr)
def raw_from_helpful(var_name: str, helpful_arr: np.ndarray) -> np.ndarray:
"""
Convert helpful change back to raw orientation for the target variable:
- If higher is better → raw = helpful
- If lower is better → raw = -helpful
"""
return helpful_arr if is_increasing(var_name) else (-helpful_arr)
# ---------------------------------------------
# Synergy rules:
# Each rule says: when BOTH sources improve (in 'helpful' terms),
# produce an *extra helpful change* on the target.
# coef controls strength; mode='help_only' ignores adverse (both-worse) cases.
# ---------------------------------------------
SYNERGY_RULES = [
# Basic needs gap ↓ AND service efficiency ↑ → extra ↓ in vulnerability
{'sources': ('basic_needs_gap', 'service_efficiency'), 'target': 'vulnerability', 'coef': 0.30, 'mode': 'help_only'},
# Education access ↑ AND service efficiency ↑ → extra ↓ in protection risk
{'sources': ('education_access', 'service_efficiency'), 'target': 'protection_risk','coef': 0.15, 'mode': 'help_only'},
# Basic needs gap ↓ AND protection risk ↓ → extra ↓ in morbidity
{'sources': ('basic_needs_gap', 'protection_risk'), 'target': 'morbidity', 'coef': 0.20, 'mode': 'help_only'},
]
# Global synergy cap per step (per household, per rule), keeps realism
ABS_SYNERGY_CAP = 0.03
def apply_synergy_effects(hh, base_deltas: dict):
"""
Apply interaction (synergy) based on the *realized base deltas* this month
BEFORE collinearity propagation.
base_deltas: dict of {var_name: np.ndarray of per-household raw delta}
"""
n = hh.n
for rule in SYNERGY_RULES:
s1, s2 = rule['sources']
tgt = rule['target']
coef = float(rule.get('coef', 0.0))
mode = rule.get('mode', 'help_only')
if coef == 0.0:
continue
if s1 not in base_deltas or s2 not in base_deltas:
continue
d1_raw = base_deltas[s1]
d2_raw = base_deltas[s2]
if d1_raw is None or d2_raw is None:
continue
# Transform into helpful space for sources
h1 = helpful_delta(s1, d1_raw)
h2 = helpful_delta(s2, d2_raw)
# Synergy is product of helpful improvements
# (both must be > 0 for 'help_only' mode)
prod = h1 * h2
if mode == 'help_only':
prod = np.where((h1 > 0) & (h2 > 0), prod, 0.0)
extra_helpful = coef * prod
# Cap synergy magnitude
extra_helpful = np.clip(extra_helpful, -ABS_SYNERGY_CAP, ABS_SYNERGY_CAP)
# Convert back to raw delta for the target variable
extra_raw = raw_from_helpful(tgt, extra_helpful)
# Apply to target state and clip
tgt_arr = getattr(hh, tgt)
tgt_arr[:] = np.clip(tgt_arr + extra_raw, 0.0, 1.0)
# 6. The "Simulate" function: the master loop that runs the virtual world.
def simulate(env: Environment, activities: list, months: int = 6, budget_scale: float = 1.0):
"""
Simulate with synergy:
0) record indicators
1) natural drift
2) collect non-mutating per-activity effects: {var: [(value, idx), ...]}
3) nonlinear + delayed update per var, applying each (value, idx)
4) compute base (realized) deltas
5) apply collinearity (uses realized deltas)
6) apply synergy (uses base deltas only)
"""
timeline = []
for t in range(months + 1):
# record
timeline.append({**env.indicators(), 'month': t})
if t == months:
break
# natural drift
env.natural_drift()
# collect effects (value, idx) per variable
effects_by_var = {
'vulnerability': [],
'protection_risk': [],
'basic_needs_gap': [],
'education_access': [],
'morbidity': [],
'service_efficiency': []
}
for a in activities:
eff = get_activity_effects(a, env.hh, budget_scale=budget_scale) # returns {var: (val, idx)}
if not eff:
continue
for var, payload in eff.items():
if payload is None:
continue
val, idx = payload
# validate idx array
if idx is None or len(idx) == 0:
continue
effects_by_var[var].append((float(val), idx))
# Snapshot before activity application
prev = {
'vulnerability': env.hh.vulnerability.copy(),
'protection_risk': env.hh.protection_risk.copy(),
'basic_needs_gap': env.hh.basic_needs_gap.copy(),
'education_access': env.hh.education_access.copy(),
'morbidity': env.hh.morbidity.copy(),
'service_efficiency': env.hh.service_efficiency.copy()
}
# nonlinear + delayed update
for var_name, batches in effects_by_var.items():
if not batches:
continue
arr = getattr(env.hh, var_name)
# Apply each activity's (value, idx) to its covered households
for effect_value, idx in batches:
if len(idx) == 0:
continue
# Update ONLY covered indices
arr[idx] = [
nonlinear_update_scalar(arr[i], effect_value, t, var_name)
for i in idx
]
# compute realized base deltas (post-nonlinear, pre-collinearity)
base_deltas = {
'vulnerability': env.hh.vulnerability - prev['vulnerability'],
'protection_risk': env.hh.protection_risk - prev['protection_risk'],
'basic_needs_gap': env.hh.basic_needs_gap - prev['basic_needs_gap'],
'education_access': env.hh.education_access - prev['education_access'],
'morbidity': env.hh.morbidity - prev['morbidity'],
'service_efficiency': env.hh.service_efficiency - prev['service_efficiency']
}
# collinearity on realized deltas
apply_collinearity_propagation(env.hh, base_deltas)
# synergy on base deltas (pre-collinearity), adds second-order effects
apply_synergy_effects(env.hh, base_deltas)
# advance time
env.time += 1
return pd.DataFrame(timeline)
# 7. The "Simulation Pipeline": runs every scenario multiple times for accuracy.
metrics = [
'Vulnerability (average effect)', 'Protection risk (average effect)', 'Basic needs gap (average effect)',
'Education access (average effect)', 'Morbidity (average effect)', 'Service efficiency (average effect)'
]
def clone_households(h):
H2 = Households(n=h.n)
for k,v in h.as_dict().items(): setattr(H2,k,v.copy())
return H2
all_results = []
for run_id in range(N_RUNS):
ss_run = SeedSequence(BASE_SEED + run_id)
rng_env = default_rng(ss_run.spawn(1)[0])
child_ss = ss_run.spawn(len(all_effect_domains))
rng_by_domain_run = {dom: default_rng(ss) for dom, ss in zip(all_effect_domains, child_ss)}
# Build activities with per-run RNGs
scen_built = []
for name, acts, bscale in all_scenarios:
built = [Activity(a.name, a.srf_domain, a.budget_share, coverage=a.coverage, intensity=a.intensity, rng=rng_by_domain_run.get(a.srf_domain, default_rng())) for a in acts]
scen_built.append((name,built,bscale))
# ----- Run "No intervention" baseline -----
env = Environment(clone_households(H_base), rng=rng_env)
traj = simulate(env, [], MONTHS, 1.0)
traj['scenario'] = 'No intervention'; traj['run'] = run_id; all_results.append(traj)
# ----- Run all other scenarios -----
for name, acts, bscale in scen_built:
if name=='No intervention': continue
env = Environment(clone_households(H_base), rng=rng_env)
traj = simulate(env, acts, MONTHS, budget_scale=bscale)
traj['scenario'] = name; traj['run'] = run_id; all_results.append(traj)
# 8. Aggregate and save the final results.
results_df_ = pd.concat(all_results, ignore_index=True)
agg = []
for (scenario, month), grp in results_df_.groupby(['scenario','month']):
row={'scenario':scenario,'month':month}
for m in metrics:
vals=grp[m].values
row[m]=float(vals.mean()); row[f'{m}_lo']=float(np.percentile(vals,5)); row[f'{m}_hi']=float(np.percentile(vals,95))
agg.append(row)
results_df_agg = pd.DataFrame(agg)
# ============================================================
# SAVE OUTPUTS - Persist params/seeds
# ============================================================
os.makedirs('outputs', exist_ok=True)
repro = {
'activity_id': ACTIVITY_ID,
'params': {'MONTHS': MONTHS, 'HOUSEHOLDS': HOUSEHOLDS, 'USE_LLM': USE_LLM, 'BASE_SEED': BASE_SEED, 'N_RUNS': N_RUNS, 'DEA_VRS': DEA_VRS},
'run_seeds': [BASE_SEED + i for i in range(N_RUNS)],
'timestamp': pd.Timestamp.now().isoformat()
}
with open(f'outputs/repro_report_{ACTIVITY_ID}.json', 'w') as f:
json.dump(repro, f, indent=2)
results_df_agg.to_csv(f'outputs/simulation_results_{ACTIVITY_ID}.csv', index=False)
# print("Simulation complete & reproducibility artifacts saved.")
print(f"[ Simulation Complete ]")
print(f"Ran {N_RUNS} replicates × {results_df_['scenario'].nunique()} scenarios × {MONTHS} months.")
print(f"Aggregated to {len(results_df_)} rows (mean ± 90% CI per scenario-month).")