Skip to main content

Command Palette

Search for a command to run...

Unflattening the Maze: Automating CFF Deobfuscation using Miasm (P1)

Updated
11 min read

1. Introduce

In recent years, it has become quite common for malware to use the control flow flattening (CFF) obfuscation technique. This is considered one of the most annoying types of obfuscation for malware analysts. In this blog post, I will show you how to deobfuscate it in general, and you can apply the same method to other more complex samples. In part 1, I will use the MIASM framework to deobfuscate. The benefits that miasm brings include:

  • Lifting & Intermediate Representation (IR): Miasm allows translating machine code (x86, ARM, etc.) into an intermediate representation (IR). This helps us analyze the logic without depending on the complexity of each specific CPU instruction set.

  • Powerful Data-flow Analysis: Miasm provides tools like ReachingDefinitions and DiGraphDefUse. These are the keys for us to trace how the State Variable changes across each code block, thereby finding the original execution path.

2. Anatomy of CFF

The components present in the structure of Control Flow Flattening:

  • State Variable: This is the "heart" of the CFF structure. The value of this variable (usually located in a register like EAX or a memory location) determines which code block will be executed next in the logical flow of the program.

  • Dispatcher: The central intersection of the function. All roads lead back here. Its task is to read the current value of the State Variable and push the execution flow into the comparison chain to find the destination.

  • Backbone: A continuous sequence of comparison (CMP) and conditional jump (Jcc) instructions. It acts as a filter, matching the value of the state variable with predefined constants to identify the correct Relevant block to run.

  • Relevant Blocks: Where the actual logic of the original program is located (such as calculations, function calls...). An important characteristic is that at the end of each of these blocks, the State Variable will be updated with a new value before returning to the loop. In some variants, the relevant block also functions as the pre-dispatcher block. Therefore, you need to identify these components accurately before proceeding with deobfuscation.

  • Pre-dispatcher: The final gathering point of the relevant blocks. After finishing executing the logic, instead of jumping directly back to the Dispatcher, the malware often jumps through this intermediate block to perform some cleanup operations or simply to further obfuscate the function graph.

3. Deobfuscation Strategy

Overall there are 5 steps. You can apply them to cases from simple to complex.

Step 1: Identify key components

First, we need to know where the dispatcher, pre-dispatcher, and prologue are. Based on graph algorithms, we will find the block with the second highest number of parent nodes (predecessors) in the function (one of the pre-dispatchers) to identify the Dispatcher from there. Then, the Pre-dispatchers will be localized based on their relationship with this Dispatcher.

Step 2: Trace the state variable using Data-flow

This is when Miasm demonstrates its strength. We use data flow analysis to find what value the state variable is updated to at each execution block. There are 2 useful cases for this feature:

  • Simple case: The value is assigned directly by a constant.

  • Complex case: The value is passed back and forth between intermediate registers, requiring tracing back to the parent blocks to find the actual number.

As you can see in the image below, the state variable value is assigned directly to the EAX register. We will extract this value to use.

image-20260417153444362

Step 3: Decode the Backbone's instruction table

Next, we analyze the backbone chain. We will iterate through the IR blocks to find constant comparison instructions. The goal is to build a complete mapping table: for each value of the state variable, to what address will the actual program jump.

Observe the backbone block represented in IR. Currently, this block is comparing with the constant 0x724743A3 and has 2 branches to jump: loc_40d0b1 and loc_40cc40.

image-20260417153444362

Step 4: Restore the original logic flow

Once we have information from Step 2 and Step 3, we simply perform logical linking. If block A assigns the state variable to 10, and according to the mapping table in the previous step, the value 10 will lead to block B, then clearly the real execution flow is from A to B. We will save these link pairs to prepare for the final step.

Step 5: Execute patching the binary file

This is the "cleanup" step to bring back clarity to the source code. Based on the found links, we will:

  • Remove the Dispatcher and redundant state variable assignment instructions using NOP instructions.

  • Replace the roundabout jumps with a direct JMP instruction to the actual destination address.

  • Overwrite these changes directly into the executable file to get a clean binary.

Below is an image of the graph before and after deobfuscating CFF on the same function.

4. Implementation

Now we will proceed to write the script using MIASM framework.

Phase 1: Locate the dispatcher and function structure

First, we need a function to scan through the entire execution graph and find the key points like Dispatcher or Pre-dispatchers. This function uses an algorithm to count the number of parent nodes to determine where all logical flows converge.

# Identify the dispatcher block and the list of pre-dispatchers
def get_cff_info(asmcfg):
    preds = {}
    for blk in asmcfg.blocks:
        offset = asmcfg.loc_db.get_location_offset(blk.loc_key)
        preds[offset] = asmcfg.predecessors(blk.loc_key)

    # Heuristic: The block with the 2nd highest number of predecessors is usually the main pre-dispatcher
    pre_dispatcher_main = sorted(preds, key=lambda key: len(preds[key]), reverse=True)[1]
    # The dispatcher is the successor of the main pre-dispatcher
    dispatcher = asmcfg.successors(asmcfg.loc_db.get_offset_location(pre_dispatcher_main))[0]
    dispatcher = asmcfg.loc_db.get_location_offset(dispatcher)
    # Find all other pre-dispatchers by looking at the dispatcher's predecessors
    all_predecessors = asmcfg.predecessors(asmcfg.loc_db.get_offset_location(dispatcher))
    pre_dispatchers = []
    for pred in all_predecessors:
        addr_pred = asmcfg.loc_db.get_location_offset(pred)
        # Pre-dispatchers typically have a higher address than the dispatcher in this CFF pattern
        if addr_pred > dispatcher:
            pre_dispatchers.append(addr_pred)
    return dispatcher, pre_dispatchers

Phase 2: Initialize and lift the code up to IR

To analyze the logic more deeply without being confused by assembly instructions, we need to convert the machine code back to an intermediate representation (IR) format. This function helps isolate each code block to analyze separately, making the processing lighter and more accurate.

# Initialize ASMCFG and IRCFG for a specific block to aid deobfuscation
def init_ircfg(addr, state_var):
    machine = Machine(cont.arch)
    loc_db = LocationDB()
    mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
    # Use callback to isolate only the relevant block
    mdis.dis_block_callback = stop_on_jmp
    asmcfg = mdis.dis_multiblock(addr)

    # Lift to IR and simplify
    lifter = machine.lifter_model_call(loc_db)
    ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
    ircfg_simpifier = IRCFGSimplifierCommon(lifter)
    ircfg_simpifier.simplify(ircfg, addr)

    # Locate the address where the state variable is set (for NOPing)
    nop_addrs = find_state_var_usedefs(ircfg, state_var)

    return nop_addrs, ircfg, asmcfg

Phase 3: Trace and extract the state variable

Use data flow analysis to find what value the state variable is updated to at each execution block. This is the key to knowing what the next code block will be.

def find_state_var_usedefs(ircfg, state_var):
    var_addrs = 0
    # Initialize reaching definitions and build a Def-Use graph for data-flow tracking
    reachings = ReachingDefinitions(ircfg)
    digraph = DiGraphDefUse(reachings)
    for head in digraph.heads():
        # Check if the instruction involves the state variable
        if head.var == state_var:
            for x in (digraph.reachable_parents(head)):
                # Get the instruction address
                dst, src = ircfg.get_block(x.label)[x.index].items()[0]
                # Check if it's an assignment of a constant or identifier
                if isinstance(src, ExprInt) or isinstance(src, ExprId):
                    var_addrs = ircfg.get_block(x.label)[x.index].instr.offset
                if var_addrs != 0:
                    break
            if var_addrs != 0:
                break
    return var_addrs

def find_var_asg(ircfg, state_var):
    val_state_var = None
    for lbl, irblk in viewitems(ircfg.blocks):
        for assignblk in irblk:
            # Check if this assignment block modifies our state variable
            result = set(assignblk).intersection(state_var)
            if not result:
                continue
            else:
                dst, src = assignblk.items()[0]
                # if state var is int => relevant type 1
                if isinstance(src, ExprInt):
                    val_state_var = int(src)
                    break
                # if state var is register => relevant type 2
                elif isinstance(src, ExprId):
                    val_state_var = src

    return val_state_var

In this phase, we map the state values we found with the comparison instructions in the backbone chain to find the true destination address of each code block.

    # Step 3: Map state values to destination blocks (Analyze the "Backbone")
    for lbl, irblk in viewitems(ircfg_org.blocks):
        for assignblk in irblk:
            asg_items = assignblk.items()
            if asg_items:
                dst, src = asg_items[0]
                if isinstance(src, ExprOp):
                    # Look for comparison logic in the backbone
                    if src.op == 'FLAG_EQ_CMP':
                        arg = src.args[1]
                        if isinstance(arg, ExprInt):
                            if int(arg) in val_list:
                                cmp_val = int(arg)
                                var, locs = irblk[-1].items()[0]
                                true_dst = ircfg_org.loc_db.get_location_offset(locs.src1.loc_key)
                                # If target is dispatcher, take the alternative path
                                if true_dst == dispatcher:
                                    true_dst = ircfg_org.loc_db.get_location_offset(locs.src2.loc_key)
                                backbone[cmp_val] = true_dst

Phase 5: Patch the binary file to restore execution flow

Finally, when having the original execution path in our hands, we proceed to replace the roundabout jumps with direct jumps and remove the redundant obfuscating components.

    # Step 5: Generate Patches
    # NOP the dispatcher block
    len_patch = 0
    for l in dispatcher_blk.lines:
        len_patch += l.l

    # NOP the state variable assignment in the prologue
    nop_addrs, head_ircfg, head_asmcfg = init_ircfg(ad, state_var)
    val_state_var = find_var_asg(head_ircfg, {state_var})
    for blk in head_asmcfg.blocks:
        for l in blk.lines:
            if l.offset == nop_addrs:
                patches[l.offset] = l.l * b'\x90'
                break
    # Patch dispatcher to JMP directly to the first real block
    jmp_patch_str = f"JMP {rel(dispatcher, backbone[val_state_var])}"
    jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
    patches[dispatcher] = jmp_patch.rjust(len_patch, b'\x90')

    # NOP out the backbone comparisons (CMP/Jcc pairs)
    for blk in asmcfg_org.blocks:
        lines = blk.lines
        if len(lines) == 2:
            len_patch = 0
            if lines[0].name.startswith('CMP') and lines[1].name.startswith('J'):
                for l in lines:
                    len_patch += l.l
                patches[lines[0].offset] = len_patch * b'\x90'
...

Finally, we will link the relevant blocks in the correct order. Because the relevant blocks in this variant have 2 forms. One is containing the state variable value directly, the 2nd is containing the intermediate state variable, and the state variable value is the predsets of the relevant block. Which the relevant block form 2 is usually condition statements. So we will split cases to handle.

# Link relevant blocks to their logical successors
    for offset, true_dst in fixed_cfg.items():
        loc_rel = asmcfg_org.loc_db.get_offset_location(offset)
        loc_suc_rel = asmcfg_org.successors(loc_rel)[0]
        addr_suc_rel = asmcfg_org.loc_db.get_location_offset(loc_suc_rel)
        blk = asmcfg_org.getby_offset(offset)

        # Type 1 Relevant Block handling
        if addr_suc_rel == dispatcher:
            addr_mov_insn = rel_blk_info[offset]
            for l in blk.lines:
                if l.offset == addr_mov_insn:
                    # NOP the state variable assignment instruction
                    patches[addr_mov_insn] = l.l * b'\x90'
                    # Replace the final JMP with a direct jump to the next logical block
                    if blk.lines[-1].name == 'JMP':
                        offset_jmp = rel(blk.lines[-1].offset, fixed_cfg[offset])
                        if int(offset_jmp, 0) == 0:
                            jmp_patch = blk.lines[-1].l * b'\x90'
                        else:
                            jmp_patch_str = f"{blk.lines[-1].name} {offset_jmp}"
                            jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
                        if len(jmp_patch) > blk.lines[-1].l:
                            print('wrong size')
                        patches[blk.lines[-1].offset] = jmp_patch.ljust(blk.lines[-1].l, b'\x90')
                        print(hex(offset), jmp_patch_str, patches[blk.lines[-1].offset].hex())
                    break

        # Type 2 Relevant Block handling
        elif addr_suc_rel in pre_dispatchers or addr_suc_rel in fixed_cfg or addr_suc_rel in dum_jmp:
            is_jmp_condition = blk.lines[-1].name.startswith("J") and blk.lines[-1].name != 'JMP'
            if is_jmp_condition:
                # Determine location of the JMP to patch based on MOV instruction position
                if blk.lines[-3].name == 'MOV':
                    addr_patch_jmp = blk.lines[-3].offset + blk.lines[-2].l
                else:
                    addr_patch_jmp = blk.lines[2].offset

                jmp_patch_str = f"{blk.lines[-1].name} {rel(addr_patch_jmp, fixed_cfg[offset])}"
                jmp_insn = asmb(jmp_patch_str, asmcfg_org.loc_db)
                addr_mov_insn = rel_blk_info.get(offset)
                len_patch = blk.lines[-1].offset + blk.lines[-1].l - addr_mov_insn

                if blk.lines[-3].name == 'MOV':
                    patches[addr_mov_insn] = blk.lines[-2].b
                    patches[addr_patch_jmp] = jmp_insn.ljust(len_patch - blk.lines[-2].l, b'\x90')
                else:
                    patches[addr_mov_insn] = jmp_insn.ljust(len_patch, b'\x90')
                print(hex(offset), jmp_patch_str, patches[addr_mov_insn].hex())
            else:
                # Handle blocks that only have MOV without a JMP instruction
                addr_mov_insn = rel_blk_info[offset]
                len_patch = 0
                for l in blk.lines:
                    if l.offset == addr_mov_insn:
                        len_patch += l.l
                        if blk.lines[-1].name == 'JMP':
                            len_patch += blk.lines[-1].l
                        jmp_patch_str = f"JMP {rel(addr_mov_insn, fixed_cfg[offset])}"
                        jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
                        if len(jmp_patch) > len_patch:
                            print('wrong size')
                        patches[addr_mov_insn] = jmp_patch.ljust(len_patch, b'\x90')
                        print(hex(offset), jmp_patch_str, patches[addr_mov_insn].hex())
                        break

    # Save patches to a pickle file for persistence
    with open('patches.pkl', 'wb') as f:
        pickle.dump(patches, f)

    return patches

5. Results

After running the script, we will get the result as shown in the image below. Although there is still other deadcode, the current code is clean enough for us to analyze further. The full code and sample are located on Github.