CYCAMORE
helper.py
Go to the documentation of this file.
1 """A set of tools for use in integration tests."""
2 import os
3 import tempfile
4 import subprocess
5 import sys
6 from hashlib import sha1
7 import numpy as np
8 import tables
9 from nose.tools import assert_equal
10 
11 
12 CYCLUS_HAS_COIN = None
13 
14 
15 if sys.version_info[0] >= 3:
16  str_types = (bytes, str)
17 else:
18  str_types = (str, unicode)
19 
20 def hasher(x):
21  return int(sha1(x.encode()).hexdigest(), 16)
22 
23 def idx(h):
24  ind = [None] * 5
25  for i in range(4, -1, -1):
26  h, ind[i] = divmod(h, 2**32)
27  return tuple(ind)
28 
29 sha1array = lambda x: np.array(idx(hasher(x)), np.uint32)
30 
31 def table_exist(db, tables):
32  """Checks if hdf5 database contains the specified tables.
33  """
34  return all([t in db.root for t in tables])
35 
36 def find_ids(data, data_table, id_table):
37  """Finds ids of the specified data located in the specified data_table,
38  and extracts the corresponding id from the specified id_table.
39  """
40  ids = []
41  for i, d in enumerate(data_table):
42  if isinstance(d, np.ndarray) and isinstance(data, np.ndarray):
43  if (d == data).all():
44  ids.append(id_table[i])
45  elif isinstance(d, np.ndarray) and not isinstance(data, np.ndarray):
46  if (d == sha1array(data)).all():
47  ids.append(id_table[i])
48  elif d == data:
49  ids.append(id_table[i])
50  return ids
51 
52 def exit_times(agent_id, exit_table):
53  """Finds exit times of the specified agent from the exit table.
54  """
55  i = 0
56  exit_times = []
57  for index in exit_table["AgentId"]:
58  if index == agent_id:
59  exit_times.append(exit_table["ExitTime"][i])
60  i += 1
61 
62  return exit_times
63 
64 
65 def run_cyclus(cyclus, cwd, in_path, out_path):
66  """Runs cyclus with various inputs and creates output databases
67  """
68  holdsrtn = [1] # needed because nose does not send() to test generator
69  # make sure the output target directory exists
70  cmd = [cyclus, "-o", out_path, "--input-file", in_path]
71  check_cmd(cmd, cwd, holdsrtn)
72 
73 
74 def check_cmd(args, cwd, holdsrtn):
75  """Runs a command in a subprocess and verifies that it executed properly.
76  """
77  if not isinstance(args, str_types):
78  args = " ".join(args)
79  print("TESTING: running command in {0}:\n\n{1}\n".format(cwd, args))
80  env = dict(os.environ)
81  env['_'] = subprocess.check_output(['which', 'cyclus'], cwd=cwd).strip()
82  with tempfile.NamedTemporaryFile() as f:
83  rtn = subprocess.call(args, shell=True, cwd=cwd, stdout=f, stderr=f, env=env)
84  if rtn != 0:
85  f.seek(0)
86  print("STDOUT + STDERR:\n\n" + f.read().decode())
87  holdsrtn[0] = rtn
88  assert_equal(rtn, 0)
89 
90 
92  global CYCLUS_HAS_COIN
93  if CYCLUS_HAS_COIN is not None:
94  return CYCLUS_HAS_COIN
95  s = subprocess.check_output(['cyclus', '--version'], universal_newlines=True)
96  s = s.strip().replace('Dependencies:', '')
97  m = {k.strip(): v.strip() for k,v in [line.split()[:2] for line in s.splitlines()
98  if line != '']}
99  CYCLUS_HAS_COIN = m['Coin-Cbc'] != '-1'
100  return CYCLUS_HAS_COIN
101 
cycamore::GrowthRegion int
def run_cyclus(cyclus, cwd, in_path, out_path)
Definition: helper.py:65
def hasher(x)
Definition: helper.py:20
def exit_times(agent_id, exit_table)
Definition: helper.py:52
def idx(h)
Definition: helper.py:23
def check_cmd(args, cwd, holdsrtn)
Definition: helper.py:74
def table_exist(db, tables)
Definition: helper.py:31
sha1array
Definition: helper.py:29
def cyclus_has_coin()
Definition: helper.py:91
def find_ids(data, data_table, id_table)
Definition: helper.py:36