Fixed 'Register as a student' link problem for GHOP program.
Params and new_params in student view are also sub-merged now, so the apply pattern may be used by GHOP student view.
import yamldef get_version_string(): return yaml_get_version_string()def get_version(): cdef int major, minor, patch yaml_get_version(&major, &minor, &patch) return (major, minor, patch)#Mark = yaml.error.MarkYAMLError = yaml.error.YAMLErrorReaderError = yaml.reader.ReaderErrorScannerError = yaml.scanner.ScannerErrorParserError = yaml.parser.ParserErrorComposerError = yaml.composer.ComposerErrorConstructorError = yaml.constructor.ConstructorErrorEmitterError = yaml.emitter.EmitterErrorSerializerError = yaml.serializer.SerializerErrorRepresenterError = yaml.representer.RepresenterErrorStreamStartToken = yaml.tokens.StreamStartTokenStreamEndToken = yaml.tokens.StreamEndTokenDirectiveToken = yaml.tokens.DirectiveTokenDocumentStartToken = yaml.tokens.DocumentStartTokenDocumentEndToken = yaml.tokens.DocumentEndTokenBlockSequenceStartToken = yaml.tokens.BlockSequenceStartTokenBlockMappingStartToken = yaml.tokens.BlockMappingStartTokenBlockEndToken = yaml.tokens.BlockEndTokenFlowSequenceStartToken = yaml.tokens.FlowSequenceStartTokenFlowMappingStartToken = yaml.tokens.FlowMappingStartTokenFlowSequenceEndToken = yaml.tokens.FlowSequenceEndTokenFlowMappingEndToken = yaml.tokens.FlowMappingEndTokenKeyToken = yaml.tokens.KeyTokenValueToken = yaml.tokens.ValueTokenBlockEntryToken = yaml.tokens.BlockEntryTokenFlowEntryToken = yaml.tokens.FlowEntryTokenAliasToken = yaml.tokens.AliasTokenAnchorToken = yaml.tokens.AnchorTokenTagToken = yaml.tokens.TagTokenScalarToken = yaml.tokens.ScalarTokenStreamStartEvent = yaml.events.StreamStartEventStreamEndEvent = yaml.events.StreamEndEventDocumentStartEvent = yaml.events.DocumentStartEventDocumentEndEvent = yaml.events.DocumentEndEventAliasEvent = yaml.events.AliasEventScalarEvent = yaml.events.ScalarEventSequenceStartEvent = yaml.events.SequenceStartEventSequenceEndEvent = yaml.events.SequenceEndEventMappingStartEvent = yaml.events.MappingStartEventMappingEndEvent = yaml.events.MappingEndEventScalarNode = yaml.nodes.ScalarNodeSequenceNode = yaml.nodes.SequenceNodeMappingNode = yaml.nodes.MappingNodecdef class Mark: cdef readonly object name cdef readonly int index cdef readonly int line cdef readonly int column cdef readonly buffer cdef readonly pointer def __init__(self, object name, int index, int line, int column, object buffer, object pointer): self.name = name self.index = index self.line = line self.column = column self.buffer = buffer self.pointer = pointer def get_snippet(self): return None def __str__(self): where = " in \"%s\", line %d, column %d" \ % (self.name, self.line+1, self.column+1) return where#class YAMLError(Exception):# pass##class MarkedYAMLError(YAMLError):## def __init__(self, context=None, context_mark=None,# problem=None, problem_mark=None, note=None):# self.context = context# self.context_mark = context_mark# self.problem = problem# self.problem_mark = problem_mark# self.note = note## def __str__(self):# lines = []# if self.context is not None:# lines.append(self.context)# if self.context_mark is not None \# and (self.problem is None or self.problem_mark is None# or self.context_mark.name != self.problem_mark.name# or self.context_mark.line != self.problem_mark.line# or self.context_mark.column != self.problem_mark.column):# lines.append(str(self.context_mark))# if self.problem is not None:# lines.append(self.problem)# if self.problem_mark is not None:# lines.append(str(self.problem_mark))# if self.note is not None:# lines.append(self.note)# return '\n'.join(lines)##class ReaderError(YAMLError):## def __init__(self, name, position, character, encoding, reason):# self.name = name# self.character = character# self.position = position# self.encoding = encoding# self.reason = reason## def __str__(self):# if isinstance(self.character, str):# return "'%s' codec can't decode byte #x%02x: %s\n" \# " in \"%s\", position %d" \# % (self.encoding, ord(self.character), self.reason,# self.name, self.position)# else:# return "unacceptable character #x%04x: %s\n" \# " in \"%s\", position %d" \# % (ord(self.character), self.reason,# self.name, self.position)##class ScannerError(MarkedYAMLError):# pass##class ParserError(MarkedYAMLError):# pass##class EmitterError(YAMLError):# pass##cdef class Token:# cdef readonly Mark start_mark# cdef readonly Mark end_mark# def __init__(self, Mark start_mark, Mark end_mark):# self.start_mark = start_mark# self.end_mark = end_mark##cdef class StreamStartToken(Token):# cdef readonly object encoding# def __init__(self, Mark start_mark, Mark end_mark, encoding):# self.start_mark = start_mark# self.end_mark = end_mark# self.encoding = encoding##cdef class StreamEndToken(Token):# pass##cdef class DirectiveToken(Token):# cdef readonly object name# cdef readonly object value# def __init__(self, name, value, Mark start_mark, Mark end_mark):# self.name = name# self.value = value# self.start_mark = start_mark# self.end_mark = end_mark##cdef class DocumentStartToken(Token):# pass##cdef class DocumentEndToken(Token):# pass##cdef class BlockSequenceStartToken(Token):# pass##cdef class BlockMappingStartToken(Token):# pass##cdef class BlockEndToken(Token):# pass##cdef class FlowSequenceStartToken(Token):# pass##cdef class FlowMappingStartToken(Token):# pass##cdef class FlowSequenceEndToken(Token):# pass##cdef class FlowMappingEndToken(Token):# pass##cdef class KeyToken(Token):# pass##cdef class ValueToken(Token):# pass##cdef class BlockEntryToken(Token):# pass##cdef class FlowEntryToken(Token):# pass##cdef class AliasToken(Token):# cdef readonly object value# def __init__(self, value, Mark start_mark, Mark end_mark):# self.value = value# self.start_mark = start_mark# self.end_mark = end_mark##cdef class AnchorToken(Token):# cdef readonly object value# def __init__(self, value, Mark start_mark, Mark end_mark):# self.value = value# self.start_mark = start_mark# self.end_mark = end_mark##cdef class TagToken(Token):# cdef readonly object value# def __init__(self, value, Mark start_mark, Mark end_mark):# self.value = value# self.start_mark = start_mark# self.end_mark = end_mark##cdef class ScalarToken(Token):# cdef readonly object value# cdef readonly object plain# cdef readonly object style# def __init__(self, value, plain, Mark start_mark, Mark end_mark, style=None):# self.value = value# self.plain = plain# self.start_mark = start_mark# self.end_mark = end_mark# self.style = stylecdef class CParser: cdef yaml_parser_t parser cdef yaml_event_t parsed_event cdef object stream cdef object stream_name cdef object current_token cdef object current_event cdef object anchors def __init__(self, stream): if yaml_parser_initialize(&self.parser) == 0: raise MemoryError self.parsed_event.type = YAML_NO_EVENT if hasattr(stream, 'read'): self.stream = stream try: self.stream_name = stream.name except AttributeError: self.stream_name = '<file>' yaml_parser_set_input(&self.parser, input_handler, <void *>self) else: if PyUnicode_CheckExact(stream) != 0: stream = PyUnicode_AsUTF8String(stream) self.stream_name = '<unicode string>' else: self.stream_name = '<string>' if PyString_CheckExact(stream) == 0: raise TypeError("a string or stream input is required") self.stream = stream yaml_parser_set_input_string(&self.parser, PyString_AS_STRING(stream), PyString_GET_SIZE(stream)) self.current_token = None self.current_event = None self.anchors = {} def __dealloc__(self): yaml_parser_delete(&self.parser) yaml_event_delete(&self.parsed_event) cdef object _parser_error(self): if self.parser.error == YAML_MEMORY_ERROR: raise MemoryError elif self.parser.error == YAML_READER_ERROR: raise ReaderError(self.stream_name, self.parser.problem_offset, self.parser.problem_value, '?', self.parser.problem) elif self.parser.error == YAML_SCANNER_ERROR \ or self.parser.error == YAML_PARSER_ERROR: context_mark = None problem_mark = None if self.parser.context != NULL: context_mark = Mark(self.stream_name, self.parser.context_mark.index, self.parser.context_mark.line, self.parser.context_mark.column, None, None) if self.parser.problem != NULL: problem_mark = Mark(self.stream_name, self.parser.problem_mark.index, self.parser.problem_mark.line, self.parser.problem_mark.column, None, None) if self.parser.error == YAML_SCANNER_ERROR: if self.parser.context != NULL: return ScannerError(self.parser.context, context_mark, self.parser.problem, problem_mark) else: return ScannerError(None, None, self.parser.problem, problem_mark) else: if self.parser.context != NULL: return ParserError(self.parser.context, context_mark, self.parser.problem, problem_mark) else: return ParserError(None, None, self.parser.problem, problem_mark) raise ValueError("no parser error") def raw_scan(self): cdef yaml_token_t token cdef int done cdef int count count = 0 done = 0 while done == 0: if yaml_parser_scan(&self.parser, &token) == 0: error = self._parser_error() raise error if token.type == YAML_NO_TOKEN: done = 1 else: count = count+1 yaml_token_delete(&token) return count cdef object _scan(self): cdef yaml_token_t token if yaml_parser_scan(&self.parser, &token) == 0: error = self._parser_error() raise error token_object = self._token_to_object(&token) yaml_token_delete(&token) return token_object cdef object _token_to_object(self, yaml_token_t *token): start_mark = Mark(self.stream_name, token.start_mark.index, token.start_mark.line, token.start_mark.column, None, None) end_mark = Mark(self.stream_name, token.end_mark.index, token.end_mark.line, token.end_mark.column, None, None) if token.type == YAML_NO_TOKEN: return None elif token.type == YAML_STREAM_START_TOKEN: encoding = None if token.data.stream_start.encoding == YAML_UTF8_ENCODING: encoding = "utf-8" elif token.data.stream_start.encoding == YAML_UTF16LE_ENCODING: encoding = "utf-16-le" elif token.data.stream_start.encoding == YAML_UTF16BE_ENCODING: encoding = "utf-16-be" return StreamStartToken(start_mark, end_mark, encoding) elif token.type == YAML_STREAM_END_TOKEN: return StreamEndToken(start_mark, end_mark) elif token.type == YAML_VERSION_DIRECTIVE_TOKEN: return DirectiveToken("YAML", (token.data.version_directive.major, token.data.version_directive.minor), start_mark, end_mark) elif token.type == YAML_TAG_DIRECTIVE_TOKEN: return DirectiveToken("TAG", (token.data.tag_directive.handle, token.data.tag_directive.prefix), start_mark, end_mark) elif token.type == YAML_DOCUMENT_START_TOKEN: return DocumentStartToken(start_mark, end_mark) elif token.type == YAML_DOCUMENT_END_TOKEN: return DocumentEndToken(start_mark, end_mark) elif token.type == YAML_BLOCK_SEQUENCE_START_TOKEN: return BlockSequenceStartToken(start_mark, end_mark) elif token.type == YAML_BLOCK_MAPPING_START_TOKEN: return BlockMappingStartToken(start_mark, end_mark) elif token.type == YAML_BLOCK_END_TOKEN: return BlockEndToken(start_mark, end_mark) elif token.type == YAML_FLOW_SEQUENCE_START_TOKEN: return FlowSequenceStartToken(start_mark, end_mark) elif token.type == YAML_FLOW_SEQUENCE_END_TOKEN: return FlowSequenceEndToken(start_mark, end_mark) elif token.type == YAML_FLOW_MAPPING_START_TOKEN: return FlowMappingStartToken(start_mark, end_mark) elif token.type == YAML_FLOW_MAPPING_END_TOKEN: return FlowMappingEndToken(start_mark, end_mark) elif token.type == YAML_BLOCK_ENTRY_TOKEN: return BlockEntryToken(start_mark, end_mark) elif token.type == YAML_FLOW_ENTRY_TOKEN: return FlowEntryToken(start_mark, end_mark) elif token.type == YAML_KEY_TOKEN: return KeyToken(start_mark, end_mark) elif token.type == YAML_VALUE_TOKEN: return ValueToken(start_mark, end_mark) elif token.type == YAML_ALIAS_TOKEN: value = PyUnicode_DecodeUTF8(token.data.alias.value, strlen(token.data.alias.value), 'strict') return AliasToken(value, start_mark, end_mark) elif token.type == YAML_ANCHOR_TOKEN: value = PyUnicode_DecodeUTF8(token.data.anchor.value, strlen(token.data.anchor.value), 'strict') return AnchorToken(value, start_mark, end_mark) elif token.type == YAML_TAG_TOKEN: handle = PyUnicode_DecodeUTF8(token.data.tag.handle, strlen(token.data.tag.handle), 'strict') suffix = PyUnicode_DecodeUTF8(token.data.tag.suffix, strlen(token.data.tag.suffix), 'strict') if not handle: handle = None return TagToken((handle, suffix), start_mark, end_mark) elif token.type == YAML_SCALAR_TOKEN: value = PyUnicode_DecodeUTF8(token.data.scalar.value, token.data.scalar.length, 'strict') plain = False style = None if token.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: plain = True style = '' elif token.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif token.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif token.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif token.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' return ScalarToken(value, plain, start_mark, end_mark, style) else: raise ValueError("unknown token type") def get_token(self): if self.current_token is not None: value = self.current_token self.current_token = None else: value = self._scan() return value def peek_token(self): if self.current_token is None: self.current_token = self._scan() return self.current_token def check_token(self, *choices): if self.current_token is None: self.current_token = self._scan() if self.current_token is None: return False if not choices: return True token_class = self.current_token.__class__ for choice in choices: if token_class is choice: return True return False def raw_parse(self): cdef yaml_event_t event cdef int done cdef int count count = 0 done = 0 while done == 0: if yaml_parser_parse(&self.parser, &event) == 0: error = self._parser_error() raise error if event.type == YAML_NO_EVENT: done = 1 else: count = count+1 yaml_event_delete(&event) return count cdef object _parse(self): cdef yaml_event_t event if yaml_parser_parse(&self.parser, &event) == 0: error = self._parser_error() raise error event_object = self._event_to_object(&event) yaml_event_delete(&event) return event_object cdef object _event_to_object(self, yaml_event_t *event): cdef yaml_tag_directive_t *tag_directive start_mark = Mark(self.stream_name, event.start_mark.index, event.start_mark.line, event.start_mark.column, None, None) end_mark = Mark(self.stream_name, event.end_mark.index, event.end_mark.line, event.end_mark.column, None, None) if event.type == YAML_NO_EVENT: return None elif event.type == YAML_STREAM_START_EVENT: encoding = None if event.data.stream_start.encoding == YAML_UTF8_ENCODING: encoding = "utf-8" elif event.data.stream_start.encoding == YAML_UTF16LE_ENCODING: encoding = "utf-16-le" elif event.data.stream_start.encoding == YAML_UTF16BE_ENCODING: encoding = "utf-16-be" return StreamStartEvent(start_mark, end_mark, encoding) elif event.type == YAML_STREAM_END_EVENT: return StreamEndEvent(start_mark, end_mark) elif event.type == YAML_DOCUMENT_START_EVENT: explicit = False if event.data.document_start.implicit == 0: explicit = True version = None if event.data.document_start.version_directive != NULL: version = (event.data.document_start.version_directive.major, event.data.document_start.version_directive.minor) tags = None if event.data.document_start.tag_directives.start != NULL: tags = {} tag_directive = event.data.document_start.tag_directives.start while tag_directive != event.data.document_start.tag_directives.end: handle = PyUnicode_DecodeUTF8(tag_directive.handle, strlen(tag_directive.handle), 'strict') prefix = PyUnicode_DecodeUTF8(tag_directive.prefix, strlen(tag_directive.prefix), 'strict') tags[handle] = prefix tag_directive = tag_directive+1 return DocumentStartEvent(start_mark, end_mark, explicit, version, tags) elif event.type == YAML_DOCUMENT_END_EVENT: explicit = False if event.data.document_end.implicit == 0: explicit = True return DocumentEndEvent(start_mark, end_mark, explicit) elif event.type == YAML_ALIAS_EVENT: anchor = PyUnicode_DecodeUTF8(event.data.alias.anchor, strlen(event.data.alias.anchor), 'strict') return AliasEvent(anchor, start_mark, end_mark) elif event.type == YAML_SCALAR_EVENT: anchor = None if event.data.scalar.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.scalar.anchor, strlen(event.data.scalar.anchor), 'strict') tag = None if event.data.scalar.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.scalar.tag, strlen(event.data.scalar.tag), 'strict') value = PyUnicode_DecodeUTF8(event.data.scalar.value, event.data.scalar.length, 'strict') plain_implicit = False if event.data.scalar.plain_implicit == 1: plain_implicit = True quoted_implicit = False if event.data.scalar.quoted_implicit == 1: quoted_implicit = True style = None if event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: style = '' elif event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' return ScalarEvent(anchor, tag, (plain_implicit, quoted_implicit), value, start_mark, end_mark, style) elif event.type == YAML_SEQUENCE_START_EVENT: anchor = None if event.data.sequence_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.sequence_start.anchor, strlen(event.data.sequence_start.anchor), 'strict') tag = None if event.data.sequence_start.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.sequence_start.tag, strlen(event.data.sequence_start.tag), 'strict') implicit = False if event.data.sequence_start.implicit == 1: implicit = True flow_style = None if event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif event.data.sequence_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False return SequenceStartEvent(anchor, tag, implicit, start_mark, end_mark, flow_style) elif event.type == YAML_MAPPING_START_EVENT: anchor = None if event.data.mapping_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(event.data.mapping_start.anchor, strlen(event.data.mapping_start.anchor), 'strict') tag = None if event.data.mapping_start.tag != NULL: tag = PyUnicode_DecodeUTF8(event.data.mapping_start.tag, strlen(event.data.mapping_start.tag), 'strict') implicit = False if event.data.mapping_start.implicit == 1: implicit = True flow_style = None if event.data.mapping_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif event.data.mapping_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False return MappingStartEvent(anchor, tag, implicit, start_mark, end_mark, flow_style) elif event.type == YAML_SEQUENCE_END_EVENT: return SequenceEndEvent(start_mark, end_mark) elif event.type == YAML_MAPPING_END_EVENT: return MappingEndEvent(start_mark, end_mark) else: raise ValueError("unknown token type") def get_event(self): if self.current_event is not None: value = self.current_event self.current_event = None else: value = self._parse() return value def peek_event(self): if self.current_event is None: self.current_event = self._parse() return self.current_event def check_event(self, *choices): if self.current_event is None: self.current_event = self._parse() if self.current_event is None: return False if not choices: return True event_class = self.current_event.__class__ for choice in choices: if event_class is choice: return True return False def check_node(self): self._parse_next_event() if self.parsed_event.type == YAML_STREAM_START_EVENT: yaml_event_delete(&self.parsed_event) self._parse_next_event() if self.parsed_event.type != YAML_STREAM_END_EVENT: return True return False def get_node(self): self._parse_next_event() if self.parsed_event.type != YAML_STREAM_END_EVENT: return self._compose_document() cdef object _compose_document(self): yaml_event_delete(&self.parsed_event) node = self._compose_node(None, None) self._parse_next_event() yaml_event_delete(&self.parsed_event) self.anchors = {} return node cdef object _compose_node(self, object parent, object index): self._parse_next_event() if self.parsed_event.type == YAML_ALIAS_EVENT: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.alias.anchor, strlen(self.parsed_event.data.alias.anchor), 'strict') if anchor not in self.anchors: mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) raise ComposerError(None, None, "found undefined alias", mark) yaml_event_delete(&self.parsed_event) return self.anchors[anchor] anchor = None if self.parsed_event.type == YAML_SCALAR_EVENT \ and self.parsed_event.data.scalar.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.anchor, strlen(self.parsed_event.data.scalar.anchor), 'strict') elif self.parsed_event.type == YAML_SEQUENCE_START_EVENT \ and self.parsed_event.data.sequence_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.anchor, strlen(self.parsed_event.data.sequence_start.anchor), 'strict') elif self.parsed_event.type == YAML_MAPPING_START_EVENT \ and self.parsed_event.data.mapping_start.anchor != NULL: anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.anchor, strlen(self.parsed_event.data.mapping_start.anchor), 'strict') if anchor is not None: if anchor in self.anchors: mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) raise ComposerError("found duplicate anchor; first occurence", self.anchors[anchor].start_mark, "second occurence", mark) self.descend_resolver(parent, index) if self.parsed_event.type == YAML_SCALAR_EVENT: node = self._compose_scalar_node(anchor) elif self.parsed_event.type == YAML_SEQUENCE_START_EVENT: node = self._compose_sequence_node(anchor) elif self.parsed_event.type == YAML_MAPPING_START_EVENT: node = self._compose_mapping_node(anchor) self.ascend_resolver() return node cdef _compose_scalar_node(self, object anchor): start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) value = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.value, self.parsed_event.data.scalar.length, 'strict') plain_implicit = False if self.parsed_event.data.scalar.plain_implicit == 1: plain_implicit = True quoted_implicit = False if self.parsed_event.data.scalar.quoted_implicit == 1: quoted_implicit = True if self.parsed_event.data.scalar.tag == NULL \ or (self.parsed_event.data.scalar.tag[0] == c'!' and self.parsed_event.data.scalar.tag[1] == c'\0'): tag = self.resolve(ScalarNode, value, (plain_implicit, quoted_implicit)) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.tag, strlen(self.parsed_event.data.scalar.tag), 'strict') style = None if self.parsed_event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE: style = '' elif self.parsed_event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE: style = '\'' elif self.parsed_event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE: style = '"' elif self.parsed_event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE: style = '|' elif self.parsed_event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE: style = '>' node = ScalarNode(tag, value, start_mark, end_mark, style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) return node cdef _compose_sequence_node(self, object anchor): cdef int index start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) implicit = False if self.parsed_event.data.sequence_start.implicit == 1: implicit = True if self.parsed_event.data.sequence_start.tag == NULL \ or (self.parsed_event.data.sequence_start.tag[0] == c'!' and self.parsed_event.data.sequence_start.tag[1] == c'\0'): tag = self.resolve(SequenceNode, None, implicit) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.tag, strlen(self.parsed_event.data.sequence_start.tag), 'strict') flow_style = None if self.parsed_event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE: flow_style = True elif self.parsed_event.data.sequence_start.style == YAML_BLOCK_SEQUENCE_STYLE: flow_style = False value = [] node = SequenceNode(tag, value, start_mark, None, flow_style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) index = 0 self._parse_next_event() while self.parsed_event.type != YAML_SEQUENCE_END_EVENT: value.append(self._compose_node(node, index)) index = index+1 self._parse_next_event() node.end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) yaml_event_delete(&self.parsed_event) return node cdef _compose_mapping_node(self, object anchor): start_mark = Mark(self.stream_name, self.parsed_event.start_mark.index, self.parsed_event.start_mark.line, self.parsed_event.start_mark.column, None, None) implicit = False if self.parsed_event.data.mapping_start.implicit == 1: implicit = True if self.parsed_event.data.mapping_start.tag == NULL \ or (self.parsed_event.data.mapping_start.tag[0] == c'!' and self.parsed_event.data.mapping_start.tag[1] == c'\0'): tag = self.resolve(MappingNode, None, implicit) else: tag = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.tag, strlen(self.parsed_event.data.mapping_start.tag), 'strict') flow_style = None if self.parsed_event.data.mapping_start.style == YAML_FLOW_MAPPING_STYLE: flow_style = True elif self.parsed_event.data.mapping_start.style == YAML_BLOCK_MAPPING_STYLE: flow_style = False value = [] node = MappingNode(tag, value, start_mark, None, flow_style) if anchor is not None: self.anchors[anchor] = node yaml_event_delete(&self.parsed_event) self._parse_next_event() while self.parsed_event.type != YAML_MAPPING_END_EVENT: item_key = self._compose_node(node, None) item_value = self._compose_node(node, item_key) value.append((item_key, item_value)) self._parse_next_event() node.end_mark = Mark(self.stream_name, self.parsed_event.end_mark.index, self.parsed_event.end_mark.line, self.parsed_event.end_mark.column, None, None) yaml_event_delete(&self.parsed_event) return node cdef int _parse_next_event(self) except 0: if self.parsed_event.type == YAML_NO_EVENT: if yaml_parser_parse(&self.parser, &self.parsed_event) == 0: error = self._parser_error() raise error return 1cdef int input_handler(void *data, char *buffer, int size, int *read) except 0: cdef CParser parser parser = <CParser>data value = parser.stream.read(size) if PyString_CheckExact(value) == 0: raise TypeError("a string value is expected") if PyString_GET_SIZE(value) > size: raise ValueError("a string value it too long") memcpy(buffer, PyString_AS_STRING(value), PyString_GET_SIZE(value)) read[0] = PyString_GET_SIZE(value) return 1cdef class CEmitter: cdef yaml_emitter_t emitter cdef object stream cdef yaml_encoding_t use_encoding cdef int document_start_implicit cdef int document_end_implicit cdef object use_version cdef object use_tags cdef object serialized_nodes cdef object anchors cdef int last_alias_id cdef int closed def __init__(self, stream, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None, encoding=None, explicit_start=None, explicit_end=None, version=None, tags=None): if yaml_emitter_initialize(&self.emitter) == 0: raise MemoryError self.stream = stream yaml_emitter_set_output(&self.emitter, output_handler, <void *>self) if canonical is not None: yaml_emitter_set_canonical(&self.emitter, 1) if indent is not None: yaml_emitter_set_indent(&self.emitter, indent) if width is not None: yaml_emitter_set_width(&self.emitter, width) if allow_unicode is not None: yaml_emitter_set_unicode(&self.emitter, 1) if line_break is not None: if line_break == '\r': yaml_emitter_set_break(&self.emitter, YAML_CR_BREAK) elif line_break == '\n': yaml_emitter_set_break(&self.emitter, YAML_LN_BREAK) elif line_break == '\r\n': yaml_emitter_set_break(&self.emitter, YAML_CRLN_BREAK) if encoding == 'utf-16-le': self.use_encoding = YAML_UTF16LE_ENCODING elif encoding == 'utf-16-be': self.use_encoding = YAML_UTF16BE_ENCODING else: self.use_encoding = YAML_UTF8_ENCODING self.document_start_implicit = 1 if explicit_start: self.document_start_implicit = 0 self.document_end_implicit = 1 if explicit_end: self.document_end_implicit = 0 self.use_version = version self.use_tags = tags self.serialized_nodes = {} self.anchors = {} self.last_alias_id = 0 self.closed = -1 def __dealloc__(self): yaml_emitter_delete(&self.emitter) cdef object _emitter_error(self): if self.emitter.error == YAML_MEMORY_ERROR: return MemoryError elif self.emitter.error == YAML_EMITTER_ERROR: return EmitterError(self.emitter.problem) raise ValueError("no emitter error") cdef int _object_to_event(self, object event_object, yaml_event_t *event) except 0: cdef yaml_encoding_t encoding cdef yaml_version_directive_t version_directive_value cdef yaml_version_directive_t *version_directive cdef yaml_tag_directive_t tag_directives_value[128] cdef yaml_tag_directive_t *tag_directives_start cdef yaml_tag_directive_t *tag_directives_end cdef int implicit cdef int plain_implicit cdef int quoted_implicit cdef char *anchor cdef char *tag cdef char *value cdef int length cdef yaml_scalar_style_t scalar_style cdef yaml_sequence_style_t sequence_style cdef yaml_mapping_style_t mapping_style event_class = event_object.__class__ if event_class is StreamStartEvent: encoding = YAML_UTF8_ENCODING if event_object.encoding == 'utf-16-le': encoding = YAML_UTF16LE_ENCODING elif event_object.encoding == 'utf-16-be': encoding = YAML_UTF16BE_ENCODING yaml_stream_start_event_initialize(event, encoding) elif event_class is StreamEndEvent: yaml_stream_end_event_initialize(event) elif event_class is DocumentStartEvent: version_directive = NULL if event_object.version: version_directive_value.major = event_object.version[0] version_directive_value.minor = event_object.version[1] version_directive = &version_directive_value tag_directives_start = NULL tag_directives_end = NULL if event_object.tags: if len(event_object.tags) > 128: raise ValueError("too many tags") tag_directives_start = tag_directives_value tag_directives_end = tag_directives_value cache = [] for handle in event_object.tags: prefix = event_object.tags[handle] if PyUnicode_CheckExact(handle): handle = PyUnicode_AsUTF8String(handle) cache.append(handle) if not PyString_CheckExact(handle): raise TypeError("tag handle must be a string") tag_directives_end.handle = PyString_AS_STRING(handle) if PyUnicode_CheckExact(prefix): prefix = PyUnicode_AsUTF8String(prefix) cache.append(prefix) if not PyString_CheckExact(prefix): raise TypeError("tag prefix must be a string") tag_directives_end.prefix = PyString_AS_STRING(prefix) tag_directives_end = tag_directives_end+1 implicit = 1 if event_object.explicit: implicit = 0 if yaml_document_start_event_initialize(event, version_directive, tag_directives_start, tag_directives_end, implicit) == 0: raise MemoryError elif event_class is DocumentEndEvent: implicit = 1 if event_object.explicit: implicit = 0 yaml_document_end_event_initialize(event, implicit) elif event_class is AliasEvent: anchor = NULL anchor_object = event_object.anchor if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) if yaml_alias_event_initialize(event, anchor) == 0: raise MemoryError elif event_class is ScalarEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) value_object = event_object.value if PyUnicode_CheckExact(value_object): value_object = PyUnicode_AsUTF8String(value_object) if not PyString_CheckExact(value_object): raise TypeError("value must be a string") value = PyString_AS_STRING(value_object) length = PyString_GET_SIZE(value_object) plain_implicit = 0 quoted_implicit = 0 if event_object.implicit is not None: plain_implicit = event_object.implicit[0] quoted_implicit = event_object.implicit[1] style_object = event_object.style scalar_style = YAML_PLAIN_SCALAR_STYLE if style_object == "'": scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE elif style_object == "\"": scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE elif style_object == "|": scalar_style = YAML_LITERAL_SCALAR_STYLE elif style_object == ">": scalar_style = YAML_FOLDED_SCALAR_STYLE if yaml_scalar_event_initialize(event, anchor, tag, value, length, plain_implicit, quoted_implicit, scalar_style) == 0: raise MemoryError elif event_class is SequenceStartEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) implicit = 0 if event_object.implicit: implicit = 1 sequence_style = YAML_BLOCK_SEQUENCE_STYLE if event_object.flow_style: sequence_style = YAML_FLOW_SEQUENCE_STYLE if yaml_sequence_start_event_initialize(event, anchor, tag, implicit, sequence_style) == 0: raise MemoryError elif event_class is MappingStartEvent: anchor = NULL anchor_object = event_object.anchor if anchor_object is not None: if PyUnicode_CheckExact(anchor_object): anchor_object = PyUnicode_AsUTF8String(anchor_object) if not PyString_CheckExact(anchor_object): raise TypeError("anchor must be a string") anchor = PyString_AS_STRING(anchor_object) tag = NULL tag_object = event_object.tag if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) implicit = 0 if event_object.implicit: implicit = 1 mapping_style = YAML_BLOCK_MAPPING_STYLE if event_object.flow_style: mapping_style = YAML_FLOW_MAPPING_STYLE if yaml_mapping_start_event_initialize(event, anchor, tag, implicit, mapping_style) == 0: raise MemoryError elif event_class is SequenceEndEvent: yaml_sequence_end_event_initialize(event) elif event_class is MappingEndEvent: yaml_mapping_end_event_initialize(event) else: raise TypeError("invalid event %s" % event_object) return 1 def emit(self, event_object): cdef yaml_event_t event self._object_to_event(event_object, &event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error def open(self): cdef yaml_event_t event if self.closed == -1: yaml_stream_start_event_initialize(&event, self.use_encoding) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.closed = 0 elif self.closed == 1: raise SerializerError("serializer is closed") else: raise SerializerError("serializer is already opened") def close(self): cdef yaml_event_t event if self.closed == -1: raise SerializerError("serializer is not opened") elif self.closed == 0: yaml_stream_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.closed = 1 def serialize(self, node): cdef yaml_event_t event cdef yaml_version_directive_t version_directive_value cdef yaml_version_directive_t *version_directive cdef yaml_tag_directive_t tag_directives_value[128] cdef yaml_tag_directive_t *tag_directives_start cdef yaml_tag_directive_t *tag_directives_end if self.closed == -1: raise SerializerError("serializer is not opened") elif self.closed == 1: raise SerializerError("serializer is closed") cache = [] version_directive = NULL if self.use_version: version_directive_value.major = self.use_version[0] version_directive_value.minor = self.use_version[1] version_directive = &version_directive_value tag_directives_start = NULL tag_directives_end = NULL if self.use_tags: if len(self.use_tags) > 128: raise ValueError("too many tags") tag_directives_start = tag_directives_value tag_directives_end = tag_directives_value for handle in self.use_tags: prefix = self.use_tags[handle] if PyUnicode_CheckExact(handle): handle = PyUnicode_AsUTF8String(handle) cache.append(handle) if not PyString_CheckExact(handle): raise TypeError("tag handle must be a string") tag_directives_end.handle = PyString_AS_STRING(handle) if PyUnicode_CheckExact(prefix): prefix = PyUnicode_AsUTF8String(prefix) cache.append(prefix) if not PyString_CheckExact(prefix): raise TypeError("tag prefix must be a string") tag_directives_end.prefix = PyString_AS_STRING(prefix) tag_directives_end = tag_directives_end+1 if yaml_document_start_event_initialize(&event, version_directive, tag_directives_start, tag_directives_end, self.document_start_implicit) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self._anchor_node(node) self._serialize_node(node, None, None) yaml_document_end_event_initialize(&event, self.document_end_implicit) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error self.serialized_nodes = {} self.anchors = {} self.last_alias_id = 0 cdef int _anchor_node(self, object node) except 0: if node in self.anchors: if self.anchors[node] is None: self.last_alias_id = self.last_alias_id+1 self.anchors[node] = "id%03d" % self.last_alias_id else: self.anchors[node] = None node_class = node.__class__ if node_class is SequenceNode: for item in node.value: self._anchor_node(item) elif node_class is MappingNode: for key, value in node.value: self._anchor_node(key) self._anchor_node(value) return 1 cdef int _serialize_node(self, object node, object parent, object index) except 0: cdef yaml_event_t event cdef int implicit cdef int plain_implicit cdef int quoted_implicit cdef char *anchor cdef char *tag cdef char *value cdef int length cdef int item_index cdef yaml_scalar_style_t scalar_style cdef yaml_sequence_style_t sequence_style cdef yaml_mapping_style_t mapping_style anchor_object = self.anchors[node] anchor = NULL if anchor_object is not None: anchor = PyString_AS_STRING(anchor_object) if node in self.serialized_nodes: if yaml_alias_event_initialize(&event, anchor) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error else: node_class = node.__class__ self.serialized_nodes[node] = True self.descend_resolver(parent, index) if node_class is ScalarNode: plain_implicit = 0 quoted_implicit = 0 tag_object = node.tag if self.resolve(ScalarNode, node.value, (True, False)) == tag_object: plain_implicit = 1 if self.resolve(ScalarNode, node.value, (False, True)) == tag_object: quoted_implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) value_object = node.value if PyUnicode_CheckExact(value_object): value_object = PyUnicode_AsUTF8String(value_object) if not PyString_CheckExact(value_object): raise TypeError("value must be a string") value = PyString_AS_STRING(value_object) length = PyString_GET_SIZE(value_object) style_object = node.style scalar_style = YAML_PLAIN_SCALAR_STYLE if style_object == "'": scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE elif style_object == "\"": scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE elif style_object == "|": scalar_style = YAML_LITERAL_SCALAR_STYLE elif style_object == ">": scalar_style = YAML_FOLDED_SCALAR_STYLE if yaml_scalar_event_initialize(&event, anchor, tag, value, length, plain_implicit, quoted_implicit, scalar_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error elif node_class is SequenceNode: implicit = 0 tag_object = node.tag if self.resolve(SequenceNode, node.value, True) == tag_object: implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) sequence_style = YAML_BLOCK_SEQUENCE_STYLE if node.flow_style: sequence_style = YAML_FLOW_SEQUENCE_STYLE if yaml_sequence_start_event_initialize(&event, anchor, tag, implicit, sequence_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error item_index = 0 for item in node.value: self._serialize_node(item, node, item_index) item_index = item_index+1 yaml_sequence_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error elif node_class is MappingNode: implicit = 0 tag_object = node.tag if self.resolve(MappingNode, node.value, True) == tag_object: implicit = 1 tag = NULL if tag_object is not None: if PyUnicode_CheckExact(tag_object): tag_object = PyUnicode_AsUTF8String(tag_object) if not PyString_CheckExact(tag_object): raise TypeError("tag must be a string") tag = PyString_AS_STRING(tag_object) mapping_style = YAML_BLOCK_MAPPING_STYLE if node.flow_style: mapping_style = YAML_FLOW_MAPPING_STYLE if yaml_mapping_start_event_initialize(&event, anchor, tag, implicit, mapping_style) == 0: raise MemoryError if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error for item_key, item_value in node.value: self._serialize_node(item_key, node, None) self._serialize_node(item_value, node, item_key) yaml_mapping_end_event_initialize(&event) if yaml_emitter_emit(&self.emitter, &event) == 0: error = self._emitter_error() raise error return 1cdef int output_handler(void *data, char *buffer, int size) except 0: cdef CEmitter emitter emitter = <CEmitter>data value = PyString_FromStringAndSize(buffer, size) emitter.stream.write(value) return 1