blob: 0639e96834589eaaeba82e16561e8890bc8c959d [file]
"An experimental (and incomplete) yaml parser for starlark"
# https://github.com/bazelbuild/starlark/issues/219
_STATE = struct(
# Consume extraneous space and keep track of indent level
CONSUME_SPACE = 0,
# Consume whitespace within a flow, where indentation isn't a factor
CONSUME_SPACE_FLOW = 1,
# Parse the next key or value
PARSE_NEXT = 2,
# Parse a multiline string
PARSE_MULTILINE_STRING = 3,
# Parse the next key or value inside of a flow
PARSE_NEXT_FLOW = 4,
# Pseudo-states that don't perform any logic but indicate the current
# hierarchical result being formed in the starlark output.
KEY = 5,
SEQUENCE = 6,
)
EOF = {}
def parse(yaml):
"""Parse yaml into starlark
Args:
yaml: string, the yaml content to parse
Returns:
A tuple containing an equivalent mapping to native starlark types and error.
"""
yaml = _normalize_yaml(yaml)
starlark = {"result": None, "error": None}
stack = []
stack.append(_new_CONSUME_SPACE(indent = ""))
inputs = yaml.elems()[:]
inputs.append(EOF)
for input in inputs:
state = _peek(stack)
if state["id"] == _STATE.CONSUME_SPACE:
_handle_CONSUME_SPACE(state, input, stack, starlark)
elif state["id"] == _STATE.CONSUME_SPACE_FLOW:
_handle_CONSUME_SPACE_FLOW(state, input, stack, starlark)
elif state["id"] == _STATE.PARSE_NEXT:
_handle_PARSE_NEXT(state, input, stack, starlark)
elif state["id"] == _STATE.PARSE_MULTILINE_STRING:
_handle_PARSE_MULTILINE_STRING(state, input, stack, starlark)
elif state["id"] == _STATE.PARSE_NEXT_FLOW:
_handle_PARSE_NEXT_FLOW(state, input, stack, starlark)
else:
msg = "Unknown state {}".format(state["id"])
fail(msg)
if starlark["error"] != None:
return None, starlark["error"]
return starlark["result"], None
def _handle_CONSUME_SPACE(state, input, stack, starlark):
if input == EOF:
return
if input == "\n":
# Reset the indentation
state["indent"] = ""
elif input.isspace():
# Count the leading indentation
state["indent"] += input
elif input == "{":
stack.pop()
# We are at the beginning of a new flow map
stack.append(_new_KEY(
key = "",
flow = True,
))
_initialize_result_value(starlark, stack)
# Consume any space following the {
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "?":
# Complex mapping keys are indicated by a "?" at the beginning of the line.
# NOTE: currently only supports string value keys.
stack.pop()
# We just started parsing a key
_pop_higher_indented_states(stack, state["indent"])
if len(stack) < 1 or len(state["indent"]) > len(_peek(stack, _STATE.KEY)["indent"]):
# The key is part of a new map
stack.append(_new_KEY(
key = "",
indent = state["indent"],
flow = False,
complex = True,
))
else:
# The key is a sibling in the map
_peek(stack, _STATE.KEY)["key"] = ""
_peek(stack, _STATE.KEY)["complex"] = True
_initialize_result_value(starlark, stack)
# Consume any space following the ?. Add an extra space in place of the "?"
stack.append(_new_CONSUME_SPACE(indent = state["indent"] + " "))
elif input == ":" and len(stack) > 1 and stack[-2]["id"] == _STATE.KEY and stack[-2]["complex"]:
stack.pop()
# Add an extra space in place of the ":"
stack.append(_new_CONSUME_SPACE(indent = state["indent"] + " "))
elif input == "[":
stack.pop()
# We are at the beginning of a new flow sequence
stack.append(_new_SEQUENCE(
index = 0,
flow = True,
))
_initialize_result_value(starlark, stack)
# Consume any space following the [
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "|":
stack.pop()
# We are at the beginning of a multiline string
stack.append(_new_PARSE_MULTILINE_STRING(
indent = state["indent"],
))
else:
# Reached the beginning of a value or key
stack.pop()
stack.append(_new_PARSE_NEXT(
indent = state["indent"],
buffer = input,
))
def _handle_PARSE_NEXT(state, input, stack, starlark):
if input == EOF:
stack.pop()
# We just parsed a scalar
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
# Consume any space following the scalar
stack.append(_new_CONSUME_SPACE(
indent = "",
))
elif input.isspace():
if state["buffer"].endswith(":"):
stack.pop()
# We just parsed a key
_pop_higher_indented_states(stack, state["indent"])
if len(stack) < 1 or len(state["indent"]) > len(_peek(stack, _STATE.KEY)["indent"]):
# The key is part of a new map
stack.append(_new_KEY(
key = _parse_key(state["buffer"][0:-1]),
indent = state["indent"],
flow = False,
complex = False,
))
else:
# The key is a sibling in the map
_peek(stack, _STATE.KEY)["key"] = _parse_key(state["buffer"][0:-1])
_peek(stack, _STATE.KEY)["complex"] = False
_initialize_result_value(starlark, stack)
# Consume any space following the key
stack.append(_new_CONSUME_SPACE(
indent = state["indent"] if not input == "\n" else "",
))
elif state["buffer"] == "-":
stack.pop()
if len(stack) > 0 and stack[-1]["id"] == _STATE.SEQUENCE:
# We are at the next item in a non-flow sequence
stack[-1]["index"] += 1
else:
# We are at the beginning of a non-flow sequence
stack.append(_new_SEQUENCE(
indent = state["indent"],
index = 0,
flow = False,
))
_initialize_result_value(starlark, stack)
# Consume any space following the sequence marker
stack.append(_new_CONSUME_SPACE(
indent = state["indent"] if not input == "\n" else "",
))
elif input == "\n":
stack.pop()
top = _peek(stack) if len(stack) > 0 else None
if top and top["id"] == _STATE.KEY and top["complex"] and not top["key"]:
# We just parsed a ?: key
top["key"] = _parse_scalar(state["buffer"])
else:
# We just parsed a scalar
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
# Consume any space following the scalar
stack.append(_new_CONSUME_SPACE(
indent = "",
))
else:
# Accumulate the space as part of the next thing to parse
state["buffer"] += input
else:
# Accumulate the current text until we know what to do with it
state["buffer"] += input
def _handle_PARSE_MULTILINE_STRING(state, input, stack, starlark):
if not state["consumed_first_newline"]:
# Consume the rest of the line after the multiline indicator
if input == "+":
state["keep"] = True
elif input == "-":
state["strip"] = True
elif input == "\n":
state["consumed_first_newline"] = True
elif input.isspace():
pass
else:
msg = "Unexpected input '{}' after start of multiline string".format(input)
fail(msg)
elif state["value_indent"] == None:
# Establish the indent of the value
if input == "\n" and state["buffer"] == "":
state["value"] += "\n"
elif input.isspace():
state["buffer"] += input
else:
state["value_indent"] = state["buffer"]
if not state["value_indent"].startswith(state["indent"]):
fail("Value indent of multiline string does not match indent of property")
elif len(state["value_indent"]) <= len(state["indent"]):
fail("Value indent of multiline is not greater than the indent of the property")
state["buffer"] += input
elif input == EOF:
# Parse all lines of the string value
state["value"] = _finalize_multiline_string(state["value"], state["keep"], state["strip"])
_set_result_value(starlark, stack, state["value"])
elif input == "\n":
state["buffer"] += input
if state["buffer"] == "\n":
state["value"] += "\n"
else:
state["value"] += state["buffer"][len(state["value_indent"]):]
state["buffer"] = ""
elif not input.isspace() and not state["buffer"].startswith(state["value_indent"]):
state["value"] = _finalize_multiline_string(state["value"], state["keep"], state["strip"])
_set_result_value(starlark, stack, state["value"])
stack.pop()
# Parse the next thing
stack.append(_new_PARSE_NEXT(
indent = state["buffer"],
buffer = input,
))
else:
state["buffer"] += input
def _handle_CONSUME_SPACE_FLOW(_, input, stack, starlark):
if input == EOF:
starlark["error"] = "Unexpected EOF"
return
if input.isspace():
pass
elif input == "[":
# We started a new inner flow sequence
stack.pop()
stack.append(_new_SEQUENCE(
index = 0,
flow = True,
))
_initialize_result_value(starlark, stack)
# Consume any space following the [
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "{":
# We started a new inner flow map
stack.pop()
stack.append(_new_KEY(
key = "",
flow = True,
))
_initialize_result_value(starlark, stack)
# Consume any space following the {
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "]" and _in_flow_sequence(stack):
# We are at the end of a flow sequence
stack.pop()
_pop(stack, _STATE.SEQUENCE)
# Consume any space before the next thing to parse and escape the flow if needed
stack.append(_new_CONSUME_SPACE_FLOW() if _in_flow(stack) else _new_CONSUME_SPACE(
indent = _peek(stack)["indent"] if len(stack) > 0 else "",
))
elif input == "}" and _in_flow_map(stack):
# We are at the end of a flow map
stack.pop()
_pop(stack, _STATE.KEY)
# Consume any space before the next thing to parse and escape the flow if needed
stack.append(_new_CONSUME_SPACE_FLOW() if _in_flow(stack) else _new_CONSUME_SPACE(
indent = _peek(stack)["indent"] if len(stack) > 0 else "",
))
elif input == "," and _in_flow_map(stack):
# If we come across a comma but we are in the consume space state,
# then it means we just parsed a non-scalar which is already in the
# result, so just move on (I think...)
stack.pop()
# Consume any space before the next sequence value
stack.append(_new_CONSUME_SPACE_FLOW())
else:
# Reached the beginning of a value or key
stack.pop()
stack.append(_new_PARSE_NEXT_FLOW(
buffer = input,
))
def _handle_PARSE_NEXT_FLOW(state, input, stack, starlark):
if input == EOF:
starlark["error"] = "Unexpected EOF"
return
if input == "[":
fail("Unhandled case")
elif input == "," and _in_flow_sequence(stack):
# We parsed the next value in a flow sequence
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
stack.pop()
sequence_flow_state = _peek(stack, _STATE.SEQUENCE)
sequence_flow_state["index"] += 1
# Consume any space before the next sequence value
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "," and _in_flow_map(stack):
# We parsed the a value corresponding to the current key
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
stack.pop()
# Reset the key
map_flow_state = _peek(stack, _STATE.KEY)
map_flow_state["key"] = ""
# Consume any space before the next sequence value
stack.append(_new_CONSUME_SPACE_FLOW())
elif input == "]" and _in_flow_sequence(stack):
# We are at the end of a flow sequence
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
stack.pop()
_pop(stack, _STATE.SEQUENCE)
# Consume any space before the next thing to parse and escape the flow if needed
stack.append(_new_CONSUME_SPACE_FLOW() if _in_flow(stack) else _new_CONSUME_SPACE(
indent = _peek(stack)["indent"] if len(stack) > 0 else "",
))
elif input == "}" and _in_flow_map(stack):
# We are at the end of a flow map
_set_result_value(starlark, stack, _parse_scalar(state["buffer"]))
stack.pop()
_pop(stack, _STATE.KEY)
# Consume any space before the next thing to parse and escape the flow if needed
stack.append(_new_CONSUME_SPACE_FLOW() if _in_flow(stack) else _new_CONSUME_SPACE(
indent = _peek(stack)["indent"] if len(stack) > 0 else "",
))
elif input.isspace() and state["buffer"].endswith(":") and _in_flow_map(stack):
# We just parsed a key
stack.pop()
_peek(stack, _STATE.KEY)["key"] = _parse_key(state["buffer"][0:-1])
stack.append(_new_CONSUME_SPACE_FLOW())
else:
state["buffer"] += input
def _new_CONSUME_SPACE(indent):
return {
"id": _STATE.CONSUME_SPACE,
"indent": indent,
}
def _new_CONSUME_SPACE_FLOW():
return {
"id": _STATE.CONSUME_SPACE_FLOW,
}
def _new_PARSE_NEXT(indent, buffer):
return {
"id": _STATE.PARSE_NEXT,
"indent": indent,
"buffer": buffer,
}
def _new_PARSE_MULTILINE_STRING(indent):
return {
"id": _STATE.PARSE_MULTILINE_STRING,
"type": "literal", # In case we support "folded" (>) later
"indent": indent,
"value_indent": None,
"buffer": "",
"value": "",
"strip": False,
"keep": False,
"consumed_first_newline": False,
}
def _new_PARSE_NEXT_FLOW(buffer):
return {
"id": _STATE.PARSE_NEXT_FLOW,
"buffer": buffer,
}
def _new_KEY(key, flow, complex = False, indent = None):
return {
"id": _STATE.KEY,
"key": key,
"indent": indent if not flow else None,
"flow": flow,
"complex": complex,
}
def _new_SEQUENCE(index, flow, indent = None):
return {
"id": _STATE.SEQUENCE,
"indent": indent if not flow else None,
"index": index,
"flow": flow,
}
def _normalize_yaml(yaml):
yaml = yaml.replace("\r", "")
return yaml
def _initialize_result_value(starlark, stack):
"Initialize empty starlark maps or list values for the current pseudostates in the stack"
kns_states = _get_key_and_sequence_states(stack)
if len(kns_states) == 0:
return
else:
if starlark["result"] == None:
starlark["result"] = _empty_value_for_state(kns_states[0])
curr_result = starlark["result"]
for (i, state) in enumerate(kns_states[0:-1]):
if type(curr_result) == "dict":
curr_result = curr_result.setdefault(state["key"], _empty_value_for_state(kns_states[i + 1]))
elif type(curr_result) == "list":
if not "index" in state:
fail("Invalid yaml state under {}".format(_stack_path(stack)))
if state["index"] >= len(curr_result):
curr_result.append(_empty_value_for_state(kns_states[i + 1]))
curr_result = curr_result[state["index"]]
else:
starlark["error"] = "Unknown result state: " + curr_result
return
def _stack_path(stack):
p = []
for s in stack:
p.append(s["key"])
return p
def _set_result_value(starlark, stack, value):
"Add a new value to the starlark result corresponding to the last pseudostate in the stack"
kns_states = _get_key_and_sequence_states(stack)
if len(kns_states) == 0:
starlark["result"] = value
else:
curr_result = starlark["result"]
for state in kns_states[0:-1]:
if type(curr_result) == "dict":
curr_result = curr_result[state["key"]]
else:
curr_result = curr_result[state["index"]]
if type(curr_result) == "dict":
curr_result[kns_states[-1]["key"]] = value
elif type(curr_result) == "list":
curr_result.append(value)
else:
starlark["error"] = "Unknown result state: " + curr_result + " for value " + value
return
def _empty_value_for_state(state):
if state["id"] == _STATE.KEY:
return {}
elif state["id"] == _STATE.SEQUENCE:
return []
else:
msg = "State {} has no empty type".format(state["id"])
fail(msg)
def _peek(stack, expected = False):
top = stack[-1]
if expected and top and top["id"] != expected:
fail("Expected state {} but got {}".format(expected, top["id"]))
return top
def _parse_scalar(value):
value = value.strip()
if _is_int(value):
return int(value)
elif _is_float(value):
return float(value)
elif _is_bool(value):
return _to_bool(value)
elif value.startswith("'"):
return value.strip("'")
else:
return value.strip("\"")
def _finalize_multiline_string(value, keep, strip):
if keep and strip:
fail("Error: a multiline string cannot both keep and strip trailing newlines. This is probably a bug.")
if not keep and not strip:
value = value.rstrip(" \t\n") + "\n"
elif strip:
value = value.rstrip(" \t\n")
return value
def _pop(stack, *types):
for t in types:
if _peek(stack)["id"] != t:
fail("Expected state {} but found {}".format(t, _peek(stack)["id"]))
stack.pop()
def _is_float(value):
return value.replace(".", "", 1).isdigit()
def _is_int(value):
return value.isdigit()
def _is_bool(value):
return value == "true" or value == "false"
def _to_bool(value):
if value == "true":
return True
elif value == "false":
return False
msg = "Cannot convert scalar {} to a starlark boolean".format(value)
fail(msg)
def _parse_key(key):
if key.startswith("'"):
return key.strip("'")
elif key.startswith("\""):
return key.strip("\"")
return key
def _get_key_and_sequence_states(stack):
return [state for state in stack if state["id"] in [_STATE.KEY, _STATE.SEQUENCE]]
def _in_flow(stack):
kns_states = _get_key_and_sequence_states(stack)
return len(kns_states) > 0 and kns_states[-1]["id"] in [_STATE.SEQUENCE, _STATE.KEY] and kns_states[-1]["flow"]
def _in_flow_sequence(stack):
kns_states = _get_key_and_sequence_states(stack)
return len(kns_states) > 0 and kns_states[-1]["id"] == _STATE.SEQUENCE and kns_states[-1]["flow"]
def _in_flow_map(stack):
kns_states = _get_key_and_sequence_states(stack)
return len(kns_states) > 0 and kns_states[-1]["id"] == _STATE.KEY and kns_states[-1]["flow"]
def _pop_higher_indented_states(stack, indent):
remove = []
for state in stack:
if state["id"] in [_STATE.KEY, _STATE.SEQUENCE] and len(state["indent"]) > len(indent):
remove.append(state)
for state in remove:
stack.remove(state)