Source code for tecplot.tecutil.util

import collections
import contextlib
import ctypes
import inspect
import logging
import pathlib
import re
import tempfile
import textwrap
import warnings

from collections.abc import Iterable
from ctypes import cast, c_int, POINTER

from ..exception import *
from ..constant import Color

log = logging.getLogger(__name__)

maxint64 = 2**(64 - 1) - 1
minint64 = -maxint64 - 1
maxuint64 = 2**64 - 1

IndexRange = collections.namedtuple('IndexRange', 'min max step')
IndexRange.__new__.__defaults__ = (None, None, None)

[docs]class Index(int): """Position identifier type. This type is used internally to represent a position in a list. It is used to indicate that a change between zero-based indexing and one-based indexing must occur at the TecUtil boundary. This type can be treated exactly like a Python native `int` and is only meaningful internally to the tecplot Python module. """
XY = collections.namedtuple('XY', ('x', 'y')) XY.__new__.__defaults__ = (None, None) XYZ = collections.namedtuple('XYZ', ('x', 'y', 'z')) XYZ.__new__.__defaults__ = (None, None, None) def flatten_args(*args): flatargs = [] for a in args: if isinstance(a, Iterable) and not isinstance(a, str): flatargs += list(a) else: flatargs.append(a) return tuple(flatargs) def array_to_enums(array_pointer, array_size, enum_type): indexes = cast(array_pointer, POINTER(c_int)) return tuple(enum_type(indexes[i]) for i in range(array_size)) def inherited_property(cls): def _copy_property(prop): attr = getattr(cls, prop.__name__) return property(attr.fget, attr.fset, attr.fdel, prop.__doc__) return _copy_property def lock_attributes(cls): """ As a decorator of a class, this ensures that no new attributes are created after __init__() is called. """ if __debug__: def _setattr(self, name, value): if not name.startswith('_') and name not in dir(self): stacknames = [f[0].f_code.co_name for f in inspect.stack()[:3]] if '__init__' not in stacknames: msg = 'No attribute: {}.{}' classname = self.__class__.__name__ raise TecplotAttributeError(msg.format(classname, name)) return super(cls, self).__setattr__(name, value) cls.__setattr__ = _setattr return cls _VarInfo = collections.namedtuple('_VarInfo', ('types', 'values', 'names')) def check_arglist_argtypes(function_name, *args): for arg in args: vinfo = _VarInfo(*arg) for name, value in zip(vinfo.names, vinfo.values): if value is not None: if not isinstance(value, tuple(vinfo.types)): errfmt = '{}: Type of parameter {} must be one of: {}' types = ', '.join(t.__name__ for t in vinfo.types) errmsg = errfmt.format(function_name, name, types) raise TecplotTypeError(errmsg) def color_spec(color, plot=None): """ color_spec(Color.Blue, plot) --> Color.Blue color_spec(Color.MultiColor, plot) --> plot.contour(0) color_spec(Color.MultiColor2, plot) --> plot.contour(1) color_spec(Color.Blue) --> Color.Blue color_spec(plot.contour(0)) --> Color.MultiColor color_spec(plot.contour(1)) --> Color.MultiColor2 color_spec(plot.rgb_coloring) --> Color.RGBColor color_spec(Color.RGBColor, plot) --> plot.rgb_coloring """ if isinstance(color, Iterable): return tuple([color_spec(c, plot) for c in color]) color_spec._indexes = { Color.RGBColor: Index(Color.RGBColor.value), Color.MultiColor: Index(0), Color.MultiColor2: Index(1), Color.MultiColor3: Index(2), Color.MultiColor4: Index(3), Color.MultiColor5: Index(4), Color.MultiColor6: Index(5), Color.MultiColor7: Index(6), Color.MultiColor8: Index(7)} color_spec._multicolors = {v: k for k, v in color_spec._indexes.items()} try: if plot: if color == Color.RGBColor: return plot.rgb_coloring else: return plot.contour(color_spec._indexes[Color(color)]) else: return color_spec._multicolors[Index(color.index)] except (AttributeError, KeyError): return Color(color) def filled_slice(slice_, maxstop): """Convert start, stop, step in slice to real integers. None and negative values are converted to positive default values depending on the maxstop given. """ if slice_.start is None: start = 0 elif slice_.start < 0: start = maxstop + slice_.start else: start = slice_.start start = min(max(start, 0), maxstop) if slice_.stop is None: stop = maxstop elif slice_.stop < 0: stop = maxstop + slice_.stop else: stop = slice_.stop stop = min(max(stop, 0), maxstop) if slice_.step is None: step = 1 else: step = min(max(slice_.step, 1), maxstop) return slice(start, stop, step) def as_slice(index, size): """Convert index to a filled slice. Parameters: index (`slice` or `int`): A slice or index into an array. size (`int`): Maximum index to include in the slice. This is ignored if **index** is an `int`. """ if isinstance(index, slice): return filled_slice(index, size + (index.start or 0)) else: return slice(index, index + 1, 1) def array_to_str(arr, maxlen=10): try: itr = iter(arr) item = next(itr) ret = '[' + str(item) for i, item in enumerate(itr, start=2): if i > maxlen: ret += ' ...' break ret += ', {}'.format(item) return ret + ']' except StopIteration: return '[]' except TypeError: return str(arr) class ListWrapper(object): """Converts a list to a wrapped paragraph of items. Unlike textwrap.TextWrapper, items in the list are not broken over multiple lines even if they contain spaces. """ def __init__(self, initial_indent='', subsequent_indent=' ', initial_width=70, subsequent_width=70, delim=',', prefix='', suffix=''): assert (len(prefix) + len(initial_indent)) < initial_width assert (len(suffix) + len(subsequent_indent)) < subsequent_width self.initial_indent = initial_indent self.subsequent_indent = subsequent_indent self.initial_width = initial_width self.subsequent_width = subsequent_width self.delim = delim self.prefix = prefix self.suffix = suffix def wrap(self, str_list): ret = [] max_space = self.subsequent_width - len(self.subsequent_indent) line = "{}{}".format(self.prefix, self.initial_indent) itr = iter(str_list) try: item = next(itr) line += "'{}'".format(item) except StopIteration: pass # no items in list space_left = self.initial_width - len(line) for item in itr: s = "{} '{}'".format(self.delim, str(item)) if len(s) < space_left or max_space <= space_left: line += s space_left -= len(s) else: ret.append(line + self.delim) line = "{}'{}'".format(self.subsequent_indent, item) space_left = self.subsequent_width - len(line) if len(self.suffix) <= space_left: ret.append(line + self.suffix) else: ret.append(line) ret.append("{}{}".format(self.subsequent_indent, self.suffix)) return ret def fill(self, str_list): return '\n'.join(self.wrap(str_list)) @contextlib.contextmanager def optional(cls, args): """Context for optional arguments that can be None. In this example, variables is an optional parameter. If not `None`, then ``varset`` will be the result of ``IndexSet(*variables)``, if `None`, then ``varset`` will be `None`:: def fn(variables=None): with optional(IndexSet, variables) as varset: _tecutil.Fn(varset) The ``cls`` parameter must be a class and must have the ``__enter__`` and ``__exit__`` methods implemented. """ if args is None: yield None else: with cls(*flatten_args(args)) as obj: yield obj def split_macro(source): """Split macro code into a list of commands.""" comments = re.compile(r'(?<!\\)(\".*?\"|\'.*?\')|(#[^\r\n]*$)', re.MULTILINE) pattern = re.compile(r'(\$!.*?)(?=\$!)|(\$!.*)', re.MULTILINE | re.DOTALL) source = comments.sub(lambda m: m.group(1) or '', source) for match in pattern.finditer(source): yield (match.group(1) or match.group(2)).strip() def normalize_path(nodepath, resolve=True): """Convert relative paths to absolute and normalize. Always use Python's current working directory to convert to absolute paths. However, relative paths are disallowed when connected to a non-local instance of the TecUtil Server. """ if nodepath is not None: if not isinstance(nodepath, pathlib.Path): nodepath = pathlib.Path(str(nodepath)) from tecplot.tecutil import _tecutil_connector if _tecutil_connector.connected: if _tecutil_connector.client.host not in ['localhost', '127.0.0.1']: if not nodepath.is_absolute(): msg = 'paths must be absolute when connected to Tecplot 360' raise TecplotLogicError(msg) return nodepath.resolve() if resolve else nodepath def normalize_filenames(filenames, resolve=True): """Convert a comma-deliminated string to a list of `pathlib.Path` objects. This function validates that the files exist when PyTecplot is running in batch mode or connected to Tecplot 360 on the same system. """ if filenames: if isinstance(filenames, str): filenames = filenames.split(',') elif isinstance(filenames, pathlib.Path): filenames = [filenames] return [normalize_path(f, resolve) for f in filenames] @contextlib.contextmanager def temporary_closed_file(suffix=None): """A named temporary file that has been created and closed. The file will be removed on exit of the context. """ ftmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) try: ftmp.close() ftmp_path = pathlib.Path(ftmp.name) yield ftmp_path finally: try: ftmp_path.unlink() except OSError as e: warnings.warn(f'Temp file I/O error: {e}') def api_changed(msg, version_changed, sdk_version_changed, warning=False): msg = textwrap.dedent('''\ PyTecplot API Change (version {}) Tecplot 360 version: {} {} ''').format(version_changed, sdk_version_changed, textwrap.fill(msg)) if warning: if __debug__: msg += 'Please update your code as the legacy interface will\n' \ + 'be removed in the next major release of PyTecplot.' warnings.warn(msg, TecplotFutureWarning) else: raise TecplotInterfaceChangeError(msg) def api_moved(old, new, version_changed, sdk_version_changed, warning=False): msg = textwrap.dedent('''\ PyTecplot API Change (version {}) Tecplot 360 version: {} {} has been moved to {} ''').format(version_changed, sdk_version_changed, old, new) if warning: if __debug__: msg += 'Please update your code as the legacy interface will\n' \ + 'be removed in the next major release of PyTecplot.' warnings.warn(msg, TecplotFutureWarning) else: raise TecplotInterfaceChangeError(msg)