Yukarıda verdiğim örnek her istemci yanıtı için tek işçi başlatıyordu. O tek işçi bütün yanıtı kendisi adım adım oluşturuyor ve istemciye gönderiyordu. (Tabii örnekte "oluşturma" diye bir kavram yoktu; hep aynı sonucu alıyorduk.)
Eğer yanıtın kendisi birden fazla işçi tarafından aynı anda oluşturulabiliyorsa, o tek işçi kendisi başka alt işçiler başlatabilir. Aşağıdaki örnek bunu gösteriyor.
Yaptığım temel değişiklik, sunucuİşçi()'de. sunucuİşçi kendisi dört alt işçi başlatıyor ve istemcinin yanıtını dört işçiye aynı anda parça parça oluşturtuyor.
import std.stdio;
import std.socket;
import std.getopt;
import std.string;
import std.range;
import std.concurrency;
import core.thread;
import std.algorithm;
/* 'Unix doman socket'ler dosya sisteminde dosya olarak
* beliriyorlarmış. Anladığım kadarıyla kullanıcının bu dosyayla doğrudan bir
* işi olmuyor.
*
* Dosya isminin ilk karakterinin '\0' olması da en azından dosya isim
* çakışmalarını önlüyormuş. Hatta, böyle isimdeki dosyaların remove() ile
* silinmeleri de gerekmiyormuş.
*/
enum soketİsmi = "\0/tmp/deneme_soketi";
int main(string[] parametreler)
{
// Sunucu rolünde mi olalım istemci mi
enum Rol { sunucu, istemci }
Rol rol;
try {
// Programın ismi dahil, iki parametre olmalı
enforce(parametreler.length == 2);
// Rol olamayacak değer geldiğinde hata atar
getopt(parametreler, "rol", &rol);
} catch {
stderr.writefln("Kullanım:\n %s --rol={sunucu|istemci}",
parametreler[0]);
return 1;
}
final switch (rol) {
case Rol.sunucu: sunucuOl(soketİsmi); break;
case Rol.istemci: istemciOl(soketİsmi); break;
}
return 0;
}
void sunucuOl(string soketİsmi)
{
auto sunucu = Sunucu(soketİsmi);
sunucu.hizmetEt();
}
void istemciOl(string soketİsmi)
{
auto istemci = İstemci(soketİsmi);
istemci.iste();
}
/* Bu sunucu her isteğe karşılık farklı bir işçi başlatır, istemcinin
* bağlantısını ona devreder ve hiç vakit geçirmeden yeni istemci beklemeye
* başlar.
*
* Bu tasarımda işçiler spawn ile başlatılıyorlar ve hemen
* unutuluyorlar. Dolayısıyla, istemcinin yanıtını göndermek bütünüyle işçinin
* görevi oluyor.
*/
struct Sunucu
{
Socket dinleyici;
this(string soketİsmi)
{
// Önce bağlantıları karşılayacak olan soketi hazırlıyoruz
dinleyici = new Socket(AddressFamily.UNIX, SocketType.STREAM);
auto adres = new UnixAddress(soketİsmi);
dinleyici.bind(adres);
dinleyici.listen(1);
}
~this()
{
dinleyici.shutdown(SocketShutdown.BOTH);
dinleyici.close();
destroy(dinleyici);
// Dosyanın silinmesinin gerekip gerekmediğinden aslında emin değilim:
remove(soketİsmi.toStringz);
}
/* Sonsuz döngüde istemci bekler ve hiç zaman geçirmeden onun isteğini
* yerine getirecek olan bir işçi başlatır. */
void hizmetEt()
{
while (true) {
// İstemci bekliyoruz
writeln("Bekliyorum...");
// Şimdi o sokette bağlantı kabul ediyoruz. Bu Socket nesnesinin
// sahipliğini sunucuİşlev devralacak.
Socket istemci = dinleyici.accept();
writefln("İstemci bağlandı: %s", istemci.remoteAddress());
/* Eş zamanlı işleyecek olan bir işçi başlatıyoruz ve hemen tekrar
* beklemeye dönüyoruz. Burada işçi sayısında hiçbir kısıtlama
* getirilmiyor. İstendiğinde başka tasarımlar da
* düşünülebilir. */
auto işçi = spawn(&sunucuİşçi, cast(shared)istemci);
}
}
}
/*
* İstemcilere eş zamanlı olarak hizmet eden işlev. Parametre olarak bir
* istemci alır ve ona hizmet eder. Bu tasarımda kendisini başlatan ana iş
* parçacığı ile hiçbir iletişimi olmuyor. Yanıtını gönderdikten sonra hemen
* sonlanıyor.
*/
void sunucuİşçi(shared(Socket) istemci_)
{
/* Bu tasarımda istemcinin sorumluluğu bu işleve ait; çıkarken
* sonlandıralım. */
scope (exit) {
destroy(istemci_);
}
/* std.concurrency modülü henüz shared ile tam uyumlu olarak
* çalışmıyor. Şimdilik en iyisi parametreyi yukarıda yaptığımız gibi
* shared olarak almak ve işlev içinde kullanmadan önce tür dönüşümü ile
* shared'i kaldırmak: */
auto istemci = cast()istemci_;
string istek = istemci.tekParçaOlarakOku();
/*
* ...
*
* Burada isteği ayrıştırdığımızı ve tam olarak ne istendiğini
* anladığımızı varsayalım.
*
* Ek olarak, bu isteğe karşılık gönderilecek olan yanıtın dört parçadan
* oluştuğunu ve bu dört parçanın birbirlerinden bağımsız olarak
* oluşturulabileceğini varsayalım. Örneğin, bu dört parçadan birisi veri
* tabanına erişerek oluşturulabilir, bir başkası başka bir sunucudan
* edinilecek olan bilgidir, vs.
*
* Bu parçalar birbirlerinden bağımsız olduklarından onları eş zamanlı
* olarak dört alt işçiye oluşturabiliriz. Görevleri dağıttıktan sonra
* bize kalan, sonuçları beklemek ve yanıtı oluşturmaktır.
*
* ...
*/
string[4] parçalar;
/* Bu örnek görevleri başlatırken her işçiye isteğin tamamını
* gönderiyor. Onun yerine istekten ayrıştırılmış olan bilgiler de
* gönderilebilir. */
spawn(&başlıkHazırlayıcı, istek, 0U);
spawn(&veriTabanındanBirŞeylerToplayıcı, istek, 1U);
spawn(&birSunucuİleKonuşucu, istek, 2U);
/* Farklı parametreli bir örnek olsun diye buna fazladan bir bilgi daha
* gönderelim. */
spawn(&birHesapYapıcı, istek, 3U, "fazladan biraz daha bilgi", 1.5, "vs.");
/* Bu yerel işlev bütün parçaların hazır olup olmadığını bildirir. */
bool bütünParçalarHazır_mı()
{
return !parçalar[].canFind!(parça => parça.empty);
}
/* Şimdi sonuçların gelmelerini bekliyoruz. */
while (!bütünParçalarHazır_mı()) {
/* İşçilerden sonuç bekliyoruz. Her işçi, parça numarasından ve
* sonuçtan oluşan bir çokuzlu gönderiyor. Her sonucu 'parçalar'
* dizisinde ait olduğu yere yerleştireceğiz. */
auto işSonucu = receiveOnly!(size_t, string)();
auto parçaNumarası = işSonucu[0];
auto parça = işSonucu[1];
/* Daha önceden aldığımız bir sonucu almadığımızı denetleyelim. */
assert(parçalar[parçaNumarası].empty);
/* Parçayı yerine yerleştirelim. */
parçalar[parçaNumarası] = parça;
}
/* while sonlandığına göre bütün parçalar hazır. Sonucu oluşturalım. */
string yanıt;
foreach (parça; parçalar) {
yanıt ~= parça;
}
istemci.send(yanıt);
}
void başlıkHazırlayıcı(string istek, size_t parçaNumarası)
{
ownerTid.send(
parçaNumarası,
"HTTP/1.0 200 OK\nContent-Type: text/html; charset=utf-8\n\n");
}
void veriTabanındanBirŞeylerToplayıcı(string istek, size_t parçaNumarası)
{
string veriler = "<p>veri tabanından bir şeyler toplamış olalım</p>";
ownerTid.send(parçaNumarası, veriler);
}
void birSunucuİleKonuşucu(string istek, size_t parçaNumarası)
{
string sunucudanGelenVeri = "<p>bunu da bir sunucudan edinmiş olalım</p>";
ownerTid.send(parçaNumarası, sunucudanGelenVeri);
}
void birHesapYapıcı(string istek,
size_t parçaNumarası,
string fazladanKullanacağımızBirParametre,
double başkaBirParametre,
string dahaBaşkaBirParametre)
{
auto sonuç = fazladanKullanacağımızBirParametre.length
+ başkaBirParametre
+ dahaBaşkaBirParametre.length;
string hesapSonucu =
format("<p>şöyle bir değer hesaplamış olalım: %s</p>", sonuç);
ownerTid.send(parçaNumarası, hesapSonucu);
}
/* Socket.receive'in aksine, okunan parçaları birleştirir ve tek string olarak
* döndürür. */
string tekParçaOlarakOku(Socket soket)
{
string okunan;
// Okunan veriyi parça parça bu belleğe alacağız
ubyte[1000] parça;
bool bütünVeriOkundu_mu = false;
while (!bütünVeriOkundu_mu) {
const adet = soket.receive(parça);
if (adet == Socket.ERROR) {
stderr.writeln("OKUMA HATASI");
break;
}
okunan ~= parça[0..adet];
bütünVeriOkundu_mu = (adet < parça.length);
}
return okunan;
}
struct İstemci
{
Socket soket;
this(string soketİsmi)
{
soket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
soket.connect(new UnixAddress(soketİsmi));
}
~this()
{
soket.shutdown(SocketShutdown.BOTH);
soket.close();
destroy(soket);
}
void iste()
{
string mesaj = format("%s%s%s", "merhaba", 42, "dünya");
const adet = soket.send(mesaj);
writefln("%s bayt gönderdim", adet);
string yanıt = soket.tekParçaOlarakOku();
writefln("Yanıt aldım: %s", yanıt);
}
}
Ali
--
[ Bu gönderi, http://ddili.org/forum'dan dönüştürülmüştür. ]