Merhaba,
Sorum sadece web socket 'i kısmı ve özellik çok iş parçacıklı(multi threading) ile ilgili olduğundan dolayı diğer kısımları çıkardım.
Tam kod https://github.com/kerdemdemir/DCryptoWrapper/blob/master/source/Client/ClientHelper/BinanceHelper.d 'de görülebilir.
Multi threading çetrefilli bir iş olduğundan aşağıdaki kodun doğruluğunu size sormak istedim. İllede web socket'lerini dinlemek olarak değilde genel bir multi threading kodu olarak düşünebilirsiniz.
Kullanıcıya iki fonksiyon sunuyorum
1 - LaunchSocket = Socket'i hayata getiriyor ve kullanıcıdan verilen bir süre kadar(maxSeconds) socket'i dinliyor. Bir data geldiğinde
callback fonksiyonunu çağırıyor
2- CloseSocket = Dİyelimki kullanıcının socket'den bir beklentisi yok artık zamanından önce socket'i kapatabiliyor.
Unittest 'lerim doğru çalışıyor ama doğruluk acısından acaba benim kaçırdığım veya sizin eklemek istediğiniz bir şeyler olurmu ?
class BinanceHelper
{
bool LaunchSocket(CallBack)( string name, string txName, string streamName,
int maxSeconds, CallBack callBackfunction )
{
auto isCreated = InitSocket( name, txName, streamName);
if ( isCreated )
{
auto result = vibe.core.concurrency.async( &CallBackLoop!(CallBack), name, txName, streamName, callBackfunction );
while ( maxSeconds-- )
{
sleep(1.seconds);
if ( result.ready() )
{
CloseSocket( name, txName, streamName);
return false;
}
}
}
CloseSocket( name, txName, streamName);
return true;
}
bool CloseSocket( string name, string txName, string streamName )
{
string uniqStreamName = name ~ txName ~ "@" ~ streamName;
auto socket = (uniqStreamName in sockets);
if ( !socket )
{
writeln( " Socket which is to be removed does not exists ");
return false;
}
auto returnVal = sockets.remove(uniqStreamName);
socket.close();
return returnVal;
}
private:
bool InitSocket( string name, string txName, string streamName )
{
import std.uni : toLower;
string uniqStreamName = name ~ txName ~ "@" ~ streamName;
if ( uniqStreamName in sockets )
{
writeln( "Socket with unique name: ", uniqStreamName, " was already existed will return" );
return false;
}
auto ws_url = URL("wss://stream.binance.com:9443/ws/" ~ uniqStreamName);
auto ws = connectWebSocket(ws_url);
if ( !ws.connected )
return false;
sockets[uniqStreamName] = ws;
return true;
}
short CallBackLoop(CallBack)( string name, string txName, string streamName, CallBack callBackfunction )
{
string uniqStreamName = name ~ txName ~ "@" ~ streamName;
auto socket = (uniqStreamName in sockets);
if ( !socket )
{
writeln(" Please be sure socket is initiliazed" );
return -1;
}
while (socket && socket.waitForData())
{
try
{
Json result = parseJsonString(socket.receiveText);
callBackfunction(result);
}
catch ( std.json.JSONException e )
{
writeln("Exception was caught while making the binance socket call: ", e);
continue;
}
}
CloseSocket(name, txName, streamName);
writeln( "Socket will be closed reason was: ", socket.closeReason );
return socket.closeCode;
}
WebSocket[string] sockets;
}
unittest
{
import vibe.core.sync;
import vibe.core.concurrency;
writeln( "***** BinanceHelper Tests *****" );
auto helper = new BinanceHelper();
void testFoo( Json json )
{
writeln(json);
}
// This is a basic test for blocking call for LaunchSocket
assert (helper.LaunchSocket!((typeof(&testFoo)))("eth", "btc", "aggTrade", 10, &testFoo ));
// I expect LaunchSocket to be called with async normally
// I launch for maximum 30 seconds
auto result = vibe.core.concurrency.async( &helper.LaunchSocket!((typeof(&testFoo))),"eth",
"btc", "aggTrade", 30, &testFoo );
sleep(2.seconds);
helper.CloseSocket("eth", "btc", "aggTrade");
//Premature close after 2 seconds result should be false
assert( !result.ready() );
}
Erdemdem
--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]
Permalink
Reply