Skip to content

Commit 00596ea

Browse files
committed
azsqs: fix retrying
1 parent 7d45485 commit 00596ea

2 files changed

Lines changed: 30 additions & 22 deletions

File tree

azsqs/queue.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func (q *Queue) addBatch(msgs []*msgqueue.Message) error {
356356

357357
body, err := msg.EncodeArgs()
358358
if err != nil {
359-
internal.Logf("EncodeArgs failed: %s", err)
359+
internal.Logf("azsqs: EncodeArgs failed: %s", err)
360360
continue
361361
}
362362
if body == "" {
@@ -384,20 +384,24 @@ func (q *Queue) addBatch(msgs []*msgqueue.Message) error {
384384

385385
out, err := q.sqs.SendMessageBatch(in)
386386
if err != nil {
387-
internal.Logf("sqs.SendMessageBatch failed: %s", err)
387+
internal.Logf("azsqs: SendMessageBatch failed: %s", err)
388388
return err
389389
}
390390

391391
for _, entry := range out.Failed {
392-
internal.Logf(
393-
"sqs.SendMessageBatch failed with code=%s message=%q",
394-
tos(entry.Code), tos(entry.Message),
395-
)
396392
if entry.SenderFault != nil && *entry.SenderFault {
397-
msg := findMessageById(msgs, tos(entry.Id))
398-
if msg != nil {
399-
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
400-
}
393+
internal.Logf(
394+
"azsqs: SendMessageBatch failed with code=%s message=%q",
395+
tos(entry.Code), tos(entry.Message),
396+
)
397+
continue
398+
}
399+
400+
msg := findMessageById(msgs, tos(entry.Id))
401+
if msg != nil {
402+
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
403+
} else {
404+
internal.Logf("azsqs: can't find message with id=%s", tos(entry.Id))
401405
}
402406
}
403407

@@ -412,13 +416,13 @@ func (q *Queue) splitAddBatch(msgs []*msgqueue.Message) ([]*msgqueue.Message, []
412416
for i, msg := range msgs {
413417
msg, err := msgutil.UnwrapMessage(msg)
414418
if err != nil {
415-
internal.Logf(err.Error())
419+
internal.Logf("azsqs: UnwrapMessage failed: %s", err)
416420
continue
417421
}
418422

419423
body, err := msg.EncodeArgs()
420424
if err != nil {
421-
internal.Logf("EncodeArgs failed: %s", err)
425+
internal.Logf("azsqs: EncodeArgs failed: %s", err)
422426
continue
423427
}
424428

@@ -465,20 +469,24 @@ func (q *Queue) deleteBatch(msgs []*msgqueue.Message) error {
465469
}
466470
out, err := q.sqs.DeleteMessageBatch(in)
467471
if err != nil {
468-
internal.Logf("sqs.DeleteMessageBatch failed: %s", err)
472+
internal.Logf("azsqs: DeleteMessageBatch failed: %s", err)
469473
return err
470474
}
471475

472476
for _, entry := range out.Failed {
473-
internal.Logf(
474-
"sqs.DeleteMessageBatch failed with code=%s message=%q",
475-
tos(entry.Code), tos(entry.Message),
476-
)
477477
if entry.SenderFault != nil && *entry.SenderFault {
478-
msg := findMessageById(msgs, tos(entry.Id))
479-
if msg != nil {
480-
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
481-
}
478+
internal.Logf(
479+
"azsqs: DeleteMessageBatch failed with code=%s message=%q",
480+
tos(entry.Code), tos(entry.Message),
481+
)
482+
continue
483+
}
484+
485+
msg := findMessageById(msgs, tos(entry.Id))
486+
if msg != nil {
487+
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
488+
} else {
489+
internal.Logf("azsqs: can't find message with id=%s", tos(entry.Id))
482490
}
483491
}
484492
return nil

ironmq/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func (q *Queue) deleteBatch(msgs []*msgqueue.Message) error {
305305
return q.q.DeleteReservedMessages(mqMsgs)
306306
})
307307
if err != nil {
308-
internal.Logf("mq.DeleteReservedMessages failed: %s", err)
308+
internal.Logf("ironmq: DeleteReservedMessages failed: %s", err)
309309
return err
310310
}
311311

0 commit comments

Comments
 (0)