Site icon NSecurity Consulting

Phishing Playbook in Splunk SOAR (Phantom)

49f23f67 9468 45ec 8854 47c1a072bc8e NSecurity Consulting

How to write a Splunk SOAR (Phantom) Python playbook for a phishing emails.
It parses the email, checks URLs/hashes with threat intel, and quarantines the email if anything is malicious.


“””
Flow:
1) on_start: parse email from artifact
2) Enrich URLs / hashes with threat intel
3) If malicious -> move email to Quarantine folder + add note
4) Else -> add note that no malicious IoCs were found
“””

import phantom.rules as phantom

TI_ASSET = “virustotal” # demo TI asset
EMAIL_ASSET = “o365_email” # demo O365/Exchange asset


def on_start(container):
phantom.debug(“on_start()”)

# Get email headers from first artifact (simplified for demo)
artifacts = phantom.collect2(
container=container,
datapath=[“artifact:*.cef.emailHeaders”, “artifact:*.id”]
)
if not artifacts or not artifacts[0][0]:
phantom.debug(“No email headers found”)
return

params = [{
“headers”: artifacts[0][0],
“context”: {“artifact_id”: artifacts[0][1]},
}]

phantom.act(
“email parse”,
parameters=params,
assets=[EMAIL_ASSET],
name=”parse_email”,
callback=parse_email_cb,
)


def parse_email_cb(action, success, container, results, **kwargs):
phantom.debug(“parse_email_cb()”)

urls = set()
hashes = set()
msg_ids = set()

for r in results:
for data in r.get(“data”, []):
for u in data.get(“urls”, []):
urls.add(u)
for att in data.get(“attachments”, []):
h = att.get(“sha256”) or att.get(“md5”)
if h:
hashes.add(h)
mid = data.get(“message_id”)
if mid:
msg_ids.add(mid)

phantom.save_run_data(“phish.msg_ids”, “,”.join(msg_ids))

ti_params = []
for u in urls:
ti_params.append({“url”: u})
for h in hashes:
ti_params.append({“hash”: h})

if not ti_params:
phantom.debug(“No URLs or hashes found – nothing to enrich.”)
phantom.add_note(container, “No IoCs extracted from email.”, “Phishing Demo”)
return

phantom.act(
“reputation”,
parameters=ti_params,
assets=[TI_ASSET],
name=”ti_reputation”,
callback=ti_reputation_cb,
)


def ti_reputation_cb(action, success, container, results, **kwargs):
phantom.debug(“ti_reputation_cb()”)

malicious = False

for r in results:
for d in r.get(“data”, []):
summ = d.get(“summary”, {}) or {}
score = summ.get(“positives”) or summ.get(“score”) or 0
if score and score >= 5: # simple demo threshold
malicious = True

if malicious:
phantom.debug(“Malicious IoCs detected – quarantining email.”)
quarantine_email(container)
else:
phantom.add_note(
container,
“No malicious indicators found in phishing demo playbook.”,
“Phishing Demo Result”,
)
phantom.set_severity(container, “low”)
phantom.set_status(container, “closed”)


def quarantine_email(container):
phantom.debug(“quarantine_email()”)

msg_ids = phantom.get_run_data(“phish.msg_ids”)
if not msg_ids:
phantom.debug(“No message IDs to quarantine.”)
return

params = []
for mid in msg_ids.split(“,”):
if mid:
params.append({“message_id”: mid, “folder”: “Quarantine”})

phantom.act(
“move email”,
parameters=params,
assets=[EMAIL_ASSET],
name=”move_to_quarantine”,
)

phantom.add_note(
container,
“Email moved to Quarantine by phishing demo playbook.”,
“Phishing Demo Containment”,
)
phantom.set_severity(container, “high”)
phantom.set_status(container, “closed”)


def on_finish(container, summary):
phantom.debug(“on_finish()”)
return


 


Enhance analyst efficiency with our SOAR consulting service


Schedule a call

Exit mobile version