123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- //go:build integration
- package integration
- import (
- "context"
- "errors"
- "fmt"
- "log/slog"
- "os"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/ollama/ollama/api"
- "github.com/stretchr/testify/require"
- )
- func TestMaxQueue(t *testing.T) {
- if os.Getenv("OLLAMA_TEST_EXISTING") != "" {
- t.Skip("Max Queue test requires spawing a local server so we can adjust the queue size")
- return
- }
- // Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU
- // Also note that by default Darwin can't sustain > ~128 connections without adjusting limits
- threadCount := 32
- mq := os.Getenv("OLLAMA_MAX_QUEUE")
- if mq != "" {
- var err error
- threadCount, err = strconv.Atoi(mq)
- require.NoError(t, err)
- } else {
- os.Setenv("OLLAMA_MAX_QUEUE", fmt.Sprintf("%d", threadCount))
- }
- req := api.GenerateRequest{
- Model: "orca-mini",
- Prompt: "write a long historical fiction story about christopher columbus. use at least 10 facts from his actual journey",
- Options: map[string]interface{}{
- "seed": 42,
- "temperature": 0.0,
- },
- }
- resp := []string{"explore", "discover", "ocean"}
- // CPU mode takes much longer at the limit with a large queue setting
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
- defer cancel()
- client, _, cleanup := InitServerConnection(ctx, t)
- defer cleanup()
- require.NoError(t, PullIfMissing(ctx, client, req.Model))
- // Context for the worker threads so we can shut them down
- // embedCtx, embedCancel := context.WithCancel(ctx)
- embedCtx := ctx
- var genwg sync.WaitGroup
- go func() {
- genwg.Add(1)
- defer genwg.Done()
- slog.Info("Starting generate request")
- DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second)
- slog.Info("generate completed")
- }()
- // Give the generate a chance to get started before we start hammering on embed requests
- time.Sleep(5 * time.Millisecond)
- threadCount += 10 // Add a few extra to ensure we push the queue past its limit
- busyCount := 0
- resetByPeerCount := 0
- canceledCount := 0
- succesCount := 0
- counterMu := sync.Mutex{}
- var embedwg sync.WaitGroup
- for i := 0; i < threadCount; i++ {
- go func(i int) {
- embedwg.Add(1)
- defer embedwg.Done()
- slog.Info("embed started", "id", i)
- embedReq := api.EmbeddingRequest{
- Model: req.Model,
- Prompt: req.Prompt,
- Options: req.Options,
- }
- // Fresh client for every request
- client, _ = GetTestEndpoint()
- resp, genErr := client.Embeddings(embedCtx, &embedReq)
- counterMu.Lock()
- defer counterMu.Unlock()
- switch {
- case genErr == nil:
- succesCount++
- require.Greater(t, len(resp.Embedding), 5) // somewhat arbitrary, but sufficient to be reasonable
- case errors.Is(genErr, context.Canceled):
- canceledCount++
- case strings.Contains(genErr.Error(), "busy"):
- busyCount++
- case strings.Contains(genErr.Error(), "connection reset by peer"):
- resetByPeerCount++
- default:
- require.NoError(t, genErr, "%d request failed", i)
- }
- slog.Info("embed finished", "id", i)
- }(i)
- }
- genwg.Wait()
- slog.Info("generate done, waiting for embeds")
- embedwg.Wait()
- slog.Info("embeds completed", "success", succesCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount)
- require.Equal(t, resetByPeerCount, 0, "Connections reset by peer, have you updated your fd and socket limits?")
- require.True(t, busyCount > 0, "no requests hit busy error but some should have")
- require.True(t, canceledCount == 0, "no requests should have been canceled due to timeout")
- }
|