Thread overview
Web socket ile veri dinleme
Apr 05, 2018
kerdemdemir
Apr 05, 2018
kerdemdemir
April 05, 2018

Merhaba,

Sorum sadece web socket 'i kısmı ve özellik çok iş parçacıklı(multi threading) ile ilgili olduğundan dolayı diğer kısımları çıkardım.
Tam kod https://github.com/kerdemdemir/DCryptoWrapper/blob/master/source/Client/ClientHelper/BinanceHelper.d 'de görülebilir.

Multi threading çetrefilli bir iş olduğundan aşağıdaki kodun doğruluğunu size sormak istedim. İllede web socket'lerini dinlemek olarak değilde genel bir multi threading kodu olarak düşünebilirsiniz.

Kullanıcıya iki fonksiyon sunuyorum
1 - LaunchSocket = Socket'i hayata getiriyor ve kullanıcıdan verilen bir süre kadar(maxSeconds) socket'i dinliyor. Bir data geldiğinde
callback fonksiyonunu çağırıyor
2- CloseSocket = Dİyelimki kullanıcının socket'den bir beklentisi yok artık zamanından önce socket'i kapatabiliyor.

Unittest 'lerim doğru çalışıyor ama doğruluk acısından acaba benim kaçırdığım veya sizin eklemek istediğiniz bir şeyler olurmu ?

class BinanceHelper
{
	bool LaunchSocket(CallBack)( string name, string txName, string streamName,
								 int maxSeconds, CallBack callBackfunction )
	{
		auto isCreated = InitSocket( name, txName, streamName);
		if ( isCreated )
		{
			auto result = vibe.core.concurrency.async( &CallBackLoop!(CallBack), name, txName, streamName, callBackfunction );
			while ( maxSeconds-- )
			{
				sleep(1.seconds);
				if ( result.ready() )
				{
					CloseSocket( name, txName, streamName);
					return false;
				}
			}
		}
		CloseSocket( name, txName, streamName);
		return true;
	}

	bool CloseSocket( string name, string txName, string streamName )
	{
		string uniqStreamName = name ~ txName ~ "@" ~ streamName;
		auto socket = (uniqStreamName in sockets);
		if ( !socket )
		{
			writeln( " Socket which is to be removed does not exists ");
			return false;
		}
		auto returnVal = sockets.remove(uniqStreamName);
		socket.close();
		return returnVal;
	}

private:

	bool InitSocket( string name, string txName, string streamName )
	{
		import std.uni : toLower;
		string uniqStreamName = name ~ txName ~ "@" ~ streamName;

		if ( uniqStreamName in sockets )
		{
			writeln( "Socket with unique name: ", uniqStreamName, " was already existed will return"  );
			return false;
		}
		auto ws_url = URL("wss://stream.binance.com:9443/ws/" ~ uniqStreamName);
		auto ws = connectWebSocket(ws_url);
		if ( !ws.connected )
			return false;
		sockets[uniqStreamName] = ws;
		return true;
	}

	short CallBackLoop(CallBack)( string name, string txName, string streamName, CallBack callBackfunction )
	{
		string uniqStreamName = name ~ txName ~ "@" ~ streamName;
		auto socket = (uniqStreamName in sockets);
		if ( !socket )
		{
			writeln(" Please be sure socket is initiliazed" );
			return -1;
		}
		while (socket && socket.waitForData())
		{
			try
			{
				Json result = parseJsonString(socket.receiveText);
				callBackfunction(result);
			}
			catch ( std.json.JSONException e )
			{
				writeln("Exception was caught while making the binance socket call: ", e);
				continue;
			}
		}
		CloseSocket(name, txName, streamName);
		writeln( "Socket will be closed reason was: ", socket.closeReason );
		return socket.closeCode;
	}

	WebSocket[string] sockets;
}

unittest
{
	import vibe.core.sync;
	import vibe.core.concurrency;


	writeln( "***** BinanceHelper Tests  *****" );

	auto helper = new BinanceHelper();

	void testFoo( Json json )
	{
		writeln(json);
	}

	// This is a basic test for blocking call for LaunchSocket
	assert (helper.LaunchSocket!((typeof(&testFoo)))("eth", "btc", "aggTrade", 10, &testFoo ));

	// I expect LaunchSocket to be called with async normally
       // I launch for maximum 30 seconds
	auto result = vibe.core.concurrency.async( &helper.LaunchSocket!((typeof(&testFoo))),"eth",
       "btc", "aggTrade", 30, &testFoo );
	sleep(2.seconds);
	helper.CloseSocket("eth", "btc", "aggTrade");
	//Premature close after 2 seconds result should be false
	assert( !result.ready() );
}

Erdemdem

--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]

April 05, 2018

Multi-threading demem yanlis oldu. Fakat sormak istedigim bir sey var. Thread 'leri kullanmiyor olsa bile yinede iki tane fiber ayni referans i kullandiginda yaris halinde(race condition) olmaz mi? Multi-threading'de olan butun sorunlar aynen fiberlerde de olmazmi bu mantikda?

Erdemdem

--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]

April 05, 2018

Ben vibe.d'de deneyimli değilim ama multi-threading'den kaçındıklarını ve fiber'ları yeğlediklerini biliyorum. Gördüğüm kadarıyla multi-threading'i de destekliyorlarmış ama async() büyük olasılıkla fiber olarak işletiliyor.

O yüzden, burada bildik multi-threading hataları görmek olanaksız. (Örneğin, veri yazmayla okumanın yarış halinde olması, vs.)

Tek gözüme çarpan, dönüş kodlarında bir durum var: CloseSocket'in dönüş türü gözardı ediliyor; edilmese LaunchSocket'in dönüş türünün anlamı karmaşıklaşacak, vs.

Ali

--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]

April 06, 2018

Program mantığı açısından yarış halinde olabilirler: Örneğin, birisi işini tamamlamadan diğeri araya girmemelidir. vibe.d'de nasıl oluyor bilmiyorum ama Phobos'un fiberlerinde diğer fiber'e ancak yield() (veya call()) çağrıldığında geçiliyor.

Onun dışında, multi-thread'de olan yarış hali fiberlerde olmuyor çünkü bütün fiberler aynı thread üzerinde işletiliyorlar; sırayla, hiçbir zaman ikisi aynı anda değil. (Thread'in tanımı gereği ve tek thread olduğundan, o anlamda sorun yok.)

Ali

--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]