sqs_test.go 11 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "log"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/aws/aws-sdk-go-v2/aws"
  13. "github.com/aws/aws-sdk-go-v2/service/sqs"
  14. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  15. "github.com/golang/mock/gomock"
  16. . "github.com/smartystreets/goconvey/convey"
  17. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
  18. )
  19. func TestSQS(t *testing.T) {
  20. Convey("Context", t, func() {
  21. buf := new(bytes.Buffer)
  22. ipcCtx := NewBrokerContext(log.New(buf, "", 0), "", "")
  23. i := &IPC{ipcCtx}
  24. Convey("Responds to SQS client offers...", func() {
  25. ctrl := gomock.NewController(t)
  26. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  27. brokerSQSQueueName := "example-name"
  28. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  29. runSQSHandler := func(sqsHandlerContext context.Context) {
  30. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  31. QueueName: aws.String(brokerSQSQueueName),
  32. Attributes: map[string]string{
  33. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  34. },
  35. }).Return(&sqs.CreateQueueOutput{
  36. QueueUrl: responseQueueURL,
  37. }, nil).Times(1)
  38. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  39. So(err, ShouldBeNil)
  40. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  41. }
  42. messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
  43. receiptHandle := "fake-receipt-handle"
  44. sqsReceiveMessageInput := sqs.ReceiveMessageInput{
  45. QueueUrl: responseQueueURL,
  46. MaxNumberOfMessages: 10,
  47. WaitTimeSeconds: 15,
  48. MessageAttributeNames: []string{
  49. string(types.QueueAttributeNameAll),
  50. },
  51. }
  52. sqsDeleteMessageInput := sqs.DeleteMessageInput{
  53. QueueUrl: responseQueueURL,
  54. ReceiptHandle: &receiptHandle,
  55. }
  56. Convey("by ignoring it if no client id specified", func(c C) {
  57. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  58. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
  59. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  60. return &sqs.ReceiveMessageOutput{
  61. Messages: []types.Message{
  62. {
  63. Body: messageBody,
  64. ReceiptHandle: &receiptHandle,
  65. },
  66. },
  67. }, nil
  68. },
  69. )
  70. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(1).Do(
  71. func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
  72. sqsCancelFunc()
  73. },
  74. )
  75. // We expect no queues to be created
  76. mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
  77. runSQSHandler(sqsHandlerContext)
  78. <-sqsHandlerContext.Done()
  79. })
  80. Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) {
  81. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  82. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
  83. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  84. sqsCancelFunc()
  85. return nil, errors.New("error")
  86. },
  87. )
  88. // We expect no queues to be created or deleted
  89. mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
  90. mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).Times(0)
  91. runSQSHandler(sqsHandlerContext)
  92. <-sqsHandlerContext.Done()
  93. })
  94. Convey("by attempting to create a new sqs queue...", func() {
  95. clientId := "fake-id"
  96. sqsCreateQueueInput := sqs.CreateQueueInput{
  97. QueueName: aws.String("snowflake-client-fake-id"),
  98. }
  99. validMessage := &sqs.ReceiveMessageOutput{
  100. Messages: []types.Message{
  101. {
  102. Body: messageBody,
  103. MessageAttributes: map[string]types.MessageAttributeValue{
  104. "ClientID": {StringValue: &clientId},
  105. },
  106. ReceiptHandle: &receiptHandle,
  107. },
  108. },
  109. }
  110. Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) {
  111. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  112. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
  113. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  114. sqsCancelFunc()
  115. return validMessage, nil
  116. })
  117. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes()
  118. mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
  119. runSQSHandler(sqsHandlerContext)
  120. <-sqsHandlerContext.Done()
  121. })
  122. Convey("and responds with a proxy answer if available.", func(c C) {
  123. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  124. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
  125. func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
  126. go func(c C) {
  127. snowflake := ipcCtx.AddSnowflake("fake", "", NATUnrestricted, 0)
  128. <-snowflake.offerChannel
  129. snowflake.answerChannel <- "fake answer"
  130. }(c)
  131. return validMessage, nil
  132. })
  133. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{
  134. QueueUrl: responseQueueURL,
  135. }, nil).AnyTimes()
  136. mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).AnyTimes()
  137. var numTimes atomic.Uint32
  138. mockSQSClient.EXPECT().SendMessage(sqsHandlerContext, gomock.Any()).MinTimes(1).DoAndReturn(
  139. func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) {
  140. n := numTimes.Add(1)
  141. if n == 1 {
  142. c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}"))
  143. // Ensure that match is correctly recorded in metrics
  144. ipcCtx.metrics.printMetrics()
  145. c.So(buf.String(), ShouldContainSubstring, `client-denied-count 0
  146. client-restricted-denied-count 0
  147. client-unrestricted-denied-count 0
  148. client-snowflake-match-count 8
  149. client-http-count 0
  150. client-http-ips
  151. client-ampcache-count 0
  152. client-ampcache-ips
  153. client-sqs-count 8
  154. client-sqs-ips ??=8
  155. `)
  156. sqsCancelFunc()
  157. }
  158. return &sqs.SendMessageOutput{}, nil
  159. },
  160. )
  161. runSQSHandler(sqsHandlerContext)
  162. <-sqsHandlerContext.Done()
  163. })
  164. })
  165. })
  166. Convey("Cleans up SQS client queues...", func() {
  167. brokerSQSQueueName := "example-name"
  168. responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
  169. ctrl := gomock.NewController(t)
  170. mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
  171. runSQSHandler := func(sqsHandlerContext context.Context) {
  172. mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
  173. QueueName: aws.String(brokerSQSQueueName),
  174. Attributes: map[string]string{
  175. "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
  176. },
  177. }).Return(&sqs.CreateQueueOutput{
  178. QueueUrl: responseQueueURL,
  179. }, nil).Times(1)
  180. mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return(
  181. &sqs.ReceiveMessageOutput{
  182. Messages: []types.Message{},
  183. }, nil,
  184. )
  185. sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
  186. So(err, ShouldBeNil)
  187. // Set the cleanup interval to 1 ns so we can immediately test the cleanup logic
  188. sqsHandler.cleanupInterval = time.Nanosecond
  189. go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
  190. }
  191. Convey("does nothing if there are no open queues.", func() {
  192. var wg sync.WaitGroup
  193. wg.Add(1)
  194. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  195. defer wg.Wait()
  196. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  197. QueueNamePrefix: aws.String("snowflake-client-"),
  198. MaxResults: aws.Int32(1000),
  199. NextToken: nil,
  200. }).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  201. wg.Done()
  202. // Cancel the handler context since we are only interested in testing one iteration of the cleanup
  203. sqsCancelFunc()
  204. return &sqs.ListQueuesOutput{
  205. QueueUrls: []string{},
  206. }, nil
  207. })
  208. runSQSHandler(sqsHandlerContext)
  209. })
  210. Convey("deletes open queue when there is one open queue.", func(c C) {
  211. var wg sync.WaitGroup
  212. wg.Add(1)
  213. sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
  214. clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1"
  215. clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2"
  216. gomock.InOrder(
  217. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  218. QueueNamePrefix: aws.String("snowflake-client-"),
  219. MaxResults: aws.Int32(1000),
  220. NextToken: nil,
  221. }).Times(1).Return(&sqs.ListQueuesOutput{
  222. QueueUrls: []string{
  223. clientQueueUrl1,
  224. clientQueueUrl2,
  225. },
  226. }, nil),
  227. mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
  228. QueueNamePrefix: aws.String("snowflake-client-"),
  229. MaxResults: aws.Int32(1000),
  230. NextToken: nil,
  231. }).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
  232. // Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration
  233. wg.Done()
  234. sqsCancelFunc()
  235. return &sqs.ListQueuesOutput{
  236. QueueUrls: []string{},
  237. }, nil
  238. }),
  239. )
  240. gomock.InOrder(
  241. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  242. QueueUrl: aws.String(clientQueueUrl1),
  243. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  244. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  245. Attributes: map[string]string{
  246. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  247. }}, nil),
  248. mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
  249. QueueUrl: aws.String(clientQueueUrl2),
  250. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
  251. }).Times(1).Return(&sqs.GetQueueAttributesOutput{
  252. Attributes: map[string]string{
  253. string(types.QueueAttributeNameLastModifiedTimestamp): "0",
  254. }}, nil),
  255. )
  256. gomock.InOrder(
  257. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  258. QueueUrl: aws.String(clientQueueUrl1),
  259. }).Return(&sqs.DeleteQueueOutput{}, nil),
  260. mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
  261. QueueUrl: aws.String(clientQueueUrl2),
  262. }).Return(&sqs.DeleteQueueOutput{}, nil),
  263. )
  264. runSQSHandler(sqsHandlerContext)
  265. wg.Wait()
  266. })
  267. })
  268. })
  269. }