PQ Signature Performance
Post-Quantum Signature Performance¶
Analysis of post-quantum cryptographic signature performance in Lean Consensus clients.
This notebook examines:
- Attestation signing time (p50, p95, p99)
- Attestation verification time
- Aggregate signature building and verification
- Performance comparison across clients
Show code
import json
from pathlib import Path
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Set default renderer for static HTML output
import plotly.io as pio
pio.renderers.default = "notebook"
Show code
# Resolve devnet_id
DATA_DIR = Path("../data")
if devnet_id is None:
# Use latest devnet from manifest
devnets_path = DATA_DIR / "devnets.json"
if devnets_path.exists():
with open(devnets_path) as f:
devnets = json.load(f).get("devnets", [])
if devnets:
devnet_id = devnets[-1]["id"] # Latest
print(f"Using latest devnet: {devnet_id}")
else:
raise ValueError("No devnets.json found. Run 'just detect-devnets' first.")
DEVNET_DIR = DATA_DIR / devnet_id
print(f"Loading data from: {DEVNET_DIR}")
Show code
# Load devnet metadata
with open(DATA_DIR / "devnets.json") as f:
devnets_data = json.load(f)
devnet_info = next((d for d in devnets_data["devnets"] if d["id"] == devnet_id), None)
if devnet_info:
print(f"Devnet: {devnet_info['id']}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Time: {devnet_info['start_time']} to {devnet_info['end_time']}")
print(f"Slots: {devnet_info['start_slot']} → {devnet_info['end_slot']}")
print(f"Clients: {', '.join(devnet_info['clients'])}")
Load Data¶
Show code
# Load PQ signature timing data
timing_df = pd.read_parquet(DEVNET_DIR / "pq_signature_timing.parquet")
print(f"Loaded {len(timing_df)} timing records")
print(f"Metrics: {timing_df['metric'].unique().tolist()}")
print(f"Clients: {timing_df['client'].unique().tolist()}")
Show code
# Load PQ signature counts
counts_df = pd.read_parquet(DEVNET_DIR / "pq_signature_metrics.parquet")
print(f"Loaded {len(counts_df)} count records")
print(f"Metrics: {counts_df['metric'].unique().tolist()}")
Attestation Signing Time¶
How long does it take to sign an attestation using post-quantum cryptography?
Show code
# Filter to signing time metric
signing_df = timing_df[timing_df["metric"] == "signing"].copy()
if signing_df.empty:
print("No signing time data available")
else:
# Convert to milliseconds for readability
signing_df["value_ms"] = signing_df["value"] * 1000
fig = px.line(
signing_df,
x="timestamp",
y="value_ms",
color="client",
line_dash="quantile",
title="Attestation Signing Time by Client",
labels={
"timestamp": "Time",
"value_ms": "Signing Time (ms)",
"client": "Client",
"quantile": "Percentile",
},
)
fig.update_layout(height=500)
fig.show()
Show code
# Summary statistics by client
if not signing_df.empty:
summary = signing_df.groupby(["client", "quantile"])["value_ms"].agg(["mean", "min", "max"]).round(3)
summary.columns = ["Mean (ms)", "Min (ms)", "Max (ms)"]
display(summary)
Attestation Verification Time¶
How long does it take to verify an attestation signature?
Show code
# Filter to verification time metric
verification_df = timing_df[timing_df["metric"] == "verification"].copy()
if verification_df.empty:
print("No verification time data available")
else:
verification_df["value_ms"] = verification_df["value"] * 1000
fig = px.line(
verification_df,
x="timestamp",
y="value_ms",
color="client",
line_dash="quantile",
title="Attestation Verification Time by Client",
labels={
"timestamp": "Time",
"value_ms": "Verification Time (ms)",
"client": "Client",
"quantile": "Percentile",
},
)
fig.update_layout(height=500)
fig.show()
Aggregate Signature Performance¶
Time to build and verify aggregated signatures.
Show code
# Filter to aggregate metrics
agg_metrics = ["agg_building", "agg_verification"]
agg_df = timing_df[timing_df["metric"].isin(agg_metrics)].copy()
if agg_df.empty:
print("No aggregate signature timing data available")
else:
agg_df["value_ms"] = agg_df["value"] * 1000
fig = px.box(
agg_df,
x="client",
y="value_ms",
color="metric",
title="Aggregate Signature Timing by Client",
labels={
"value_ms": "Time (ms)",
"client": "Client",
"metric": "Operation",
},
)
fig.update_layout(height=500)
fig.show()
Client Comparison: P95 Signing Time¶
Show code
# Compare p95 signing time across clients
p95_signing = signing_df[signing_df["quantile"] == 0.95].copy() if not signing_df.empty else pd.DataFrame()
if p95_signing.empty:
print("No p95 signing data available")
else:
fig = px.bar(
p95_signing.groupby("client")["value_ms"].mean().reset_index(),
x="client",
y="value_ms",
title="Average P95 Attestation Signing Time by Client",
labels={
"value_ms": "P95 Signing Time (ms)",
"client": "Client",
},
color="client",
)
fig.update_layout(height=400, showlegend=False)
fig.show()
Signature Success Rates¶
Show code
# Calculate valid/invalid signature rates
valid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_valid_total"]
invalid_df = counts_df[counts_df["metric"] == "lean_pq_sig_aggregated_signatures_invalid_total"]
if valid_df.empty and invalid_df.empty:
print("No signature count data available")
else:
# Get final counts per client
def get_final_count(df):
if df.empty:
return pd.DataFrame()
return df.groupby("client")["value"].max().reset_index()
valid_final = get_final_count(valid_df)
invalid_final = get_final_count(invalid_df)
if not valid_final.empty:
valid_final["status"] = "valid"
if not invalid_final.empty:
invalid_final["status"] = "invalid"
combined = pd.concat([valid_final, invalid_final], ignore_index=True)
if not combined.empty:
fig = px.bar(
combined,
x="client",
y="value",
color="status",
barmode="group",
title="Aggregated Signature Counts by Client",
labels={
"value": "Count",
"client": "Client",
"status": "Status",
},
color_discrete_map={"valid": "#2ecc71", "invalid": "#e74c3c"},
)
fig.update_layout(height=400)
fig.show()
Summary¶
Key findings from this devnet iteration:
Show code
# Generate summary statistics
print(f"Devnet: {devnet_id}")
print(f"Duration: {devnet_info['duration_hours']:.1f} hours")
print(f"Clients analyzed: {len(timing_df['client'].unique())}")
print()
if not signing_df.empty:
p95_mean = signing_df[signing_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 signing time: {p95_mean:.2f} ms")
if not verification_df.empty:
p95_ver = verification_df[verification_df["quantile"] == 0.95]["value_ms"].mean()
print(f"Average P95 verification time: {p95_ver:.2f} ms")