max_queue_test.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. //go:build integration
  2. package integration
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "log/slog"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/stretchr/testify/require"
  16. )
  17. func TestMaxQueue(t *testing.T) {
  18. if os.Getenv("OLLAMA_TEST_EXISTING") != "" {
  19. t.Skip("Max Queue test requires spawing a local server so we can adjust the queue size")
  20. return
  21. }
  22. // Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU
  23. // Also note that by default Darwin can't sustain > ~128 connections without adjusting limits
  24. threadCount := 32
  25. mq := os.Getenv("OLLAMA_MAX_QUEUE")
  26. if mq != "" {
  27. var err error
  28. threadCount, err = strconv.Atoi(mq)
  29. require.NoError(t, err)
  30. } else {
  31. os.Setenv("OLLAMA_MAX_QUEUE", fmt.Sprintf("%d", threadCount))
  32. }
  33. req := api.GenerateRequest{
  34. Model: "orca-mini",
  35. Prompt: "write a long historical fiction story about christopher columbus. use at least 10 facts from his actual journey",
  36. Options: map[string]interface{}{
  37. "seed": 42,
  38. "temperature": 0.0,
  39. },
  40. }
  41. resp := []string{"explore", "discover", "ocean"}
  42. // CPU mode takes much longer at the limit with a large queue setting
  43. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  44. defer cancel()
  45. client, _, cleanup := InitServerConnection(ctx, t)
  46. defer cleanup()
  47. require.NoError(t, PullIfMissing(ctx, client, req.Model))
  48. // Context for the worker threads so we can shut them down
  49. // embedCtx, embedCancel := context.WithCancel(ctx)
  50. embedCtx := ctx
  51. var genwg sync.WaitGroup
  52. go func() {
  53. genwg.Add(1)
  54. defer genwg.Done()
  55. slog.Info("Starting generate request")
  56. DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second)
  57. slog.Info("generate completed")
  58. }()
  59. // Give the generate a chance to get started before we start hammering on embed requests
  60. time.Sleep(5 * time.Millisecond)
  61. threadCount += 10 // Add a few extra to ensure we push the queue past its limit
  62. busyCount := 0
  63. resetByPeerCount := 0
  64. canceledCount := 0
  65. succesCount := 0
  66. counterMu := sync.Mutex{}
  67. var embedwg sync.WaitGroup
  68. for i := 0; i < threadCount; i++ {
  69. go func(i int) {
  70. embedwg.Add(1)
  71. defer embedwg.Done()
  72. slog.Info("embed started", "id", i)
  73. embedReq := api.EmbeddingRequest{
  74. Model: req.Model,
  75. Prompt: req.Prompt,
  76. Options: req.Options,
  77. }
  78. // Fresh client for every request
  79. client, _ = GetTestEndpoint()
  80. resp, genErr := client.Embeddings(embedCtx, &embedReq)
  81. counterMu.Lock()
  82. defer counterMu.Unlock()
  83. switch {
  84. case genErr == nil:
  85. succesCount++
  86. require.Greater(t, len(resp.Embedding), 5) // somewhat arbitrary, but sufficient to be reasonable
  87. case errors.Is(genErr, context.Canceled):
  88. canceledCount++
  89. case strings.Contains(genErr.Error(), "busy"):
  90. busyCount++
  91. case strings.Contains(genErr.Error(), "connection reset by peer"):
  92. resetByPeerCount++
  93. default:
  94. require.NoError(t, genErr, "%d request failed", i)
  95. }
  96. slog.Info("embed finished", "id", i)
  97. }(i)
  98. }
  99. genwg.Wait()
  100. slog.Info("generate done, waiting for embeds")
  101. embedwg.Wait()
  102. slog.Info("embeds completed", "success", succesCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount)
  103. require.Equal(t, resetByPeerCount, 0, "Connections reset by peer, have you updated your fd and socket limits?")
  104. require.True(t, busyCount > 0, "no requests hit busy error but some should have")
  105. require.True(t, canceledCount == 0, "no requests should have been canceled due to timeout")
  106. }