Unflattening the Maze: Automating CFF Deobfuscation using Miasm (P1)
1. Introduce
In recent years, it has become quite common for malware to use the control flow flattening (CFF) obfuscation technique. This is considered one of the most annoying types of obfuscation for malware analysts. In this blog post, I will show you how to deobfuscate it in general, and you can apply the same method to other more complex samples. In part 1, I will use the MIASM framework to deobfuscate. The benefits that miasm brings include:
Lifting & Intermediate Representation (IR): Miasm allows translating machine code (x86, ARM, etc.) into an intermediate representation (IR). This helps us analyze the logic without depending on the complexity of each specific CPU instruction set.
Powerful Data-flow Analysis: Miasm provides tools like
ReachingDefinitionsandDiGraphDefUse. These are the keys for us to trace how the State Variable changes across each code block, thereby finding the original execution path.
2. Anatomy of CFF
The components present in the structure of Control Flow Flattening:
State Variable: This is the "heart" of the CFF structure. The value of this variable (usually located in a register like
EAXor a memory location) determines which code block will be executed next in the logical flow of the program.Dispatcher: The central intersection of the function. All roads lead back here. Its task is to read the current value of the State Variable and push the execution flow into the comparison chain to find the destination.
Backbone: A continuous sequence of comparison (
CMP) and conditional jump (Jcc) instructions. It acts as a filter, matching the value of the state variable with predefined constants to identify the correct Relevant block to run.Relevant Blocks: Where the actual logic of the original program is located (such as calculations, function calls...). An important characteristic is that at the end of each of these blocks, the State Variable will be updated with a new value before returning to the loop. In some variants, the relevant block also functions as the pre-dispatcher block. Therefore, you need to identify these components accurately before proceeding with deobfuscation.
Pre-dispatcher: The final gathering point of the relevant blocks. After finishing executing the logic, instead of jumping directly back to the Dispatcher, the malware often jumps through this intermediate block to perform some cleanup operations or simply to further obfuscate the function graph.
3. Deobfuscation Strategy
Overall there are 5 steps. You can apply them to cases from simple to complex.
Step 1: Identify key components
First, we need to know where the dispatcher, pre-dispatcher, and prologue are. Based on graph algorithms, we will find the block with the second highest number of parent nodes (predecessors) in the function (one of the pre-dispatchers) to identify the Dispatcher from there. Then, the Pre-dispatchers will be localized based on their relationship with this Dispatcher.
Step 2: Trace the state variable using Data-flow
This is when Miasm demonstrates its strength. We use data flow analysis to find what value the state variable is updated to at each execution block. There are 2 useful cases for this feature:
Simple case: The value is assigned directly by a constant.
Complex case: The value is passed back and forth between intermediate registers, requiring tracing back to the parent blocks to find the actual number.
As you can see in the image below, the state variable value is assigned directly to the EAX register. We will extract this value to use.
Step 3: Decode the Backbone's instruction table
Next, we analyze the backbone chain. We will iterate through the IR blocks to find constant comparison instructions. The goal is to build a complete mapping table: for each value of the state variable, to what address will the actual program jump.
Observe the backbone block represented in IR. Currently, this block is comparing with the constant 0x724743A3 and has 2 branches to jump: loc_40d0b1 and loc_40cc40.
Step 4: Restore the original logic flow
Once we have information from Step 2 and Step 3, we simply perform logical linking. If block A assigns the state variable to 10, and according to the mapping table in the previous step, the value 10 will lead to block B, then clearly the real execution flow is from A to B. We will save these link pairs to prepare for the final step.
Step 5: Execute patching the binary file
This is the "cleanup" step to bring back clarity to the source code. Based on the found links, we will:
Remove the Dispatcher and redundant state variable assignment instructions using NOP instructions.
Replace the roundabout jumps with a direct JMP instruction to the actual destination address.
Overwrite these changes directly into the executable file to get a clean binary.
Below is an image of the graph before and after deobfuscating CFF on the same function.
4. Implementation
Now we will proceed to write the script using MIASM framework.
Phase 1: Locate the dispatcher and function structure
First, we need a function to scan through the entire execution graph and find the key points like Dispatcher or Pre-dispatchers. This function uses an algorithm to count the number of parent nodes to determine where all logical flows converge.
# Identify the dispatcher block and the list of pre-dispatchers
def get_cff_info(asmcfg):
preds = {}
for blk in asmcfg.blocks:
offset = asmcfg.loc_db.get_location_offset(blk.loc_key)
preds[offset] = asmcfg.predecessors(blk.loc_key)
# Heuristic: The block with the 2nd highest number of predecessors is usually the main pre-dispatcher
pre_dispatcher_main = sorted(preds, key=lambda key: len(preds[key]), reverse=True)[1]
# The dispatcher is the successor of the main pre-dispatcher
dispatcher = asmcfg.successors(asmcfg.loc_db.get_offset_location(pre_dispatcher_main))[0]
dispatcher = asmcfg.loc_db.get_location_offset(dispatcher)
# Find all other pre-dispatchers by looking at the dispatcher's predecessors
all_predecessors = asmcfg.predecessors(asmcfg.loc_db.get_offset_location(dispatcher))
pre_dispatchers = []
for pred in all_predecessors:
addr_pred = asmcfg.loc_db.get_location_offset(pred)
# Pre-dispatchers typically have a higher address than the dispatcher in this CFF pattern
if addr_pred > dispatcher:
pre_dispatchers.append(addr_pred)
return dispatcher, pre_dispatchers
Phase 2: Initialize and lift the code up to IR
To analyze the logic more deeply without being confused by assembly instructions, we need to convert the machine code back to an intermediate representation (IR) format. This function helps isolate each code block to analyze separately, making the processing lighter and more accurate.
# Initialize ASMCFG and IRCFG for a specific block to aid deobfuscation
def init_ircfg(addr, state_var):
machine = Machine(cont.arch)
loc_db = LocationDB()
mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
# Use callback to isolate only the relevant block
mdis.dis_block_callback = stop_on_jmp
asmcfg = mdis.dis_multiblock(addr)
# Lift to IR and simplify
lifter = machine.lifter_model_call(loc_db)
ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
ircfg_simpifier = IRCFGSimplifierCommon(lifter)
ircfg_simpifier.simplify(ircfg, addr)
# Locate the address where the state variable is set (for NOPing)
nop_addrs = find_state_var_usedefs(ircfg, state_var)
return nop_addrs, ircfg, asmcfg
Phase 3: Trace and extract the state variable
Use data flow analysis to find what value the state variable is updated to at each execution block. This is the key to knowing what the next code block will be.
def find_state_var_usedefs(ircfg, state_var):
var_addrs = 0
# Initialize reaching definitions and build a Def-Use graph for data-flow tracking
reachings = ReachingDefinitions(ircfg)
digraph = DiGraphDefUse(reachings)
for head in digraph.heads():
# Check if the instruction involves the state variable
if head.var == state_var:
for x in (digraph.reachable_parents(head)):
# Get the instruction address
dst, src = ircfg.get_block(x.label)[x.index].items()[0]
# Check if it's an assignment of a constant or identifier
if isinstance(src, ExprInt) or isinstance(src, ExprId):
var_addrs = ircfg.get_block(x.label)[x.index].instr.offset
if var_addrs != 0:
break
if var_addrs != 0:
break
return var_addrs
def find_var_asg(ircfg, state_var):
val_state_var = None
for lbl, irblk in viewitems(ircfg.blocks):
for assignblk in irblk:
# Check if this assignment block modifies our state variable
result = set(assignblk).intersection(state_var)
if not result:
continue
else:
dst, src = assignblk.items()[0]
# if state var is int => relevant type 1
if isinstance(src, ExprInt):
val_state_var = int(src)
break
# if state var is register => relevant type 2
elif isinstance(src, ExprId):
val_state_var = src
return val_state_var
Phase 4: Analyze the backbone and build the link map
In this phase, we map the state values we found with the comparison instructions in the backbone chain to find the true destination address of each code block.
# Step 3: Map state values to destination blocks (Analyze the "Backbone")
for lbl, irblk in viewitems(ircfg_org.blocks):
for assignblk in irblk:
asg_items = assignblk.items()
if asg_items:
dst, src = asg_items[0]
if isinstance(src, ExprOp):
# Look for comparison logic in the backbone
if src.op == 'FLAG_EQ_CMP':
arg = src.args[1]
if isinstance(arg, ExprInt):
if int(arg) in val_list:
cmp_val = int(arg)
var, locs = irblk[-1].items()[0]
true_dst = ircfg_org.loc_db.get_location_offset(locs.src1.loc_key)
# If target is dispatcher, take the alternative path
if true_dst == dispatcher:
true_dst = ircfg_org.loc_db.get_location_offset(locs.src2.loc_key)
backbone[cmp_val] = true_dst
Phase 5: Patch the binary file to restore execution flow
Finally, when having the original execution path in our hands, we proceed to replace the roundabout jumps with direct jumps and remove the redundant obfuscating components.
# Step 5: Generate Patches
# NOP the dispatcher block
len_patch = 0
for l in dispatcher_blk.lines:
len_patch += l.l
# NOP the state variable assignment in the prologue
nop_addrs, head_ircfg, head_asmcfg = init_ircfg(ad, state_var)
val_state_var = find_var_asg(head_ircfg, {state_var})
for blk in head_asmcfg.blocks:
for l in blk.lines:
if l.offset == nop_addrs:
patches[l.offset] = l.l * b'\x90'
break
# Patch dispatcher to JMP directly to the first real block
jmp_patch_str = f"JMP {rel(dispatcher, backbone[val_state_var])}"
jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
patches[dispatcher] = jmp_patch.rjust(len_patch, b'\x90')
# NOP out the backbone comparisons (CMP/Jcc pairs)
for blk in asmcfg_org.blocks:
lines = blk.lines
if len(lines) == 2:
len_patch = 0
if lines[0].name.startswith('CMP') and lines[1].name.startswith('J'):
for l in lines:
len_patch += l.l
patches[lines[0].offset] = len_patch * b'\x90'
...
Finally, we will link the relevant blocks in the correct order. Because the relevant blocks in this variant have 2 forms. One is containing the state variable value directly, the 2nd is containing the intermediate state variable, and the state variable value is the predsets of the relevant block. Which the relevant block form 2 is usually condition statements. So we will split cases to handle.
# Link relevant blocks to their logical successors
for offset, true_dst in fixed_cfg.items():
loc_rel = asmcfg_org.loc_db.get_offset_location(offset)
loc_suc_rel = asmcfg_org.successors(loc_rel)[0]
addr_suc_rel = asmcfg_org.loc_db.get_location_offset(loc_suc_rel)
blk = asmcfg_org.getby_offset(offset)
# Type 1 Relevant Block handling
if addr_suc_rel == dispatcher:
addr_mov_insn = rel_blk_info[offset]
for l in blk.lines:
if l.offset == addr_mov_insn:
# NOP the state variable assignment instruction
patches[addr_mov_insn] = l.l * b'\x90'
# Replace the final JMP with a direct jump to the next logical block
if blk.lines[-1].name == 'JMP':
offset_jmp = rel(blk.lines[-1].offset, fixed_cfg[offset])
if int(offset_jmp, 0) == 0:
jmp_patch = blk.lines[-1].l * b'\x90'
else:
jmp_patch_str = f"{blk.lines[-1].name} {offset_jmp}"
jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
if len(jmp_patch) > blk.lines[-1].l:
print('wrong size')
patches[blk.lines[-1].offset] = jmp_patch.ljust(blk.lines[-1].l, b'\x90')
print(hex(offset), jmp_patch_str, patches[blk.lines[-1].offset].hex())
break
# Type 2 Relevant Block handling
elif addr_suc_rel in pre_dispatchers or addr_suc_rel in fixed_cfg or addr_suc_rel in dum_jmp:
is_jmp_condition = blk.lines[-1].name.startswith("J") and blk.lines[-1].name != 'JMP'
if is_jmp_condition:
# Determine location of the JMP to patch based on MOV instruction position
if blk.lines[-3].name == 'MOV':
addr_patch_jmp = blk.lines[-3].offset + blk.lines[-2].l
else:
addr_patch_jmp = blk.lines[2].offset
jmp_patch_str = f"{blk.lines[-1].name} {rel(addr_patch_jmp, fixed_cfg[offset])}"
jmp_insn = asmb(jmp_patch_str, asmcfg_org.loc_db)
addr_mov_insn = rel_blk_info.get(offset)
len_patch = blk.lines[-1].offset + blk.lines[-1].l - addr_mov_insn
if blk.lines[-3].name == 'MOV':
patches[addr_mov_insn] = blk.lines[-2].b
patches[addr_patch_jmp] = jmp_insn.ljust(len_patch - blk.lines[-2].l, b'\x90')
else:
patches[addr_mov_insn] = jmp_insn.ljust(len_patch, b'\x90')
print(hex(offset), jmp_patch_str, patches[addr_mov_insn].hex())
else:
# Handle blocks that only have MOV without a JMP instruction
addr_mov_insn = rel_blk_info[offset]
len_patch = 0
for l in blk.lines:
if l.offset == addr_mov_insn:
len_patch += l.l
if blk.lines[-1].name == 'JMP':
len_patch += blk.lines[-1].l
jmp_patch_str = f"JMP {rel(addr_mov_insn, fixed_cfg[offset])}"
jmp_patch = asmb(jmp_patch_str, asmcfg_org.loc_db)
if len(jmp_patch) > len_patch:
print('wrong size')
patches[addr_mov_insn] = jmp_patch.ljust(len_patch, b'\x90')
print(hex(offset), jmp_patch_str, patches[addr_mov_insn].hex())
break
# Save patches to a pickle file for persistence
with open('patches.pkl', 'wb') as f:
pickle.dump(patches, f)
return patches
5. Results
After running the script, we will get the result as shown in the image below. Although there is still other deadcode, the current code is clean enough for us to analyze further. The full code and sample are located on Github.



