# (optional aber empfohlen) venv aktivieren
# source ~/venv-nostr/bin/activate
python - <<'PY'
import asyncio, json, time, hashlib, re
import websockets
from coincurve import PrivateKey
# ================== CONFIG ==================
SK_HEX = "" # <-- 64-hex PRIVATE KEY einsetzen
RELAYS = [
"wss://nostr1.system.direct",
"wss://nos.lol",
"wss://relay.damus.io",
"wss://relay.primal.net",
]
PAGE_LIMIT = 500 # Relay-Limits beachten
MAX_EVENTS_TOTAL = 20000 # Safety
REASON = "wipe all events for this pubkey"
CHUNK_SIZE = 200 # wie viele Event-IDs pro Delete-Event
WS_TIMEOUT = 12
# ===========================================
HEX64 = re.compile(r"^[0-9a-f]{64}$", re.I)
def must_hex64(name, s):
s = (s or "").strip()
if not HEX64.match(s):
raise ValueError(f"{name} must be 64 hex chars, got len={len(s)}")
return s.lower()
def pubkey_from_sk(sk_hex: str) -> str:
sk = PrivateKey(bytes.fromhex(sk_hex))
# compressed pubkey: 33 bytes, drop prefix 02/03 => 32 bytes hex (64 chars)
return sk.public_key.format(compressed=True).hex()[2:]
def serialize(pubkey, created_at, kind, tags, content):
return json.dumps([0, pubkey, created_at, kind, tags, content],
separators=(",", ":"), ensure_ascii=False)
def event_id(pubkey, created_at, kind, tags, content):
s = serialize(pubkey, created_at, kind, tags, content)
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def sign_event(sk_hex: str, ev_id_hex: str) -> str:
sk = PrivateKey(bytes.fromhex(sk_hex))
sig = sk.sign_schnorr(bytes.fromhex(ev_id_hex))
return sig.hex()
def build_delete_event(pubkey_hex: str, sk_hex: str, target_ids):
created_at = int(time.time())
tags = [["e", tid] for tid in target_ids]
if REASON:
tags.append(["reason", REASON])
kind = 5
content = ""
ev_id = event_id(pubkey_hex, created_at, kind, tags, content)
sig = sign_event(sk_hex, ev_id)
return {
"id": ev_id,
"pubkey": pubkey_hex,
"created_at": created_at,
"kind": kind,
"tags": tags,
"content": content,
"sig": sig,
}
async def ws_req_all_ids(relay: str, pubkey_hex: str):
"""
Sammelt ALLE Events dieses Pubkeys über paging:
- nutzt filter {authors:[pub], limit, until}
- überspringt kind=5 (Delete-Events), damit am Ende nicht wieder „1 Event“ übrig bleibt
"""
ids = set()
until = None
done = False
async with websockets.connect(relay, open_timeout=WS_TIMEOUT, close_timeout=WS_TIMEOUT) as ws:
while not done:
flt = {"authors": [pubkey_hex], "limit": PAGE_LIMIT}
if until is not None:
flt["until"] = until
sub = f"wipe-{int(time.time()*1000)}"
await ws.send(json.dumps(["REQ", sub, flt]))
batch = []
lowest_created = None
while True:
raw = await ws.recv()
msg = json.loads(raw)
t = msg[0]
if t == "EVENT":
ev = msg[2]
k = ev.get("kind")
eid = ev.get("id")
ca = ev.get("created_at")
# Skip delete-events (kind=5)
if isinstance(k, int) and k == 5:
continue
if isinstance(eid, str) and HEX64.match(eid):
ids.add(eid.lower())
if isinstance(ca, int):
batch.append(ca)
if lowest_created is None or ca < lowest_created:
lowest_created = ca
if len(ids) >= MAX_EVENTS_TOTAL:
done = True
elif t == "EOSE":
break
elif t == "NOTICE":
# Relay kann trotzdem Daten liefern; nur loggen
pass
# Paging: wenn kein created_at im Batch oder Batch leer -> fertig
if done or lowest_created is None:
done = True
else:
# weiter zurück in die Vergangenheit, minus 1 Sekunde um Dupe zu vermeiden
until = max(0, lowest_created - 1)
# Wenn Relay irgendwann nur noch leere Batches liefert -> fertig
if not batch:
done = True
return ids
async def publish_delete(relay: str, delete_ev: dict):
try:
async with websockets.connect(relay, open_timeout=WS_TIMEOUT, close_timeout=WS_TIMEOUT) as ws:
await ws.send(json.dumps(["EVENT", delete_ev]))
# warte kurz auf OK/NOTICE
for _ in range(10):
msg = json.loads(await ws.recv())
if msg[0] == "OK":
ok = bool(msg[2])
info = msg[3] if len(msg) > 3 else ""
return ok, info
if msg[0] == "NOTICE":
# nicht sofort returnen, manche relays schicken NOTICE + OK
last_notice = msg[1] if len(msg) > 1 else ""
return True, "" # kein OK gesehen, aber gesendet
except Exception as e:
return False, str(e)
async def main():
sk_hex = must_hex64("SK_HEX", SK_HEX)
pub = pubkey_from_sk(sk_hex)
print("PUBKEY:", pub)
print("Relays:", RELAYS)
# 1) Sammeln
all_ids = set()
per_relay = {}
for r in RELAYS:
try:
ids = await ws_req_all_ids(r, pub)
per_relay[r] = ids
all_ids |= ids
print(f"{r} -> found {len(ids)} events (excluding kind=5)")
except Exception as e:
print(f"{r} -> ERROR while scanning: {e}")
all_ids = sorted(all_ids)
print("\nTOTAL UNIQUE EVENTS TO DELETE (excluding kind=5):", len(all_ids))
if not all_ids:
print("Nothing to delete.")
return
# 2) Delete in Chunks broadcasten
chunks = [all_ids[i:i+CHUNK_SIZE] for i in range(0, len(all_ids), CHUNK_SIZE)]
print(f"Publishing {len(chunks)} delete-event(s) (chunk size {CHUNK_SIZE})...")
for i, chunk in enumerate(chunks, 1):
delete_ev = build_delete_event(pub, sk_hex, chunk)
print(f"\n--- DELETE EVENT {i}/{len(chunks)} ---")
print("delete_id:", delete_ev["id"], "targets:", len(chunk))
results = await asyncio.gather(*(publish_delete(r, delete_ev) for r in RELAYS))
for r, (ok, info) in zip(RELAYS, results):
print(f"{r} -> OK={ok} {info}")
# 3) Optional: quick re-check (nur local-logisch; manche Relays cachen)
print("\nRe-check (excluding kind=5) ...")
for r in RELAYS:
try:
ids = await ws_req_all_ids(r, pub)
print(f"{r} -> remaining non-delete events: {len(ids)}")
except Exception as e:
print(f"{r} -> ERROR while re-checking: {e}")
print("\nDone. (Ein verbleibendes kind=5 Delete-Event ist normal.)")
asyncio.run(main())
PY