# Distribuirani Sistemi
## RMI
### RMI Struktura
```.java
// ### IServiceManager.java ###
public interface IServiceManager extends Remote {
// Potpisi metoda
}
// ### ServiceManager.java ###
public class ServiceManager extends UnicastRemoteObject implements IServiceManager {
// Implementacije metoda
}
// ### ServiceServer.java ###
public class ServiceServer {
public void run() {
IServiceManager sm = new ServiceManager();
LocateRegistry.createRegistry(1099);
Naming.bind("rmi://localhost:1099/ServiceManager", sm);
}
public static void main(String[] args) {
ServiceServer s = new ServiceServer();
s.run();
}
}
// ### ServiceClient.java ###
public class ServiceClient {
public void run() {
IServiceManager sm = (IServiceManager) Naming.lookup("rmi://localhost:1099/ServiceManager");
}
// Ukoliko postoji callback koji treba da se implementira
class ServiceCallback extends UnicastRemoteObject implements IServiceCallback {
// Implementacija metoda
}
}
```
## MPI
### Prenosenje redova matrice ptp
```.c
int matrix[m][n];
int local_row[n];
if (rank == 0)
{
/*// Root cvor popunjava matricu
*/
if (size > 1)
{
for (int i = 1; i < m; i++)
{
MPI_Send (&matrix[i][0], n, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
}
else
{
MPI_Recv(local_row, n, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
```
### Prenosenje kolona matrice PTP
```.c
int matrix[m][n];
int local_col[m];
if (rank == 0)
{
/*// Root cvor popunjava matricu
*/
if (size > 1)
{
int col[m]; // Pomocni niz koji se salje
for (int i = 1; i < n; i++)
{
for (int j = 0; j < m; j++)
{
col[j] = matrix[j][i];
}
MPI_Send(col, m, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
}
else
{
MPI_Recv(local_col, m, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
```
### Prenosenje redova matrice koriscenjem MPI_Scatter()
```.c
int matrix[m][n];
int local_row[n];
if (rank == 0)
{
/*// Root cvor popunjava matricu
*/
}
// MPI_Scatter je grupna operacija i
// zahteva da se izvrsi u svim procesima
MPI_Scatter (matrix, n, MPI_INT, local_row, n, MPI_INT, 0, MPI_COMM_WORLD);
/*// Obrada
*/
```
### Struktura stabla
```.c
int num;
if (rank == 0)
{
num = 5;
}
int deg = 1;
int step = log2(size);
for (int i = 0; i < step; i++)
{
if (rank < 2 * deg)
{
if (rank < deg)
{
MPI_Send(&num, 1, MPI_INT, rank + deg, 0, MPI_COMM_WORLD);
printf("%d (%d) -> %d\n", rank, i + 1, rank + deg);
}
else {
MPI_Recv(&num, 1, MPI_INT, rank - deg, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
}
deg *= 2;
}
```
### Hiperkub
```.c
int num;
if (rank == 0)
{
num = 5;
}
int deg = 8;
int step = log2(size);
for (int i = 0; i < step; i++)
{
if (rank % (deg / 2) == 0)
{
if (rank % deg == 0)
{
MPI_Send(&num, 1, MPI_INT, rank + (deg / 2), 0, MPI_COMM_WORLD);
printf("%d (%d) -> %d\n", rank, i + 1, rank + (deg/2));
}
else {
MPI_Recv(&num, 1, MPI_INT, rank - (deg / 2), 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
}
deg /= 2;
}
```
### Izracunavanje minimalne vrednosti medju svim procesima
```.c
// l - lokalna vrednost
// g - globalna vrednost
struct { int val; int rank; }l, g;
printf("[%d] %d \n",rank, r);
l.rank = rank;
l.val = r;
MPI_Reduce(&l, &g, 1, MPI_2INT, MPI_MINLOC, 0, MPI_COMM_WORLD);
MPI_Bcast(&g, 1, MPI_2INT, 0, MPI_COMM_WORLD);
printf("\n\n");
if (g.rank == rank) {
printf("[%d] I have the lowest number: %d", rank, g.val);
}
```
### PTP ekvivalenti grupnih funkcija
```.c
// Scatter
if(rank == 0) {
for(i = 0; i < size; i++) {
MPI_Send(b[i*q], q, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
// Reduce (MAXLOC)
MPI_Send(&max, 1, MPI_2INT, 0, 0, MPI_COMM_WORLD);
if(rank == 0) {
for(i = 1; i < size; i++) {
MPI_Recv(&gmax, 1, MPI_2INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if(max.val > gmax.val) {
gmax.val = max.val;
}
}
}
// Bcast
if(rank == 0) {
for(i = 1; i < size; i++) {
MPI_Send(&gmax, 1, MPI_2INT, i, 0, MPI_COMM_WORLD);
}
}else {
MPI_Recv(&gmax, 1, MPI_2INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
// Reduce (SUM)
MPI_Send(c1, n, MPI_INT, gmax.rank, 0, MPI_COMM_WORLD);
if(rank == gmax.rank) {
MPI_Recv(c, n, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for(i = 1; i < size; i++) {
MPI_Recv(c1, n, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for(j = 0; j < n; j++){
c[j] += c1[j];
}
}
}
```
### Mnozenje matrica
```.c
/*
r1 - broj redova matrice 1
r2 - broj redova matrice 2
c1 - broj kolona matrice 1
c2 - broj kolona mtarice 2
*/
int mat1[r1][c1];
int mat2[r2][c2];
int res[r1][c2];
for (int i = 0; i < R1; i++) {
for (int j = 0; j < C2; j++)
{
res[i][j] = 0;
for (int k = 0; k < R2; k++)
{
res[i][j] += mat1[i][k] * mat2[k][j];
}
}
}
```
## JMS
### JMS Struktura
```.java
// ### Client.java ###
public class Client {
int id;
Queue q;
QueueConnection qc;
QueueSession qs;
QueueSender qsnd;
QueueReceiver qrcv;
Topic t;
TopicConnection tc;
TopicSession ts;
TopicPublicsher tpub;
TopicSubscriber tsub;
public Client(int id) {
this.id = id;
InitialContext ictx = new InitialContext();
q = (Queue) ictx.lookup("queue");
t = (Topic) ictx.lookup("topic");
QueueConnectionFactory qcf = (QueueConnectionFactory) ictx.lookup("qcf");
TopicConnectionFactory tcf = (TopicConnectionFactory) ictx.lookup("tcf");
ictx.close();
qc = (QueueConnection) qcf.createConnection();
tc = (TopicConnection) tcf.createConnection();
qs = qc.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
ts = (TopicSession) tc.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
qsnd = qs.createSender(q);
qrcv = qs.createReceiver(q, "id =" +id); // Primice poruku ako poruka sadzi njegov id
tpub = ts.createPublisher(t);
tsub = ts.createSubscriber(t);
qrcv.setMessageListener(new QueueMessageListener(this));
tsub.setMessageListener(new TopicMessageListener(this));
qc.start();
tc.start();
}
public void onQueueMessage(Message msg) {
// Obrada poruke
}
public void onTopicMessage(Message msg) {
// Obrada poruke
}
}
// ### QueueMessageListener.java ###
public class QueueMessageListener implements MessageListener {
Client c;
public QueueMessageListener(Client c) { this.c = c; }
public void onMessage(Message msg) {
this.c.onQueueMessage(msg);
}
}
// ### TopicMessageListener.java ###
public class TopicMessageListener implements MessageListener {
Client c;
public TopicMessageListener(Client c) { this.c = c; }
public void onMessage(Message msg) {
this.c.onTopicMessage(msg);
}
}
```
### Slanje poruke
```.java
TextMessage tm = qs.createTextMessage();
tm.setStringProperty("stringKey", value);
tm.setIntProperty("intKey", value);
qsnd.send(tm);
qs.commit(); // Potrebno je ako je session transacted
```
### Prijem poruke
```.java
public void onMessage(Message msg) {
TextMessage tm = (TextMessage) msg;
int intValue = tm.getIntProperty("intKey");
String stringValue = tm.getStringProperty("stringKey");
}
```
## WCF
### WCF Struktura
```.cs
// ### IService.cs ###
[ServiceContract(CallbackContract = typeof(IServiceCallback), SessionMode = SessionMode.Required)]
public interface IService
{
[OperationContract]
void Action();
}
// Ukoliko se prenosi objekat odredjenog tipa
[DataContract]
public class ServiceDataType
{
[DataMember]
public string Prop {get; set;}
}
public ServiceDataType() { }
// ### IServiceCallback.cs ###
internal interface IServiceCallback
{
[OperationContract(IsOneWay = true)]
void CallbackAction();
}
// ### Service.svc ###
/*
InstanceContextMode.Single - jedna instanca servisa kojoj se svi obracaju
InstanceContextMode.PerCall - kreira se nova instanca pri svakom novom pozivu
InstanceContextMode.PerSession - kreira se nova instanca pri svakoj novoj sesiji
*/
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public class Service : IService
{
// Implementacija metoda
public void Action()
{
// Kreiranje callback objekta
IServiceCallback callback = OperationContext.Current.GetCallbackChannel<IServiceCallback>();
// Pozivanje callback metode
callback.CallbackAction();
}
}
// ### Client.Program.cs ###
class Program
{
static void Main(string[] args)
{
// ServiceClient je referenca na servis
ServiceClient proxy = new ServiceClient();
proxy.Action
}
}
```
#### Web.xml
```.xml
<!--
binding="wsDualHttpBinding" - duplex, omogucava i klijentima i servisima da salju i primaju poruke u istom trenutku
binding="basicHttpBinding" - samo jedna strana moze da prima i salje poruke u datom trenutku
-->
<services>
<service name="Service">
<endpoint binding="wsDualHttpBinding" address="" contract="IService"/>
</service>
</services>
```