-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathvirtual_card.py
155 lines (134 loc) · 5.08 KB
/
virtual_card.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
import sys
import atexit
import socket
import struct
import errno
_Csizeof_short = len(struct.pack('h', 0))
VPCD_CTRL_LEN = 1
VPCD_CTRL_OFF = 0
VPCD_CTRL_ON = 1
VPCD_CTRL_RESET = 2
VPCD_CTRL_ATR = 4
class VirtualCard(object):
"""
This class is responsible for maintaining the communication of the virtual
PCD and the emulated smartcard. vpicc and vpcd communicate via a socket.
The vpcd sends command APDUs (which it receives from an application) to the
vicc. The vicc passes these CAPDUs on to an emulated smartcard, which
produces a response APDU. This RAPDU is then passed back by the vicc to
the vpcd, which forwards it to the application.
"""
def __init__(self, host, port, os, attack):
self.os = os
self.attack = attack
# Connect to the VPCD
self.host = host
self.port = port
try:
self.sock = self.connectToPort(host, port)
self.sock.settimeout(None)
self.server_sock = None
except socket.error as e:
logging.error("Failed to open socket: %s", str(e))
logging.error("Is pcscd running at %s? Is vpcd loaded? Is a \
firewall blocking port %u?", host, port)
sys.exit()
logging.info("Connected to virtual PCD at %s:%u", host, port)
atexit.register(self.stop)
@staticmethod
def connectToPort(host, port):
"""
Open a connection to a given host on a given port.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
return sock
@staticmethod
def openPort(port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', port))
server_socket.listen(0)
logging.info("Waiting for vpcd on port " + str(port))
(client_socket, address) = server_socket.accept()
return (client_socket, server_socket, address[0])
def __sendToVPICC(self, msg):
""" Send a message to the vpcd """
self.sock.sendall(struct.pack('!H', len(msg)) + msg)
def __recvFromVPICC(self):
""" Receive a message from the vpcd """
# receive message size
while True:
try:
sizestr = self.sock.recv(_Csizeof_short)
except socket.error as e:
if e.errno == errno.EINTR:
continue
break
if len(sizestr) == 0:
logging.info("Virtual PCD shut down")
raise socket.error
size = struct.unpack('!H', sizestr)[0]
# receive and return message
if size:
while True:
try:
msg = self.sock.recv(size)
except socket.error as e:
if e.errno == errno.EINTR:
continue
break
if len(msg) == 0:
logging.info("Virtual PCD shut down")
raise socket.error
else:
msg = None
return size, msg
def run(self):
"""
Main loop of the vpicc. Receives command APDUs via a socket from the
vpcd, dispatches them to the emulated smartcard and sends the resulting
respsonse APDU back to the vpcd.
"""
while True:
try:
(size, msg) = self.__recvFromVPICC()
except socket.error as e:
if not self.host:
logging.info("Waiting for vpcd on port " + str(self.port))
(self.sock, address) = self.server_sock.accept()
continue
else:
sys.exit()
if not size:
logging.warning("Error in communication protocol (missing \
size parameter)")
elif size == VPCD_CTRL_LEN:
if msg == chr(VPCD_CTRL_OFF):
logging.info("Power Down")
logging.info("-" * 70)
self.os.powerDown()
elif msg == chr(VPCD_CTRL_ON):
logging.info("Power Up")
self.os.powerUp()
elif msg == chr(VPCD_CTRL_RESET):
logging.info("Reset")
self.os.reset()
elif msg == chr(VPCD_CTRL_ATR):
#logging.info("ATR")
msg = self.os.getATR()
#logging.info("\nATR (%d bytes):\n%s", len(msg), hexdump(msg))
self.__sendToVPICC(msg)
else:
logging.warning("unknown control command")
else:
if size != len(msg):
logging.warning("Expected %u bytes, but received only %u",
size, len(msg))
answer = self.attack.user_execute(msg)
self.__sendToVPICC(answer)
def stop(self):
self.sock.close()
if self.server_sock:
self.server_sock.close()