# HG changeset patch # User cmlenz # Date 1121353830 0 # Node ID 7b10a8b2d0c49893712ce2a93c9a6a6a3e9055a0 # Parent 8ae753a22494d80ac6a75c50de8e0009d39e09b3 Changed BEEP frame sending mechanism to use the `push_with_producer()` function provided by `asynchat`. In preparation for #17. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -279,39 +279,6 @@ if channel == 0 and msgno is not None: self.channels[0].profile.send_error(msgno, 550, e) - def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, - payload=''): - """Send the specified data frame to the peer. - - @param cmd: The frame keyword (one of MSG, RPY, ERR, ANS, or NUL) - @param channel: The number of the sending channel - @param msgno: The message number - @param more: Whether this is the last frame belonging to the message - @param seqno: The frame-specific sequence number - @param ansno: For ANS frames, the answer number, otherwise `None` - @param payload: The payload of the frame - """ - headerbits = [cmd, channel, msgno, more and '*' or '.', seqno, - len(payload)] - if cmd == 'ANS': - assert ansno is not None - headerbits.append(ansno) - header = ' '.join([str(bit) for bit in headerbits]) - logging.debug('Sending frame [%s]', header) - self.push('\r\n'.join((header, payload, 'END', ''))) - - def send_seq_frame(self, channel, ackno, window): - """Send a SEQ frame to the peer. - - @param channel: The number of the sending channel - @param ackno: The acknowledgement number. - @param window: The requested window size - """ - headerbits = ['SEQ', channel, ackno, window] - header = ' '.join([str(bit) for bit in headerbits]) - logging.debug('Sending frame [%s]', header) - self.push('\r\n'.join((header, 'END', ''))) - def terminate(self, handle_ok=None, handle_error=None): """Terminate the session by closing all channels.""" def close_next_channel(): @@ -472,34 +439,13 @@ elif cmd == 'NUL': self.profile.handle_nul(msgno) - def _send(self, cmd, msgno, ansno=None, payload=None): - """Send a frame to the peer.""" - data = '' - if payload is not None: - data = payload.as_string() - - # If the size of the payload exceeds the current negotiated window size, - # fragment the message and send in smaller chunks - while len(data) > self.windowsize: - window = data[:self.windowsize] - self.session.send_data_frame(cmd, self.channelno, msgno, True, - self.seqno[1].value, payload=window, - ansno=ansno) - self.seqno[1] += self.windowsize - data = data[self.windowsize:] - - # Send the final frame - self.session.send_data_frame(cmd, self.channelno, msgno, False, - self.seqno[1].value, payload=data, - ansno=ansno) - self.seqno[1] += len(data) - def send_msg(self, payload, handle_reply=None): """Send a MSG frame to the peer. @param payload: The message payload (a `Payload` instance) @param handle_reply: A function that is called when a reply to this message is received + @return: the message number assigned to the message """ while True: # Find a unique message number msgno = self.msgno.next() @@ -508,7 +454,8 @@ self.msgnos.add(msgno) # Flag the chosen message number as in use if handle_reply is not None: self.reply_handlers[msgno] = handle_reply - self._send('MSG', msgno, None, payload) + self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, + payload)) return msgno def send_rpy(self, msgno, payload): @@ -517,7 +464,8 @@ @param msgno: The number of the message this reply is in reference to @param payload: The message payload (a `Payload` instance) """ - self._send('RPY', msgno, None, payload) + self.session.push_with_producer(FrameProducer(self, 'RPY', msgno, None, + payload)) def send_err(self, msgno, payload): """Send an ERR frame to the peer. @@ -525,17 +473,20 @@ @param msgno: The number of the message this reply is in reference to @param payload: The message payload (a `Payload` instance) """ - self._send('ERR', msgno, None, payload) + self.session.push_with_producer(FrameProducer(self, 'ERR', msgno, None, + payload)) def send_ans(self, msgno, payload): """Send an ANS frame to the peer. @param msgno: The number of the message this reply is in reference to @param payload: The message payload (a `Payload` instance) + @return: the answer number assigned to the answer """ ansnos = self.ansnos.setdefault(msgno, cycle_through(0, 2147483647)) next_ansno = ansnos.next() - self._send('ANS', msgno, next_ansno, payload) + self.session.push_with_producer(FrameProducer(self, 'ANS', msgno, + next_ansno, payload)) return next_ansno def send_nul(self, msgno): @@ -543,10 +494,67 @@ @param msgno: The number of the message this reply is in reference to """ - self._send('NUL', msgno) + self.session.push_with_producer(FrameProducer(self, 'NUL', msgno)) del self.ansnos[msgno] # dealloc answer numbers for the message +class FrameProducer(object): + """Internal class that emits the frames of a BEEP message, based on the + `asynchat` `push_with_producer()` protocol. + """ + + def __init__(self, channel, cmd, msgno, ansno=None, payload=None): + """Initialize the frame producer. + + @param channel the channel the message is to be sent on + @param cmd the BEEP command/keyword (MSG, RPY, ERR, ANS or NUL) + @param msgno the message number + @param ansno the answer number (only for ANS messages) + @param payload the message payload (an instance of `Payload`) + """ + self.session = channel.session + self.channel = channel + self.cmd = cmd + self.msgno = msgno + self.ansno = ansno + + self.data = '' + if payload is not None: + self.data = payload.as_string() + self.done = False + + def more(self): + """Called by `async_chat` when the producer has been pushed on the + producer FIFO and the channel is about to write.""" + if self.done: + return '' + + if len(self.data) > self.channel.windowsize: + # If the size of the payload exceeds the current negotiated window + # size, fragment the message and send in smaller chunks + frame = self._make_frame(self.data[:self.channel.windowsize], True) + self.channel.seqno[1] += self.channel.windowsize + self.data = self.data[self.channel.windowsize:] + return frame + else: + # Send the final frame + frame = self._make_frame(self.data, False) + self.channel.seqno[1] += len(self.data) + self.done = True + return frame + + def _make_frame(self, payload, more=False): + headerbits = [self.cmd, self.channel.channelno, self.msgno, + more and '*' or '.', self.channel.seqno[1].value, + len(payload)] + if self.cmd == 'ANS': + assert self.ansno is not None + headerbits.append(self.ansno) + header = ' '.join([str(bit) for bit in headerbits]) + logging.debug('Sending frame [%s]', header) + return '\r\n'.join((header, payload, 'END', '')) + + class ProfileHandler(object): """Abstract base class for handlers of specific BEEP profiles. diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -19,11 +19,31 @@ def close(self): self.closed = True - def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, - payload=''): + def push_with_producer(self, producer): assert not self.closed - self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, - payload.strip())) + while True: + frame = producer.more() + if not frame: + break + header, rest = frame.split('\r\n', 1) + header = header.split(' ') + cmd = header[0].upper() + channel = int(header[1]) + msgno = int(header[2]) + more = header[3] == '*' + seqno = int(header[4]) + size = int(header[5]) + ansno = None + if cmd == 'ANS': + ansno = int(header[6]) + if size == 0: + payload = '' + else: + payload = rest[:size] + rest = rest[size:] + self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, + payload.strip())) + assert rest == '\r\nEND\r\n' class MockProfileHandler(object):