Skip to content

Commit

Permalink
part 3
Browse files Browse the repository at this point in the history
  • Loading branch information
AmarnathCJD committed Nov 29, 2024
1 parent 109fc63 commit 88dda88
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
35 changes: 16 additions & 19 deletions mtproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ type MTProto struct {
proxy *url.URL
transport transport.Transport

ctxCancel []context.CancelFunc
ctxCancel context.CancelFunc
routineswg sync.WaitGroup
memorySession bool
tcpActive bool
tcpActive atomic.Bool
timeOffset int64
mode mode.Variant

Expand Down Expand Up @@ -121,11 +121,9 @@ func NewMTProto(c Config) (*MTProto, error) {
return nil, errors.Wrap(err, "loading session")
}
}

if c.Logger == nil {
c.Logger = utils.NewLogger("gogram [mtproto]").SetLevel(utils.InfoLevel)
}

mtproto := &MTProto{
sessionStorage: c.SessionStorage,
Addr: c.ServerHost,
Expand Down Expand Up @@ -332,7 +330,7 @@ func (m *MTProto) CreateConnection(withLog bool) error {
m.stopRoutines()

ctx, cancelfunc := context.WithCancel(context.Background())
m.ctxCancel = append(m.ctxCancel, cancelfunc)
m.ctxCancel = cancelfunc
if withLog {
m.Logger.Info(fmt.Sprintf("connecting to [%s] - <%s> ...", utils.FmtIp(m.Addr), utils.Vtcp(m.IpV6)))
} else {
Expand All @@ -343,7 +341,7 @@ func (m *MTProto) CreateConnection(withLog bool) error {
m.Logger.Error(errors.Wrap(err, "creating connection"))
return err
}
m.tcpActive = true
m.tcpActive.Store(true)
if withLog {
if m.proxy != nil && m.proxy.Host != "" {
m.Logger.Info(fmt.Sprintf("connection to (~%s)[%s] - <%s> established", utils.FmtIp(m.proxy.Host), m.Addr, utils.Vtcp(m.IpV6)))
Expand Down Expand Up @@ -398,11 +396,12 @@ func (m *MTProto) connect(ctx context.Context) error {

func (m *MTProto) makeRequest(data tl.Object, expectedTypes ...reflect.Type) (any, error) {
if !m.TcpActive() {
_ = m.CreateConnection(false)
time.Sleep(20 * time.Millisecond)
}

resp, _, err := m.sendPacket(data, expectedTypes...)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "transport is closed") {
if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "transport is closed") || strings.Contains(err.Error(), "connection was forcibly closed") {
m.Logger.Info("connection closed due to broken tcp, reconnecting to [" + m.Addr + "]" + " - <Tcp> ...")
err = m.Reconnect(false)
if err != nil {
Expand Down Expand Up @@ -433,9 +432,10 @@ func (m *MTProto) makeRequest(data tl.Object, expectedTypes ...reflect.Type) (an
}

func (m *MTProto) makeRequestCtx(ctx context.Context, data tl.Object, expectedTypes ...reflect.Type) (any, error) {
if !m.TcpActive() {
_ = m.CreateConnection(false)
for !m.TcpActive() {
time.Sleep(20 * time.Millisecond)
}

resp, msgId, err := m.sendPacket(data, expectedTypes...)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "transport is closed") {
Expand Down Expand Up @@ -483,21 +483,18 @@ func (m *MTProto) InvokeRequestWithoutUpdate(data tl.Object, expectedTypes ...re
}

func (m *MTProto) TcpActive() bool {
return m.tcpActive
return m.tcpActive.Load()
}

func (m *MTProto) stopRoutines() {
newCtxCancel := m.ctxCancel[:0]
for _, f := range m.ctxCancel {
f() // Call the function
if m.ctxCancel != nil {
m.ctxCancel()
}

m.ctxCancel = newCtxCancel
}

func (m *MTProto) Disconnect() error {
m.tcpActive.Store(false)
m.stopRoutines()
m.tcpActive = false

return nil
}
Expand All @@ -506,7 +503,7 @@ func (m *MTProto) Terminate() error {
m.stopRoutines()
m.responseChannels.Close()
m.transport.Close()
m.tcpActive = false
m.tcpActive.Store(false)
return nil
}

Expand Down Expand Up @@ -567,7 +564,7 @@ func (m *MTProto) startReadingResponses(ctx context.Context) {
case <-ctx.Done():
return
default:
if !m.tcpActive {
if !m.tcpActive.Load() {
m.Logger.Warn("connection is not established with, stopping Updates Queue")
return
}
Expand Down
15 changes: 12 additions & 3 deletions telegram/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,11 +639,20 @@ func (c *Client) GetChatJoinRequests(channelID any, lim int, query ...string) ([
break
}

c.Cache.UpdatePeersToCache(chatInviteImporters.Users, []Chat{})
var users = chatInviteImporters.Users

//c.Cache.UpdatePeersToCache(chatInviteImporters.Users, []Chat{})
// Add all UserObj objects to allUsers slice
for _, user := range chatInviteImporters.Importers {
userObj, err := c.GetUser(user.UserID)
if err != nil {
var userObj *UserObj
for _, u := range users {
if u, ok := u.(*UserObj); ok && u.ID == user.UserID {
userObj = u
break
}
}

if userObj == nil {
userObj = &UserObj{ID: user.UserID}
}
u := &JoinRequest{
Expand Down
6 changes: 6 additions & 0 deletions telegram/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ func (c *Client) DownloadMedia(file any, Opts ...*DownloadOptions) (string, erro
}
wg.Wait()

retrySinglePart:
for _, p := range getUndoneParts(&doneArray, int(totalParts)) {
wg.Add(1)
sem <- struct{}{}
Expand Down Expand Up @@ -579,6 +580,7 @@ func (c *Client) DownloadMedia(file any, Opts ...*DownloadOptions) (string, erro
c.Log.Debug("seq-downloaded part ", p, "/", totalParts, " len: ", len(v.Bytes)/1024, "KB")
fs.WriteAt(v.Bytes, int64(p)*int64(partSize))
doneBytes.Add(int64(len(v.Bytes)))
doneArray.Store(p, true)
case *UploadFileCdnRedirect:
panic("cdn redirect not implemented") // TODO
case nil:
Expand All @@ -591,6 +593,10 @@ func (c *Client) DownloadMedia(file any, Opts ...*DownloadOptions) (string, erro
}(p)
}

if len(getUndoneParts(&doneArray, int(totalParts))) > 0 { // Loop through failed parts
goto retrySinglePart
}

wg.Wait()
close(sem)
close(progressTicker)
Expand Down

0 comments on commit 88dda88

Please sign in to comment.