Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High response load #2 #195

Open
egorkovalchuk opened this issue Mar 28, 2024 · 1 comment
Open

High response load #2 #195

egorkovalchuk opened this issue Mar 28, 2024 · 1 comment

Comments

@egorkovalchuk
Copy link

egorkovalchuk commented Mar 28, 2024

Hello

I put a high load on the diameter server, the load goes away, but some of the responses disappear. Him sent, but not processed. Most likely, more than one response is placed in the network buffer, but a few.

Speed send 4000 op/s

what information will you need?

I use an application that sends generated requests to two diameter servers. At low request rates. everything works correctly. When the requests increase, I see that the application does not parse all the answers. At the same time, tkpdump shows that these responses were sent by the servers.

my project https://github.com/egorkovalchuk/go-cdrgenerator/
add you

`
func StartDiameterClient() {
var brt_adress []datatype.Address
for _, ip := range global_cfg.Common.BRT {
brt_adress = append(brt_adress, datatype.Address(net.ParseIP(ip)))
}

//прописываем конфиг
ProcessDebug("Load Diameter config")

diam_cfg := &sm.Settings{
	OriginHost:       datatype.DiameterIdentity(global_cfg.Common.BRT_OriginHost),
	OriginRealm:      datatype.DiameterIdentity(global_cfg.Common.BRT_OriginRealm),
	VendorID:         data.PETER_SERVICE_VENDOR_ID,
	ProductName:      "CDR-generator",
	OriginStateID:    datatype.Unsigned32(time.Now().Unix()),
	FirmwareRevision: 1,
	HostIPAddresses:  brt_adress,
}

// Create the state machine (it's a diam.ServeMux) and client.
mux := sm.New(diam_cfg)
ProcessDebug(mux.Settings())

ProcessDebug("Load Diameter dictionary")
ProcessDebug("Load Diameter client")

// Инициализация конфига клиента
cli := data.Client(mux)

// Set message handlers.
// Можно использовать канал AnswerCCAEvent(BrtDiamChannelAnswer)
mux.Handle("CCA", AnswerCCAEvent())
mux.Handle("DWA", AnswerDWAEvent())

// Запуск потока записи ошибок в лог
go DiamPrintErrors(mux.ErrorReports())
//KeepAlive WTF??
cli.EnableWatchdog = false //true

chk := 0
ProcessDiam("Connecting clients...")
for _, init_connect := range global_cfg.Common.BRT {
	ProcessDebug(init_connect)

	var err error

	brt_connect, err := Dial(cli, init_connect+":"+strconv.Itoa(global_cfg.Common.BRT_port), "", "", false, "tcp")
	if err != nil {
		ProcessError("Connect error ")
		ProcessError(err)
	} else {
		ProcessDebug("Connect to " + init_connect + " done.")
		// Запуск потоков записи по БРТ
		// Отмеаем что клиент запущен
		chk++
		go SendCCREvent(brt_connect, diam_cfg, BrtDiamChannel)
	}
}
// Проверка что клиент запущен
if chk > 0 {
	ProcessDiam("Done. Sending messages...")
} else {
	ProcessDiam("Stopping the client's diameter. No connection is initialized")
	brt = false
}

}
// Кусок для диаметра
// Определение шифрование соединения
func Dial(cli *sm.Client, addr, cert, key string, ssl bool, networkType string) (diam.Conn, error) {
if ssl {
return cli.DialNetworkTLS(networkType, addr, cert, key, nil)
}
return cli.DialNetwork(networkType, addr)
}
// Обработчик-ответа Диаметра
func AnswerCCAEvent() diam.HandlerFunc {
//func AnswerCCAEvent(done chan struct{}) diam.HandlerFunc {
return func(c diam.Conn, m diam.Message) {
// обработчик ошибок, добавить поток(канал) для офлайна?
// Конкуренция по ответам, запись в фаил?
s, sid := data.ResponseDiamHandler(m, ProcessDiam, debugm)
CDRDiamResponseCount.Inc(strconv.Itoa(s))
if s == 4011 || s == 4522 || s == 4012 {
//logdiam.Println("DIAM: Answer CCA code: " + strconv.Itoa(s) + " Session: " + sid)
//переход в оффлайн
val := BrtOfflineCDR.Load(sid) //BrtOfflineCDR[sid]

rr, err := data.CreateCDRRecord(val.RecPool, val.CDRtime, val.Ratio, CDRPatternTask[val.TaskName])
if err != nil {
LogChannel <- LogStruct{"ERROR", err}
} else {
CDRChanneltoFileUni[val.TaskName] <- rr
}
BrtOfflineCDR.Delete(sid)
} else if s == 5030 {
// 5030 пользователь не известен
BrtOfflineCDR.Delete(sid)
} else {
//logdiam.Println("DIAM: Answer CCA code: " + strconv.Itoa(s))
BrtOfflineCDR.Delete(sid)
}

}

}
func AnswerDWAEvent() diam.HandlerFunc {
return func(c diam.Conn, m *diam.Message) {
//обработчик ошибок, вотч дог пишем в обычный лог
s, _ := data.ResponseDiamHandler(m, ProcessDiam, debugm)
ProcessDiam("Answer DWA code: " + strconv.Itoa(s))
}
}
// Горутина приема и записи сообщения по диаметру в брт
func SendCCREvent(c diam.Conn, cfg *sm.Settings, in chan data.DiamCH) {

var err error
server, _, _ := strings.Cut(c.RemoteAddr().String(), ":")
// на подумать, использовать структуру, а потом ее определять или сазу передавать готовое сообщение
// заменить на просто вывод в лог
defer c.Close()

heartbeat := time.Tick(5 * time.Second)
meta, ok := smpeer.FromContext(c.Context())
if !ok {
	ProcessDiam("Client connection does not contain metadata")
	ProcessDiam("Close threads")
}

for {
	select {
	case <-heartbeat:
		// Сделать выход или переоткрытие?
		meta, ok = smpeer.FromContext(c.Context())
		if !ok {
			ProcessDiam("Client connection does not contain metadata")
			ProcessDiam("Close threads")
		}

		// Настройка Watch Dog
		m := diam.NewRequest(280, 4, dict.Default)
		m.NewAVP(avp.OriginHost, avp.Mbit, 0, cfg.OriginHost)
		m.NewAVP(avp.OriginRealm, avp.Mbit, 0, cfg.OriginRealm)
		m.NewAVP(avp.OriginStateID, avp.Mbit, 0, cfg.OriginStateID)
		log.Printf("DIAM: Sending DWR to %s", c.RemoteAddr())
		_, err = m.WriteTo(c)
		if err != nil {
			LogChannel <- LogStruct{"ERROR", err}
		}

	case tmp := <-in:

		diam_message := tmp.Message
		//diam_message := data.CreateCCREventMessage(dict.Default)
		diam_message.NewAVP(avp.OriginHost, avp.Mbit, 0, cfg.OriginHost)
		diam_message.NewAVP(avp.OriginRealm, avp.Mbit, 0, cfg.OriginRealm)
		diam_message.NewAVP(avp.DestinationRealm, avp.Mbit, 0, meta.OriginRealm)
		diam_message.NewAVP(avp.DestinationHost, avp.Mbit, 0, meta.OriginHost)

		/*logdiam.Printf("DIAM: Sending CCR to %s", c.RemoteAddr())
		logdiam.Println(diam_message)*/

		_, err = diam_message.WriteTo(c)
		if err != nil {
			LogChannel <- LogStruct{"ERROR", err}
		} else {
			CDRDiamCount.Inc(server)
		}

	default:

	}
}

}`

@egorkovalchuk
Copy link
Author

Hi

added diagnostics. At some point, responses from one of the two servers stop being processed.

what is the best way to use the machine, for each connection its own or a common one as in your example

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant