# Distribuirani Sistemi ## RMI ### RMI Struktura ```.java // ### IServiceManager.java ### public interface IServiceManager extends Remote { // Potpisi metoda } // ### ServiceManager.java ### public class ServiceManager extends UnicastRemoteObject implements IServiceManager { // Implementacije metoda } // ### ServiceServer.java ### public class ServiceServer { public void run() { IServiceManager sm = new ServiceManager(); LocateRegistry.createRegistry(1099); Naming.bind("rmi://localhost:1099/ServiceManager", sm); } public static void main(String[] args) { ServiceServer s = new ServiceServer(); s.run(); } } // ### ServiceClient.java ### public class ServiceClient { public void run() { IServiceManager sm = (IServiceManager) Naming.lookup("rmi://localhost:1099/ServiceManager"); } // Ukoliko postoji callback koji treba da se implementira class ServiceCallback extends UnicastRemoteObject implements IServiceCallback { // Implementacija metoda } } ``` ## MPI ### Prenosenje redova matrice ptp ```.c int matrix[m][n]; int local_row[n]; if (rank == 0) { /*// Root cvor popunjava matricu */ if (size > 1) { for (int i = 1; i < m; i++) { MPI_Send (&matrix[i][0], n, MPI_INT, i, 0, MPI_COMM_WORLD); } } } else { MPI_Recv(local_row, n, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } ``` ### Prenosenje kolona matrice PTP ```.c int matrix[m][n]; int local_col[m]; if (rank == 0) { /*// Root cvor popunjava matricu */ if (size > 1) { int col[m]; // Pomocni niz koji se salje for (int i = 1; i < n; i++) { for (int j = 0; j < m; j++) { col[j] = matrix[j][i]; } MPI_Send(col, m, MPI_INT, i, 0, MPI_COMM_WORLD); } } } else { MPI_Recv(local_col, m, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } ``` ### Prenosenje redova matrice koriscenjem MPI_Scatter() ```.c int matrix[m][n]; int local_row[n]; if (rank == 0) { /*// Root cvor popunjava matricu */ } // MPI_Scatter je grupna operacija i // zahteva da se izvrsi u svim procesima MPI_Scatter (matrix, n, MPI_INT, local_row, n, MPI_INT, 0, MPI_COMM_WORLD); /*// Obrada */ ``` ### Struktura stabla ```.c int num; if (rank == 0) { num = 5; } int deg = 1; int step = log2(size); for (int i = 0; i < step; i++) { if (rank < 2 * deg) { if (rank < deg) { MPI_Send(&num, 1, MPI_INT, rank + deg, 0, MPI_COMM_WORLD); printf("%d (%d) -> %d\n", rank, i + 1, rank + deg); } else { MPI_Recv(&num, 1, MPI_INT, rank - deg, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } deg *= 2; } ``` ### Hiperkub ```.c int num; if (rank == 0) { num = 5; } int deg = 8; int step = log2(size); for (int i = 0; i < step; i++) { if (rank % (deg / 2) == 0) { if (rank % deg == 0) { MPI_Send(&num, 1, MPI_INT, rank + (deg / 2), 0, MPI_COMM_WORLD); printf("%d (%d) -> %d\n", rank, i + 1, rank + (deg/2)); } else { MPI_Recv(&num, 1, MPI_INT, rank - (deg / 2), 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } deg /= 2; } ``` ### Izracunavanje minimalne vrednosti medju svim procesima ```.c // l - lokalna vrednost // g - globalna vrednost struct { int val; int rank; }l, g; printf("[%d] %d \n",rank, r); l.rank = rank; l.val = r; MPI_Reduce(&l, &g, 1, MPI_2INT, MPI_MINLOC, 0, MPI_COMM_WORLD); MPI_Bcast(&g, 1, MPI_2INT, 0, MPI_COMM_WORLD); printf("\n\n"); if (g.rank == rank) { printf("[%d] I have the lowest number: %d", rank, g.val); } ``` ### PTP ekvivalenti grupnih funkcija ```.c // Scatter if(rank == 0) { for(i = 0; i < size; i++) { MPI_Send(b[i*q], q, MPI_INT, i, 0, MPI_COMM_WORLD); } } // Reduce (MAXLOC) MPI_Send(&max, 1, MPI_2INT, 0, 0, MPI_COMM_WORLD); if(rank == 0) { for(i = 1; i < size; i++) { MPI_Recv(&gmax, 1, MPI_2INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); if(max.val > gmax.val) { gmax.val = max.val; } } } // Bcast if(rank == 0) { for(i = 1; i < size; i++) { MPI_Send(&gmax, 1, MPI_2INT, i, 0, MPI_COMM_WORLD); } }else { MPI_Recv(&gmax, 1, MPI_2INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } // Reduce (SUM) MPI_Send(c1, n, MPI_INT, gmax.rank, 0, MPI_COMM_WORLD); if(rank == gmax.rank) { MPI_Recv(c, n, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); for(i = 1; i < size; i++) { MPI_Recv(c1, n, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); for(j = 0; j < n; j++){ c[j] += c1[j]; } } } ``` ### Mnozenje matrica ```.c /* r1 - broj redova matrice 1 r2 - broj redova matrice 2 c1 - broj kolona matrice 1 c2 - broj kolona mtarice 2 */ int mat1[r1][c1]; int mat2[r2][c2]; int res[r1][c2]; for (int i = 0; i < R1; i++) { for (int j = 0; j < C2; j++) { res[i][j] = 0; for (int k = 0; k < R2; k++) { res[i][j] += mat1[i][k] * mat2[k][j]; } } } ``` ## JMS ### JMS Struktura ```.java // ### Client.java ### public class Client { int id; Queue q; QueueConnection qc; QueueSession qs; QueueSender qsnd; QueueReceiver qrcv; Topic t; TopicConnection tc; TopicSession ts; TopicPublicsher tpub; TopicSubscriber tsub; public Client(int id) { this.id = id; InitialContext ictx = new InitialContext(); q = (Queue) ictx.lookup("queue"); t = (Topic) ictx.lookup("topic"); QueueConnectionFactory qcf = (QueueConnectionFactory) ictx.lookup("qcf"); TopicConnectionFactory tcf = (TopicConnectionFactory) ictx.lookup("tcf"); ictx.close(); qc = (QueueConnection) qcf.createConnection(); tc = (TopicConnection) tcf.createConnection(); qs = qc.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); ts = (TopicSession) tc.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); qsnd = qs.createSender(q); qrcv = qs.createReceiver(q, "id =" +id); // Primice poruku ako poruka sadzi njegov id tpub = ts.createPublisher(t); tsub = ts.createSubscriber(t); qrcv.setMessageListener(new QueueMessageListener(this)); tsub.setMessageListener(new TopicMessageListener(this)); qc.start(); tc.start(); } public void onQueueMessage(Message msg) { // Obrada poruke } public void onTopicMessage(Message msg) { // Obrada poruke } } // ### QueueMessageListener.java ### public class QueueMessageListener implements MessageListener { Client c; public QueueMessageListener(Client c) { this.c = c; } public void onMessage(Message msg) { this.c.onQueueMessage(msg); } } // ### TopicMessageListener.java ### public class TopicMessageListener implements MessageListener { Client c; public TopicMessageListener(Client c) { this.c = c; } public void onMessage(Message msg) { this.c.onTopicMessage(msg); } } ``` ### Slanje poruke ```.java TextMessage tm = qs.createTextMessage(); tm.setStringProperty("stringKey", value); tm.setIntProperty("intKey", value); qsnd.send(tm); qs.commit(); // Potrebno je ako je session transacted ``` ### Prijem poruke ```.java public void onMessage(Message msg) { TextMessage tm = (TextMessage) msg; int intValue = tm.getIntProperty("intKey"); String stringValue = tm.getStringProperty("stringKey"); } ``` ## WCF ### WCF Struktura ```.cs // ### IService.cs ### [ServiceContract(CallbackContract = typeof(IServiceCallback), SessionMode = SessionMode.Required)] public interface IService { [OperationContract] void Action(); } // Ukoliko se prenosi objekat odredjenog tipa [DataContract] public class ServiceDataType { [DataMember] public string Prop {get; set;} } public ServiceDataType() { } // ### IServiceCallback.cs ### internal interface IServiceCallback { [OperationContract(IsOneWay = true)] void CallbackAction(); } // ### Service.svc ### /* InstanceContextMode.Single - jedna instanca servisa kojoj se svi obracaju InstanceContextMode.PerCall - kreira se nova instanca pri svakom novom pozivu InstanceContextMode.PerSession - kreira se nova instanca pri svakoj novoj sesiji */ [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] public class Service : IService { // Implementacija metoda public void Action() { // Kreiranje callback objekta IServiceCallback callback = OperationContext.Current.GetCallbackChannel<IServiceCallback>(); // Pozivanje callback metode callback.CallbackAction(); } } // ### Client.Program.cs ### class Program { static void Main(string[] args) { // ServiceClient je referenca na servis ServiceClient proxy = new ServiceClient(); proxy.Action } } ``` #### Web.xml ```.xml <!-- binding="wsDualHttpBinding" - duplex, omogucava i klijentima i servisima da salju i primaju poruke u istom trenutku binding="basicHttpBinding" - samo jedna strana moze da prima i salje poruke u datom trenutku --> <services> <service name="Service"> <endpoint binding="wsDualHttpBinding" address="" contract="IService"/> </service> </services> ```