#
# Jasy - Web Tooling Framework
# Copyright 2010-2012 Zynga Inc.
# Copyright 2013-2014 Sebastian Werner
#
import sys
import os
import yaml
import json
import jasy.core.Console as Console
import jasy.core.File as File
from jasy import UserError
from jasy.core.Util import getKey
__all__ = ("Config", "findConfig", "isConfigName", "loadConfig", "writeConfig")
[docs]def isConfigName(fileName, requiredBaseName=None):
baseName = os.path.basename(fileName)
fileSplit = os.path.splitext(baseName)
if requiredBaseName is not None and requiredBaseName != fileSplit[0]:
return False
return fileSplit[1] in (".yaml", ".json")
[docs]def findConfig(fileName):
"""
Returns the name of a config file based on the given base file name (without extension).
Returns either a filename which endswith .yaml, .json or None when no file was found.
"""
fileExt = os.path.splitext(fileName)[1]
# Auto discovery
if not fileExt:
for tryExt in (".yaml", ".json"):
if os.path.isfile(fileName + tryExt):
return fileName + tryExt
return None
if os.path.isfile(fileName) and fileExt in (".yaml", ".json"):
return fileName
else:
return None
[docs]def loadConfig(fileName, encoding="utf-8"):
"""Loads the given configuration file (filename without extension) and returns the parsed object structure."""
configName = findConfig(fileName)
if configName is None:
raise UserError("Unsupported config file: %s" % fileName)
fileHandle = open(configName, mode="r", encoding=encoding)
fileExt = os.path.splitext(configName)[1]
if fileExt == ".yaml":
result = yaml.load(fileHandle)
elif fileExt == ".json":
result = json.load(fileHandle)
fileHandle.close()
return result
[docs]def writeConfig(data, fileName, indent=2, encoding="utf-8"):
"""
Writes the given data structure to the given file name.
Based on the given extension a different file format is choosen. Currently use either .yaml or .json.
"""
fileHandle = open(fileName, mode="w", encoding=encoding)
fileExt = os.path.splitext(fileName)[1]
if fileExt == ".yaml":
yaml.dump(data, fileHandle, default_flow_style=False, indent=indent, allow_unicode=True)
fileHandle.close()
elif fileExt == ".json":
json.dump(data, fileHandle, indent=indent, ensure_ascii=False)
fileHandle.close()
else:
fileHandle.close()
raise UserError("Unsupported config type: %s" % fileExt)
def matchesType(value, expected):
"""
Returns boolean for whether the given value matches the given type.
Supports all basic JSON supported value types:
primitive, integer/int, float, number/num, string/str, boolean/bool, dict/map, array/list, ...
"""
result = type(value)
expected = expected.lower()
if result is int:
return expected in ("integer", "number", "int", "num", "primitive")
elif result is float:
return expected in ("float", "number", "num", "primitive")
elif result is str:
return expected in ("string", "str", "primitive")
elif result is bool:
return expected in ("boolean", "bool", "primitive")
elif result is dict:
return expected in ("dict", "map")
elif result is list:
return expected in ("array", "list")
return False
[docs]class Config:
"""Wrapper around YAML/JSON with easy to use import tools for using question files, command line arguments, etc."""
def __init__(self, data=None):
"""Initialized configuration object with destination file name."""
self.__data = data or {}
[docs] def debug(self):
"""Prints data to the console."""
print(self.__data)
[docs] def export(self):
"""Returns a flat data structure of the internal data."""
result = {}
def recurse(data, prefix):
for key in data:
value = data[key]
if isinstance(value, dict):
if prefix:
recurse(value, prefix + key + ".")
else:
recurse(value, key + ".")
else:
result[prefix + key] = value
recurse(self.__data, "")
return result
[docs] def injectValues(self, parse=True, **argv):
"""Injects a list of arguments into the configuration file, typically used for injecting command line
arguments."""
for key in argv:
self.set(key, argv[key], parse=parse)
[docs] def loadValues(self, fileName, optional=False, encoding="utf-8"):
"""
Imports the values of the given config file Returns True when the file was found and processed.
Note: Supports dotted names to store into sub trees
Note: This method overrides keys when they are already defined!
"""
configFile = findConfig(fileName)
if configFile is None:
if optional:
return False
else:
raise UserError("Could not find configuration file (values): %s" % configFile)
data = loadConfig(configFile, encoding=encoding)
for key in data:
self.set(key, data[key])
return True
[docs] def readQuestions(self, fileName, force=False, autoDelete=True, optional=False, encoding="utf-8"):
"""
Reads the given configuration file with questions and deletes the file afterwards (by default).
Returns True when the file was found and processed.
"""
configFile = findConfig(fileName)
if configFile is None:
if optional:
return False
else:
raise UserError("Could not find configuration file (questions): %s" % configFile)
data = loadConfig(configFile, encoding=encoding)
for entry in data:
question = entry["question"]
name = entry["name"]
accept = getKey(entry, "accept", None)
required = getKey(entry, "required", True)
default = getKey(entry, "default", None)
force = getKey(entry, "force", False)
self.ask(question, name, accept=accept, required=required, default=default, force=force)
if autoDelete:
File.rm(configFile)
return True
[docs] def executeScript(self, fileName, autoDelete=True, optional=False, encoding="utf-8"):
"""
Executes the given script for configuration proposes and deletes the file afterwards (by default).
Returns True when the file was found and processed.
"""
if not os.path.exists(fileName):
if optional:
return False
else:
raise UserError("Could not find configuration script: %s" % fileName)
env = {
"config" : self,
"file" : File
}
code = open(fileName, "r", encoding=encoding).read()
exec(compile(code, os.path.abspath(fileName), "exec"), globals(), env)
if autoDelete:
File.rm(fileName)
return True
[docs] def has(self, name):
"""Returns whether there is a value for the given field name."""
if not "." in name:
return name in self.__data
splits = name.split(".")
current = self.__data
for split in splits:
if split in current:
current = current[split]
else:
return False
return True
[docs] def get(self, name, default=None):
"""Returns the value of the given field or None when field is not set."""
if not "." in name:
return getKey(self.__data, name, default)
splits = name.split(".")
current = self.__data
for split in splits[:-1]:
if split in current:
current = current[split]
else:
return default
return getKey(current, splits[-1], default)
[docs] def ask(self, question, name, accept=None, required=True, default=None, force=False, parse=True):
"""
Asks the user for value for the given configuration field:
:param question: Question to ask the user
:type question: string
:param name: Name of field to store value in
:type name: string
:param accept: Any of the supported types to validate for (see matchesType)
:type accept: string
:param required: Whether the field is required
:type required: boolean
:param default: Default value whenever user has given no value
"""
while True:
msg = "- %s?" % question
if accept is not None:
msg += Console.colorize(" [%s]" % accept, "grey")
if default is None:
msg += Console.colorize(" (%s)" % name, "magenta")
else:
msg += Console.colorize(" (%s=%s)" % (name, default), "magenta")
msg += ": "
sys.stdout.write(msg)
# Do not ask user for solved items
if not force and self.has(name):
print("%s %s" % (self.get(name), Console.colorize("(pre-filled)", "cyan")))
return
# Read user input, but ignore any leading/trailing white space
value = input().strip()
# Fallback to default if no value is given and field is not required
if not required and value == "":
value = default
# Don't accept empty values
if value == "":
continue
# Try setting the current value
if self.set(name, value, accept=accept, parse=parse):
break
[docs] def set(self, name, value, accept=None, parse=False):
"""Saves the given value under the given field."""
# Don't accept None value
if value is None:
return False
# Parse value for easy type checks
if parse:
try:
parsedValue = eval(value)
except:
pass
else:
value = parsedValue
# Convert tuples/sets into JSON compatible array
if type(value) in (tuple, set):
value = list(value)
# Check for given type
if accept is not None and not matchesType(value, accept):
print(Console.colorize(" - Invalid value: %s" % str(value), "red"))
return False
if "." in name:
splits = name.split(".")
current = self.__data
for split in splits[:-1]:
if split not in current:
current[split] = {}
current = current[split]
current[splits[-1]] = value
else:
self.__data[name] = value
return True
[docs] def write(self, fileName, indent=2, encoding="utf-8"):
"""Uses config writer to write the configuration file to the application."""
writeConfig(self.__data, fileName, indent=indent, encoding=encoding)