forked from hunkim/PyTorchZeroToAll
-
Notifications
You must be signed in to change notification settings - Fork 2
/
seq2seq_models.py
143 lines (107 loc) · 5.06 KB
/
seq2seq_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Original code from
# https://github.com/spro/practical-pytorch/blob/master/seq2seq-translation/seq2seq-translation.ipynb
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
MAX_LENGTH = 100
SOS_token = chr(0)
EOS_token = 1
# Helper function to create Variable based on
# the cuda availability
def cuda_variable(tensor):
# Do cuda() before wrapping with variable
if torch.cuda.is_available():
return Variable(tensor.cuda())
else:
return Variable(tensor)
# Sting to char tensor
def str2tensor(msg, eos=False):
tensor = [ord(c) for c in msg]
if eos:
tensor.append(EOS_token)
return cuda_variable(torch.LongTensor(tensor))
# To demonstrate seq2seq, We don't handle batch in the code,
# and our encoder runs this one step at a time
# It's extremely slow, and please do not use in practice.
# We need to use (1) batch and (2) data parallelism
# http://pytorch.org/tutorials/beginner/former_torchies/parallelism_tutorial.html.
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size, n_layers=1):
self.hidden_size = hidden_size
self.n_layers = n_layers
super(EncoderRNN, self).__init__()
self.embedding = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
def forward(self, word_inputs, hidden):
# Note: we run this all at once (over the whole input sequence)
seq_len = len(word_inputs)
# input shape: S x B (=1) x I (input size)
embedded = self.embedding(word_inputs).view(seq_len, 1, -1)
output, hidden = self.gru(embedded, hidden)
return output, hidden
def init_hidden(self):
# (num_layers * num_directions, batch, hidden_size)
return cuda_variable(torch.zeros(self.n_layers, 1, self.hidden_size))
class DecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, n_layers=1):
super(DecoderRNN, self).__init__()
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, input, hidden):
# input shape: S(=1) x B (=1) x I (input size)
# Note: we run this one step at a time. (Sequence size = 1)
output = self.embedding(input).view(1, 1, -1)
output, hidden = self.gru(output, hidden)
output = self.out(output[0])
# No need softmax, since we are using CrossEntropyLoss
return output, hidden
def init_hidden(self):
# (num_layers * num_directions, batch, hidden_size)
return cuda_variable(torch.zeros(self.n_layers, 1, self.hidden_size))
class AttnDecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, n_layers=1, dropout_p=0.1):
super(AttnDecoderRNN, self).__init__()
# Linear for attention
self.attn = nn.Linear(hidden_size, hidden_size)
# Define layers
self.embedding = nn.Embedding(output_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size,
n_layers, dropout=dropout_p)
self.out = nn.Linear(hidden_size * 2, output_size)
def forward(self, word_input, last_hidden, encoder_hiddens):
# Note: we run this one step (S=1) at a time
# Get the embedding of the current input word (last output word)
rnn_input = self.embedding(word_input).view(1, 1, -1) # S=1 x B x I
rnn_output, hidden = self.gru(rnn_input, last_hidden)
# Calculate attention from current RNN state and all encoder outputs;
# apply to encoder outputs
attn_weights = self.get_att_weight(
rnn_output.squeeze(0), encoder_hiddens)
context = attn_weights.bmm(
encoder_hiddens.transpose(0, 1)) # B x S(=1) x I
# Final output layer (next word prediction) using the RNN hidden state
# and context vector
rnn_output = rnn_output.squeeze(0) # S(=1) x B x I -> B x I
context = context.squeeze(1) # B x S(=1) x I -> B x I
output = self.out(torch.cat((rnn_output, context), 1))
# Return final output, hidden state, and attention weights (for
# visualization)
return output, hidden, attn_weights
def get_att_weight(self, hidden, encoder_hiddens):
seq_len = len(encoder_hiddens)
# Create variable to store attention energies
attn_scores = cuda_variable(torch.zeros(seq_len)) # B x 1 x S
# Calculate energies for each encoder hidden
for i in range(seq_len):
attn_scores[i] = self.get_att_score(hidden, encoder_hiddens[i])
# Normalize scores to weights in range 0 to 1,
# resize to 1 x 1 x seq_len
# print("att_scores", attn_scores.size())
return F.softmax(attn_scores).view(1, 1, -1)
# score = h^T W h^e = h dot (W h^e)
# TODO: We need to implement different score models
def get_att_score(self, hidden, encoder_hidden):
score = self.attn(encoder_hidden)
return torch.dot(hidden.view(-1), score.view(-1))