1004-05,使用workflow对象创建http任务,redis任务

目录

01_wait_group.cc

02_http_wait.cc

03_httptask_callback.cc

04_http_task_request.cc

05_redis_task_callback.cc

06_redistask_read.cc

07_series.cc

08_series_dynamic.cc

09_context.cc

10_parallel_work.cc

作业:

01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"

02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)

03 阅读下面的代码并尝试添加注释

01_wait_group.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>

#include <signal.h>
#include <iostream>

using std::cout;
using std::cerr;

static WFFacilities::WaitGroup WaitGroup(1);

void handler(int signum){
    cout<<"done\n";
    WaitGroup.done();
}

int main(void)
{
    signal(SIGINT,handler);

    //创建任务
    WFHttpTask * httpTask=WFTaskFactory::create_http_task(
            /* "http://www.baidu.com", */
"http://localhost/en/index.html",
            10,
            10,
            nullptr
    );
    //交给框架执行
    httpTask->start();

    WaitGroup.wait();
    cout<<"finish\n";

    return 0;
}


	/* static WFHttpTask *create_http_task(const std::string& url, */
	/* 									int redirect_max,//最大重定向次数 */
	/* 									int retry_max, //最大重试次数*/
	/* 									http_callback_t callback//回调函数); */

02_http_wait.cc

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
    cout << "done\n";
    waitGroup.done();
}
int main(){
    signal(SIGINT,handler);
    // 用户代码 1 创建任务
    WFHttpTask * httpTask =  WFTaskFactory::create_http_task(
        //"http://www.baidu.com",
        "http://localhost/en/index.html",
        10,
        10,
        nullptr
    );
    // 用户代码 2 把任务交给框架
    httpTask->start();

    waitGroup.wait();
    cout << "finish!\n";
}

03_httptask_callback.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpUtil.h>

#include <signal.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){
    cout<<"callback is called\n";

    protocol::HttpRequest* req=httpTask->get_req();
    protocol::HttpResponse* resq=httpTask->get_resp();
    int state=httpTask->get_state();
    int error=httpTask->get_error();//错误原因

    //状态处理
    switch(state) {
        case WFT_STATE_SYS_ERROR: // 系统错误
            cerr <<"system error: " << strerror(error) << "\n"; 
            break;
        case WFT_STATE_DNS_ERROR: // DNS错误
            cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
            break;
        case WFT_STATE_SSL_ERROR: // SSL错误
            cerr <<"SSL error: " << error << "\n"; 
            break;
        case WFT_STATE_TASK_ERROR: // 任务错误
            cerr <<"Task error: "<< error << "\n"; 
            break;
        case WFT_STATE_SUCCESS: // 请求成功
            break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }


    //------------------------------------//
    //请求报文-起始行-首部字段
    cerr<<"method="<<req->get_method()<<"\n";
    cerr<<"version="<<req->get_http_version()<<"\n";
    cerr<<"path& query="<<req->get_request_uri()<<"\n";

    std::string name;
    std::string value;
    protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化
    while(req_cursor.next(name,value)){
        cerr<<"name ="<<name<<"  value ="<<value<<"\n";
    }
    cerr<<"\n";

    //------------------------------------//
    //响应报文-起始行-首部字段-报文体
    cerr<<"version="<<resq->get_http_version()<<"\n";
    cerr<<"state code="<<resq->get_status_code()<<"\n";
    cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";

    protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化
    while(resp_cursor.next(name,value)){
        cerr<<"name ="<<name<<"  value ="<<value<<"\n";
    }

    const void * body;
    size_t body_len;
    resq->get_parsed_body(&body,&body_len);
    cerr<<static_cast<const char*>(body)<<"\n";
    

}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);

    WFHttpTask* httpTask=WFTaskFactory::create_http_task(
        "http://www.taobao.com",10,10,httpCallback);

    httpTask->start();
    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

04_http_task_request.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>

#include <signal.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){
    cout<<"callback is called\n";

    protocol::HttpRequest* req=httpTask->get_req();
    protocol::HttpResponse* resq=httpTask->get_resp();
    int state=httpTask->get_state();
    int error=httpTask->get_error();//错误原因

    //状态处理
    switch(state) {
        case WFT_STATE_SYS_ERROR: // 系统错误
            cerr <<"system error: " << strerror(error) << "\n"; 
            break;
        case WFT_STATE_DNS_ERROR: // DNS错误
            cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
            break;
        case WFT_STATE_SSL_ERROR: // SSL错误
            cerr <<"SSL error: " << error << "\n"; 
            break;
        case WFT_STATE_TASK_ERROR: // 任务错误
            cerr <<"Task error: "<< error << "\n"; 
            break;
        case WFT_STATE_SUCCESS: // 请求成功
            break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }


    //------------------------------------//
    //请求报文-起始行-首部字段
    cerr<<"method="<<req->get_method()<<"\n";
    cerr<<"version="<<req->get_http_version()<<"\n";
    cerr<<"path& query="<<req->get_request_uri()<<"\n";

    std::string name;
    std::string value;
    protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化
    while(req_cursor.next(name,value)){
        cerr<<"name ="<<name<<"  value ="<<value<<"\n";
    }
    cerr<<"\n";

    //------------------------------------//
    //响应报文-起始行-首部字段-报文体
    cerr<<"version="<<resq->get_http_version()<<"\n";
    cerr<<"state code="<<resq->get_status_code()<<"\n";
    cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";

    protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化
    while(resp_cursor.next(name,value)){
        cerr<<"name ="<<name<<"  value ="<<value<<"\n";
    }

    /* const void * body; */
    /* size_t body_len; */
    /* resq->get_parsed_body(&body,&body_len); */
    /* cerr<<static_cast<const char*>(body)<<"\n"; */

/* GET: 通常用于请求数据,不会对服务器的状态产生副作用。 */
/* POST: 用于提交数据,通常会导致服务器状态改变或者创建新的资源。 */
/* 百度的接口: */

/* 百度的搜索接口一般是通过 GET 请求来获取搜索结果。因此,即使您将方法设置为 POST,如果服务器只支持 GET,请求仍然会被处理为 GET。 */

}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);

    WFHttpTask* httpTask=WFTaskFactory::create_http_task(
        "http://www.taobao.com",10,10,httpCallback);

    
    //找到请求并设置残数
    protocol::HttpRequest* req=httpTask->get_req();
    req->set_method("POST");
    req->set_request_uri("/s?wd=123");//百度查询接口
    req->add_header_pair("myname","workflow");
    cerr<<"method="<<req->get_method()<<"\n";
    cerr<<"version="<<req->get_http_version()<<"\n";
    cerr<<"path& query="<<req->get_request_uri()<<"\n";
    cerr<<"\n";
    cerr<<"\n";

    httpTask->start();
    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

05_redis_task_callback.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>

#include <signal.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
    cout<<"callback is called\n";


    protocol::RedisRequest *req=redistask->get_req();
    protocol::RedisResponse* resp=redistask->get_resp();
    int state=redistask->get_state();
    int error=redistask->get_error();

    //状态处理
    switch(state) {
        case WFT_STATE_SYS_ERROR: // 系统错误
            cerr <<"system error: " << strerror(error) << "\n"; 
            break;
        case WFT_STATE_DNS_ERROR: // DNS错误
            cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
            break;
        case WFT_STATE_SSL_ERROR: // SSL错误
            cerr <<"SSL error: " << error << "\n"; 
            break;
        case WFT_STATE_TASK_ERROR: // 任务错误
            cerr <<"Task error: "<< error << "\n"; 
            break;
        case WFT_STATE_SUCCESS: // 请求成功
            break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }


    cout<<"callback is end\n";
}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);

    WFRedisTask* redistask=WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",10,redisCallback);
    cerr<<"\n";

    //找到修改请求
    protocol::RedisRequest *req=redistask->get_req();
    req->set_request("SET",{"huasheng","lovexixi"});

    redistask->start();
    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

06_redistask_read.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>

#include <signal.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
    cout<<"callback is called\n";


    protocol::RedisRequest *req=redistask->get_req();
    protocol::RedisResponse* resp=redistask->get_resp();
    int state=redistask->get_state();
    int error=redistask->get_error();
    protocol::RedisValue val;

    //状态处理
    switch(state) {
        case WFT_STATE_SYS_ERROR: // 系统错误
            cerr <<"system error: " << strerror(error) << "\n"; 
            break;
        case WFT_STATE_DNS_ERROR: // DNS错误
            cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
            break;
        case WFT_STATE_SSL_ERROR: // SSL错误
            cerr <<"SSL error: " << error << "\n"; 
            break;
        case WFT_STATE_TASK_ERROR: // 任务错误
            cerr <<"Task error: "<< error << "\n"; 
            break;
        case WFT_STATE_SUCCESS: // 请求成功
            resp->get_result(val);
            if(val.is_error()){
                cerr<<"error reply,need a password? \n";
                state=WFT_STATE_TASK_ERROR;
            }
            break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }

    //------------------------------------//
    //查看redis执行的结果
    /* protocol::RedisValue val; */
    /* resp->get_result(val); */
    if(val.is_string()){
        cerr<<"value is string:  "<<val.string_value()<<"\n";
    }
    if(val.is_array()){
        cerr<<"value is array: \n ";
        for(size_t i=0;i<val.arr_size();++i){
            cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";
        }
    }



    cout<<"callback is end\n";
}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);

    WFRedisTask* redistask=WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",10,redisCallback);
    cerr<<"\n";

    //找到修改请求
    protocol::RedisRequest *req=redistask->get_req();
    /* req->set_request("SET",{"huasheng","lovexixi"}); */
    req->set_request("HGETALL",{"aa"});

    redistask->start();
    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

07_series.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>

#include <signal.h>
#include <unistd.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
    cout<<"callback is called\n";


    protocol::RedisRequest *req=redistask->get_req();
    protocol::RedisResponse* resp=redistask->get_resp();
    int state=redistask->get_state();
    int error=redistask->get_error();
    protocol::RedisValue val;

    //状态处理
    switch(state) {
        case WFT_STATE_SYS_ERROR: // 系统错误
            cerr <<"system error: " << strerror(error) << "\n"; 
            break;
        case WFT_STATE_DNS_ERROR: // DNS错误
            cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
            break;
        case WFT_STATE_SSL_ERROR: // SSL错误
            cerr <<"SSL error: " << error << "\n"; 
            break;
        case WFT_STATE_TASK_ERROR: // 任务错误
            cerr <<"Task error: "<< error << "\n"; 
            break;
        case WFT_STATE_SUCCESS: // 请求成功
            resp->get_result(val);
            if(val.is_error()){
                cerr<<"error reply,need a password? \n";
                state=WFT_STATE_TASK_ERROR;
            }
            break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }

    //------------------------------------//
    //查看redis执行的结果
    /* protocol::RedisValue val; */
    /* resp->get_result(val); */
    if(val.is_string()){
        cerr<<"value is string:  "<<val.string_value()<<"\n";
    }
    if(val.is_array()){
        cerr<<"value is array: \n ";
        for(size_t i=0;i<val.arr_size();++i){
            cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";
        }
    }


    /* sleep(2); */
    cout<<"callback is end\n";
}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);

    WFRedisTask* redistask=WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",10,redisCallback);
    protocol::RedisRequest *req=redistask->get_req();
    req->set_request("SET",{"07dada","07lovexixi"});


    WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",10,redisCallback);
    protocol::RedisRequest *req1=redistask1->get_req();
    req1->set_request("GET",{"07dada"});

    /* redistask->start(); */
    /* redistask1->start(); */
    //没有固定先后顺序

    SeriesWork* series=Workflow::create_series_work(redistask,nullptr);
    series->push_back(redistask1);
    series->start();

    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

08_series_dynamic.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>

#include <signal.h>
#include <unistd.h>
#include <iostream>

using std::cout;
using std::cerr;

//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask* redistask){
    cout<<"callback111\n";
}

void redisCallback(WFRedisTask* redistask){
    cout<<"callback is called\n";


    protocol::RedisRequest *req=redistask->get_req();
    protocol::RedisResponse* resp=redistask->get_resp();
    int state=redistask->get_state();
    int error=redistask->get_error();
    protocol::RedisValue val;

    //状态处理
    switch(state) {
    case WFT_STATE_SYS_ERROR: // 系统错误
        cerr <<"system error: " << strerror(error) << "\n"; 
        break;
    case WFT_STATE_DNS_ERROR: // DNS错误
        cerr <<"DNS error: " << gai_strerror(error) << "\n"; 
        break;
    case WFT_STATE_SSL_ERROR: // SSL错误
        cerr <<"SSL error: " << error << "\n"; 
        break;
    case WFT_STATE_TASK_ERROR: // 任务错误
        cerr <<"Task error: "<< error << "\n"; 
        break;
    case WFT_STATE_SUCCESS: // 请求成功
        resp->get_result(val);
        if(val.is_error()){
            cerr<<"error reply,need a password? \n";
            state=WFT_STATE_TASK_ERROR;
        }
        break; 
    }

    //错误处理
    if (state != WFT_STATE_SUCCESS) {
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }

    //------------------------------------//
    if(val.is_string()){
        cerr<<"value is string:  "<<val.string_value()<<"\n";
        //在正在执行的任务队列中添加任务
        WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
              /* "redis://127.0.0.1:6379",10,redisCallback); */
              "redis://127.0.0.1:6379",10,redisCallback1);
        //递归调用
        redistask1->get_req()->set_request("SET",{"07dada","07lovexixi"});
        series_of(redistask)->push_back(redistask1);
    }
    if(val.is_array()){
        cerr<<"value is array: \n ";
        for(size_t i=0;i<val.arr_size();++i){
            cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";
        }
    }


    /* sleep(2); */
    cout<<"callback is end\n";
}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);



    WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
                                                             "redis://127.0.0.1:6379",10,redisCallback);
    protocol::RedisRequest *req1=redistask1->get_req();
    req1->set_request("GET",{"07dada"});

    /* redistask->start(); */
    redistask1->start();
    //没有固定先后顺序

    /* SeriesWork* series=Workflow::create_series_work(redistask,nullptr); */
    /* series->push_back(redistask1); */
    /* series->start(); */

    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

09_context.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>

#include <signal.h>
#include <unistd.h>
#include <iostream>

using std::cout;
using std::cerr;
using std::string;
using std::vector;

//------------------------------------//
struct SeriesContext{
    int id;
    std::string name;
};
static WFFacilities::WaitGroup waitGroup(1);

void sighandler(int signum){
    waitGroup.done();//waitGroup(--num);
    cout<<"done\n";
}

//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask *redisTask){
	cerr << "xixi 1 begin\n";
	SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
	cerr << "before id = " << context->id << " name = " << context->name << "\n";
	context->id = 1001;
	context->name = "task1";
	cerr << "after id = " << context->id << " name = " << context->name << "\n";
	cerr << "xixi 1 end!\n";
}
void redisCallback2(WFRedisTask *redisTask){
	cerr << "xixi 2 begin\n";
	SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
	cerr << "before id = " << context->id << " name = " << context->name << "\n";
	context->id = 1002;
	context->name = "task2";
	cerr << "after id = " << context->id << " name = " << context->name << "\n";
	cerr << "xixi 2 end!\n";
}
void redisCallback3(WFRedisTask *redisTask){
	cerr << "xixi 3 begin\n";
	SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
	cerr << "before id = " << context->id << " name = " << context->name << "\n";
	cerr << "xixi 3 end!\n";
}


void Callback(const SeriesWork* series){
    SeriesContext* context=static_cast<SeriesContext*>(series->get_context());
    cerr<<"callback id ="<<context->id<<"  name="<<context->name<<"\n";
    delete context;
}



//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{
    signal(SIGINT,sighandler);


    WFRedisTask * redisTask1 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1);
    redisTask1->get_req()->set_request("SET",{"key","123"});

    WFRedisTask * redisTask2 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
    redisTask2->get_req()->set_request("SET",{"key","123"});

    WFRedisTask * redisTask3 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback3);
    redisTask3->get_req()->set_request("SET",{"key","123"});


    SeriesWork* series=Workflow::create_series_work(redisTask1,nullptr);
    series->push_back(redisTask2);
    /* series->push_back(redisTask3); */

    SeriesContext* context=new SeriesContext({1000,"mian"});
    series->set_context(context);
    
    series->set_callback([context](const SeriesWork*series){
        cerr<<"callback id ="<<context->id<<"  name="<<context->name<<"\n";
        delete context;
                         });

    series->start();

    waitGroup.wait();

    cerr<<"\nfinish\n";
    return 0;
}

10_parallel_work.cc

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{
    string url;
    size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
    cout << "done\n";
    waitGroup.done();
}

void httpCallback(WFHttpTask *httpTask){
	protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应
	int state = httpTask->get_state(); // 获取状态
	int error = httpTask->get_error(); // 获取错误原因
	switch (state){
	case WFT_STATE_SYS_ERROR:
		cerr <<"system error: " << strerror(error) << "\n";
		break;
	case WFT_STATE_DNS_ERROR:
		cerr <<"DNS error: "  << gai_strerror(error) << "\n";
		break;
	case WFT_STATE_SSL_ERROR:
		cerr <<"SSL error\n";
		break;
	case WFT_STATE_TASK_ERROR:
		cerr <<"Task error\n";
		break;
	case WFT_STATE_SUCCESS:
		break;
	}
	if (state != WFT_STATE_SUCCESS){
		cerr <<"Failed. Press Ctrl-C to exit.\n";
		return;
	}

	const void *body;
	size_t body_len;
	resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体
    SeriesContext * context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
    context->body_len = body_len;
    cerr << "url = " << context->url << ", len = " << context->body_len << "\n";
}
void parallelCallback(const ParallelWork * parallelWork){
    cerr << "parallel callback\n";
    string name;
    size_t body_len = 0;
    for(int i = 0; i < 3; ++i){
        // 找到内部的(已经执行完成不可修改的)序列
        const SeriesWork * series = parallelWork->series_at(i);
        SeriesContext * context = static_cast<SeriesContext *>(series->get_context());
        cerr << "i = " << i << "url = " << context->url << ", len = " << context->body_len << "\n";
        if(body_len < context->body_len){
            body_len = context->body_len;
            name = context->url;
        }
        delete context;
    }
    cerr << "longest body_len  url = " << name << " body_len = " << body_len <<"\n";

    WFRedisTask * redisTask = WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",10,nullptr
    );
    redisTask->get_req()->set_request("SET",{name,std::to_string(body_len)});
    series_of(parallelWork)->push_back(redisTask);
}
int main(){
    signal(SIGINT,handler);
    // 创建一个空的并行任务
    ParallelWork * parallelWork = Workflow::create_parallel_work(parallelCallback);
    // 创建多个小序列
    vector<string> urls = {
        "http://www.taobao.com",
        "http://www.jd.com",
        "http://www.baidu.com"
    };
    for(int i = 0; i < 3; ++i){
        // 创建一个http任务
        WFHttpTask * httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);
        // 根据http任务创建小序列
        SeriesWork * series =  Workflow::create_series_work(httpTask,nullptr);
        // 往序列中加一个context
        SeriesContext *context = new SeriesContext;
        context->url = urls[i];
        series->set_context(context);
        // 把小序列加入并行任务
        parallelWork->add_series(series);
    }

    parallelWork->start();
    waitGroup.wait();
    cout << "finish!\n";
}

作业:

01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>

#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
    cout << "done\n";
    waitGroup.done();
}
void redisCallback(WFRedisTask *redisTask)
{
	protocol::RedisRequest *req = redisTask->get_req();
	protocol::RedisResponse *resp = redisTask->get_resp();

	int state = redisTask->get_state();
	int error = redisTask->get_error();
    // val用来保存redis执行的结果
	protocol::RedisValue val;
    switch (state){
	case WFT_STATE_SYS_ERROR:
		cerr <<"system error: " << strerror(error) << "\n";
		break;
	case WFT_STATE_DNS_ERROR:
		cerr <<"DNS error: "  << gai_strerror(error) << "\n";
		break;
	case WFT_STATE_SSL_ERROR:
		cerr <<"SSL error\n";
		break;
	case WFT_STATE_TASK_ERROR:
		cerr <<"Task error\n";
		break;
	case WFT_STATE_SUCCESS:
    	resp->get_result(val);// 将redis的执行结果保存起来
		if (val.is_error()){
		   cerr <<  "Error reply. Need a password?\n";
		   state = WFT_STATE_TASK_ERROR;
		}
		break;
	}
	if (state != WFT_STATE_SUCCESS){
		cerr <<  "Failed. Press Ctrl-C to exit.\n";
		return;
	}

    if(val.is_string()&& val.string_value()!="100"){
        cerr<<"100 is not found"<<val.string_value()<<"\n";
        WFRedisTask* newtask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
        newtask->get_req()->set_request("GET",{val.string_value()});
        series_of(redisTask)->push_back(newtask);
    }else{
        cerr<<"100 is found\n";
    }

}

int main(){
    signal(SIGINT,handler);
    // 创建任务
    WFRedisTask * redisTask =  WFTaskFactory::create_redis_task(
        "redis://127.0.0.1:6379",
        10,
        redisCallback
    );
    // 找到请求
    protocol::RedisRequest * req =  redisTask->get_req();
    req->set_request("GET", {"x1"});
    // 将任务交给框架
    redisTask->start();
    waitGroup.wait();
    cout << "finish!\n";
}

02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{
    string url;
    size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
    cout << "done\n";
    waitGroup.done();
}

void httpCallback(WFHttpTask *httpTask){
    protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应
    int state = httpTask->get_state(); // 获取状态
    int error = httpTask->get_error(); // 获取错误原因
    switch (state){
    case WFT_STATE_SYS_ERROR:
        cerr <<"system error: " << strerror(error) << "\n";
        break;
    case WFT_STATE_DNS_ERROR:
        cerr <<"DNS error: "  << gai_strerror(error) << "\n";
        break;
    case WFT_STATE_SSL_ERROR:
        cerr <<"SSL error\n";
        break;
    case WFT_STATE_TASK_ERROR:
        cerr <<"Task error\n";
        break;
    case WFT_STATE_SUCCESS:
        break;
    }
    if (state != WFT_STATE_SUCCESS){
        cerr <<"Failed. Press Ctrl-C to exit.\n";
        return;
    }

    const void *body;
    size_t body_len;
    resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体
    
    //创建一个redis任务
    WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);
    redistask->get_req()->set_request("SET",{"http://www.baidu.com",static_cast<const char*>(body)});
    series_of(httpTask)->push_back(redistask);
}

int main(){
    signal(SIGINT,handler);
    WFHttpTask * httpTask = WFTaskFactory::create_http_task("http://www.baidu.com",10,10,httpCallback);
    /* SeriesWork * series =  Workflow::create_series_work(httpTask,nullptr); */


    httpTask->start();
    waitGroup.wait();
    cout << "finish!\n";
}

03 阅读下面的代码并尝试添加注释

#include <workflow/WFFacilities.h>
#include <workflow/MySQLUtil.h>
#include <workflow/MySQLResult.h>
#include <iostream>
#include <signal.h>
using std::string;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
    std::cout << "signum = " << signum << "\n";
    waitGroup.done();
}
void mysqlCallback(WFMySQLTask * mysqlTask){
    if(mysqlTask->get_state() != WFT_STATE_SUCCESS){
        // 在系统层面报错,权限or密码
        cerr << "error_msg =  " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";
        return;
    }
    protocol::MySQLResponse * resp = mysqlTask->get_resp();
    if(resp->get_packet_type() == MYSQL_PACKET_ERROR){
        // 在SQL语句报错
        cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";
        return;
    }
    protocol::MySQLResultCursor cursor(resp);
    do{
        if(cursor.get_cursor_status() == MYSQL_STATUS_OK){
            // 写类型的SQL语句
            cerr << "write \n";
            cerr << cursor.get_affected_rows() << " rows affected\n";
        }
        else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){
            // 读类型的SQL语句
            cerr << "read \n";
            // 读表头 列的信息 field
            const protocol::MySQLField * const * fieldArr;
            fieldArr = cursor.fetch_fields();
            for(int i = 0; i < cursor.get_field_count(); ++i){
                cerr << "db = " << fieldArr[i]->get_db()
                     << " table = " << fieldArr[i]->get_table()
                     << " name = " << fieldArr[i]->get_name()
                     << " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";
            }
            // 读表的内容 每一行每一列
            // bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);
            std::vector<std::vector<protocol::MySQLCell>> rows;
            cursor.fetch_all(rows);
            for(auto &row:rows){
                for(auto &cell:row){
                    if(cell.is_int()){
                        cerr << cell.as_int();
                    }
                    else if(cell.is_string()){
                        cerr << cell.as_string();
                    }
                    else if(cell.is_datetime()){
                        cerr << cell.as_datetime();
                    }
                    cerr << "\t";
                }
                cerr << "\n";
            } 
        }
    }while(cursor.next_result_set()); //mysql 任务支持一个任务处理多个SQL语句
}
int main(){
    signal(SIGINT,sighandler);
    WFMySQLTask * mysqlTask =  WFTaskFactory::create_mysql_task("mysql://root:123@localhost",1,mysqlCallback);
    string sql = "insert into mycloud.tbl_user_token (user_name,user_token) values ('Caixukun','singdancerap');";
    //string sql;
    sql += "select * from mycloud.tbl_user_token;";
    mysqlTask->get_req()->set_query(sql);
    mysqlTask->start();
    waitGroup.wait();
    return 0;
}
#include <workflow/WFFacilities.h> // 包含 WFFacilities 库
#include <workflow/MySQLUtil.h>    // 包含 MySQL 相关工具库
#include <workflow/MySQLResult.h>  // 包含 MySQL 结果处理库
#include <iostream>                // 包含输入输出流库
#include <signal.h>                // 包含信号处理库

using std::string; // 使用 string 类型
using std::cerr;   // 使用 cerr 输出错误信息

// 创建一个 WaitGroup 对象,用于同步
static WFFacilities::WaitGroup waitGroup(1);

// 定义信号处理函数,用于处理 SIGINT 信号
void sighandler(int signum){
    std::cout << "signum = " << signum << "\n"; // 输出接收到的信号编号
    waitGroup.done(); // 完成 WaitGroup 的工作,结束程序
}

// MySQL 任务的回调函数
void mysqlCallback(WFMySQLTask * mysqlTask){
    // 检查任务状态是否成功
    if(mysqlTask->get_state() != WFT_STATE_SUCCESS){
        // 输出系统级错误信息,如权限或密码错误
        cerr << "error_msg =  " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";
        return;
    }

    // 获取 MySQL 响应对象
    protocol::MySQLResponse * resp = mysqlTask->get_resp();
    
    // 检查返回的数据包类型是否为错误类型
    if(resp->get_packet_type() == MYSQL_PACKET_ERROR){
        // 输出 SQL 语句执行中的错误码和错误信息
        cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";
        return;
    }

    // 创建 MySQL 结果游标对象
    protocol::MySQLResultCursor cursor(resp);
    
    // 循环处理结果集
    do{
        // 检查游标状态
        if(cursor.get_cursor_status() == MYSQL_STATUS_OK){
            // 如果是写操作,输出受影响的行数
            cerr << "write \n";
            cerr << cursor.get_affected_rows() << " rows affected\n";
        }
        else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){
            // 如果是读操作
            cerr << "read \n";
            
            // 读取字段信息
            const protocol::MySQLField * const * fieldArr;
            fieldArr = cursor.fetch_fields();
            for(int i = 0; i < cursor.get_field_count(); ++i){
                // 输出数据库、表名、字段名及其数据类型
                cerr << "db = " << fieldArr[i]->get_db()
                     << " table = " << fieldArr[i]->get_table()
                     << " name = " << fieldArr[i]->get_name()
                     << " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";
            }

            // 读取表的所有内容
            std::vector<std::vector<protocol::MySQLCell>> rows;
            cursor.fetch_all(rows); // 获取所有行
            
            // 遍历每一行和每一个单元格
            for(auto &row : rows){
                for(auto &cell : row){
                    // 根据单元格的数据类型输出相应的内容
                    if(cell.is_int()){
                        cerr << cell.as_int();
                    }
                    else if(cell.is_string()){
                        cerr << cell.as_string();
                    }
                    else if(cell.is_datetime()){
                        cerr << cell.as_datetime();
                    }
                    cerr << "\t"; // 输出制表符
                }
                cerr << "\n"; // 换行
            } 
        }
    } while(cursor.next_result_set()); // 支持一个任务处理多个 SQL 语句
}

int main(){
    signal(SIGINT, sighandler); // 注册 SIGINT 信号处理函数

    // 创建 MySQL 任务并指定连接字符串和回调函数
    WFMySQLTask * mysqlTask =  WFTaskFactory::create_mysql_task("mysql://root:123@localhost", 1, mysqlCallback);
    
    // 构造 SQL 查询语句
    string sql = "insert into mycloud.tbl_user_token (user_name, user_token) values ('Caixukun', 'singdancerap');";
    sql += "select * from mycloud.tbl_user_token;"; // 在插入后选择用户令牌

    // 设置 MySQL 任务的查询
    mysqlTask->get_req()->set_query(sql);
    
    // 启动 MySQL 任务
    mysqlTask->start();
    
    // 等待 WaitGroup 完成
    waitGroup.wait();
    
    return 0; // 返回 0 表示程序正常结束
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/887808.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode讲解篇之139. 单词拆分

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 我们使用一个数组记录字符串s在[0, i)区间能否使用wordDict组成 我们使用左右指针遍历字符串s的子串&#xff0c;左指针 j 为子串的左端点下标&#xff0c;右指针 i 为右端点下标的下一个 遍历过程中如果字符串s…

自然语言处理问答系统

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

字节放大招:无需LORA训练,小红书写真轻松搞定,Pulid-Flux换脸方案来了

前言 在这之前&#xff0c;SD常用的换脸节点还不支持Flux模型&#xff0c;使用Flux 做虚拟模特最好的方法是炼制人脸lora&#xff0c;但是炼丹是个有技术门槛的活。 之前文章有提过字节跳动的 Pulid团队&#xff0c;率先推出了Pulid-Flux模型&#xff0c;但是之前只能在线上使用…

【Redis】Hash类型的常用命令

背景&#xff1a;redis中存储数据采取key-value键值对的形式&#xff0c;而hash内部也是键值对&#xff0c;为了区别这两个东西&#xff0c;hash内部的键值对称为&#xff1a;field-value&#xff0c;而redis的为key-value&#xff0c;这里的value包括&#xff1a;field-value。…

Elasticsearch基础_5.ES聚合功能

文章目录 一、数据聚合1.1、桶聚合1.1.1、单维度桶聚合1.1.2、聚合结果排序1.1.3、限定聚合范围 1.2、Metric聚合 二、聚合总结 本文只记录ES聚合基本用法&#xff0c;后续有更复杂的需求可以查看相关书籍&#xff0c;如《Elasticsearch搜索引擎构建入门与实战》 一、数据聚合…

通过 Groovy 实现业务逻辑的动态变更

Groovy 1、需求的提出2、为什么是Groovy3、设计参考1_引入Maven依赖2_GroovyEngineUtils工具类3_GroovyScriptVar类4_脚本规则表设计5_对应的实体类6_数据库访问层7_GroovyExecService通用接口 4、测试5、其他的注意事项6、总结 1、需求的提出 在我们日常的开发过程中&#xf…

【机器学习】智驭未来:探索机器学习在食品生产中的革新之路

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀目录 &#x1f50d;1. 引言&#xff1a;探索机器学习在食品生产中的革新之路&#x1f4d2;2. 机器学习在食品质量控制中的应用&#x1f31e;实…

MySQL之复合查询与内外连接

目录 一、多表查询 二、自连接 三、子查询 四、合并查询 五、表的内连接和外连接 1、内连接 2、外连接 前面我们讲解的mysql表的查询都是对一张表进行查询&#xff0c;即数据的查询都是在某一时刻对一个表进行操作的。而在实际开发中&#xff0c;我们往往还需要对多个表…

15分钟学 Python 第41天:Python 爬虫入门(六)第二篇

Day41&#xff1a;Python爬取猫眼电影网站的电影信息 1. 项目背景 在本项目中&#xff0c;我们将使用 Python 爬虫技术从猫眼电影网站抓取电影信息。猫眼电影是一个知名的电影信息平台&#xff0c;提供了丰富的电影相关数据。通过这个练习&#xff0c;您将深入学习如何抓取动…

Android Compose的基本使用

前言: Compose这个东西呢,好处我没发现,坏处就是学习成本和低版本兼容. 不过,看在官方力推的份儿上,有空就学一下吧. 当初的kotlin,很多人说鸡肋(包括我)!现在不也咔咔用纯kotlin做项目吗?哈哈哈哈. 未来的事情,谁说得清呢? 首先创建一个专用的Compose项目 对没错!看到E…

大数据新视界 --大数据大厂之 从 Druid 和 Kafka 到 Polars:大数据处理工具的传承与创新

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

【笔记】数据结构12

文章目录 2013年408应用题41方法一方法二 看到的社区的一个知识总结&#xff0c;这里记录一下。 知识点汇总 2013年408应用题41 解决方法&#xff1a; 方法一 &#xff08;1&#xff09;算法思想 算法的策略是从前向后扫描数组元素&#xff0c;标记出一个可能成为主元素的元…

elasticsearch ES DBA常用语句

一、 查看集群状态 curl -uelastic 连接串:端口/_cluster/health?pretty 集群健康有三种状态&#xff1a;green,yellow,red green&#xff1a;所有主要分片、复制分片都可用yellow&#xff1a;所有主要分片可用&#xff0c;但不是所有复制分片都可用red&#xff1a;不是所有…

根据拍摄时间一键将图片分组

说在前面 最近裸辞出去玩了两个多月&#xff0c;旅行的过程中少不了拍照&#xff0c;祖国的大好河山太美了&#xff0c;一趟旅行下来产出了1w多张图片&#xff0c;所以回来后总要整理一下图片&#xff0c;我这边想要的是根据拍摄时间来对图片进行分组,所以回到家后我就写了这样…

通信工程学习:什么是ICP网络内容服务商

ICP&#xff1a;网络内容服务商 ICP&#xff0c;全称Internet Content Provider&#xff0c;即网络内容服务商&#xff0c;是指那些通过互联网向用户提供各种类型内容服务的组织或个人。ICP在数字化时代扮演着至关重要的角色&#xff0c;它们不仅是信息的传播者&#xff0c;更是…

Linux高级编程_29_信号

文章目录 进程间通讯 - 信号信号完整的信号周期信号的编号信号的产生发送信号1 kill 函数(他杀)作用&#xff1a;语法&#xff1a;示例&#xff1a; 2 raise函数(自杀)作用&#xff1a;示例&#xff1a; 3 abort函数(自杀)作用&#xff1a;语法&#xff1a;示例&#xff1a; 4 …

苏州 数字化科技展厅展馆-「世岩科技」一站式服务商

数字化科技展厅展馆设计施工是一个综合性强、技术要求高的项目&#xff0c;涉及到众多方面的要点。以下是对数字化科技展厅展馆设计施工要点的详细分析&#xff1a; 一、明确目标与定位 在设计之初&#xff0c;必须明确展厅的目标和定位。这包括确定展厅的主题、目标受众、展…

详解正确创建好SpringBoot项目后但是找不到Maven的问题

目录 问题 解决步骤&#xff1a; 找到File->Project Structure... 设置SDK 设置SDKs 问题 刚刚在使用IDEA专业版创建好SpringBoot项目后&#xff0c;发现上方导航栏的运行按钮是灰色的&#xff0c;而且左侧导航栏的pom.xml的图标颜色也不是正常的&#xff0c;与此同时我…

PIKACHU | PIKACHU 靶场 XSS 后台配置

关注这个靶场的其他相关笔记&#xff1a;PIKACHU —— 靶场笔记合集-CSDN博客 PIKACHU 自带了一个 XSS 平台&#xff0c;可以辅助我们完成 XSS 攻击&#xff0c;但是该后台需要配置数据库以后才能使用。本教程&#xff0c;就是教大家如何配置 PIKACHU XSS 平台的。 PIKACHU XS…

在线教育的未来:SpringBoot技术实现

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理微服务在线教育系统的相关信息成为必然。开…