-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrnet.py
150 lines (129 loc) · 4.68 KB
/
crnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import pandas as pd
import util
class CRStation:
def __init__(self, name, pinyin, tele, conn):
self.name = name
self.pinyin = pinyin
self.tele = tele
self.conn = conn
def __str__(self):
return f'{self.name}, {self.pinyin}, {self.tele}, {self.conn}'
def __repr__(self):
return self.__str__()
class CRStations:
def __init__(self, infos=None, line=None, lazyMap=True):
self.db = {}
self.nmMap = util.MultiDict()
self.pyMap = util.MultiDict()
if infos is not None:
for index, station in infos.iterrows():
content = (CRStation(*station), {line})
self.db[station.tele] = content
if not lazyMap:
self._genMap()
def _genMap(self):
for tele, content in self.db.items():
self.nmMap.add(content[0].name, content)
self.pyMap.add(content[0].pinyin, content)
def _mergeIn(self, crs):
for tele, (station, lines) in crs.db.items():
if tele in self.db:
self.db[tele][1].update(lines)
else:
self.db[tele] = (station, lines)
def getName(self, tele):
return self.db[tele][0].name
def findTele(self, tele):
try:
return self.db[tele]
except KeyError:
return None
def findPinyin(self, pinyin):
return self.pyMap[pinyin]
def findName(self, name):
return self.nmMap[name]
def teleToName(self, tele):
try:
return self.db[tele][0].name
except KeyError:
return None
class CRLine:
def __init__(self, line, num, info):
self.line = line
self.num = num
self.info = info
self.t2r = {}
for index, station in info.iterrows():
self.t2r[station.tele] = index
def dist(self, tele1, tele2):
mile1 = self.info.loc[self.t2r[tele1], 'mile']
mile2 = self.info.loc[self.t2r[tele2], 'mile']
return abs(mile1 - mile2)
def getNodes(self, *teleps):
last = len(self.info) - 1
for index, station in self.info.iterrows():
if station.conn or index in (0, last) \
or station.tele in teleps:
yield station.tele
def getEdges(self, *teleps):
last = len(self.info) - 1
prev_tele = self.info.loc[0, 'tele']
prev_mile = 0
it = self.info.iterrows()
next(it)
for index, station in it:
if not station.conn and index not in (0, last) \
and station.tele not in teleps:
continue
tele = station.tele
mile = station.mile
yield (prev_tele, tele, self.line, mile - prev_mile)
prev_tele, prev_mile = tele, mile
@classmethod
def readLine(cls, filename, line, crs=False):
info = pd.read_excel(filename, header=1, usecols='B,C,D,H,J')
info.drop(len(info) - 1, inplace=True)
assert len(info) > 1
info.columns = ['name', 'pinyin', 'tele', 'mile', 'conn']
info.mile = info.mile.apply(lambda x: int(x[:-2].replace(',', '')))
info.conn = info.conn.apply(lambda x: x[0] == 'Y')
infos = info[['name', 'pinyin', 'tele', 'conn']]
infol = info[['tele', 'mile', 'conn']]
if crs:
return cls(line, len(infol), infol), \
CRStations(infos, line)
else:
return cls(line, len(infol), infol)
class CRNetwork:
def __init__(self, network, stations):
self.network = network
self.stations = stations
@classmethod
def readLines(cls, lines):
network = {}
stations = CRStations()
for line in lines:
crline, crs = CRLine.readLine(line + '.xls', line, crs=True)
network[line] = crline
stations._mergeIn(crs)
stations._genMap()
return cls(network, stations)
@classmethod
def myInit(cls, name, lines=None):
if lines is None:
with open('lines.txt') as f:
lines = f.read().split()
lines.remove('广肇城际线佛肇线')
lines = [line for line in lines if '高速' not in line and '城际' not in line]
ret = cls.readLines(lines)
return ret
if __name__ == '__main__':
# CRLine.readLine('京沪线.xls', '京沪线')
with open('lines.txt') as f:
lines = f.read().split()
lines.remove('广肇城际线佛肇线')
# lines = ['京沪线']
crnet = CRNetwork.readLines(lines)
print(crnet.stations.findPinyin('NTO'))
# print(list(crnet.network['京沪线'].getNodes('-KSH', '-XXX')))
# print(list(crnet.network['京沪线'].getEdges('-KSH', '-XXX')))