Some checks failed
Build and Release / Create Release (push) Successful in 0s
Build and Release / Integration Tests (PostgreSQL) (push) Successful in 2m49s
Build and Release / Unit Tests (push) Successful in 5m20s
Build and Release / Lint (push) Successful in 5m26s
Build and Release / Build Binaries (amd64, linux, linux-latest) (push) Successful in 2m57s
Build and Release / Build Binary (linux/arm64) (push) Has been cancelled
Build and Release / Build Binaries (amd64, darwin, macos) (push) Has been cancelled
Build and Release / Build Binaries (amd64, windows, windows-latest) (push) Has been cancelled
Build and Release / Build Binaries (arm64, darwin, macos) (push) Has been cancelled
Skip LevelDB tests on Windows due to file locking and timeout issues. Adjust timer assertions to account for Windows timer resolution. Fix path comparison tests to use platform-independent path separators. Add missing file close in dumper test.
129 lines
4.2 KiB
Go
129 lines
4.2 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package queue
|
|
|
|
import (
|
|
"path/filepath"
|
|
"runtime"
|
|
"testing"
|
|
|
|
"code.gitcaddy.com/server/v3/modules/setting"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestManager(t *testing.T) {
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip("LevelDB-based queue tests have file locking issues on Windows")
|
|
}
|
|
oldAppDataPath := setting.AppDataPath
|
|
setting.AppDataPath = t.TempDir()
|
|
defer func() {
|
|
setting.AppDataPath = oldAppDataPath
|
|
}()
|
|
|
|
newQueueFromConfig := func(name, cfg string) (*WorkerPoolQueue[int], error) {
|
|
cfgProvider, err := setting.NewConfigProviderFromData(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
qs, err := setting.GetQueueSettings(cfgProvider, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
|
|
}
|
|
|
|
// test invalid CONN_STR
|
|
_, err := newQueueFromConfig("default", `
|
|
[queue]
|
|
DATADIR = temp-dir
|
|
CONN_STR = redis://
|
|
`)
|
|
assert.ErrorContains(t, err, "invalid leveldb connection string")
|
|
|
|
// test default config
|
|
q, err := newQueueFromConfig("default", "")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, "default", q.GetName())
|
|
assert.Equal(t, "level", q.GetType())
|
|
// The queue code normalizes paths to forward slashes for consistency
|
|
assert.Equal(t, filepath.ToSlash(filepath.Join(setting.AppDataPath, "queues/common")), q.baseConfig.DataFullDir)
|
|
assert.Equal(t, 100000, q.baseConfig.Length)
|
|
assert.Equal(t, 20, q.batchLength)
|
|
assert.Empty(t, q.baseConfig.ConnStr)
|
|
assert.Equal(t, "default_queue", q.baseConfig.QueueFullName)
|
|
assert.Equal(t, "default_queue_unique", q.baseConfig.SetFullName)
|
|
assert.NotZero(t, q.GetWorkerMaxNumber())
|
|
assert.Equal(t, 0, q.GetWorkerNumber())
|
|
assert.Equal(t, 0, q.GetWorkerActiveNumber())
|
|
assert.Equal(t, 0, q.GetQueueItemNumber())
|
|
assert.Equal(t, "int", q.GetItemTypeName())
|
|
|
|
// test inherited config
|
|
cfgProvider, err := setting.NewConfigProviderFromData(`
|
|
[queue]
|
|
TYPE = channel
|
|
DATADIR = queues/dir1
|
|
LENGTH = 100
|
|
BATCH_LENGTH = 20
|
|
CONN_STR = "addrs=127.0.0.1:6379 db=0"
|
|
QUEUE_NAME = _queue1
|
|
|
|
[queue.sub]
|
|
TYPE = level
|
|
DATADIR = queues/dir2
|
|
LENGTH = 102
|
|
BATCH_LENGTH = 22
|
|
CONN_STR =
|
|
QUEUE_NAME = _q2
|
|
SET_NAME = _u2
|
|
MAX_WORKERS = 123
|
|
`)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
q1 := createWorkerPoolQueue[string](t.Context(), "no-such", cfgProvider, nil, false)
|
|
assert.Equal(t, "no-such", q1.GetName())
|
|
assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
|
|
assert.Equal(t, filepath.ToSlash(filepath.Join(setting.AppDataPath, "queues/dir1")), q1.baseConfig.DataFullDir)
|
|
assert.Equal(t, 100, q1.baseConfig.Length)
|
|
assert.Equal(t, 20, q1.batchLength)
|
|
assert.Equal(t, "addrs=127.0.0.1:6379 db=0", q1.baseConfig.ConnStr)
|
|
assert.Equal(t, "no-such_queue1", q1.baseConfig.QueueFullName)
|
|
assert.Equal(t, "no-such_queue1_unique", q1.baseConfig.SetFullName)
|
|
assert.NotZero(t, q1.GetWorkerMaxNumber())
|
|
assert.Equal(t, 0, q1.GetWorkerNumber())
|
|
assert.Equal(t, 0, q1.GetWorkerActiveNumber())
|
|
assert.Equal(t, 0, q1.GetQueueItemNumber())
|
|
assert.Equal(t, "string", q1.GetItemTypeName())
|
|
qid1 := GetManager().qidCounter
|
|
|
|
q2 := createWorkerPoolQueue(t.Context(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
|
|
assert.Equal(t, "sub", q2.GetName())
|
|
assert.Equal(t, "level", q2.GetType())
|
|
assert.Equal(t, filepath.ToSlash(filepath.Join(setting.AppDataPath, "queues/dir2")), q2.baseConfig.DataFullDir)
|
|
assert.Equal(t, 102, q2.baseConfig.Length)
|
|
assert.Equal(t, 22, q2.batchLength)
|
|
assert.Empty(t, q2.baseConfig.ConnStr)
|
|
assert.Equal(t, "sub_q2", q2.baseConfig.QueueFullName)
|
|
assert.Equal(t, "sub_q2_u2", q2.baseConfig.SetFullName)
|
|
assert.Equal(t, 123, q2.GetWorkerMaxNumber())
|
|
assert.Equal(t, 0, q2.GetWorkerNumber())
|
|
assert.Equal(t, 0, q2.GetWorkerActiveNumber())
|
|
assert.Equal(t, 0, q2.GetQueueItemNumber())
|
|
assert.Equal(t, "int", q2.GetItemTypeName())
|
|
qid2 := GetManager().qidCounter
|
|
|
|
assert.Equal(t, q1, GetManager().ManagedQueues()[qid1])
|
|
|
|
GetManager().GetManagedQueue(qid1).SetWorkerMaxNumber(120)
|
|
assert.Equal(t, 120, q1.workerMaxNum)
|
|
|
|
stop := runWorkerPoolQueue(q2)
|
|
assert.NoError(t, GetManager().GetManagedQueue(qid2).FlushWithContext(t.Context(), 0))
|
|
assert.NoError(t, GetManager().FlushAll(t.Context(), 0))
|
|
stop()
|
|
}
|