This commit is contained in:
2021-04-19 20:16:55 +02:00
commit a0ff94dca2
839 changed files with 198976 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
py_trace_event allows low-overhead instrumentation of a multi-threaded,
multi-process application in order to study its global performance
characteristics. It uses the trace event format used in Chromium/Chrome's
about:tracing system.
Trace files generated by py_trace_event can be viewed and manipulated by
trace_event_viewer.

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
_CATAPULT_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..'))
_PY_TRACE_EVENT_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
def _RunTestsOrDie(top_level_dir):
# Need everything in one process for tracing to work.
exit_code = run_with_typ.Run(
top_level_dir, path=[_PY_TRACE_EVENT_PATH], jobs=1)
if exit_code:
sys.exit(exit_code)
def _AddToPathIfNeeded(path):
if path not in sys.path:
sys.path.insert(0, path)
if __name__ == '__main__':
_AddToPathIfNeeded(_CATAPULT_PATH)
from catapult_build import run_with_typ
_RunTestsOrDie(_PY_TRACE_EVENT_PATH)

View File

@@ -0,0 +1,12 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
PY_UTILS = os.path.abspath(os.path.join(SCRIPT_DIR, '..', '..', 'py_utils'))
PROTOBUF = os.path.abspath(os.path.join(
SCRIPT_DIR, '..', 'third_party', 'protobuf'))
sys.path.append(PY_UTILS)
sys.path.append(PROTOBUF)

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env python
# Copyright 2011 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from distutils.core import setup
setup(
name='py_trace_event',
packages=['trace_event_impl'],
version='0.1.0',
description='Performance tracing for python',
author='Nat Duca'
)

View File

@@ -0,0 +1,295 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from py_trace_event import trace_time
r"""Instrumentation-based profiling for Python.
trace_event allows you to hand-instrument your code with areas of interest.
When enabled, trace_event logs the start and stop times of these events to a
logfile. These resulting logfiles can be viewed with either Chrome's
about:tracing UI or with the standalone trace_event_viewer available at
http://www.github.com/natduca/trace_event_viewer/
To use trace event, call trace_event_enable and start instrumenting your code:
from trace_event import *
if "--trace" in sys.argv:
trace_enable("myfile.trace")
@traced
def foo():
...
class MyFoo(object):
@traced
def bar(self):
...
trace_event records trace events to an in-memory buffer. If your application is
long running and you want to see the results of a trace before it exits, you can
call trace_flush to write any in-memory events to disk.
To help intregrating trace_event into existing codebases that dont want to add
trace_event as a dependancy, trace_event is split into an import shim
(trace_event.py) and an implementaiton (trace_event_impl/*). You can copy the
shim, trace_event.py, directly into your including codebase. If the
trace_event_impl is not found, the shim will simply noop.
trace_event is safe with regard to Python threads. Simply trace as you normally
would and each thread's timing will show up in the trace file.
Multiple processes can safely output into a single trace_event logfile. If you
fork after enabling tracing, the child process will continue outputting to the
logfile. Use of the multiprocessing module will work as well. In both cases,
however, note that disabling tracing in the parent process will not stop tracing
in the child processes.
"""
try:
import trace_event_impl
except ImportError:
trace_event_impl = None
def trace_can_enable():
"""
Returns True if a trace_event_impl was found. If false,
trace_enable will fail. Regular tracing methods, including
trace_begin and trace_end, will simply be no-ops.
"""
return trace_event_impl != None
# Default TracedMetaClass to type incase trace_event_impl is not defined.
# This is to avoid exception during import time since TracedMetaClass typically
# used in class definition scope.
TracedMetaClass = type
if trace_event_impl:
import time
# Trace file formats
JSON = trace_event_impl.JSON
JSON_WITH_METADATA = trace_event_impl.JSON_WITH_METADATA
PROTOBUF = trace_event_impl.PROTOBUF
def trace_is_enabled():
return trace_event_impl.trace_is_enabled()
def trace_enable(logfile, format=None):
return trace_event_impl.trace_enable(logfile, format)
def trace_disable():
return trace_event_impl.trace_disable()
def trace_flush():
trace_event_impl.trace_flush()
def trace_begin(name, **kwargs):
args_to_log = {key: repr(value) for key, value in kwargs.iteritems()}
trace_event_impl.add_trace_event("B", trace_time.Now(), "python", name,
args_to_log)
def trace_end(name):
trace_event_impl.add_trace_event("E", trace_time.Now(), "python", name)
def trace_set_thread_name(thread_name):
trace_event_impl.add_trace_event("M", trace_time.Now(), "__metadata",
"thread_name", {"name": thread_name})
def trace_add_benchmark_metadata(*args, **kwargs):
trace_event_impl.trace_add_benchmark_metadata(*args, **kwargs)
def trace(name, **kwargs):
return trace_event_impl.trace(name, **kwargs)
TracedMetaClass = trace_event_impl.TracedMetaClass
def traced(fn):
return trace_event_impl.traced(fn)
def clock_sync(sync_id, issue_ts=None):
'''
Add a clock sync event to the trace log.
Args:
sync_id: ID of clock sync event.
issue_ts: Time at which clock sync was issued, in microseconds.
'''
time_stamp = trace_time.Now()
args_to_log = {'sync_id': sync_id}
if issue_ts: # Issuer if issue_ts is set, else reciever.
assert issue_ts <= time_stamp
args_to_log['issue_ts'] = issue_ts
trace_event_impl.add_trace_event(
"c", time_stamp, "python", "clock_sync", args_to_log)
def is_tracing_controllable():
return trace_event_impl.is_tracing_controllable()
else:
import contextlib
# Trace file formats
JSON = None
JSON_WITH_METADATA = None
PROTOBUF = None
def trace_enable():
raise TraceException(
"Cannot enable trace_event. No trace_event_impl module found.")
def trace_disable():
pass
def trace_is_enabled():
return False
def trace_flush():
pass
def trace_begin(name, **kwargs):
del name # unused.
del kwargs # unused.
pass
def trace_end(name):
del name # unused.
pass
def trace_set_thread_name(thread_name):
del thread_name # unused.
pass
@contextlib.contextmanager
def trace(name, **kwargs):
del name # unused
del kwargs # unused
yield
def traced(fn):
return fn
def clock_sync(sync_id, issue_ts=None):
del sync_id # unused.
pass
def is_tracing_controllable():
return False
trace_enable.__doc__ = """Enables tracing.
Once enabled, the enabled bit propagates to forked processes and
multiprocessing subprocesses. Regular child processes, e.g. those created via
os.system/popen, or subprocess.Popen instances, will not get traced. You can,
however, enable tracing on those subprocess manually.
Trace files are multiprocess safe, so you can have multiple processes
outputting to the same tracelog at once.
log_file can be one of three things:
None: a logfile is opened based on sys[argv], namely
"./" + sys.argv[0] + ".json"
string: a logfile of the given name is opened.
file-like object: the fileno() is is used. The underlying file descriptor
must support fcntl.lockf() operations.
"""
trace_disable.__doc__ = """Disables tracing, if enabled.
Will not disable tracing on any existing child proceses that were forked
from this process. You must disable them yourself.
"""
trace_flush.__doc__ = """Flushes any currently-recorded trace data to disk.
trace_event records traces into an in-memory buffer for efficiency. Flushing
is only done at process exit or when this method is called.
"""
trace_is_enabled.__doc__ = """Returns whether tracing is enabled.
"""
trace_begin.__doc__ = """Records the beginning of an event of the given name.
The building block for performance tracing. A typical example is:
from trace_event import *
def something_heavy():
trace_begin("something_heavy")
trace_begin("read")
try:
lines = open().readlines()
finally:
trace_end("read")
trace_begin("parse")
try:
parse(lines)
finally:
trace_end("parse")
trace_end("something_heavy")
Note that a trace_end call must be issued for every trace_begin call. When
tracing around blocks that might throw exceptions, you should use the trace
function, or a try-finally pattern to ensure that the trace_end method is
called.
See the documentation for the @traced decorator for a simpler way to
instrument functions and methods.
"""
trace_end.__doc__ = """Records the end of an event of the given name.
See the documentation for trace_begin for more information.
Make sure to issue a trace_end for every trace_begin issued. Failure to pair
these calls will lead to bizarrely tall looking traces in the
trace_event_viewer UI.
"""
trace_set_thread_name.__doc__ = """Sets the trace's name for the current thread.
"""
trace.__doc__ = """Traces a block of code using a with statement.
Example usage:
from trace_event import *
def something_heavy(lines):
with trace("parse_lines", lines=lines):
parse(lines)
If tracing an entire function call, prefer the @traced decorator.
"""
traced.__doc__ = """
Traces the provided function, using the function name for the actual generated
event.
Prefer this decorator over the explicit trace_begin and trace_end functions
whenever you are tracing the start and stop of a function. It automatically
issues trace_begin/end events, even when the wrapped function throws.
You can also pass the function's argument names to traced, and the argument
values will be added to the trace. Example usage:
from trace_event import *
@traced("url")
def send_request(url):
urllib2.urlopen(url).read()
"""
clock_sync.__doc__ = """
Issues a clock sync marker event.
Clock sync markers are used to synchronize the clock domains of different
traces so that they can be used together. It takes a sync_id, and if it is
the issuer of a clock sync event it will also require an issue_ts. The
issue_ts is a timestamp from when the clocksync was first issued. This is used
to calculate the time difference between clock domains.
"""

View File

@@ -0,0 +1,7 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from log import *
from decorators import *
from meta_class import *
import multiprocessing_shim

View File

@@ -0,0 +1,87 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import inspect
import time
import functools
import log
from py_trace_event import trace_time
@contextlib.contextmanager
def trace(name, **kwargs):
category = "python"
start = trace_time.Now()
args_to_log = {key: repr(value) for key, value in kwargs.iteritems()}
log.add_trace_event("B", start, category, name, args_to_log)
try:
yield
finally:
end = trace_time.Now()
log.add_trace_event("E", end, category, name)
def traced(*args):
def get_wrapper(func):
if inspect.isgeneratorfunction(func):
raise Exception("Can not trace generators.")
category = "python"
arg_spec = inspect.getargspec(func)
is_method = arg_spec.args and arg_spec.args[0] == "self"
def arg_spec_tuple(name):
arg_index = arg_spec.args.index(name)
defaults_length = len(arg_spec.defaults) if arg_spec.defaults else 0
default_index = arg_index + defaults_length - len(arg_spec.args)
if default_index >= 0:
default = arg_spec.defaults[default_index]
else:
default = None
return (name, arg_index, default)
args_to_log = map(arg_spec_tuple, arg_names)
@functools.wraps(func)
def traced_function(*args, **kwargs):
# Everything outside traced_function is done at decoration-time.
# Everything inside traced_function is done at run-time and must be fast.
if not log._enabled: # This check must be at run-time.
return func(*args, **kwargs)
def get_arg_value(name, index, default):
if name in kwargs:
return kwargs[name]
elif index < len(args):
return args[index]
else:
return default
if is_method:
name = "%s.%s" % (args[0].__class__.__name__, func.__name__)
else:
name = "%s.%s" % (func.__module__, func.__name__)
# Be sure to repr before calling func. Argument values may change.
arg_values = {
name: repr(get_arg_value(name, index, default))
for name, index, default in args_to_log}
start = trace_time.Now()
log.add_trace_event("B", start, category, name, arg_values)
try:
return func(*args, **kwargs)
finally:
end = trace_time.Now()
log.add_trace_event("E", end, category, name)
return traced_function
no_decorator_arguments = len(args) == 1 and callable(args[0])
if no_decorator_arguments:
arg_names = ()
return get_wrapper(args[0])
else:
arg_names = args
return get_wrapper

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import decorators
import logging
import unittest
from trace_test import TraceTest
#from .trace_test import TraceTest
def generator():
yield 1
yield 2
class DecoratorTests(unittest.TestCase):
def test_tracing_object_fails(self):
self.assertRaises(Exception, lambda: decorators.trace(1))
self.assertRaises(Exception, lambda: decorators.trace(""))
self.assertRaises(Exception, lambda: decorators.trace([]))
def test_tracing_generators_fail(self):
self.assertRaises(Exception, lambda: decorators.trace(generator))
class ClassToTest(object):
@decorators.traced
def method1(self):
return 1
@decorators.traced
def method2(self):
return 1
@decorators.traced
def traced_func():
return 1
class DecoratorTests(TraceTest):
def _get_decorated_method_name(self, f):
res = self.go(f)
events = res.findEventsOnThread(res.findThreadIds()[0])
# Sanity checks.
self.assertEquals(2, len(events))
self.assertEquals(events[0]["name"], events[1]["name"])
return events[1]["name"]
def test_func_names_work(self):
expected_method_name = __name__ + '.traced_func'
self.assertEquals(expected_method_name,
self._get_decorated_method_name(traced_func))
def test_method_names_work(self):
ctt = ClassToTest()
self.assertEquals('ClassToTest.method1',
self._get_decorated_method_name(ctt.method1))
self.assertEquals('ClassToTest.method2',
self._get_decorated_method_name(ctt.method2))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main(verbosity=2)

View File

@@ -0,0 +1,364 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import atexit
import json
import os
import sys
import time
import threading
import multiprocessing
import multiprocessing_shim
from py_trace_event.trace_event_impl import perfetto_trace_writer
from py_trace_event import trace_time
from py_utils import lock
# Trace file formats:
# Legacy format: json list of events.
# Events can be written from multiple processes, but since no process
# can be sure that it is the last one, nobody writes the closing ']'.
# So the resulting file is not technically correct json.
JSON = "json"
# Full json with events and metadata.
# This format produces correct json ready to feed into TraceDataBuilder.
# Note that it is the responsibility of the user of py_trace_event to make sure
# that trace_disable() is called after all child processes have finished.
JSON_WITH_METADATA = "json_with_metadata"
# Perfetto protobuf trace format.
PROTOBUF = "protobuf"
_lock = threading.Lock()
_enabled = False
_log_file = None
_cur_events = [] # events that have yet to be buffered
_benchmark_metadata = {}
_tls = threading.local() # tls used to detect forking/etc
_atexit_regsitered_for_pid = None
_control_allowed = True
_original_multiprocessing_process = multiprocessing.Process
class TraceException(Exception):
pass
def _note(msg, *args):
pass
# print "%i: %s" % (os.getpid(), msg)
def _locked(fn):
def locked_fn(*args,**kwargs):
_lock.acquire()
try:
ret = fn(*args,**kwargs)
finally:
_lock.release()
return ret
return locked_fn
def _disallow_tracing_control():
global _control_allowed
_control_allowed = False
def trace_enable(log_file=None, format=None):
""" Enable tracing.
Args:
log_file: file to write trace into. Can be a file-like object,
a name of file, or None. If None, file name is constructed
from executable name.
format: trace file format. See trace_event.py for available options.
"""
if format is None:
format = JSON
_trace_enable(log_file, format)
def _write_header():
tid = threading.current_thread().ident
if not tid:
tid = os.getpid()
if _format == PROTOBUF:
tid = threading.current_thread().ident
perfetto_trace_writer.write_thread_descriptor_event(
output=_log_file,
pid=os.getpid(),
tid=tid,
ts=trace_time.Now(),
)
perfetto_trace_writer.write_event(
output=_log_file,
ph="M",
category="process_argv",
name="process_argv",
ts=trace_time.Now(),
args=sys.argv,
tid=tid,
)
else:
if _format == JSON:
_log_file.write('[')
elif _format == JSON_WITH_METADATA:
_log_file.write('{"traceEvents": [\n')
else:
raise TraceException("Unknown format: %s" % _format)
json.dump({
"ph": "M",
"category": "process_argv",
"pid": os.getpid(),
"tid": threading.current_thread().ident,
"ts": trace_time.Now(),
"name": "process_argv",
"args": {"argv": sys.argv},
}, _log_file)
_log_file.write('\n')
@_locked
def _trace_enable(log_file=None, format=None):
global _format
_format = format
global _enabled
if _enabled:
raise TraceException("Already enabled")
if not _control_allowed:
raise TraceException("Tracing control not allowed in child processes.")
_enabled = True
global _log_file
if log_file == None:
if sys.argv[0] == '':
n = 'trace_event'
else:
n = sys.argv[0]
if _format == PROTOBUF:
log_file = open("%s.pb" % n, "ab", False)
else:
log_file = open("%s.json" % n, "ab", False)
elif isinstance(log_file, basestring):
log_file = open("%s" % log_file, "ab", False)
elif not hasattr(log_file, 'fileno'):
raise TraceException(
"Log file must be None, a string, or file-like object with a fileno()")
_note("trace_event: tracelog name is %s" % log_file)
_log_file = log_file
with lock.FileLock(_log_file, lock.LOCK_EX):
_log_file.seek(0, os.SEEK_END)
lastpos = _log_file.tell()
creator = lastpos == 0
if creator:
_note("trace_event: Opened new tracelog, lastpos=%i", lastpos)
_write_header()
else:
_note("trace_event: Opened existing tracelog")
_log_file.flush()
# Monkeypatch in our process replacement for the multiprocessing.Process class
if multiprocessing.Process != multiprocessing_shim.ProcessShim:
multiprocessing.Process = multiprocessing_shim.ProcessShim
@_locked
def trace_flush():
if _enabled:
_flush()
@_locked
def trace_disable():
global _enabled
if not _control_allowed:
raise TraceException("Tracing control not allowed in child processes.")
if not _enabled:
return
_enabled = False
_flush(close=True)
multiprocessing.Process = _original_multiprocessing_process
def _write_cur_events():
if _format == PROTOBUF:
for e in _cur_events:
perfetto_trace_writer.write_event(
output=_log_file,
ph=e["ph"],
category=e["category"],
name=e["name"],
ts=e["ts"],
args=e["args"],
tid=threading.current_thread().ident,
)
elif _format in (JSON, JSON_WITH_METADATA):
for e in _cur_events:
_log_file.write(",\n")
json.dump(e, _log_file)
else:
raise TraceException("Unknown format: %s" % _format)
del _cur_events[:]
def _write_footer():
if _format in [JSON, PROTOBUF]:
# In JSON format we might not be the only process writing to this logfile.
# So, we will simply close the file rather than writing the trailing ] that
# it technically requires. The trace viewer understands this and
# will insert a trailing ] during loading.
# In PROTOBUF format there's no need for a footer. The metadata has already
# been written in a special proto message.
pass
elif _format == JSON_WITH_METADATA:
_log_file.write('],\n"metadata": ')
json.dump(_benchmark_metadata, _log_file)
_log_file.write('}')
else:
raise TraceException("Unknown format: %s" % _format)
def _flush(close=False):
global _log_file
with lock.FileLock(_log_file, lock.LOCK_EX):
_log_file.seek(0, os.SEEK_END)
if len(_cur_events):
_write_cur_events()
if close:
_write_footer()
_log_file.flush()
if close:
_note("trace_event: Closed")
_log_file.close()
_log_file = None
else:
_note("trace_event: Flushed")
@_locked
def trace_is_enabled():
return _enabled
@_locked
def add_trace_event(ph, ts, category, name, args=None):
global _enabled
if not _enabled:
return
if not hasattr(_tls, 'pid') or _tls.pid != os.getpid():
_tls.pid = os.getpid()
global _atexit_regsitered_for_pid
if _tls.pid != _atexit_regsitered_for_pid:
_atexit_regsitered_for_pid = _tls.pid
atexit.register(_trace_disable_atexit)
_tls.pid = os.getpid()
del _cur_events[:] # we forked, clear the event buffer!
tid = threading.current_thread().ident
if not tid:
tid = os.getpid()
_tls.tid = tid
_cur_events.append({
"ph": ph,
"category": category,
"pid": _tls.pid,
"tid": _tls.tid,
"ts": ts,
"name": name,
"args": args or {},
});
def trace_begin(name, args=None):
add_trace_event("B", trace_time.Now(), "python", name, args)
def trace_end(name, args=None):
add_trace_event("E", trace_time.Now(), "python", name, args)
def trace_set_thread_name(thread_name):
add_trace_event("M", trace_time.Now(), "__metadata", "thread_name",
{"name": thread_name})
def trace_add_benchmark_metadata(
benchmark_start_time_us,
story_run_time_us,
benchmark_name,
benchmark_description,
story_name,
story_tags,
story_run_index,
label=None,
had_failures=None,
):
""" Add benchmark metadata to be written to trace file.
Args:
benchmark_start_time_us: Benchmark start time in microseconds.
story_run_time_us: Story start time in microseconds.
benchmark_name: Name of the benchmark.
benchmark_description: Description of the benchmark.
story_name: Name of the story.
story_tags: List of story tags.
story_run_index: Index of the story run.
label: Optional label.
had_failures: Whether this story run failed.
"""
global _benchmark_metadata
if _format == PROTOBUF:
# Write metadata immediately.
perfetto_trace_writer.write_metadata(
output=_log_file,
benchmark_start_time_us=benchmark_start_time_us,
story_run_time_us=story_run_time_us,
benchmark_name=benchmark_name,
benchmark_description=benchmark_description,
story_name=story_name,
story_tags=story_tags,
story_run_index=story_run_index,
label=label,
had_failures=had_failures,
)
elif _format == JSON_WITH_METADATA:
# Store metadata to write it in the footer.
telemetry_metadata_for_json = {
"benchmarkStart": benchmark_start_time_us / 1000.0,
"traceStart": story_run_time_us / 1000.0,
"benchmarks": [benchmark_name],
"benchmarkDescriptions": [benchmark_description],
"stories": [story_name],
"storyTags": story_tags,
"storysetRepeats": [story_run_index],
}
if label:
telemetry_metadata_for_json["labels"] = [label]
if had_failures:
telemetry_metadata_for_json["hadFailures"] = [had_failures]
_benchmark_metadata = {
# TODO(crbug.com/948633): For right now, we use "TELEMETRY" as the
# clock domain to guarantee that Telemetry is given its own clock
# domain. Telemetry isn't really a clock domain, though: it's a
# system that USES a clock domain like LINUX_CLOCK_MONOTONIC or
# WIN_QPC. However, there's a chance that a Telemetry controller
# running on Linux (using LINUX_CLOCK_MONOTONIC) is interacting
# with an Android phone (also using LINUX_CLOCK_MONOTONIC, but
# on a different machine). The current logic collapses clock
# domains based solely on the clock domain string, but we really
# should to collapse based on some (device ID, clock domain ID)
# tuple. Giving Telemetry its own clock domain is a work-around
# for this.
"clock-domain": "TELEMETRY",
"telemetry": telemetry_metadata_for_json,
}
elif _format == JSON:
raise TraceException("Can't write metadata in JSON format")
else:
raise TraceException("Unknown format: %s" % _format)
def _trace_disable_atexit():
trace_disable()
def is_tracing_controllable():
global _control_allowed
return _control_allowed

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import sys
import unittest
from log import *
from parsed_trace_events import *
from py_utils import tempfile_ext
class LogIOTest(unittest.TestCase):
def test_enable_with_file(self):
with tempfile_ext.TemporaryFileName() as filename:
trace_enable(open(filename, 'w+'))
trace_disable()
e = ParsedTraceEvents(trace_filename=filename)
self.assertTrue(len(e) > 0)
def test_enable_with_filename(self):
with tempfile_ext.TemporaryFileName() as filename:
trace_enable(filename)
trace_disable()
e = ParsedTraceEvents(trace_filename=filename)
self.assertTrue(len(e) > 0)
def test_enable_with_implicit_filename(self):
expected_filename = "%s.json" % sys.argv[0]
def do_work():
trace_enable()
trace_disable()
e = ParsedTraceEvents(trace_filename=expected_filename)
self.assertTrue(len(e) > 0)
try:
do_work()
finally:
if os.path.exists(expected_filename):
os.unlink(expected_filename)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main(verbosity=2)

View File

@@ -0,0 +1,17 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import types
from py_trace_event.trace_event_impl import decorators
class TracedMetaClass(type):
def __new__(cls, name, bases, attrs):
for attr_name, attr_value in attrs.iteritems():
if (not attr_name.startswith('_') and
isinstance(attr_value, types.FunctionType)):
attrs[attr_name] = decorators.traced(attr_value)
return super(TracedMetaClass, cls).__new__(cls, name, bases, attrs)

View File

@@ -0,0 +1,88 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import multiprocessing
import log
import time
_RealProcess = multiprocessing.Process
__all__ = []
class ProcessSubclass(_RealProcess):
def __init__(self, shim, *args, **kwards):
_RealProcess.__init__(self, *args, **kwards)
self._shim = shim
def run(self,*args,**kwargs):
log._disallow_tracing_control()
try:
r = _RealProcess.run(self, *args, **kwargs)
finally:
if log.trace_is_enabled():
log.trace_flush() # todo, reduce need for this...
return r
class ProcessShim():
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self._proc = ProcessSubclass(self, group, target, name, args, kwargs)
# hint to testing code that the shimming worked
self._shimmed_by_trace_event = True
def run(self):
self._proc.run()
def start(self):
self._proc.start()
def terminate(self):
if log.trace_is_enabled():
# give the flush a chance to finish --> TODO: find some other way.
time.sleep(0.25)
self._proc.terminate()
def join(self, timeout=None):
self._proc.join( timeout)
def is_alive(self):
return self._proc.is_alive()
@property
def name(self):
return self._proc.name
@name.setter
def name(self, name):
self._proc.name = name
@property
def daemon(self):
return self._proc.daemon
@daemon.setter
def daemon(self, daemonic):
self._proc.daemon = daemonic
@property
def authkey(self):
return self._proc._authkey
@authkey.setter
def authkey(self, authkey):
self._proc.authkey = AuthenticationString(authkey)
@property
def exitcode(self):
return self._proc.exitcode
@property
def ident(self):
return self._proc.ident
@property
def pid(self):
return self._proc.pid
def __repr__(self):
return self._proc.__repr__()

View File

@@ -0,0 +1,98 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import math
import json
class ParsedTraceEvents(object):
def __init__(self, events = None, trace_filename = None):
"""
Utility class for filtering and manipulating trace data.
events -- An iterable object containing trace events
trace_filename -- A file object that contains a complete trace.
"""
if trace_filename and events:
raise Exception("Provide either a trace file or event list")
if not trace_filename and events == None:
raise Exception("Provide either a trace file or event list")
if trace_filename:
f = open(trace_filename, 'r')
t = f.read()
f.close()
# If the event data begins with a [, then we know it should end with a ].
# The reason we check for this is because some tracing implementations
# cannot guarantee that a ']' gets written to the trace file. So, we are
# forgiving and if this is obviously the case, we fix it up before
# throwing the string at JSON.parse.
if t[0] == '[':
n = len(t);
if t[n - 1] != ']' and t[n - 1] != '\n':
t = t + ']'
elif t[n - 2] != ']' and t[n - 1] == '\n':
t = t + ']'
elif t[n - 3] != ']' and t[n - 2] == '\r' and t[n - 1] == '\n':
t = t + ']'
try:
events = json.loads(t)
except ValueError:
raise Exception("Corrupt trace, did not parse. Value: %s" % t)
if 'traceEvents' in events:
events = events['traceEvents']
if not hasattr(events, '__iter__'):
raise Exception, 'events must be iteraable.'
self.events = events
self.pids = None
self.tids = None
def __len__(self):
return len(self.events)
def __getitem__(self, i):
return self.events[i]
def __setitem__(self, i, v):
self.events[i] = v
def __repr__(self):
return "[%s]" % ",\n ".join([repr(e) for e in self.events])
def findProcessIds(self):
if self.pids:
return self.pids
pids = set()
for e in self.events:
if "pid" in e and e["pid"]:
pids.add(e["pid"])
self.pids = list(pids)
return self.pids
def findThreadIds(self):
if self.tids:
return self.tids
tids = set()
for e in self.events:
if "tid" in e and e["tid"]:
tids.add(e["tid"])
self.tids = list(tids)
return self.tids
def findEventsOnProcess(self, pid):
return ParsedTraceEvents([e for e in self.events if e["pid"] == pid])
def findEventsOnThread(self, tid):
return ParsedTraceEvents(
[e for e in self.events if e["ph"] != "M" and e["tid"] == tid])
def findByPhase(self, ph):
return ParsedTraceEvents([e for e in self.events if e["ph"] == ph])
def findByName(self, n):
return ParsedTraceEvents([e for e in self.events if e["name"] == n])

View File

@@ -0,0 +1,222 @@
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Classes representing perfetto trace protobuf messages.
This module makes use of neither python-protobuf library nor python classes
compiled from .proto definitions, because currently there's no way to
deploy those to all the places where telemetry is run.
TODO(crbug.com/944078): Remove this module after the python-protobuf library
is deployed to all the bots.
Definitions of perfetto messages can be found here:
https://android.googlesource.com/platform/external/perfetto/+/refs/heads/master/protos/perfetto/trace/
"""
import encoder
import wire_format
class TracePacket(object):
def __init__(self):
self.interned_data = None
self.thread_descriptor = None
self.incremental_state_cleared = None
self.track_event = None
self.trusted_packet_sequence_id = None
self.chrome_benchmark_metadata = None
def encode(self):
parts = []
if self.trusted_packet_sequence_id is not None:
writer = encoder.UInt32Encoder(10, False, False)
writer(parts.append, self.trusted_packet_sequence_id)
if self.track_event is not None:
tag = encoder.TagBytes(11, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.track_event.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
if self.interned_data is not None:
tag = encoder.TagBytes(12, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.interned_data.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
if self.incremental_state_cleared is not None:
writer = encoder.BoolEncoder(41, False, False)
writer(parts.append, self.incremental_state_cleared)
if self.thread_descriptor is not None:
tag = encoder.TagBytes(44, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.thread_descriptor.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
if self.chrome_benchmark_metadata is not None:
tag = encoder.TagBytes(48, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.chrome_benchmark_metadata.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
return b"".join(parts)
class InternedData(object):
def __init__(self):
self.event_category = None
self.legacy_event_name = None
def encode(self):
parts = []
if self.event_category is not None:
tag = encoder.TagBytes(1, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.event_category.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
if self.legacy_event_name is not None:
tag = encoder.TagBytes(2, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.legacy_event_name.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
return b"".join(parts)
class EventCategory(object):
def __init__(self):
self.iid = None
self.name = None
def encode(self):
if (self.iid is None or self.name is None):
raise RuntimeError("Missing mandatory fields.")
parts = []
writer = encoder.UInt32Encoder(1, False, False)
writer(parts.append, self.iid)
writer = encoder.StringEncoder(2, False, False)
writer(parts.append, self.name)
return b"".join(parts)
LegacyEventName = EventCategory
class ThreadDescriptor(object):
def __init__(self):
self.pid = None
self.tid = None
self.reference_timestamp_us = None
def encode(self):
if (self.pid is None or self.tid is None or
self.reference_timestamp_us is None):
raise RuntimeError("Missing mandatory fields.")
parts = []
writer = encoder.UInt32Encoder(1, False, False)
writer(parts.append, self.pid)
writer = encoder.UInt32Encoder(2, False, False)
writer(parts.append, self.tid)
writer = encoder.Int64Encoder(6, False, False)
writer(parts.append, self.reference_timestamp_us)
return b"".join(parts)
class TrackEvent(object):
def __init__(self):
self.timestamp_absolute_us = None
self.timestamp_delta_us = None
self.legacy_event = None
self.category_iids = None
def encode(self):
parts = []
if self.timestamp_delta_us is not None:
writer = encoder.Int64Encoder(1, False, False)
writer(parts.append, self.timestamp_delta_us)
if self.category_iids is not None:
writer = encoder.UInt32Encoder(3, is_repeated=True, is_packed=False)
writer(parts.append, self.category_iids)
if self.legacy_event is not None:
tag = encoder.TagBytes(6, wire_format.WIRETYPE_LENGTH_DELIMITED)
data = self.legacy_event.encode()
length = encoder._VarintBytes(len(data))
parts += [tag, length, data]
if self.timestamp_absolute_us is not None:
writer = encoder.Int64Encoder(16, False, False)
writer(parts.append, self.timestamp_absolute_us)
return b"".join(parts)
class LegacyEvent(object):
def __init__(self):
self.phase = None
self.name_iid = None
def encode(self):
parts = []
if self.name_iid is not None:
writer = encoder.UInt32Encoder(1, False, False)
writer(parts.append, self.name_iid)
if self.phase is not None:
writer = encoder.Int32Encoder(2, False, False)
writer(parts.append, self.phase)
return b"".join(parts)
class ChromeBenchmarkMetadata(object):
def __init__(self):
self.benchmark_start_time_us = None
self.story_run_time_us = None
self.benchmark_name = None
self.benchmark_description = None
self.story_name = None
self.story_tags = None
self.story_run_index = None
self.label = None
self.had_failures = None
def encode(self):
parts = []
if self.benchmark_start_time_us is not None:
writer = encoder.Int64Encoder(1, False, False)
writer(parts.append, self.benchmark_start_time_us)
if self.story_run_time_us is not None:
writer = encoder.Int64Encoder(2, False, False)
writer(parts.append, self.story_run_time_us)
if self.benchmark_name is not None:
writer = encoder.StringEncoder(3, False, False)
writer(parts.append, self.benchmark_name)
if self.benchmark_description is not None:
writer = encoder.StringEncoder(4, False, False)
writer(parts.append, self.benchmark_description)
if self.label is not None:
writer = encoder.StringEncoder(5, False, False)
writer(parts.append, self.label)
if self.story_name is not None:
writer = encoder.StringEncoder(6, False, False)
writer(parts.append, self.story_name)
if self.story_tags is not None:
writer = encoder.StringEncoder(7, is_repeated=True, is_packed=False)
writer(parts.append, self.story_tags)
if self.story_run_index is not None:
writer = encoder.Int32Encoder(8, False, False)
writer(parts.append, self.story_run_index)
if self.had_failures is not None:
writer = encoder.BoolEncoder(9, False, False)
writer(parts.append, self.had_failures)
return b"".join(parts)
def write_trace_packet(output, trace_packet):
tag = encoder.TagBytes(1, wire_format.WIRETYPE_LENGTH_DELIMITED)
output.write(tag)
binary_data = trace_packet.encode()
encoder._EncodeVarint(output.write, len(binary_data))
output.write(binary_data)

View File

@@ -0,0 +1,166 @@
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Functions to write trace data in perfetto protobuf format.
"""
import collections
import perfetto_proto_classes as proto
# Dicts of strings for interning.
# Note that each thread has its own interning index.
_interned_categories_by_tid = collections.defaultdict(dict)
_interned_event_names_by_tid = collections.defaultdict(dict)
# Trusted sequence ids from telemetry should not overlap with
# trusted sequence ids from other trace producers. Chrome assigns
# sequence ids incrementally starting from 1 and we expect all its ids
# to be well below 10000. Starting from 2^20 will give us enough
# confidence that it will not overlap.
_next_sequence_id = 1<<20
_sequence_ids = {}
# Timestamp of the last event from each thread. Used for delta-encoding
# of timestamps.
_last_timestamps = {}
def _get_sequence_id(tid):
global _sequence_ids
global _next_sequence_id
if tid not in _sequence_ids:
_sequence_ids[tid] = _next_sequence_id
_next_sequence_id += 1
return _sequence_ids[tid]
def _intern_category(category, trace_packet, tid):
global _interned_categories_by_tid
categories = _interned_categories_by_tid[tid]
if category not in categories:
# note that interning indices start from 1
categories[category] = len(categories) + 1
if trace_packet.interned_data is None:
trace_packet.interned_data = proto.InternedData()
trace_packet.interned_data.event_category = proto.EventCategory()
trace_packet.interned_data.event_category.iid = categories[category]
trace_packet.interned_data.event_category.name = category
return categories[category]
def _intern_event_name(event_name, trace_packet, tid):
global _interned_event_names_by_tid
event_names = _interned_event_names_by_tid[tid]
if event_name not in event_names:
# note that interning indices start from 1
event_names[event_name] = len(event_names) + 1
if trace_packet.interned_data is None:
trace_packet.interned_data = proto.InternedData()
trace_packet.interned_data.legacy_event_name = proto.LegacyEventName()
trace_packet.interned_data.legacy_event_name.iid = event_names[event_name]
trace_packet.interned_data.legacy_event_name.name = event_name
return event_names[event_name]
def write_thread_descriptor_event(output, pid, tid, ts):
""" Write the first event in a sequence.
Call this function before writing any other events.
Note that this function is NOT thread-safe.
Args:
output: a file-like object to write events into.
pid: process ID.
tid: thread ID.
ts: timestamp in microseconds.
"""
global _last_timestamps
ts_us = int(ts)
_last_timestamps[tid] = ts_us
thread_descriptor_packet = proto.TracePacket()
thread_descriptor_packet.trusted_packet_sequence_id = _get_sequence_id(tid)
thread_descriptor_packet.thread_descriptor = proto.ThreadDescriptor()
thread_descriptor_packet.thread_descriptor.pid = pid
# Thread ID from threading module doesn't fit into int32.
# But we don't need the exact thread ID, just some number to
# distinguish one thread from another. We assume that the last 31 bits
# will do for that purpose.
thread_descriptor_packet.thread_descriptor.tid = tid & 0x7FFFFFFF
thread_descriptor_packet.thread_descriptor.reference_timestamp_us = ts_us
thread_descriptor_packet.incremental_state_cleared = True;
proto.write_trace_packet(output, thread_descriptor_packet)
def write_event(output, ph, category, name, ts, args, tid):
""" Write a trace event.
Note that this function is NOT thread-safe.
Args:
output: a file-like object to write events into.
ph: phase of event.
category: category of event.
name: event name.
ts: timestamp in microseconds.
args: this argument is currently ignored.
tid: thread ID.
"""
del args # TODO(khokhlov): Encode args as DebugAnnotations.
global _last_timestamps
ts_us = int(ts)
delta_ts = ts_us - _last_timestamps[tid]
packet = proto.TracePacket()
packet.trusted_packet_sequence_id = _get_sequence_id(tid)
packet.track_event = proto.TrackEvent()
if delta_ts >= 0:
packet.track_event.timestamp_delta_us = delta_ts
_last_timestamps[tid] = ts_us
else:
packet.track_event.timestamp_absolute_us = ts_us
packet.track_event.category_iids = [_intern_category(category, packet, tid)]
legacy_event = proto.LegacyEvent()
legacy_event.phase = ord(ph)
legacy_event.name_iid = _intern_event_name(name, packet, tid)
packet.track_event.legacy_event = legacy_event
proto.write_trace_packet(output, packet)
def write_metadata(
output,
benchmark_start_time_us,
story_run_time_us,
benchmark_name,
benchmark_description,
story_name,
story_tags,
story_run_index,
label=None,
had_failures=None,
):
metadata = proto.ChromeBenchmarkMetadata()
metadata.benchmark_start_time_us = int(benchmark_start_time_us)
metadata.story_run_time_us = int(story_run_time_us)
metadata.benchmark_name = benchmark_name
metadata.benchmark_description = benchmark_description
metadata.story_name = story_name
metadata.story_tags = list(story_tags)
metadata.story_run_index = int(story_run_index)
if label is not None:
metadata.label = label
if had_failures is not None:
metadata.had_failures = had_failures
packet = proto.TracePacket()
packet.chrome_benchmark_metadata = metadata
proto.write_trace_packet(output, packet)

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
import StringIO
from py_trace_event.trace_event_impl import perfetto_trace_writer
class PerfettoTraceWriterTest(unittest.TestCase):
""" Tests functions that write perfetto protobufs.
TODO(crbug.com/944078): Switch to using python-protobuf library
and implement proper protobuf parsing then.
"""
def testWriteThreadDescriptorEvent(self):
result = StringIO.StringIO()
perfetto_trace_writer.write_thread_descriptor_event(
output=result,
pid=1,
tid=2,
ts=1556716807306000,
)
expected_output = (
'\n\x17P\x80\x80@\xc8\x02\x01\xe2\x02\r\x08\x01\x10'
'\x020\x90\xf6\xc2\x82\xb6\xfa\xe1\x02'
)
self.assertEqual(expected_output, result.getvalue())
def testWriteTwoEvents(self):
result = StringIO.StringIO()
perfetto_trace_writer.write_thread_descriptor_event(
output=result,
pid=1,
tid=2,
ts=1556716807306000,
)
perfetto_trace_writer.write_event(
output=result,
ph="M",
category="category",
name="event_name",
ts=1556716807406000,
args={},
tid=2,
)
expected_output = (
'\n\x17P\x80\x80@\xc8\x02\x01\xe2\x02\r\x08\x01\x10'
'\x020\x90\xf6\xc2\x82\xb6\xfa\xe1\x02\n2P\x80\x80@Z\x0c\x08'
'\xa0\x8d\x06\x18\x012\x04\x08\x01\x10Mb\x1e\n\x0c\x08\x01'
'\x12\x08category\x12\x0e\x08\x01\x12\nevent_name'
)
self.assertEqual(expected_output, result.getvalue())
def testWriteMetadata(self):
result = StringIO.StringIO()
perfetto_trace_writer.write_metadata(
output=result,
benchmark_start_time_us=1556716807306000,
story_run_time_us=1556716807406000,
benchmark_name="benchmark",
benchmark_description="description",
story_name="story",
story_tags=["foo", "bar"],
story_run_index=0,
label="label",
had_failures=False,
)
expected_output = (
'\nI\x82\x03F\x08\x90\xf6\xc2\x82\xb6\xfa\xe1'
'\x02\x10\xb0\x83\xc9\x82\xb6\xfa\xe1\x02\x1a\tbenchmark"'
'\x0bdescription*\x05label2\x05story:\x03foo:\x03bar@\x00H\x00'
)
self.assertEqual(expected_output, result.getvalue())

View File

@@ -0,0 +1,48 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
#from .log import *
#from .parsed_trace_events import *
from log import *
from parsed_trace_events import *
from py_utils import tempfile_ext
class TraceTest(unittest.TestCase):
def __init__(self, *args):
"""
Infrastructure for running tests of the tracing system.
Does not actually run any tests. Look at subclasses for those.
"""
unittest.TestCase.__init__(self, *args)
self._file = None
def go(self, cb):
"""
Enables tracing, runs the provided callback, and if successful, returns a
TraceEvents object with the results.
"""
with tempfile_ext.TemporaryFileName() as filename:
self._file = open(filename, 'a+')
trace_enable(self._file)
try:
cb()
finally:
trace_disable()
e = ParsedTraceEvents(trace_filename=self._file.name)
self._file.close()
self._file = None
return e
@property
def trace_filename(self):
return self._file.name
def tearDown(self):
if trace_is_enabled():
trace_disable()
if self._file:
self._file.close()

View File

@@ -0,0 +1,518 @@
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import json
import logging
import math
import multiprocessing
import os
import time
import unittest
import sys
from py_trace_event import trace_event
from py_trace_event import trace_time
from py_trace_event.trace_event_impl import log
from py_trace_event.trace_event_impl import multiprocessing_shim
from py_utils import tempfile_ext
class TraceEventTests(unittest.TestCase):
@contextlib.contextmanager
def _test_trace(self, disable=True, format=None):
with tempfile_ext.TemporaryFileName() as filename:
self._log_path = filename
try:
trace_event.trace_enable(self._log_path, format=format)
yield
finally:
if disable:
trace_event.trace_disable()
def testNoImpl(self):
orig_impl = trace_event.trace_event_impl
try:
trace_event.trace_event_impl = None
self.assertFalse(trace_event.trace_can_enable())
finally:
trace_event.trace_event_impl = orig_impl
def testImpl(self):
self.assertTrue(trace_event.trace_can_enable())
def testIsEnabledFalse(self):
self.assertFalse(trace_event.trace_is_enabled())
def testIsEnabledTrue(self):
with self._test_trace():
self.assertTrue(trace_event.trace_is_enabled())
def testEnable(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 1)
self.assertTrue(trace_event.trace_is_enabled())
log_output = log_output.pop()
self.assertEquals(log_output['category'], 'process_argv')
self.assertEquals(log_output['name'], 'process_argv')
self.assertTrue(log_output['args']['argv'])
self.assertEquals(log_output['ph'], 'M')
def testDoubleEnable(self):
try:
with self._test_trace():
with self._test_trace():
pass
except log.TraceException:
return
assert False
def testDisable(self):
_old_multiprocessing_process = multiprocessing.Process
with self._test_trace(disable=False):
with open(self._log_path, 'r') as f:
self.assertTrue(trace_event.trace_is_enabled())
self.assertEqual(
multiprocessing.Process, multiprocessing_shim.ProcessShim)
trace_event.trace_disable()
self.assertEqual(
multiprocessing.Process, _old_multiprocessing_process)
self.assertEquals(len(json.loads(f.read() + ']')), 1)
self.assertFalse(trace_event.trace_is_enabled())
def testDoubleDisable(self):
with self._test_trace():
pass
trace_event.trace_disable()
def testFlushChanges(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.clock_sync('1')
self.assertEquals(len(json.loads(f.read() + ']')), 1)
f.seek(0)
trace_event.trace_flush()
self.assertEquals(len(json.loads(f.read() + ']')), 2)
def testFlushNoChanges(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
self.assertEquals(len(json.loads(f.read() + ']')),1)
f.seek(0)
trace_event.trace_flush()
self.assertEquals(len(json.loads(f.read() + ']')), 1)
def testDoubleFlush(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.clock_sync('1')
self.assertEquals(len(json.loads(f.read() + ']')), 1)
f.seek(0)
trace_event.trace_flush()
trace_event.trace_flush()
self.assertEquals(len(json.loads(f.read() + ']')), 2)
def testTraceBegin(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.trace_begin('test_event', this='that')
trace_event.trace_flush()
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 2)
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue( current_entry['args']['argv'])
self.assertEquals( current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'test_event')
self.assertEquals(current_entry['args']['this'], '\'that\'')
self.assertEquals(current_entry['ph'], 'B')
def testTraceEnd(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.trace_end('test_event')
trace_event.trace_flush()
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 2)
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue(current_entry['args']['argv'])
self.assertEquals(current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'test_event')
self.assertEquals(current_entry['args'], {})
self.assertEquals(current_entry['ph'], 'E')
def testTrace(self):
with self._test_trace():
with trace_event.trace('test_event', this='that'):
pass
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 3)
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue(current_entry['args']['argv'])
self.assertEquals(current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'test_event')
self.assertEquals(current_entry['args']['this'], '\'that\'')
self.assertEquals(current_entry['ph'], 'B')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'test_event')
self.assertEquals(current_entry['args'], {})
self.assertEquals(current_entry['ph'], 'E')
def testTracedDecorator(self):
@trace_event.traced("this")
def test_decorator(this="that"):
pass
with self._test_trace():
test_decorator()
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 3)
expected_name = __name__ + '.test_decorator'
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue(current_entry['args']['argv'])
self.assertEquals(current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], expected_name)
self.assertEquals(current_entry['args']['this'], '\'that\'')
self.assertEquals(current_entry['ph'], 'B')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], expected_name)
self.assertEquals(current_entry['args'], {})
self.assertEquals(current_entry['ph'], 'E')
def testClockSyncWithTs(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.clock_sync('id', issue_ts=trace_time.Now())
trace_event.trace_flush()
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 2)
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue(current_entry['args']['argv'])
self.assertEquals(current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'clock_sync')
self.assertTrue(current_entry['args']['issue_ts'])
self.assertEquals(current_entry['ph'], 'c')
def testClockSyncWithoutTs(self):
with self._test_trace():
with open(self._log_path, 'r') as f:
trace_event.clock_sync('id')
trace_event.trace_flush()
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 2)
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'process_argv')
self.assertEquals(current_entry['name'], 'process_argv')
self.assertTrue(current_entry['args']['argv'])
self.assertEquals(current_entry['ph'], 'M')
current_entry = log_output.pop(0)
self.assertEquals(current_entry['category'], 'python')
self.assertEquals(current_entry['name'], 'clock_sync')
self.assertFalse(current_entry['args'].get('issue_ts'))
self.assertEquals(current_entry['ph'], 'c')
def testTime(self):
actual_diff = []
def func1():
trace_begin("func1")
start = time.time()
time.sleep(0.25)
end = time.time()
actual_diff.append(end-start) # Pass via array because of Python scoping
trace_end("func1")
with self._test_trace():
start_ts = time.time()
trace_event.trace_begin('test')
end_ts = time.time()
trace_event.trace_end('test')
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 3)
meta_data = log_output[0]
open_data = log_output[1]
close_data = log_output[2]
self.assertEquals(meta_data['category'], 'process_argv')
self.assertEquals(meta_data['name'], 'process_argv')
self.assertTrue(meta_data['args']['argv'])
self.assertEquals(meta_data['ph'], 'M')
self.assertEquals(open_data['category'], 'python')
self.assertEquals(open_data['name'], 'test')
self.assertEquals(open_data['ph'], 'B')
self.assertEquals(close_data['category'], 'python')
self.assertEquals(close_data['name'], 'test')
self.assertEquals(close_data['ph'], 'E')
event_time_diff = close_data['ts'] - open_data['ts']
recorded_time_diff = (end_ts - start_ts) * 1000000
self.assertLess(math.fabs(event_time_diff - recorded_time_diff), 1000)
def testNestedCalls(self):
with self._test_trace():
trace_event.trace_begin('one')
trace_event.trace_begin('two')
trace_event.trace_end('two')
trace_event.trace_end('one')
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 5)
meta_data = log_output[0]
one_open = log_output[1]
two_open = log_output[2]
two_close = log_output[3]
one_close = log_output[4]
self.assertEquals(meta_data['category'], 'process_argv')
self.assertEquals(meta_data['name'], 'process_argv')
self.assertTrue(meta_data['args']['argv'])
self.assertEquals(meta_data['ph'], 'M')
self.assertEquals(one_open['category'], 'python')
self.assertEquals(one_open['name'], 'one')
self.assertEquals(one_open['ph'], 'B')
self.assertEquals(one_close['category'], 'python')
self.assertEquals(one_close['name'], 'one')
self.assertEquals(one_close['ph'], 'E')
self.assertEquals(two_open['category'], 'python')
self.assertEquals(two_open['name'], 'two')
self.assertEquals(two_open['ph'], 'B')
self.assertEquals(two_close['category'], 'python')
self.assertEquals(two_close['name'], 'two')
self.assertEquals(two_close['ph'], 'E')
self.assertLessEqual(one_open['ts'], two_open['ts'])
self.assertGreaterEqual(one_close['ts'], two_close['ts'])
def testInterleavedCalls(self):
with self._test_trace():
trace_event.trace_begin('one')
trace_event.trace_begin('two')
trace_event.trace_end('one')
trace_event.trace_end('two')
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 5)
meta_data = log_output[0]
one_open = log_output[1]
two_open = log_output[2]
two_close = log_output[4]
one_close = log_output[3]
self.assertEquals(meta_data['category'], 'process_argv')
self.assertEquals(meta_data['name'], 'process_argv')
self.assertTrue(meta_data['args']['argv'])
self.assertEquals(meta_data['ph'], 'M')
self.assertEquals(one_open['category'], 'python')
self.assertEquals(one_open['name'], 'one')
self.assertEquals(one_open['ph'], 'B')
self.assertEquals(one_close['category'], 'python')
self.assertEquals(one_close['name'], 'one')
self.assertEquals(one_close['ph'], 'E')
self.assertEquals(two_open['category'], 'python')
self.assertEquals(two_open['name'], 'two')
self.assertEquals(two_open['ph'], 'B')
self.assertEquals(two_close['category'], 'python')
self.assertEquals(two_close['name'], 'two')
self.assertEquals(two_close['ph'], 'E')
self.assertLessEqual(one_open['ts'], two_open['ts'])
self.assertLessEqual(one_close['ts'], two_close['ts'])
# TODO(khokhlov): Fix this test on Windows. See crbug.com/945819 for details.
def disabled_testMultiprocess(self):
def child_function():
with trace_event.trace('child_event'):
pass
with self._test_trace():
trace_event.trace_begin('parent_event')
trace_event.trace_flush()
p = multiprocessing.Process(target=child_function)
p.start()
self.assertTrue(hasattr(p, "_shimmed_by_trace_event"))
p.join()
trace_event.trace_end('parent_event')
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 5)
meta_data = log_output[0]
parent_open = log_output[1]
child_open = log_output[2]
child_close = log_output[3]
parent_close = log_output[4]
self.assertEquals(meta_data['category'], 'process_argv')
self.assertEquals(meta_data['name'], 'process_argv')
self.assertTrue(meta_data['args']['argv'])
self.assertEquals(meta_data['ph'], 'M')
self.assertEquals(parent_open['category'], 'python')
self.assertEquals(parent_open['name'], 'parent_event')
self.assertEquals(parent_open['ph'], 'B')
self.assertEquals(child_open['category'], 'python')
self.assertEquals(child_open['name'], 'child_event')
self.assertEquals(child_open['ph'], 'B')
self.assertEquals(child_close['category'], 'python')
self.assertEquals(child_close['name'], 'child_event')
self.assertEquals(child_close['ph'], 'E')
self.assertEquals(parent_close['category'], 'python')
self.assertEquals(parent_close['name'], 'parent_event')
self.assertEquals(parent_close['ph'], 'E')
@unittest.skipIf(sys.platform == 'win32', 'crbug.com/945819')
def testTracingControlDisabledInChildButNotInParent(self):
def child(resp):
# test tracing is not controllable in the child
resp.put(trace_event.is_tracing_controllable())
with self._test_trace():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=child, args=[q])
p.start()
# test tracing is controllable in the parent
self.assertTrue(trace_event.is_tracing_controllable())
self.assertFalse(q.get())
p.join()
def testMultiprocessExceptionInChild(self):
def bad_child():
trace_event.trace_disable()
with self._test_trace():
p = multiprocessing.Pool(1)
trace_event.trace_begin('parent')
self.assertRaises(Exception, lambda: p.apply(bad_child, ()))
p.close()
p.terminate()
p.join()
trace_event.trace_end('parent')
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 3)
meta_data = log_output[0]
parent_open = log_output[1]
parent_close = log_output[2]
self.assertEquals(parent_open['category'], 'python')
self.assertEquals(parent_open['name'], 'parent')
self.assertEquals(parent_open['ph'], 'B')
self.assertEquals(parent_close['category'], 'python')
self.assertEquals(parent_close['name'], 'parent')
self.assertEquals(parent_close['ph'], 'E')
def testFormatJson(self):
with self._test_trace(format=trace_event.JSON):
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
log_output = json.loads(f.read() + ']')
self.assertEquals(len(log_output), 1)
self.assertEquals(log_output[0]['ph'], 'M')
def testFormatJsonWithMetadata(self):
with self._test_trace(format=trace_event.JSON_WITH_METADATA):
trace_event.trace_disable()
with open(self._log_path, 'r') as f:
log_output = json.load(f)
self.assertEquals(len(log_output), 2)
events = log_output['traceEvents']
self.assertEquals(len(events), 1)
self.assertEquals(events[0]['ph'], 'M')
def testFormatProtobuf(self):
with self._test_trace(format=trace_event.PROTOBUF):
trace_event.trace_flush()
with open(self._log_path, 'r') as f:
self.assertGreater(len(f.read()), 0)
def testAddMetadata(self):
with self._test_trace(format=trace_event.JSON_WITH_METADATA):
trace_event.trace_add_benchmark_metadata(
benchmark_start_time_us=1000,
story_run_time_us=2000,
benchmark_name='benchmark',
benchmark_description='desc',
story_name='story',
story_tags=['tag1', 'tag2'],
story_run_index=0,
)
trace_event.trace_disable()
with open(self._log_path, 'r') as f:
log_output = json.load(f)
self.assertEquals(len(log_output), 2)
telemetry_metadata = log_output['metadata']['telemetry']
self.assertEquals(len(telemetry_metadata), 7)
self.assertEquals(telemetry_metadata['benchmarkStart'], 1)
self.assertEquals(telemetry_metadata['traceStart'], 2)
self.assertEquals(telemetry_metadata['benchmarks'], ['benchmark'])
self.assertEquals(telemetry_metadata['benchmarkDescriptions'], ['desc'])
self.assertEquals(telemetry_metadata['stories'], ['story'])
self.assertEquals(telemetry_metadata['storyTags'], ['tag1', 'tag2'])
self.assertEquals(telemetry_metadata['storysetRepeats'], [0])
def testAddMetadataProtobuf(self):
with self._test_trace(format=trace_event.PROTOBUF):
trace_event.trace_add_benchmark_metadata(
benchmark_start_time_us=1000,
story_run_time_us=2000,
benchmark_name='benchmark',
benchmark_description='desc',
story_name='story',
story_tags=['tag1', 'tag2'],
story_run_index=0,
)
trace_event.trace_disable()
with open(self._log_path, 'r') as f:
self.assertGreater(len(f.read()), 0)
def testAddMetadataInJsonFormatRaises(self):
with self._test_trace(format=trace_event.JSON):
with self.assertRaises(log.TraceException):
trace_event.trace_add_benchmark_metadata(
benchmark_start_time_us=1000,
story_run_time_us=2000,
benchmark_name='benchmark',
benchmark_description='description',
story_name='story',
story_tags=['tag1', 'tag2'],
story_run_index=0,
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main(verbosity=2)

View File

@@ -0,0 +1,234 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import ctypes
import ctypes.util
import logging
import os
import platform
import sys
import time
import threading
GET_TICK_COUNT_LAST_NOW = 0
# If GET_TICK_COUNTER_LAST_NOW is less than the current time, the clock has
# rolled over, and this needs to be accounted for.
GET_TICK_COUNT_WRAPAROUNDS = 0
# The current detected platform
_CLOCK = None
_NOW_FUNCTION = None
# Mapping of supported platforms and what is returned by sys.platform.
_PLATFORMS = {
'mac': 'darwin',
'linux': 'linux',
'windows': 'win32',
'cygwin': 'cygwin',
'freebsd': 'freebsd',
'sunos': 'sunos5',
'bsd': 'bsd'
}
# Mapping of what to pass get_clocktime based on platform.
_CLOCK_MONOTONIC = {
'linux': 1,
'freebsd': 4,
'bsd': 3,
'sunos5': 4
}
_LINUX_CLOCK = 'LINUX_CLOCK_MONOTONIC'
_MAC_CLOCK = 'MAC_MACH_ABSOLUTE_TIME'
_WIN_HIRES = 'WIN_QPC'
_WIN_LORES = 'WIN_ROLLOVER_PROTECTED_TIME_GET_TIME'
def InitializeMacNowFunction(plat):
"""Sets a monotonic clock for the Mac platform.
Args:
plat: Platform that is being run on. Unused in GetMacNowFunction. Passed
for consistency between initilaizers.
"""
del plat # Unused
global _CLOCK # pylint: disable=global-statement
global _NOW_FUNCTION # pylint: disable=global-statement
_CLOCK = _MAC_CLOCK
libc = ctypes.CDLL('/usr/lib/libc.dylib', use_errno=True)
class MachTimebaseInfoData(ctypes.Structure):
"""System timebase info. Defined in <mach/mach_time.h>."""
_fields_ = (('numer', ctypes.c_uint32),
('denom', ctypes.c_uint32))
mach_absolute_time = libc.mach_absolute_time
mach_absolute_time.restype = ctypes.c_uint64
timebase = MachTimebaseInfoData()
libc.mach_timebase_info(ctypes.byref(timebase))
ticks_per_second = timebase.numer / timebase.denom * 1.0e9
def MacNowFunctionImpl():
return mach_absolute_time() / ticks_per_second
_NOW_FUNCTION = MacNowFunctionImpl
def GetClockGetTimeClockNumber(plat):
for key in _CLOCK_MONOTONIC:
if plat.startswith(key):
return _CLOCK_MONOTONIC[key]
raise LookupError('Platform not in clock dicitonary')
def InitializeLinuxNowFunction(plat):
"""Sets a monotonic clock for linux platforms.
Args:
plat: Platform that is being run on.
"""
global _CLOCK # pylint: disable=global-statement
global _NOW_FUNCTION # pylint: disable=global-statement
_CLOCK = _LINUX_CLOCK
clock_monotonic = GetClockGetTimeClockNumber(plat)
try:
# Attempt to find clock_gettime in the C library.
clock_gettime = ctypes.CDLL(ctypes.util.find_library('c'),
use_errno=True).clock_gettime
except AttributeError:
# If not able to find int in the C library, look in rt library.
clock_gettime = ctypes.CDLL(ctypes.util.find_library('rt'),
use_errno=True).clock_gettime
class Timespec(ctypes.Structure):
"""Time specification, as described in clock_gettime(3)."""
_fields_ = (('tv_sec', ctypes.c_long),
('tv_nsec', ctypes.c_long))
def LinuxNowFunctionImpl():
ts = Timespec()
if clock_gettime(clock_monotonic, ctypes.pointer(ts)):
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return ts.tv_sec + ts.tv_nsec / 1.0e9
_NOW_FUNCTION = LinuxNowFunctionImpl
def IsQPCUsable():
"""Determines if system can query the performance counter.
The performance counter is a high resolution timer on windows systems.
Some chipsets have unreliable performance counters, so this checks that one
of those chipsets is not present.
Returns:
True if QPC is useable, false otherwise.
"""
# Sample output: 'Intel64 Family 6 Model 23 Stepping 6, GenuineIntel'
info = platform.processor()
if 'AuthenticAMD' in info and 'Family 15' in info:
return False
if not hasattr(ctypes, 'windll'):
return False
try: # If anything goes wrong during this, assume QPC isn't available.
frequency = ctypes.c_int64()
ctypes.windll.Kernel32.QueryPerformanceFrequency(
ctypes.byref(frequency))
if float(frequency.value) <= 0:
return False
except Exception: # pylint: disable=broad-except
logging.exception('Error when determining if QPC is usable.')
return False
return True
def InitializeWinNowFunction(plat):
"""Sets a monotonic clock for windows platforms.
Args:
plat: Platform that is being run on.
"""
global _CLOCK # pylint: disable=global-statement
global _NOW_FUNCTION # pylint: disable=global-statement
if IsQPCUsable():
_CLOCK = _WIN_HIRES
qpc_return = ctypes.c_int64()
qpc_frequency = ctypes.c_int64()
ctypes.windll.Kernel32.QueryPerformanceFrequency(
ctypes.byref(qpc_frequency))
qpc_frequency = float(qpc_frequency.value)
qpc = ctypes.windll.Kernel32.QueryPerformanceCounter
def WinNowFunctionImpl():
qpc(ctypes.byref(qpc_return))
return qpc_return.value / qpc_frequency
else:
_CLOCK = _WIN_LORES
kernel32 = (ctypes.cdll.kernel32
if plat.startswith(_PLATFORMS['cygwin'])
else ctypes.windll.kernel32)
get_tick_count_64 = getattr(kernel32, 'GetTickCount64', None)
# Windows Vista or newer
if get_tick_count_64:
get_tick_count_64.restype = ctypes.c_ulonglong
def WinNowFunctionImpl():
return get_tick_count_64() / 1000.0
else: # Pre Vista.
get_tick_count = kernel32.GetTickCount
get_tick_count.restype = ctypes.c_uint32
get_tick_count_lock = threading.Lock()
def WinNowFunctionImpl():
global GET_TICK_COUNT_LAST_NOW # pylint: disable=global-statement
global GET_TICK_COUNT_WRAPAROUNDS # pylint: disable=global-statement
with get_tick_count_lock:
current_sample = get_tick_count()
if current_sample < GET_TICK_COUNT_LAST_NOW:
GET_TICK_COUNT_WRAPAROUNDS += 1
GET_TICK_COUNT_LAST_NOW = current_sample
final_ms = GET_TICK_COUNT_WRAPAROUNDS << 32
final_ms += GET_TICK_COUNT_LAST_NOW
return final_ms / 1000.0
_NOW_FUNCTION = WinNowFunctionImpl
def InitializeNowFunction(plat):
"""Sets a monotonic clock for the current platform.
Args:
plat: Platform that is being run on.
"""
if plat.startswith(_PLATFORMS['mac']):
InitializeMacNowFunction(plat)
elif (plat.startswith(_PLATFORMS['linux'])
or plat.startswith(_PLATFORMS['freebsd'])
or plat.startswith(_PLATFORMS['bsd'])
or plat.startswith(_PLATFORMS['sunos'])):
InitializeLinuxNowFunction(plat)
elif (plat.startswith(_PLATFORMS['windows'])
or plat.startswith(_PLATFORMS['cygwin'])):
InitializeWinNowFunction(plat)
else:
raise RuntimeError('%s is not a supported platform.' % plat)
global _NOW_FUNCTION
global _CLOCK
assert _NOW_FUNCTION, 'Now function not properly set during initialization.'
assert _CLOCK, 'Clock not properly set during initialization.'
def Now():
return _NOW_FUNCTION() * 1e6 # convert from seconds to microseconds
def GetClock():
return _CLOCK
InitializeNowFunction(sys.platform)

View File

@@ -0,0 +1,123 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import logging
import platform
import sys
import unittest
from py_trace_event import trace_time
class TimerTest(unittest.TestCase):
# Helper methods.
@contextlib.contextmanager
def ReplacePlatformProcessorCall(self, f):
try:
old_proc = platform.processor
platform.processor = f
yield
finally:
platform.processor = old_proc
@contextlib.contextmanager
def ReplaceQPCCheck(self, f):
try:
old_qpc = trace_time.IsQPCUsable
trace_time.IsQPCUsable = f
yield
finally:
trace_time.IsQPCUsable = old_qpc
# Platform detection tests.
def testInitializeNowFunction_platformNotSupported(self):
with self.assertRaises(RuntimeError):
trace_time.InitializeNowFunction('invalid_platform')
def testInitializeNowFunction_windows(self):
if not (sys.platform.startswith(trace_time._PLATFORMS['windows'])
or sys.platform.startswith(trace_time._PLATFORMS['cygwin'])):
return True
trace_time.InitializeNowFunction(sys.platform)
self.assertTrue(trace_time.GetClock() == trace_time._WIN_HIRES
or trace_time.GetClock() == trace_time._WIN_LORES)
def testInitializeNowFunction_linux(self):
if not sys.platform.startswith(trace_time._PLATFORMS['linux']):
return True
trace_time.InitializeNowFunction(sys.platform)
self.assertEqual(trace_time.GetClock(), trace_time._LINUX_CLOCK)
def testInitializeNowFunction_mac(self):
if not sys.platform.startswith(trace_time._PLATFORMS['mac']):
return True
trace_time.InitializeNowFunction(sys.platform)
self.assertEqual(trace_time.GetClock(), trace_time._MAC_CLOCK)
# Windows Tests
def testIsQPCUsable_buggyAthlonProcReturnsFalse(self):
if not (sys.platform.startswith(trace_time._PLATFORMS['windows'])
or sys.platform.startswith(trace_time._PLATFORMS['cygwin'])):
return True
def BuggyAthlonProc():
return 'AMD64 Family 15 Model 23 Stepping 6, AuthenticAMD'
with self.ReplacePlatformProcessorCall(BuggyAthlonProc):
self.assertFalse(trace_time.IsQPCUsable())
def testIsQPCUsable_returnsTrueOnWindows(self):
if not (sys.platform.startswith(trace_time._PLATFORMS['windows'])
or sys.platform.startswith(trace_time._PLATFORMS['cygwin'])):
return True
def Proc():
return 'Intel64 Family 15 Model 23 Stepping 6, GenuineIntel'
with self.ReplacePlatformProcessorCall(Proc):
self.assertTrue(trace_time.IsQPCUsable())
def testGetWinNowFunction_QPC(self):
if not (sys.platform.startswith(trace_time._PLATFORMS['windows'])
or sys.platform.startswith(trace_time._PLATFORMS['cygwin'])):
return True
# Test requires QPC to be available on platform.
if not trace_time.IsQPCUsable():
return True
self.assertGreater(trace_time.Now(), 0)
# Works even if QPC would work.
def testGetWinNowFunction_GetTickCount(self):
if not (sys.platform.startswith(trace_time._PLATFORMS['windows'])
or sys.platform.startswith(trace_time._PLATFORMS['cygwin'])):
return True
with self.ReplaceQPCCheck(lambda: False):
self.assertGreater(trace_time.Now(), 0)
# Linux tests.
def testGetClockGetTimeClockNumber_linux(self):
self.assertEquals(trace_time.GetClockGetTimeClockNumber('linux'), 1)
def testGetClockGetTimeClockNumber_freebsd(self):
self.assertEquals(trace_time.GetClockGetTimeClockNumber('freebsd'), 4)
def testGetClockGetTimeClockNumber_bsd(self):
self.assertEquals(trace_time.GetClockGetTimeClockNumber('bsd'), 3)
def testGetClockGetTimeClockNumber_sunos(self):
self.assertEquals(trace_time.GetClockGetTimeClockNumber('sunos5'), 4)
# Smoke Test.
def testMonotonic(self):
time_one = trace_time.Now()
for _ in xrange(1000):
time_two = trace_time.Now()
self.assertLessEqual(time_one, time_two)
time_one = time_two
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main(verbosity=2)

View File

@@ -0,0 +1,12 @@
Name: Protobuf
URL: https://developers.google.com/protocol-buffers/
Version: 3.0.0
License: BSD
Description:
Protocol buffers are Google's language-neutral, platform-neutral,
extensible mechanism for serializing structured data.
Local Modifications:
Removed pretty much everything except functions necessary to write
bools, ints, and strings.

View File

@@ -0,0 +1,224 @@
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc. All rights reserved.
# https://developers.google.com/protocol-buffers/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import six
import wire_format
def _VarintSize(value):
"""Compute the size of a varint value."""
if value <= 0x7f: return 1
if value <= 0x3fff: return 2
if value <= 0x1fffff: return 3
if value <= 0xfffffff: return 4
if value <= 0x7ffffffff: return 5
if value <= 0x3ffffffffff: return 6
if value <= 0x1ffffffffffff: return 7
if value <= 0xffffffffffffff: return 8
if value <= 0x7fffffffffffffff: return 9
return 10
def _SignedVarintSize(value):
"""Compute the size of a signed varint value."""
if value < 0: return 10
if value <= 0x7f: return 1
if value <= 0x3fff: return 2
if value <= 0x1fffff: return 3
if value <= 0xfffffff: return 4
if value <= 0x7ffffffff: return 5
if value <= 0x3ffffffffff: return 6
if value <= 0x1ffffffffffff: return 7
if value <= 0xffffffffffffff: return 8
if value <= 0x7fffffffffffffff: return 9
return 10
def _VarintEncoder():
"""Return an encoder for a basic varint value (does not include tag)."""
def EncodeVarint(write, value):
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeVarint
def _SignedVarintEncoder():
"""Return an encoder for a basic signed varint value (does not include
tag)."""
def EncodeSignedVarint(write, value):
if value < 0:
value += (1 << 64)
bits = value & 0x7f
value >>= 7
while value:
write(six.int2byte(0x80|bits))
bits = value & 0x7f
value >>= 7
return write(six.int2byte(bits))
return EncodeSignedVarint
_EncodeVarint = _VarintEncoder()
_EncodeSignedVarint = _SignedVarintEncoder()
def _VarintBytes(value):
"""Encode the given integer as a varint and return the bytes. This is only
called at startup time so it doesn't need to be fast."""
pieces = []
_EncodeVarint(pieces.append, value)
return b"".join(pieces)
def TagBytes(field_number, wire_type):
"""Encode the given tag and return the bytes. Only called at startup."""
return _VarintBytes(wire_format.PackTag(field_number, wire_type))
def _SimpleEncoder(wire_type, encode_value, compute_value_size):
"""Return a constructor for an encoder for fields of a particular type.
Args:
wire_type: The field's wire type, for encoding tags.
encode_value: A function which encodes an individual value, e.g.
_EncodeVarint().
compute_value_size: A function which computes the size of an individual
value, e.g. _VarintSize().
"""
def SpecificEncoder(field_number, is_repeated, is_packed):
if is_packed:
tag_bytes = TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
local_EncodeVarint = _EncodeVarint
def EncodePackedField(write, value):
write(tag_bytes)
size = 0
for element in value:
size += compute_value_size(element)
local_EncodeVarint(write, size)
for element in value:
encode_value(write, element)
return EncodePackedField
elif is_repeated:
tag_bytes = TagBytes(field_number, wire_type)
def EncodeRepeatedField(write, value):
for element in value:
write(tag_bytes)
encode_value(write, element)
return EncodeRepeatedField
else:
tag_bytes = TagBytes(field_number, wire_type)
def EncodeField(write, value):
write(tag_bytes)
return encode_value(write, value)
return EncodeField
return SpecificEncoder
Int32Encoder = Int64Encoder = EnumEncoder = _SimpleEncoder(
wire_format.WIRETYPE_VARINT, _EncodeSignedVarint, _SignedVarintSize)
UInt32Encoder = UInt64Encoder = _SimpleEncoder(
wire_format.WIRETYPE_VARINT, _EncodeVarint, _VarintSize)
def BoolEncoder(field_number, is_repeated, is_packed):
"""Returns an encoder for a boolean field."""
false_byte = b'\x00'
true_byte = b'\x01'
if is_packed:
tag_bytes = TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
local_EncodeVarint = _EncodeVarint
def EncodePackedField(write, value):
write(tag_bytes)
local_EncodeVarint(write, len(value))
for element in value:
if element:
write(true_byte)
else:
write(false_byte)
return EncodePackedField
elif is_repeated:
tag_bytes = TagBytes(field_number, wire_format.WIRETYPE_VARINT)
def EncodeRepeatedField(write, value):
for element in value:
write(tag_bytes)
if element:
write(true_byte)
else:
write(false_byte)
return EncodeRepeatedField
else:
tag_bytes = TagBytes(field_number, wire_format.WIRETYPE_VARINT)
def EncodeField(write, value):
write(tag_bytes)
if value:
return write(true_byte)
return write(false_byte)
return EncodeField
def StringEncoder(field_number, is_repeated, is_packed):
"""Returns an encoder for a string field."""
tag = TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
local_EncodeVarint = _EncodeVarint
local_len = len
assert not is_packed
if is_repeated:
def EncodeRepeatedField(write, value):
for element in value:
encoded = element.encode('utf-8')
write(tag)
local_EncodeVarint(write, local_len(encoded))
write(encoded)
return EncodeRepeatedField
else:
def EncodeField(write, value):
encoded = value.encode('utf-8')
write(tag)
local_EncodeVarint(write, local_len(encoded))
return write(encoded)
return EncodeField

View File

@@ -0,0 +1,52 @@
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc. All rights reserved.
# https://developers.google.com/protocol-buffers/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
TAG_TYPE_BITS = 3 # Number of bits used to hold type info in a proto tag.
WIRETYPE_VARINT = 0
WIRETYPE_FIXED64 = 1
WIRETYPE_LENGTH_DELIMITED = 2
WIRETYPE_START_GROUP = 3
WIRETYPE_END_GROUP = 4
WIRETYPE_FIXED32 = 5
_WIRETYPE_MAX = 5
def PackTag(field_number, wire_type):
"""Returns an unsigned 32-bit integer that encodes the field number and
wire type information in standard protocol message wire format.
Args:
field_number: Expected to be an integer in the range [1, 1 << 29)
wire_type: One of the WIRETYPE_* constants.
"""
if not 0 <= wire_type <= _WIRETYPE_MAX:
raise RuntimeError('Unknown wire type: %d' % wire_type)
return (field_number << TAG_TYPE_BITS) | wire_type