Source code for bare68k.runtime

"""the runtime module drives the m68k processor emulation of bare68k.
"""

import time
import logging

import bare68k.api.machine as mach
import bare68k.api.cpu as cpu
import bare68k.api.mem as mem
import bare68k.api.tools as tools

from bare68k.consts import *
from bare68k.errors import *
from bare68k.cpucfg import *
from bare68k.memcfg import *
from bare68k.runcfg import RunConfig
from bare68k.label import *
from bare68k.handler import EventHandler


class EventStats(object):
    """Store statistics on called events"""

    def __init__(self):
        self.total_events = 0
        self.event_counts = [0] * CPU_NUM_EVENTS

    def count(self, ev_num):
        if ev_num >= 0 and ev_num < CPU_NUM_EVENTS:
            self.event_counts[ev_num] += 1
            self.total_events += 1
        else:
            raise ValueError("invalid event number")

    def get_total_events(self):
        return self.total_events

    def get_event_count(self, ev_num):
        if ev_num >= 0 and ev_num < CPU_NUM_EVENTS:
            return self.event_counts[ev_num]
        else:
            raise ValueError("invalid event number")

    def __repr__(self):
        vals = map(str, self.event_counts)
        return "EventStats(#%d:%s)" % (self.total_events, ",".join(vals))


class RunInfo(object):
    """RunInfo returns information about the CPU run last performed"""

    def __init__(self, total_time, cpu_time, total_cycles, results, stats):
        self.total_time = total_time
        self.cpu_time = cpu_time
        self.total_cycles = total_cycles
        self.results = results
        self.stats = stats
        self.py_time = total_time - cpu_time

    def __repr__(self):
        return "RunInfo({},{},{},{},{})".format(
            self.total_time, self.cpu_time,
            self.total_cycles, repr(self.results),
            repr(self.stats))

    def is_done(self):
        return self.get_last_result() == CPU_EVENT_DONE

    def get_results(self):
        return self.results

    def get_event(self, pos):
        return self.results[pos][1]

    def get_result(self, pos):
        return self.results[pos][0]

    def get_last_result(self):
        if len(self.results) > 0:
            return self.results[-1][0]
        else:
            return None

    def get_last_event(self):
        if len(self.results) > 0:
            return self.results[-1][1]
        else:
            return None

    def get_stats(self):
        return self.stats

    def calc_cpu_mhz(self):
        """from cpu time and cycle count calc cpu clock speed of musashi"""
        if self.cpu_time > 0:
            return self.total_cycles / (self.cpu_time * 1000000)
        else:
            return 0


def log_setup(level=logging.DEBUG):
    """setup logging of the runtime"""
    FORMAT = '%(asctime)-15s %(name)24s:%(levelname)7s:  %(message)s'
    logging.basicConfig(format=FORMAT, level=level)


def init_quick(cpu_type=M68K_CPU_TYPE_68000, ram_pages=1):
    """a simplified init.

    Create a Runtime() instance which is simplified and uses a basic memory
    setup with only a RAM region starting at 0 with given number of pages.
    """
    cpu_cfg = CPUConfig(cpu_type)
    mem_cfg = MemoryConfig()
    mem_cfg.add_ram_range(0, ram_pages)
    run_cfg = RunConfig()
    return Runtime(cpu_cfg, mem_cfg, run_cfg)


[docs]class Runtime(object): """The runtime controls the CPU emulation run and dispatches events. The central entry point for every system emulation done with bare68k is the Runtime. First you configure it by passing in a CPU, memory layout and runtime configuration. Add an optional event handler to control the processing of incoming events. Configure an optional Logger to receive all incoming traces. Args: cpu_cfg (:obj:`bare68k.CPUConfig`): the CPU configuration mem_cfg (:obj:`bare68k.MemoryConfig`): the memory layout for the emulation run_cfg (:obj:`bare68k.RunConfig`): runtime options event_handler (:obj:`bare68k.EventHandler`, optional): event handler that receives all returned events from the CPU emulation. By default the :class:`bare68k.EventHandler` is used. log_channel (:obj:`logging.Logger`, optional): a logger that logs all runtime events. By default a logger with ``__name__`` of the module is created. """ def __init__(self, cpu_cfg, mem_cfg, run_cfg, event_handler=None, log_channel=None): # setup logging if log_channel is None: self._log = logging.getLogger(__name__) else: self._log = log_channel # remember configs self._cpu_cfg = cpu_cfg self._mem_cfg = mem_cfg self._run_cfg = run_cfg # setup event handler backref to runtime if event_handler is None: self._event_handler = EventHandler() else: self._event_handler = event_handler # check mem config max_pages = cpu_cfg.get_max_pages() mem_cfg.check(max_pages=max_pages) # init machine with_labels = run_cfg._with_labels cpu = cpu_cfg.get_cpu_type() num_pages = mem_cfg.get_num_pages() mach.init(cpu, num_pages, with_labels) # realize mem config self._setup_mem(mem_cfg) # setup cpu event handlers self._setup_handlers() # clear state self._reset_pc = None self._reset_sp = None self._end_pcs = [] self._cpu_states = [] # setup label mgr if with_labels: self._label_mgr = LabelMgr() else: self._label_mgr = DummyLabelMgr() # finally attach runtime to handler self._event_handler.attach_runtime(self)
[docs] def get_with_labels(self): """Check is memory labels are enabled for the runtime. The runtime can be either configured to enable or disable memory labels via the :class:`RunConfig`. This function returns True if labels are enabled otherwise False. Returns: bool: True if labels are enabled, otherwise False """ return self._run_cfg._with_labels
[docs] def get_label_mgr(self): """Get the label manager associated with the runtime. If labels are enabled a real :class:`LabelMgr` is returned. If labels are disabled then a fake :class:`DummyLabelMgr` is available. It provides the same interface but does nothing. Returns: :obj:`LabelMgr` or :obj:`DummyLabelMgr`: active label manager """ return self._label_mgr
[docs] def get_cpu_cfg(self): """access the current CPU configuration""" return self._cpu_cfg
[docs] def get_mem_cfg(self): """access the current memory configuration""" return self._mem_cfg
[docs] def get_run_cfg(self): """access the current run configuration""" return self._run_cfg
[docs] def get_cpu(self): """return cpu API 'object'""" return cpu
[docs] def get_mem(self): """return mem API 'object'""" return mem
def get_event_handler(self): return self._event_handler def _setup_mem(self, mem_cfg): """internal helper to realize the memory configuration""" mem_ranges = mem_cfg.get_range_list() for mr in mem_ranges: mt = mr.mem_type start = mr.start_page size = mr.num_pages if mt == MEM_RAM: flags = MEM_FLAGS_RW if mr.traps: flags |= MEM_FLAGS_TRAPS mem.add_memory(start, size, flags) self._log.info( "memory: RAM @%04x +%04x flags=%x", start, size, flags) elif mt == MEM_ROM: flags = MEM_FLAGS_READ if mr.traps: flags |= MEM_FLAGS_TRAPS mem.add_memory(start, size, flags) data = mr.opts if data is not None: mem.w_block(mr.start_addr, data) self._log.info( "memory: ROM @%04x +%04x flags=%x", start, size, flags) elif mt == MEM_SPECIAL: r_func, w_func = mr.opts mem.add_special(start, size, r_func, w_func) self._log.info("memory: spc @%04x +%04x", start, size) elif mt == MEM_EMPTY: value = mr.opts mem.add_empty(start, size, MEM_FLAGS_RW, value) self._log.info("memory: --- @%04x +%04x: %08x", start, size, value) elif mt == MEM_MIRROR: base_page = mr.opts mem.add_mirror(start, size, MEM_FLAGS_RW, base_page) self._log.info("memory: mir @%04x +%04x: <- %04x", start, size, base_page) elif mt == MEM_NOALLOC: self._log.info("memory: ??? @%04x +%04x", start, size) elif mt == MEM_RESERVE: self._log.info("memory: XXX @%04x +%04x", start, size) else: raise ValueError("Invalid memory type: %d" % mt) self._log.info("memory: done. max_pages=%04x", mem_cfg.get_num_pages())
[docs] def shutdown(self): """shutdown runtime after using the runtime you have to shut it down. this frees the allocated resources. After that you can init() again for a new run """ self._cpu_cfg = None self._mem_cfg = None self._run_cfg = None mach.shutdown() self._log.info("shutdown")
[docs] def reset(self, init_pc, init_sp=0x800): """reset the CPU before you can run the CPU you have to reset it. This will write the initial SP and the initial PC to locations 0 and 4 in RAM and pulse a reset in the CPU emulation. After this operation you are free to overwrite these values again. Now proceed to call run(). """ self._reset_pc = init_pc self._reset_sp = init_sp # place SP and PC in memory mem.w32(0, init_sp) mem.w32(4, init_pc) # now pulse reset cpu.pulse_reset() # clear info to reset cycle counts cpu.clear_info() self._log.info("reset: pc=%08x, sp=%08x", init_pc, init_sp)
def get_reset_pc(self): return self._reset_pc def get_reset_sp(self): return self._reset_sp def get_top_end_pc(self): if len(self._end_pcs) == 0: return None else: return self._end_pcs[-1]
[docs] def run(self, reset_end_pc=None, start_pc=None, start_sp=None): """run the CPU until emulation ends This is the main loop of your emulation. The CPU emulation is run and events are processed. The events are dispatched and the associated handlers are called. If a reset opcode is encountered then the execution is terminated. Returns a RunInfo instance giving you timing information. """ # get some config values catch_kb_intr = self._run_cfg._catch_kb_intr cycles_per_run = self._run_cfg._cycles_per_run pc_trace_size = self._run_cfg._pc_trace_size instr_trace = self._run_cfg._instr_trace cpu_mem_trace = self._run_cfg._cpu_mem_trace api_mem_trace = self._run_cfg._api_mem_trace # recursive run() call? if yes then store cpu state rec_depth = len(self._end_pcs) if rec_depth > 0: cpu_state = cpu.get_cpu_context() else: cpu_state = None # set start pc/sp if requested if start_pc is not None: cpu.w_pc(start_pc) if start_sp is not None: cpu.w_sp(start_sp) # keep end pc self._end_pcs.append(reset_end_pc) # timer function timer = time.time # stats stats = EventStats() start_cycles = cpu.get_total_cycles() # pc trace? tools.setup_pc_trace(pc_trace_size) # instr trace? evh = self._event_handler if instr_trace: cpu.set_instr_hook_func(evh.handle_instr_trace) if cpu_mem_trace: mem.mach.set_mem_cpu_trace_func( evh.handle_cpu_mem_trace, as_str=True) if api_mem_trace: mem.mach.set_mem_api_trace_func( evh.handle_api_mem_trace, as_str=True) cpu_time = 0 # main loop self._log.debug("enter run loop #%d", rec_depth) total_start = timer() results = [] stay = True while stay: try: start = timer() # execute CPU code until event occurs num_events = cpu.execute_to_event_checked(cycles_per_run) except KeyboardInterrupt as e: # either abort execution (default) or re-raise exception self._log.debug("keyboard interrupt") if not catch_kb_intr: raise e results.append((CPU_EVENT_USER_ABORT, None)) break finally: end = timer() cpu_time += end - start # dispatch events run_info = cpu.get_info() results = [] for event in run_info.events: handler = event.handler stats.count(event.ev_type) if handler is not None: self._log.debug("trigger handler: %s", CPU_EVENT_NAMES[event.ev_type]) result = handler(event) # handler wants to exit run loop if result is not None: results.append((result, event)) stay = False self._log.debug("run loop exit #%d: result=%s" " (event=%r)", rec_depth, CPU_EVENT_NAMES[result], event) # user abort terminates event loop if result == CPU_EVENT_USER_ABORT: break else: self._log.warning("no handler: result=%s (event=%r)", CPU_EVENT_NAMES[event.ev_type], event) total_end = timer() self._log.debug("leave run loop #%d", rec_depth) # pop end pc self._end_pcs.pop() # restore cpu if cpu_state is not None: cpu.set_cpu_context(cpu_state) # instr trace if instr_trace: cpu.set_instr_hook_func(None) if cpu_mem_trace: mem.set_mem_cpu_trace_func(None) if api_mem_trace: mem.set_mem_api_trace_func(None) # final timing total_time = total_end - total_start end_cycles = cpu.get_total_cycles() total_cycles = end_cycles - start_cycles # create run info result ri = RunInfo(total_time, cpu_time, total_cycles, results, stats) self._log.debug("run info: %s", ri) return ri
def _setup_handlers(self): """internal setter for all machine event handlers""" cfg = self._event_handler eh = { CPU_EVENT_CALLBACK_ERROR: cfg.handle_cb_error, CPU_EVENT_RESET: cfg.handle_reset, CPU_EVENT_ALINE_TRAP: cfg.handle_aline_trap, CPU_EVENT_MEM_ACCESS: cfg.handle_mem_access, CPU_EVENT_MEM_BOUNDS: cfg.handle_mem_bounds, CPU_EVENT_MEM_TRACE: cfg.handle_mem_trace, CPU_EVENT_MEM_SPECIAL: cfg.handle_mem_special, CPU_EVENT_INSTR_HOOK: cfg.handle_instr_hook, CPU_EVENT_INT_ACK: cfg.handle_int_ack, CPU_EVENT_BREAKPOINT: cfg.handle_breakpoint, CPU_EVENT_WATCHPOINT: cfg.handle_watchpoint, CPU_EVENT_TIMER: cfg.handle_timer } for e in eh: cpu.set_event_handler(e, eh[e])
[docs] def set_handler(self, event_type, handler): """set a custom handler for an event type and overwrite default handler""" cpu.set_event_handler(event_type, handler)