Source code for FabLabKasse.cashPayment.server.NV11.NV11Device

#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-

# Interface client for Innovative Technology NV11 banknote validator/changer with eSSP Protocol
# based on the official specification (Innovative Technology GA138 SSP Protocol Manual) and a lot of own practical tests

# (C) 2013 Max Gaukler <development@maxgaukler.de>

#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  The text of the license conditions can be read at
#  <http://www.gnu.org/licenses/>.

import serial
import copy
import time
import traceback
from crc_algorithms import Crc
import Crypto.Cipher.AES
import Crypto.Random.random
import logging
from ..hex import hex
import unittest


[docs]class ESSPDevice(object): """low layer eSSP protocol - implements the network layer and all communication-related commands""" def __init__(self, port, presharedKey=0x0123456701234567, slaveID=0): ESSPDevice.Helper.unitTest() ESSPDevice.ByteStreamReader.unitTest() self.ser = serial.serial_for_url(port, 9600, timeout=0.2) self.seq = 0 self.slaveID = slaveID self.rawBuffer = [] self.buffer = [] self.lastPacket = None self.crypt = None self.encryptionCounter = None self.unencryptedCommand([0x11]) # sync: start communications self.initCrypto(presharedKey) # debug functions def __repr__(self): return "<ESSP>"
[docs] def printDebug(self, s, debugLevel): logLevels = {-1: logging.ERROR, 0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG, 3: logging.DEBUG - 1} logging.getLogger(self.__repr__()).log(logLevels[debugLevel], s)
[docs] def error(self, s): self.printDebug(s, -1)
[docs] def log(self, s): self.printDebug(s, 1)
[docs] def warn(self, s): self.printDebug(s, 0)
[docs] def debug(self, s): self.printDebug(s, 2)
[docs] def debug2(self, s): self.printDebug(s, 3)
[docs] class Helper(object): CRC = Crc(width=16, poly=0x8005, reflect_in=False, xor_in=0xFFFF, reflect_out=False, xor_out=0x0000) # specification is slightly unclear, figured out by trial-and-error
[docs] @staticmethod def unitTest(): assert ESSPDevice.Helper.crc([0x80, 0x01, 0x01]) == [0x06, 0x02] assert ESSPDevice.Helper.crc([0x80, 0x01, 0xF0]) == [0x23, 0x80]
[docs] @classmethod def splitBytes(cls, uint16): # uint16 -> [lowByte, highByte] return [uint16 & 0xFF, uint16 >> 8]
[docs] @classmethod def crc(cls, data): c = cls.CRC.bit_by_bit(data) return cls.splitBytes(c)
[docs] @classmethod def Unsigned32ToBytes(cls, x): # return little-endian representation return ESSPDevice.Helper.UnsignedToBytes(x, n=4)
[docs] @staticmethod def Unsigned64ToBytes(x): # return little-endian representation return ESSPDevice.Helper.UnsignedToBytes(x, n=8)
[docs] @staticmethod def UnsignedToBytes(x, n): r = [] for _ in range(n): r.append(x & 0xFF) x = x >> 8 return r
[docs] @classmethod def AsciiToBytes(cls, x): return [ord(c) for c in x]
[docs] @staticmethod def byteArrayToString(data): return b''.join([chr(x) for x in data])
[docs] @staticmethod def stringToByteArray(string): return ([ord(x) for x in string])
[docs] class ByteStreamReader(object): """ read data values from a list of bytes """ def __init__(self, bytesList): self.buffer = copy.deepcopy(bytesList)
[docs] @staticmethod def unitTest(): test = ESSPDevice.ByteStreamReader([0x00, 0x1C, 0x96, 0x2C]) assert test.readUnsigned32BigEndian() == 0x1c962c test.assertFinished()
[docs] def readData(self, n): assert len(self.buffer) >= n data = copy.deepcopy(self.buffer[0:n]) del self.buffer[0:n] return data
[docs] def hasData(self): return len(self.buffer) > 0
[docs] def readByte(self): # unsigned int8 return self.readData(1)[0]
# read a n-bytes unsigned integer, little or big endian
[docs] def readUnsigned(self, numBytes, littleEndian): data = self.readData(numBytes) if littleEndian: data.reverse() result = 0 for byte in data: result *= 256 result += byte return result
[docs] def readUnsigned32(self, CheckOverrun=True): d = self.readUnsigned(numBytes=4, littleEndian=True) # values should be far far away from the integer maximum. # if the most significant byte is very large, a decoding error is very likely # even the value counters won't be that large for heavy use: # 10k€/month * 100 ct/euro * 12 months/year * 10 years < 2**31 ! if CheckOverrun: assert d <= 2 ** 31 return d
[docs] def readUnsigned32BigEndian(self): # big-endian version of previous function d = self.readUnsigned(numBytes=4, littleEndian=False) assert d <= 2 ** 31 return d
[docs] def readUnsigned24(self): # unsigned int24 little-endian return self.readUnsigned(numBytes=3, littleEndian=True)
[docs] def readUnsigned24BigEndian(self): return self.readUnsigned(numBytes=3, littleEndian=False)
[docs] def readAscii(self, n): d = self.readData(n) for byte in d: # only printable ASCII characters allowed assert byte >= 32 assert byte <= 126 # ASCII byte-array to string return "".join(map(chr, d))
[docs] def assertFinished(self): # assert that there is no data left - use this at the end of parsing fixed-length replies assert not self.hasData()
# # Low-Level Send/Receive #
[docs] def send(self, data): self.debug2("encoding packet: " + hex(data)) # alternate sequence bit self.seq = int(not self.seq) content = [self.seq * 128 + self.slaveID, len(data)] + data content = content + ESSPDevice.Helper.crc(content) def bytestuff(bytesList): output = [] for b in bytesList: output.append(b) if b == 0x7F: # 0x7F is repeated once ("byte stuffing") output.append(b) return output content = bytestuff(content) packet = [0x7F] + content self.debug2("sending raw packet: " + hex(packet)) packetString = "" for byte in packet: packetString = packetString + chr(byte) self.ser.write(packetString) self.lastPacket = packetString
[docs] def resendLast(self): self.debug("resending last packet") self.ser.write(self.lastPacket)
# clear all read buffers
[docs] def flushRead(self): while self.ser.inWaiting() > 0: self.warn("flushing serial read buffer") self.ser.read(self.ser.inWaiting()) if self.rawBuffer: self.warn("flushing raw input buffer") self.rawBuffer = [] if self.buffer: self.warn("flushing input buffer") self.buffer = []
# get data from serial port - call this repeatedly until it returns False, then again after some waiting time # if data was received for the right slaveID, it returns a Response object containing data and status
[docs] def read(self): bytesToRead = self.ser.inWaiting() if bytesToRead > 0: self.debug2("received " + str(bytesToRead) + " bytes") rxString = self.ser.read(bytesToRead) self.debug2("raw buffer at start of read() (old data): " + hex(self.rawBuffer)) for char in rxString: self.rawBuffer.append(ord(char)) self.debug2("raw buffer before byte-destuffing: " + hex(self.rawBuffer)) # read bytes for de-stuffing while self.rawBuffer: if self.rawBuffer[0] == 0x7F: # the STX byte needs special care (byte-stuffing needs to be reversed) if len(self.rawBuffer) == 1: # a 0x7F byte at the end of the buffer cannot be processed yet (unclear if it will be followed by another by 0x7F) break if self.rawBuffer[1] == 0x7F: # we got a byte-stuffed 0x7F databyte self.buffer.append(0x7F) del self.rawBuffer[0:2] # delete first two elements (indices 0 and 1) else: # we received a real STX byte # store it as -1 (special value) del self.rawBuffer[0] self.buffer.append(-1) else: # accept other bytes unmodified self.buffer.append(self.rawBuffer.pop(0)) # parse data # data format AFTER bytestuffing: # 0: STX byte (originally 0x7F, now -1) # 1: sequence bit + slave ID # 2: length # 3 ... 3+length-1: data # 4+length-1 ... 5+length-1: CRC # attention, python subranging is a bit strange: array[a:b] returns array[a] ... array[b-1] (not including b!) while self.buffer: self.debug2("buffer: " + hex(self.buffer)) # check validity of packet while len(self.buffer) > 0 and self.buffer[0] != -1: self.warn("wrong sync-start of packet - discarding one byte") self.error("halting because of wrong sync-start. buffer: " + hex(self.buffer)) raise Exception("comms error or device firmware bug. halting.") # del self.buffer[0] if len(self.buffer) < 3: # not enough data received return False length = self.buffer[2] if -1 in self.buffer[1:length + 5]: # note: python does not throw exceptions if the buffer is not long enough for the range 1 ... length+5, but just returns the longest possible part self.warn("packet data contains a sync-start, skipping the start of this malformed (too short?) packet and waiting for next sync") del self.buffer[0] continue if len(self.buffer) < length + 5: # not enough data received to parse the whole packet return False crc = self.buffer[length + 3:length + 4 + 1] if ESSPDevice.Helper.crc(self.buffer[1:3 + length - 1 + 1]) != crc: self.warn("CRC error - discarding this packet start and waiting for next sync-start") # do not delete whole packet from buffer (the length might have been corrupted), but only the first byte (sync-start), wait until the next start del self.buffer[0] continue # valid packet - return the data if self.buffer[1] & 0x7F == self.slaveID: if bool(self.buffer[1] & 128) != bool(self.seq): self.warn("response has wrong sequence number, discarding it.") data = False else: data = ESSPDevice.Response(self.buffer[3:length + 3]) self.debug("response: {0}".format(data)) else: # the packet was not for us, but for another slave data = False del self.buffer[0:length + 5] return data return False
# # High-Level Send/Receive: Command/Response #
[docs] class Response(object): """ status + data """ statusStrings = { -1: "decoded response contains no data", 0xF0: "OK", 0xF2: "Command not known", 0xF3: "Wrong number of parameters", 0xF4: "Param out of range", 0xF5: "Command cannot be processed at this time, possibly busy or not correctly configured", 0xF6: "Software error", 0xF8: "Command failure", 0xFA: "Encryption key not set", 0x7E: "Encrypted Data" } def __init__(self, data): if not data: # empty response AFTER decoding self.status = -1 self.data = [] self.status = data[0] self.data = copy.copy(data[1:])
[docs] def isOkay(self): return self.status == 0xF0
[docs] def isSoftFail(self): return self.status == 0xF5
[docs] def isHardFail(self): return not(self.isOkay() or self.isSoftFail())
[docs] def isEncrypted(self): return self.status == 0x7E
[docs] def statusString(self): try: return self.statusStrings[self.status] except KeyError: return "unknown statuscode " + str(self.status)
def __repr__(self): return "<Response: status=" + hex(self.status) + " (" + self.statusString() + "), data=" + hex(self.data) + ">"
[docs] def getData(self): return copy.copy(self.data)
[docs] def getDataStream(self): return ESSPDevice.ByteStreamReader(self.getData())
[docs] def getStatus(self): return self.status()
[docs] def unencryptedCommand(self, data, allowSoftFail=False): return self.command(data, allowSoftFail, encrypted=False)
[docs] def command(self, data, allowSoftFail=False, encrypted=True): if encrypted: self.debug2("encrypting command, original data:" + hex(data)) data = self.encryptData(data) self.debug2("encrypted data:" + hex(data)) time.sleep(0.25) self.flushRead() self.send(data) num_retries = 3 for _ in range(num_retries): for __ in range(20): time.sleep(0.01) r = self.read() if r is not False: break if encrypted: if r is not False and r.isEncrypted(): r = self.decryptResponse(r) else: if r is False: self.log("response timeout") else: self.log("unsuccessful response: " + str(r)) if r is False: self.warn("Timeout or CRC/Crypto error -- resend necessary (not fatal, this may happen rarely)") self.resendLast() continue if not r.isOkay(): self.warn("got response with error status:" + str(r)) if not (r.isSoftFail() and allowSoftFail): self.error(("Command failed: Cmd " + hex(data) + ", Resp " + str(r))) raise Exception("Command failed: Cmd " + hex(data) + ", Resp " + str(r)) else: self.debug2("got response:" + str(r)) return r self.error("No valid response after 3 retries") raise Exception("No reply received")
# # Crypto #
[docs] def initCrypto(self, presharedKey): # todo use real random primes here ????????? whatever, the whole crypto stuff is rather a marketing gag than a useful security measure, it has no MITM protection # and AFAIK SSL also uses fixed generator and modulus generator = 0x5a7ccab modulus = 0x4c564cf assert modulus < generator self.unencryptedCommand([0x4A] + ESSPDevice.Helper.Unsigned64ToBytes(generator)) # set generator self.unencryptedCommand([0x4B] + ESSPDevice.Helper.Unsigned64ToBytes(modulus)) # set modulus # hostRandomNumber=1 # random number hostRandomNumber = 0x4e9efc7 hostTempKey = pow(generator, hostRandomNumber, modulus) # generator ** hostRandomNumber % modulus # assert hostTempKey==0x1d9ecb1 s = self.unencryptedCommand([0x4C] + ESSPDevice.Helper.Unsigned64ToBytes(hostTempKey)).getDataStream() slaveTempKey = s.readUnsigned(numBytes=8, littleEndian=True) # SlaveInter=0x10ada5d # slaveTempKey=SlaveInter s.assertFinished() negotiatedKey = pow(slaveTempKey, hostRandomNumber, modulus) # (slaveTempKey ** hostRandomNumber) % modulus key = negotiatedKey * (2 ** 64) + presharedKey # convert to byte array keyBytearray = ESSPDevice.Helper.UnsignedToBytes(key, n=16) # convert to string keyString = ESSPDevice.Helper.byteArrayToString(keyBytearray) self.crypt = Crypto.Cipher.AES.new(keyString) self.encryptionCounter = 0
[docs] def encryptData(self, data): d = [len(data)] + ESSPDevice.Helper.Unsigned32ToBytes(self.encryptionCounter) + data self.encryptionCounter += 1 # padding: make the final data length a multiple of 16 bytes while not (len(d) + 2) % 16 == 0: d.append(Crypto.Random.random.randint(0, 255)) d += ESSPDevice.Helper.crc(d) stream = ESSPDevice.ByteStreamReader(d) encryptedData = [0x7E] # start byte of encrypted transmission while stream.hasData(): dataStr = ESSPDevice.Helper.byteArrayToString(stream.readData(16)) cryptedStr = self.crypt.encrypt(dataStr) encryptedData += ESSPDevice.Helper.stringToByteArray(cryptedStr) return encryptedData
[docs] def decryptResponse(self, r): try: assert r.isEncrypted() # check start byte 0x7E, it is the response status byte assert len(r.data) % 16 == 0, "padding length" encryptedStream = r.getDataStream() decryptedData = [] while encryptedStream.hasData(): dataStr = ESSPDevice.Helper.byteArrayToString(encryptedStream.readData(16)) decryptedStr = self.crypt.decrypt(dataStr) decryptedData += ESSPDevice.Helper.stringToByteArray(decryptedStr) self.debug2("parsing decrypted data: " + hex(decryptedData)) stream = ESSPDevice.ByteStreamReader(decryptedData) length = stream.readByte() assert length > 0, "nonzero length" receivedEncryptionCounter = stream.readUnsigned32(CheckOverrun=False) assert self.encryptionCounter == receivedEncryptionCounter, "encryption counter: expected {0}, received {1}".format(self.encryptionCounter, receivedEncryptionCounter) data = stream.readData(length) # discard padding # # len,counter,data,padding,CRC have a total length of n*16 # 1 + 4 + len + x + 2 = n*16 # # => (7 + len) + x - n*16 = 0 # => x = n*16 - (7*length) so that x >= 0 stream.readData((- (7 + length)) % 16) # CRC on all bytes except the start byte and the CRC itself assert stream.readData(2) == ESSPDevice.Helper.crc(decryptedData[0:-2]), "decrypted CRC mismatch" stream.assertFinished() return ESSPDevice.Response(data) except AssertionError: self.warn("failed to decrypt response, discarding it. " + traceback.format_exc()) # self.encryptionCounter-=1 return False
# # Basic Commands # # resets the device - ATTENTION, the usb device also detaches after reset, so you need to reopen the port!
[docs] def reset(self): r = self.unencryptedCommand([0x01]) if r != 0x01: raise Exception("Reset failed")
[docs] def setEnabled(self, enabled): if enabled: self.command([0x0A]) else: self.command([0x09])
[docs]class NV11Device(ESSPDevice): """Interface client for Innovative Technology NV11 banknote validator/changer with eSSP Protocol""" def __init__(self, port, presharedKey=0x0123456701234567, slaveID=0): ESSPDevice.__init__(self, port, presharedKey, slaveID) self.command([0x06, 0x07]) # Host protocol version 7 self.unitData = self._getUnitData() # set Value reporting: Value - if this is changed, a lot of the event decoding will break! self.command([0x45, 0x00]) # report by value, not by channel ID self.command([0x5C, 0x01]) # enable payout device self.setEnabled(False) self.setEnabledChannels() def __repr__(self): return "<NV11>" # # Commands #
[docs] def setEnabledChannels(self, enabledChannels=None, upTo=0): """ enable cash input for certain channels (denominations). Use either of the two parameters. If both are used, the result will be combined with logical ``or``. :param list[int] enabledChannels: IDs of channels to explicitly enable, even if their denominaion is below the value of ``upTo``. :param int upTo: maximum allowed denomination, or 0 """ # channel numbers are counted from 1 on bitmask = 0 enabledChannels = set(enabledChannels or []) for i in range(self.unitData["numChannels"]): if self.unitData["real channel value"][i] <= upTo: enabledChannels.add(i + 1) for c in list(enabledChannels): assert c > 0 assert c <= 16 bitmask = bitmask | (1 << (c - 1)) bytesList = ESSPDevice.Helper.splitBytes(bitmask) self.command([0x02] + bytesList) self.setEnabled(bitmask != 0)
[docs] def getChannelValue(self, channelId, reportedValue=False): assert channelId > 0 assert channelId <= 16 if reportedValue: return self.unitData["reported channel value"][channelId - 1] else: return self.unitData["real channel value"][channelId - 1]
def _getUnitData(self): """get device setup data -- see ESSP specification""" unitData = {} # Serial number s = self.command([0xC]).getDataStream() unitData["serial number"] = s.readUnsigned32BigEndian() # serial number is big-endian - WTF s.assertFinished() # Unit data s = self.command([0xD]).getDataStream() unitData["unit type"] = s.readByte() unitData["firmware version"] = s.readAscii(4) unitData["country"] = s.readAscii(3) unitData["internal value multiplier"] = s.readUnsigned24BigEndian() # the official documentation example looks like this should be little-endian, but it isn't unitData["protocol version"] = s.readByte() s.assertFinished() # Setup Request s = self.command([0x5]).getDataStream() assert s.readByte() == 7 # BNV with NoteFloat assert s.readAscii(4) == unitData["firmware version"] assert s.readAscii(3) == unitData["country"] assert unitData["internal value multiplier"] == s.readUnsigned24BigEndian() assert unitData["internal value multiplier"] != 0 # assuming old-style dataset - if this fails, see official docs and rewrite unitData["numChannels"] = s.readByte() assert unitData["numChannels"] in range(1, 17) unmultipliedChannelValue = [s.readByte() for _ in range(unitData["numChannels"])] unitData["channel security (obsolete)"] = [s.readByte() for _ in range(unitData["numChannels"])] unitData["real value multiplier"] = s.readUnsigned24BigEndian() # second value multiplier assert unitData["internal value multiplier"] != 0 unitData["reported channel value"] = [x * unitData["internal value multiplier"] for x in unmultipliedChannelValue] unitData["real channel value"] = [x * unitData["internal value multiplier"] * unitData["real value multiplier"] for x in unmultipliedChannelValue] assert s.readByte() == 7 # current protocol version # multi currency datasets are not implemented. assume all country codes are equal for i in range(unitData["numChannels"]): assert s.readAscii(3) == unitData["country"] # full channel value for i in range(unitData["numChannels"]): assert s.readUnsigned32() == unitData["reported channel value"][i] s.assertFinished() self.log("Unit data: {0}".format(unitData)) return unitData
[docs] def setRouteToPayout(self, values): """route all notes in the given list of values to the payout-store. others will be directly put to the cashbox and are not availble for return. """ for v in values: assert v % self.unitData["real value multiplier"] == 0 assert v / self.unitData["real value multiplier"] in self.unitData["reported channel value"] for v in self.unitData["reported channel value"]: # set denomination route route = 0x01 # default: to cashbox if v * self.unitData["real value multiplier"] in values: route = 0x00 # route to payout-store # docs are unclear: here the real and not the reported value is used!!!! self.command([0x3B, route] + ESSPDevice.Helper.Unsigned32ToBytes(v * self.unitData["real value multiplier"]) + ESSPDevice.Helper.AsciiToBytes(self.unitData["country"]))
[docs] def getPayoutValues(self): """get values of notes on payout stack. The last one of these is on top of the stack and will be paid out by the payout-command. """ s = self.command([0x41]).getDataStream() # get note positions num = s.readByte() values = [s.readUnsigned32() for _ in range(num)] s.assertFinished() self.debug("payout values:" + str(values)) return values
[docs] def tryPayout(self, value): l = self.getPayoutValues() if len(l) == 0 or l[-1] > value: return False # payout seems possible self.log("trying payout of note {0}".format(l[-1])) r = self.command([0x42], allowSoftFail=True) # payout last stored note if not r.isOkay(): self.log("could not payout (busy if data==3, otherwise error):" + str(r)) return r.isOkay() # False = could not payout, True = starting payout / waiting for start
[docs] def stackFromPayout(self): """ move the current note from payout store to the cashbox, so that a smaller note that isn't on top of the stack can be paid out. does not check if this is useful, these checks need to be done at a higher level! """ l = self.getPayoutValues() assert len(l) > 0 self.log("moving note {0} from payout-store to cashbox. payout store contents before:{1} ".format(l[-1], l)) self.command([0x43])
[docs] def empty(self): self.log("counters:") s = self.command([0x58]).getDataStream() msg = "Counters: " assert s.readByte() == 5 # 5x4bytes msg += "{0} stacked, ".format(s.readUnsigned32()) msg += "{0} stored, ".format(s.readUnsigned32()) msg += "{0} dispensed, ".format(s.readUnsigned32()) msg += "{0} transferred from store to stacker, ".format(s.readUnsigned32()) msg += "{0} rejected.".format(s.readUnsigned32()) s.assertFinished() self.log(msg) self.log("emptying (smart-empty)") self.command([0x52])
[docs] def poll(self): # TODO properly document return type # should we use the POLL_WITH_ACK command? # from the docs, it looks better than POLL, but in practice it caused some strange trouble USE_POLL_WITH_ACK = False if USE_POLL_WITH_ACK: resp = self.command([0x56]) # Poll with ACK else: resp = self.command([0x07]) # Poll (normal, does not need extra ACK) fullData = resp.getData() # copy for logging self.debug("event response:" + hex(fullData)) eventData = resp.getDataStream() # stream for parsing # event importance (log level) # error=-1 is not used as long as the communication is okay warning = 0 # something bad happened - email the operator! log = 1 # interesting, but usual debug = 2 # uninteresting noise, only for debugging # non-ACK events with no data: simpleEvents = { # 0xB5: ["all input channels disabled", debug], 0xB6: ["booting, please wait", log], 0xC2: ["emptying, please wait", debug], 0xC3: ["emptied", log], 0xC6: ["payout device went out of service - (TODO: implement re-enabling by ENABLE PAYOUT DEVICE)", warning], 0xC7: ["payout device removed", warning], 0xC8: ["payout device attached", log], 0xCF: ["payout device full", log], # TODO official documentation says that this event is 0xC9, which already has a meaning. WTF 0xCC: ["note stacking", debug], 0xE3: ["cashbox removed", log], 0xE4: ["cashbox reinserted", log], 0xE7: ["stacker full", warning], 0xE9: ["note jammed, possibly removable by user", warning], 0xEA: ["note jammed, safe (not retrievable by user)", warning], 0xEB: ["note stacked", debug], 0xEC: ["note rejected", debug], 0xED: ["rejecting note", debug], 0xF1: ["power reset", log], } # TODO filter out repetitions???? # TODO email for errors # events with 1 byte data (channel or 0=in progress) # only for NV9/NV11 validators - needs many changes for other device types! eventsWithChannelInfo = {0xE6: ["Fraud attemtpted", True, warning], 0xE1: ["Note rejected to user at powerup", True, warning], 0xE2: ["Note cleared to cashbox at powerup", True, warning], 0xEE: ["Credit note ", True, log] } # events with and without ACK that have a data response like: # numItems, [Countrycode, Value], [Countrycode, Value], ... eventsWithValueReporting = { # Code: [name, needsACK, logLevel, meaning], ... # meaning: see code below - how should the code handle this event 0xB3: ["emptying (smart-empty), current value ", False, debug, "ignore"], 0xB4: ["emptied (smart-empty)", True, log, "ignore"], 0xCA: ["note cleared to stacker at powerup", True, warning, "LogSingleItem"], 0xCB: ["note cleared to payout-store at powerup", True, warning, "LogSingleItem"], 0xCD: ["note cleared to user (dispensed!) at powerup", True, warning, "LogSingleItem"], 0xCE: ["dispensed note held in bezel", False, log, "LogSingleItem"], 0xD2: ["payout completed", True, log, "EndOfPayout"], 0xD5: ["jammed - needs manual intervention, currently paid out value:", False, warning, "EndOfPayout"], # TODO should this really be EndOfPayout ????? # TODO 0xD5 really EndOfPayout ????? 0xD6: ["payout halted (requested by host), currently paid out value", False, log, "Unsupported"], 0xD9: ["Timeout: unable to complete payout request. paid out value:", True, warning, "EndOfPayout"], # TODO should this really be EndOfPayout ????? 0xDA: ["payout is active, currently paid out:", False, debug, "ignore"], 0xDB: ["note stored in payout", False, log, "LogSingleItem"], # official documentation unclear! 0xDC: ["Payout request before powerup was interrupted. floated value / requested value:", True, warning, "LogTwoValues"], 0xDD: ["Float request before powerup was interrupted. floated value / requested value: ", True, warning, "LogTwoValues"], 0xC9: ["note moved from payout to stacker", True, log, "LogSingleItem"], } eventNeedsACK = False r = {} # return r["acceptActive"] = True r["received"] = [] r["dispensed"] = [] r["payoutActive"] = False r["finished"] = False r["stackedFromPayout"] = False r["smartEmptyFinished"] = False while eventData.hasData(): ev = eventData.readByte() self.debug("event " + hex(ev)) if ev in simpleEvents: self.printDebug("event " + hex(ev) + ": " + simpleEvents[ev][0], simpleEvents[ev][1]) elif ev == 0xEF: channel = eventData.readByte() if channel == 0: self.debug("reading note...") else: self.log("note in escrow, channel " + str(channel)) # we could also reject the note here, but then we must send Hold/Reject command before ANY ack is sent out, even if other ACK events are pending!!! elif ev == 0xE8: self.debug("shutdown") r["finished"] = True elif ev == 0xB5: self.debug("accept-shutdown (all inputs disabled)") r["acceptActive"] = False elif ev in eventsWithChannelInfo.keys(): message = eventsWithChannelInfo[ev][0] if eventsWithChannelInfo[ev][1]: eventNeedsACK = True logSeverity = eventsWithChannelInfo[ev][2] channel = eventData.readByte() self.printDebug(message + " - channel " + str(channel), logSeverity) if ev == 0xEE: # credit note r["received"] += [self.getChannelValue(channel)] # TODO we can't easily tell if the note was stacked or put into the cashbox # self.setEnabledChannels() will be called after parsing of all events has been completed elif ev in eventsWithValueReporting.keys(): message = eventsWithValueReporting[ev][0] if eventsWithValueReporting[ev][1]: # needs ACK eventNeedsACK = True logSeverity = eventsWithValueReporting[ev][2] meaning = eventsWithValueReporting[ev][3] assert meaning in ["EndOfPayout", "Unsupported", "ignore", "LogTwoValues", "LogSingleItem"] assert meaning != "Unsupported" if ev == 0xDA: # payout in progress r["payoutActive"] = True if ev == 0xC9: r["stackedFromPayout"] = True if ev == 0xB4: r["smartEmptyFinished"] = True if True: # meaning != "LogSingleItem": TODO # array of (country, value) pairs, first byte is the number of array-items # we don't implement multi-country, so assert there is only one assert eventData.readByte() == 1 # for "LogSingleItems" the array length is not transmitted and fixed to 1 value = eventData.readUnsigned32() # assuming reporting by value (this is set in init) if meaning == "LogTwoValues": # this event has an answer with two values: (value1 value2 country) value = [value, eventData.readUnsigned32()] country = eventData.readAscii(3) message += " {0} {1}. ".format(value, country) if meaning == "EndOfPayout": self.log(hex(ev) + "end of payout, dispensed: " + str(value)) assert r["dispensed"] == [] # prevent duplicate counting of one pay-out in single poll response - TODO is this okay? assert value > 0 # empty completed payouts are usually reported after communication errors - TODO may this happen? r["dispensed"] += [value] self.printDebug(message, logSeverity) else: remainingData = [] while eventData.hasData(): remainingData.append(eventData.readByte()) self.error("event decode error: " + hex(fullData) + ", trouble at " + hex(ev) + ", remaining unparsed data:" + hex(remainingData)) raise Exception("unknown event - probably decode error") logging.debug("event parsing finished.") if r["received"]: # a banknote has been received. # Do not allow further notes before explicit reactivation. self.log("disabling channels after a banknote was received") self.setEnabledChannels() if eventNeedsACK and USE_POLL_WITH_ACK: # when using poll-with-ACK, certain events need to be confirmed via ACK, otherwise they will reappear in the next poll response. # because of strange bugs, we have a extra safety delay added here. time.sleep(1) self.command([0x57]) # event ACK time.sleep(1) return r
[docs]class NV11DeviceTest(unittest.TestCase): """Test NV11Device class"""
[docs] def test_ESSPDevice_crc(self): """unittest: check crc of ESSPDevice.Helper""" ESSPDevice.Helper.unitTest()
[docs] def test_ESSPDevice_ByteStreamReader(self): """unittest: test the ByteStreamReader of ESSSPDevice""" ESSPDevice.ByteStreamReader.unitTest()